Skip to content

Commit

Permalink
passes on standalone
Browse files Browse the repository at this point in the history
general clean up, preallocate smartly, always protect sexps directly or
indirectly, write more headers etc
  • Loading branch information
piccolbo committed Jan 30, 2014
1 parent 69b1847 commit f8d4b07
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 39 deletions.
32 changes: 25 additions & 7 deletions pkg/R/IO.R
Expand Up @@ -179,10 +179,26 @@ to.list =
if (is.matrix(x)) x = as.data.frame(x)
if (is.data.frame(x))
unname(
t.list(lapply(x, as.list)))
t.list(x))
else
as.list(x)}}

intersperse =
function(a.list, another.list, every.so.many)
splat(c)(
mapply(
lapply(another.list, list),
split(a.list, ceiling(seq_along(a.list)/every.so.many), drop = TRUE),
FUN = c,
SIMPLIFY = FALSE))

intersperse.one =
function(a.list, an.element, every.so.many)
splat(c)(
lapply(
split(a.list, ceiling(seq_along(a.list)/every.so.many)),
function(y) c(list(an.element),y)))

make.native.or.typedbytes.output.format =
function(native, write.size = 10^6) {
template = NULL
Expand All @@ -205,12 +221,14 @@ make.native.or.typedbytes.output.format =
if(is.null(template)) {
template <<-
list(key = rmr.slice(k, 0), val = rmr.slice(v, 0))}
typedbytes.writer(
list(
sample(ks, 1)[[1]],
structure(template, rmr.template = TRUE)),
con,
native)}
N = {
if(length(vs) < 100) 1
else {
r = ceiling((object.size(ks) + object.size(vs))/10^6)
if (r < 100) length(vs) / 100
else r}}
ks = intersperse(ks, sample(ks, ceiling(length(ks)/N)), N)
vs = intersperse.one(vs, structure(template, rmr.template = TRUE), N)}
typedbytes.writer(
interleave(ks, vs),
con,
Expand Down
2 changes: 1 addition & 1 deletion pkg/R/basic.R
Expand Up @@ -75,7 +75,7 @@ t.list =
function(l) {
if(length(l) == 0) l
else
.Call("t_list", l, PACKAGE = "rmr2")}
.Call("t_list", lapply(l, as.list), PACKAGE = "rmr2")}

#data frame manip

Expand Down
2 changes: 1 addition & 1 deletion pkg/R/streaming.R
Expand Up @@ -194,7 +194,7 @@ rmr.stream =
preamble = paste(sep = "", '
sink(file = stderr())
options(warn = 1)
options(error = quote({sink(stderr()); traceback(); q()}))
options(error = quote({sink(stderr()); traceback(); stop()}))
library(functional)
invisible(
if(is.null(formals(load)$verbose)) #recent R change
Expand Down
10 changes: 6 additions & 4 deletions pkg/src/t-list.cpp
Expand Up @@ -22,10 +22,12 @@ using std::endl;

SEXP t_list(SEXP _ll) {
List ll(_ll);
List l_1(as<List>(ll[1]));
vector<vector<RObject> > tll(l_1.size());
List l_0(as<List>(ll[0]));
List tll(l_0.size());
for(int j = 0; j < tll.size(); j++)
tll[j] = List(ll.size());
for(int i = 0; i < ll.size(); i++) {
List l_i(as<List>(ll[i]));
for(int j = 0; j < l_i.size(); j++) {
tll[j].push_back(l_i[j]);};}
for(int j = 0; j < tll.size(); j++) {
as<List>(tll[j])[i] = l_i[j];};}
return wrap(tll);}
56 changes: 31 additions & 25 deletions pkg/src/typed-bytes.cpp
Expand Up @@ -72,12 +72,6 @@ class ReadPastEnd {
type_code = _type_code;
start = _start;}};

class RmrTemplate {
public:
RObject rmr_template;
RmrTemplate(RObject _template) {
rmr_template = _template;}};

class UnsupportedType{
public:
unsigned char type_code;
Expand Down Expand Up @@ -291,7 +285,7 @@ RObject unserialize(const raw & data, unsigned int & start, int type_code){
for(int i = 0; i < names.size(); i++) {
char * c = names[i]; //workaround Rcpp bug now fixed, remove if assuming 0.10.2 and higher
string s(c);
new_object.attr(s)= attributes[i];}}
new_object.attr(s) = attributes[i];}}
break;
case R_VECTOR: {
int raw_length = get_length(data, start);
Expand Down Expand Up @@ -324,38 +318,50 @@ RObject unserialize(const raw & data, unsigned int & start, int type_code){
new_object = wrap(unserialize_vector<string>(data, start, raw_length));}
break;
case RMR_TEMPLATE: {
throw RmrTemplate(unserialize_native(data, start));}
new_object = unserialize_native(data, start);}
break;
default: {
throw UnsupportedType(type_code);}}
return new_object;}

List supersize(const List& x) {
unsigned int oldsize = x.size() ;
List y(2*oldsize) ;
for(unsigned int i = 0; i < oldsize; i++)
y[i] = x[i] ;
return y ;}

SEXP typedbytes_reader(SEXP data){
std::vector<RObject> objs;
List objs(1);
unsigned int objs_end = 0;
RawVector tmp(data);
raw rd(tmp.begin(), tmp.end());
unsigned int start = 0;
unsigned int parsed_start = 0;
unsigned int parsed_start = 0;
RObject rmr_template = R_NilValue;
while(rd.size() > start) {
try{
objs.push_back(unserialize(rd, start));
parsed_start = start;}
RObject new_object = unserialize(rd, start);
if(new_object.hasAttribute("rmr.template")) {
if(objs_end == 0)
safe_stop("Found template at beginning of buffer. Tough luck");
objs_end--; // discard the key for the template
rmr_template = new_object;}
else {
if(objs_end >= (unsigned int)objs.size())
objs = supersize(objs);
objs[objs_end] = new_object;
objs_end++;}
parsed_start = start;} //if rpe exception occurs parsed start won't move, unlike start
catch (ReadPastEnd rpe){
break;}
catch (UnsupportedType ue) {
safe_stop("Unsupported type: " + to_string((int)ue.type_code));}
catch (NegativeLength nl) {
safe_stop("Negative length exception");}
catch (RmrTemplate rt) {
objs.pop_back();
parsed_start = start;
rmr_template = rt.rmr_template;
}}
List list_tmp(objs.begin(), objs.end());
return wrap(
safe_stop("Negative length exception");}}
return wrap(
List::create(
Named("objects") = list_tmp,
Named("objects") = List(objs.begin(), objs.begin() + objs_end),
Named("length") = parsed_start,
Named("template") = rmr_template));}

Expand Down Expand Up @@ -415,7 +421,7 @@ void serialize_vector(T & data, unsigned char type_code, raw & serialized, bool
void serialize_list(List & data, raw & serialized, bool native){
serialized.push_back(TB_VECTOR);
length_header(data.size(), serialized);
for(unsigned int i = 0; i < data.size(); i++) {
for(unsigned int i = 0; i < (unsigned int)data.size(); i++) {
serialize(as<RObject>(data[i]), serialized, native);}}

void serialize_native(const RObject & object, raw & serialized, type_code tc = R_NATIVE) {
Expand Down Expand Up @@ -508,11 +514,11 @@ void serialize_noattr(const RObject & object, raw & serialized, bool native) {

void serialize_attributes(const RObject & object, raw & serialized) {
vector<string> names = object.attributeNames();
serialize(RObject(wrap(names)), serialized, TRUE);
serialize(wrap(names), serialized, TRUE);
vector<RObject> attributes;
for(unsigned int i = 0; i < names.size(); i++) {
attributes.push_back(object.attr(names[i]));}
serialize(RObject(wrap(attributes)), serialized, TRUE);}
serialize(wrap(attributes), serialized, TRUE);}

void serialize(const RObject & object, raw & serialized, bool native) {
bool has_attr = object.attributeNames().size() > 0;
Expand All @@ -535,7 +541,7 @@ SEXP typedbytes_writer(SEXP objs, SEXP native){
raw serialized(0);
List objects(objs);
LogicalVector is_native(native);
for(unsigned int i = 0; i < objects.size(); i++) {
for(unsigned int i = 0; i < (unsigned int)objects.size(); i++) {
try{
serialize(as<RObject>(objects[i]), serialized, is_native[0]);}
catch(UnsupportedType ut){
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/benchmarks.R
Expand Up @@ -20,7 +20,7 @@ for (be in c("local", "hadoop")) {
## @knitr input
input.size = {
if(rmr.options('backend') == "local")
10^5
10^4
else
10^6}
## @knitr end
Expand Down

0 comments on commit f8d4b07

Please sign in to comment.