Skip to content
Browse files

restoring 1.2

  • Loading branch information...
1 parent 298d97a commit c79d9534dc36d98a6be2e26cbdd6cd8f64caf6f5 @piccolbo piccolbo committed Feb 24, 2012
View
1 README.md
@@ -0,0 +1 @@
+Please see the [wiki](https://github.com/RevolutionAnalytics/RHadoop/wiki) for information,
View
6 rmr/pkg/DESCRIPTION
@@ -1,10 +1,10 @@
Package: rmr
Type: Package
Title: R and Hadoop Streaming Connector
-Version: 1.1
-Date: 2011-08-16
+Version: 1.2
+Date: 2012-02-20
Author: Revolution Analytics
-Depends: R (>= 2.6.0), methods, RJSONIO (>= 0.8-2), itertools, digest
+Depends: R (>= 2.6.0), methods, RJSONIO (>= 0.8-2), itertools, digest
Maintainer: Revolution Analytics <rhadoop@revolutionanalytics.com>
Description: Supports the map reduce programming model on top of hadoop streaming
License: Apache License (== 2.0)
View
9 rmr/pkg/NAMESPACE
@@ -1,11 +1,8 @@
export(mapreduce)
export(from.dfs, to.dfs)
-export(equijoin)
+export(equijoin, scatter)
export(dfs.empty)
-export(rmr.backend, rmr.profilenodes)
+export(rmr.options.set, rmr.options.get)
export(keyval, keys, values)
-export(rawtextinputformat, rawtextoutputformat,
- csvtextinputformat, csvtextoutputformat,
- jsontextinputformat, jsontextoutputformat,
- nativetextinputformat, nativetextoutputformat)
+export(make.input.format, make.output.format)
export(to.map, to.reduce, to.reduce.all)
View
1,126 rmr/pkg/R/mapreduce.R
@@ -7,7 +7,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
+# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -16,56 +16,45 @@
rmr.options = new.env(parent=emptyenv())
rmr.options$backend = "hadoop"
-rmr.options$profilenodes = FALSE
-
-rmr.profilenodes = function(on = NULL) {
- if (!is.null(on))
- rmr.options$profilenodes = on
- rmr.options$profilenodes}
-
-rmr.backend = function(backend = c(NULL, "hadoop", "local")) {
- if(!missing(backend)){
- backend = match.arg(backend)
- rmr.options$backend = backend}
- rmr.options$backend}
-
-#I/O
-createReader = function(linebufsize = 2000, textinputformat){
- con = file("stdin", open="r")
- close = function(){
- ## close(con)
- }
- readChunk = function(){
- lines = readLines(con = con, n = linebufsize, warn = FALSE)
- if(length(lines) > 0){
- return(lapply(lines, textinputformat))
- }else{
- return(NULL)
- }
- }
- return(list(close = close, get = readChunk))
-}
-
-send = function(out, textoutputformat = defaulttextoutputformat){
- if (is.keyval(out))
- cat(textoutputformat(out$key, out$val))
- else
- lapply(out, function(o) cat(textoutputformat(o$key, o$val)) )
- TRUE
-}
+rmr.options$profile.nodes = FALSE
+rmr.options$depend.check = FALSE
+#rmr.options$managed.dir = "/var/rmr/managed"
+
+rmr.options.get = function(...) {
+ opts = as.list(rmr.options)
+ if(missing(...))
+ opts
+ else {
+ args = c(...)
+ if (length(args) > 1)
+ opts[args]
+ else
+ opts[[args]]}}
+
+rmr.options.set = function(backend = c("hadoop", "local"),
+ profile.nodes = NULL#,
+ #depend.check = NULL,
+ #managed.dir = NULL
+ ) {
+ this.call = match.call()
+ backend = match.arg(backend) #this doesn't do anything, fix
+ lapply(names(this.call)[-1],
+ function(x)
+ assign(x, eval(this.call[[x]]), envir = rmr.options))
+ as.list(rmr.options)}
# additional hadoop features, disabled for now
-counter = function(group="r-stream",family, value){
- cat(sprintf("report:counter:%s,$s,%s",
- as.character(group),
- as.character(family),
- as.integer(value)),
+counter = function(group="r-stream", family, value) {
+ cat(sprintf("report:counter:%s, $s, %s",
+ as.character(group),
+ as.character(family),
+ as.integer(value)),
stderr())
}
-status = function(what){
- cat(sprintf("report:status:%s",
- as.character(what)),
+status = function(what) {
+ cat(sprintf("report:status:%s",
+ as.character(what)),
stderr())
}
@@ -85,159 +74,304 @@ keyval.to.list = function(kvl) {l = values(kvl); names(l) = keys(kvl); l}
to.map = function(fun1, fun2 = identity) {
if (missing(fun2)) {
- function(k,v) fun1(keyval(k,v))}
+ function(k, v) fun1(keyval(k, v))}
else {
- function(k,v) keyval(fun1(k), fun2(v))}}
+ function(k, v) keyval(fun1(k), fun2(v))}}
to.reduce = to.map
to.reduce.all = function(fun1, fun2 = identity) {
if (missing(fun2)) {
- function(k,vv) lapply(vv, function(v) fun1(keyval(k,v)))}
+ function(k, vv) lapply(vv, function(v) fun1(keyval(k, v)))}
else {
- function(k,vv) lapply(vv, function(v) keyval(fun1(k), fun2(v)))}}
+ function(k, vv) lapply(vv, function(v) keyval(fun1(k), fun2(v)))}}
-mkSeriesMap = function(map1, map2) function(k,v) {out = map1(k,v); map2(out$key, out$val)}
-mkParallelMap = function(...) function (k,v) lapply(list(...), function(map) map(k,v))
+## mapred combinators
+wrap.keyval = function(kv) {
+ if(is.null(kv)) list()
+ else if (is.keyval(kv)) list(kv)
+ else kv}
-## drivers section, or what runs on the nodes
+compose.mapred = function(mapred, map) function(k, v) {
+ out = mapred(k, v)
+ if (is.null(out)) NULL
+ else if (is.keyval(out)) map(out$key, out$val)
+ else do.call(c,
+ lapply(out,
+ function(x)
+ wrap.keyval(map(x$key, x$val))))}
-activateProfiling = function(){
- dir = file.path("/tmp/Rprof", Sys.getenv('mapred_job_id'), Sys.getenv('mapred_tip_id'))
- dir.create(dir, recursive = T)
- Rprof(file.path(dir, paste(Sys.getenv('mapred_task_id'), Sys.time())), interval=0.000000001)}
-
-closeProfiling = function() Rprof(NULL)
-
-mapDriver = function(map, linebufsize, textinputformat, textoutputformat, profile){
- if(profile) activateProfiling()
- k = createReader(linebufsize, textinputformat)
- while( !is.null(d <- k$get())){
- lapply(d,
- function(r) {
- out = map(r[[1]], r[[2]])
- if(!is.null(out))
- send(out, textoutputformat)
- })
- }
- k$close()
- if(profile) closeProfiling()
- invisible()
-}
+union.mapred = function(mr1, mr2) function(k, v) {
+ out = c(wrap.keyval(mr1(k, v)), wrap.keyval(mr2(k, v)))
+ if (length(out) == 0) NULL else out}
-listComp = function(ll,e) sapply(ll, function(l) isTRUE(all.equal(e,l)))
-## using isTRUE(all.equal(x)) because identical() was too strict, but on paper it should be it
-reduceDriver = function(reduce, linebufsize, textinputformat, textoutputformat, reduceondataframe, profile){
- if(profile) activateProfiling()
- k = createReader(linebufsize, textinputformat)
- lastKey = NULL
- lastGroup = list()
- while( !is.null(d <- k$get())){
- d = c(lastGroup,d)
- lastKey = d[[length(d)]][[1]]
- groupKeys = keys(d)
- lastGroup = d[listComp(groupKeys, lastKey)]
- d = d[!listComp(groupKeys, lastKey)]
- if(length(d) > 0) {
- groups = tapply(d, sapply(keys(d), digest), identity, simplify = FALSE)
- lapply(groups,
- function(g) {
- out = NULL
- out = reduce(g[[1]][[1]], if(reduceondataframe) {
- to.data.frame(values(g))}
- else {
- values(g)})
- if(!is.null(out))
- send(out, textoutputformat)
- })
- }
- }
- if (length(lastGroup) > 0) {
- out = reduce(lastKey,
- if(reduceondataframe) {
- to.data.frame(values(lastGroup))}
- else {
- values(lastGroup)})
- send(out, textoutputformat)
- }
- k$close()
- if(profile) closeProfiling()
- invisible()
-}
#some option formatting utils
-make.job.conf = function(m,pfx){
+paste.options = function(optlist) {
+ optlist = unlist(sapply(optlist, function(x) if (is.logical(x)) {if(x) "" else NULL} else x))
+ if(is.null(optlist)) ""
+ else paste(unlist(rbind(paste("-", names(optlist), sep = ""), optlist)), collapse = " ")}
+
+make.job.conf = function(m, pfx) {
N = names(m)
if(length(m) == 0) return(" ")
- paste(unlist(lapply(1:length(N),
- function(i){
- sprintf("%s %s=%s ", pfx, N[[i]],as.character(m[[i]]))})),
+ paste(unlist(lapply(1:length(N),
+ function(i) {
+ sprintf("%s %s=%s ", pfx, N[[i]], as.character(m[[i]]))})),
collapse = " ")}
-make.cache.files = function(caches,pfx,shorten = TRUE){
+make.cache.files = function(caches, pfx, shorten = TRUE) {
if(length(caches) == 0) return(" ")
- sprintf("%s %s",pfx, paste(sapply(caches,
- function(r){
+ sprintf("%s %s", pfx, paste(sapply(caches,
+ function(r) {
if(shorten) {
- path.leaf = tail(strsplit(r,"/")[[1]],1)
- sprintf("'%s#%s'",r,path.leaf)
- }else{
- sprintf("'%s'",r)}}),
- collapse = ","))}
+ path.leaf = tail(strsplit(r, "/")[[1]], 1)
+ sprintf("'%s#%s'", r, path.leaf)
+ }else {
+ sprintf("'%s'", r)}}),
+ collapse = ", "))}
-make.input.files = function(infiles){
+make.input.files = function(infiles) {
if(length(infiles) == 0) return(" ")
- paste(sapply(infiles,
- function(r){
- sprintf("-input %s ", r)}),
+ paste(sapply(infiles,
+ function(r) {
+ sprintf("-input %s ", r)}),
collapse=" ")}
-# formats
-# alternate, native format
-# unserialize(charToRaw(gsub("\\\\n", "\n", gsub("\n", "\\\\n", rawToChar(serialize(matrix(1:20, ncol = 5), ascii=T,conn = NULL))))))
-
-nativetextinputformat = function(line) {
- x = strsplit(line, "\t")[[1]]
- de = function(x) unserialize(charToRaw(gsub("\\\\n", "\n", x)))
- keyval(de(x[1]),de(x[2]))}
-
-nativetextoutputformat = function(k, v) {
- ser = function(x) gsub("\n", "\\\\n", rawToChar(serialize(x, ascii=T,conn = NULL)))
- paste(ser(k), "\t", ser(v), "\n", sep = "")}
+# I/O
+
+make.record.reader = function(mode = NULL, format = NULL, con = NULL) {
+ default = make.input.format()
+ if(is.null(mode)) mode = default$mode
+ if(is.null(format)) format = default$format
+ if(mode == "text") {
+ if(is.null(con)) con = file("stdin", "r") #not stdin() which is parsed by the interpreter
+ function() {
+ line = readLines(con, 1)
+ if(length(line) == 0) NULL
+ else format(line)}}
+ else {
+ if(is.null(con)) con = pipe("cat", "rb")
+ function() format(con)}}
+
+make.record.writer = function(mode = NULL, format = NULL, con = NULL) {
+ default = make.output.format()
+ if(is.null(mode)) mode = default$mode
+ if(is.null(format)) format = default$format
+ if(mode == "text") {
+ if(is.null(con)) con = stdout()
+ function(k, v) writeLines(format(k, v), con)}
+ else {
+ if(is.null(con)) con = pipe("cat", "wb")
+ function(k, v) format(k, v, con)}}
+
+IO.formats = c("text", "json", "csv", "native", "native.text",
+ "sequence.typedbytes")
+
+make.input.format = function(format = native.input.format,
+ mode = c("binary", "text"),
+ streaming.format = NULL, ...) {
+ mode = match.arg(mode)
+ if(is.character(format)) {
+ format = match.arg(format, IO.formats)
+ switch(format,
+ text = {format = text.input.format;
+ mode = "text"},
+ json = {format = json.input.format;
+ mode = "text"},
+ csv = {format = csv.input.format(...);
+ mode = "text"},
+ native.text = {format = native.text.input.format;
+ mode = "text"},
+ native = {format = native.input.format;
+ mode = "binary"},
+ sequence.typedbytes = {format = typed.bytes.input.format;
+ mode = "binary"})}
+ if(is.null(streaming.format) && mode == "binary")
+ streaming.format = "org.apache.hadoop.streaming.AutoInputFormat"
+ list(mode = mode, format = format, streaming.format = streaming.format)}
+
+make.output.format = function(format = native.output.format,
+ mode = c("binary", "text"),
+ streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ ...) {
+ mode = match.arg(mode)
+ if(is.character(format)) {
+ format = match.arg(format, IO.formats)
+ switch(format,
+ text = {format = text.output.format;
+ mode = "text";
+ streaming.format = NULL},
+ json = {format = json.output.format;
+ mode = "text";
+ streaming.format = NULL},
+ csv = {format = csv.output.format(...);
+ mode = "text";
+ streaming.format = NULL},
+ native.text = {format = native.text.output.format;
+ mode = "text";
+ streaming.format = NULL},
+ native = {format = native.output.format;
+ mode = "binary";
+ streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"},
+ sequence.typedbytes = {format = typed.bytes.output.format;
+ mode = "binary";
+ streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"})}
+ mode = match.arg(mode)
+ list(mode = mode, format = format, streaming.format = streaming.format)}
+
+native.text.input.format = function(line) {
+ if (length(line) == 0) NULL
+ else {
+ x = strsplit(line, "\t")[[1]]
+ de = function(x) unserialize(charToRaw(gsub("\\\\n", "\n", x)))
+ keyval(de(x[1]), de(x[2]))}}
+native.text.output.format = function(k, v) {
+ ser = function(x) gsub("\n", "\\\\n", rawToChar(serialize(x, ascii=T, conn = NULL)))
+ paste(ser(k), ser(v), sep = "\t")}
-jsontextinputformat = function(line) {
- decodeString = function(s) gsub("\\\\n","\\\n", gsub("\\\\t","\\\t", s))
+json.input.format = function(line) {
x = strsplit(line, "\t")[[1]]
- keyval(fromJSON(decodeString(x[1]), asText = TRUE),
- fromJSON(decodeString(x[2]), asText = TRUE))}
+ if(length(x) == 1) keyval(NULL, fromJSON(x[1], asText = TRUE))
+ else keyval(fromJSON(x[1], asText = TRUE),
+ fromJSON(x[2], asText = TRUE))}
+
+json.output.format = function(k, v) {
+ paste(gsub("\n", "", toJSON(k, .escapeEscapes=TRUE, collapse = "")),
+ gsub("\n", "", toJSON(v, .escapeEscapes=TRUE, collapse = "")),
+ sep = "\t")}
+
+text.input.format = function(line) {keyval(NULL, line)}
+text.output.format = function(k, v) paste(k, v, collapse = "", sep = "\t")
+
+csv.input.format = function(key = 1, ...) function(line) {
+ tc = textConnection(line)
+ df = tryCatch(read.table(file = tc, header = FALSE, ...),
+ error =
+ function(e) {
+ if (e$message == "no lines available in input")
+ write(x="No data in this line", file=stderr())
+ else stop(e)
+ NULL})
+ close(tc)
+ keyval(df[, key], df[, -key])}
+
+csv.output.format = function(...) function(k, v) {
+ on.exit(close(tc))
+ tc = textConnection(object = NULL, open = "w")
+ args = list(x = c(as.list(k), as.list(v)), file = tc, ..., row.names = FALSE, col.names = FALSE)
+ do.call(write.table, args[unique(names(args))])
+ paste(textConnectionValue(con = tc), sep = "", collapse = "")}
+
+typed.bytes.reader = function (con, type.code = NULL) {
+ r = function(...) {
+ if(is.raw(con))
+ con = rawConnection(con, open = "r")
+ readBin(con, endian = "big", signed = TRUE, ...)}
+ read.code = function() (256 + r(what = "integer", n = 1, size = 1)) %% 256
+ read.length = function() r(what = "integer", n= 1, size = 4)
+ tbr = function() typed.bytes.reader(con)[[1]]
+ two55.terminated.list = function() {
+ ll = list()
+ code = read.code()
+ while(length(code) > 0 && code != 255) {
+ ll = append(ll, typed.bytes.reader(con, code))
+ code = read.code()}
+ ll}#quadratic, fix later
+
+ if (is.null(type.code)) type.code = read.code()
+ if(length(type.code) > 0) {
+ list(switch(as.character(type.code),
+ "0" = r("raw", n = read.length()),
+ "1" = r("raw"),
+ "2" = r("logical", size = 1),
+ "3" = r("integer", size = 4),
+ "4" = r("integer", size = 8),
+ "5" = r("numeric", size = 4),
+ "6" = r("numeric", size = 8),
+ "7" = readChar(con, nchars = read.length(), useBytes=TRUE),
+ "8" = replicate(read.length(), tbr(), simplify=FALSE),
+ "9" = two55.terminated.list(),
+ "10" = replicate(read.length(), keyval(tbr(), tbr()), simplify = FALSE),
+ "11" = r("integer", size = 2), #this and the next implemented only in hive, silly
+ "12" = NULL,
+ "144" = unserialize(r("raw", n = read.length()))))}
+ else NULL}
+
+typed.bytes.writer = function(value, con, native = FALSE) {
+ w = function(x, size = NA_integer_) writeBin(x, con, size = size, endian = "big")
+ write.code = function(x) w(as.integer(x), size = 1)
+ write.length = function(x) w(as.integer(x), size = 4)
+ tbw = function(x) typed.bytes.writer(x, con)
+ if(native) {
+ bytes = serialize(value, NULL)
+ write.code(144); write.length(length(bytes)); w(bytes)
+ }
+ else{
+ if(is.list(value) && all(sapply(value, is.keyval))) {
+ write.code(10)
+ write.length(length(value))
+ lapply(value, function(kv) {lapply(kv, tbw)})}
+ else {
+ if(length(value) == 1) {
+ switch(class(value),
+ raw = {write.code(1); w(value)},
+ logical = {write.code(2); w(value, size = 1)},
+ integer = {write.code(3); w(value)},
+ #doesn't happen in R integer = {write.code(4); w(value)},
+ #doesn't happen in R numeric = {write.code(5); w(value)},
+ numeric = {write.code(6); w(value)},
+ character = {write.code(7); write.length(nchar(value)); writeChar(value, con, eos = NULL)},
+ factor = {value = as.character(value)
+ write.code(7); write.length(nchar(value)); writeChar(value, con, eos = NULL)},
+ stop("not implemented yet"))}
+ else {
+ switch(class(value),
+ raw = {write.code(0); write.length(length(value)); w(value)},
+ #NULL = {write.code(12); write.length(0)}, #this spec was added by the hive folks, but not in streaming HIVE-1029
+ {write.code(8); write.length(length(value)); lapply(value, tbw)})}}}
+ TRUE}
+
+typed.bytes.input.format = function(con) {
+ key = typed.bytes.reader(con)
+ val = typed.bytes.reader(con)
+ if(is.null(key) || is.null(val)) NULL
+ else keyval(key[[1]],val[[1]])}
-jsontextoutputformat = function(k,v) {
- encodeString = function(s) gsub("\\\n","\\\\n", gsub("\\\t","\\\\t", s))
- paste(encodeString(toJSON(k, collapse = "")), "\t", encodeString(toJSON(v, collapse = "")), "\n", sep = "")}
+typed.bytes.output.format = function(k, v, con) {
+ typed.bytes.writer(k, con)
+ typed.bytes.writer(v, con)}
-rawtextinputformat = function(line) {keyval(NULL, line)}
-csvtextinputformat = function(key = 1, ...) function(line) {tc = textConnection(line)
- df = read.table(file = tc, header = FALSE, ...)
- close(tc)
- keyval(df[,key], df[,-key])}
+native.input.format = typed.bytes.input.format
-rawtextoutputformat = function(k,v) paste(c(k,v, "\n"), collapse = "")
-csvtextoutputformat = function(...) function(k,v) {
- tc = textConnection(object = NULL, open = "w")
- args = list(x = c(as.list(k),as.list(v)), file = tc, ..., row.names = FALSE, col.names = FALSE)
- do.call(write.table, args[unique(names(args))])
- paste(textConnectionValue(con = tc), "\n", sep = "", collapse = "")}
+native.output.format = function(k, v, con){
+ typed.bytes.writer(k, con, TRUE)
+ typed.bytes.writer(v, con, TRUE)}
-defaulttextinputformat = nativetextinputformat
-defaulttextoutputformat = nativetextoutputformat
#data frame conversion
-
-flatten = function(x) unlist(list(name = as.name("name"), x))[-1]
-to.data.frame = function(l) data.frame(lapply(data.frame(do.call(rbind,lapply(l, flatten))), unlist))
-from.data.frame = function(df, keycol = 1) lapply(1:dim(df)[[1]], function(i) keyval(df[i,keycol],df[i,] ))
+flatten = function(x) if (is.list(x)) do.call(c, lapply(x, function(y) as.list(flatten(y)))) else if(is.factor(x)) as.character(x) else x
+
+list.to.data.frame =
+ function(x) {
+ mat = do.call(
+ rbind,
+ apply(
+ cbind(rmr.key = keys(x),
+ values(x)),
+ 1,
+ flatten))
+ df = data.frame(
+ lapply(1:dim(mat)[[2]],
+ function(i) unlist(mat[,i])))
+ names(df) = paste("V", 1:dim(df)[[2]], sep = "")
+ df}
+
+from.data.frame = function(df, keycol = NULL)
+ lapply(1:dim(df)[[1]],
+ function(i) keyval(if(is.null(keycol)) NULL else df[i, keycol], df[i, ] ))
#output cmp
cmp = function(x, y) isTRUE(all.equal(x[order(unlist(keys(x)))],
@@ -252,18 +386,18 @@ hdfs = function(cmd, intern, ...) {
else {
argnames = names(list(...))
}
- system(paste(Sys.getenv("HADOOP_HOME"), "/bin/hadoop dfs -", cmd, " ",
+ system(paste(Sys.getenv("HADOOP_HOME"), "/bin/hadoop dfs -", cmd, " ",
paste(
- apply(cbind(argnames, list(...)),1,
+ apply(cbind(argnames, list(...)), 1,
function(x) paste(
- if(x[[1]] == ""){""} else{"-"},
+ if(x[[1]] == "") {""} else {"-"},
x[[1]],
" ",
to.dfs.path(x[[2]]),
sep = ""))[
order(argnames, decreasing = T)],
- collapse = " "),
- sep = ""),
+ collapse = " "),
+ sep = ""),
intern = intern)}
getcmd = function(matched.call)
@@ -272,38 +406,54 @@ getcmd = function(matched.call)
hdfs.match.sideeffect = function(...) {
hdfs(getcmd(match.call()), FALSE, ...) == 0}
-hdfs.match.out = function(...)
- to.data.frame(strsplit(hdfs(getcmd(match.call()), TRUE, ...)[-1], " +"))
+#this returns a character matrix, individual cmds may benefit from additional transformations
+hdfs.match.out = function(...) {
+ oldwarn = options("warn")[[1]]
+ options(warn = -1)
+ retval = do.call(rbind, strsplit(hdfs(getcmd(match.call()), TRUE, ...), " +"))
+ options(warn = oldwarn)
+ retval}
mkhdfsfun = function(hdfscmd, out)
- eval(parse(text = paste ("hdfs.", hdfscmd, " = hdfs.match.", if(out) "out" else "sideeffect", sep = "")),
+ eval(parse(text = paste ("hdfs.", hdfscmd, " = hdfs.match.", if(out) "out" else "sideeffect", sep = "")),
envir = parent.env(environment()))
-for (hdfscmd in c("ls","lsr","df","du","dus","count","cat","text","stat","tail","help"))
+for (hdfscmd in c("ls", "lsr", "df", "du", "dus", "count", "cat", "text", "stat", "tail", "help"))
mkhdfsfun(hdfscmd, TRUE)
-for (hdfscmd in c("mv","cp","rm","rmr","expunge","put","copyFromLocal","moveFromLocal","get","getmerge",
- "copyToLocal","moveToLocal","mkdir","setrep","touchz","test","chmod","chown","chgrp"))
+for (hdfscmd in c("mv", "cp", "rm", "rmr", "expunge", "put", "copyFromLocal", "moveFromLocal", "get", "getmerge",
+ "copyToLocal", "moveToLocal", "mkdir", "setrep", "touchz", "test", "chmod", "chown", "chgrp"))
mkhdfsfun(hdfscmd, FALSE)
+pretty.hdfs.ls = function(...) {
+ ls.out = hdfs.ls(...)
+ if(ls.out[1,1] == "Found")
+ ls.out = ls.out[-1,]
+ if(class(ls.out) == "character") ls.out = t(ls.out)
+ df = as.data.frame(ls.out)
+ names(df) = c("mode", "links", "owner", "group", "size", "last.modified.date", "last.modified.time", "path")
+ df$links = as.numeric(sapply(as.character(df$links), function(x) if (x=="-") 0 else x))
+ df$size = as.numeric(as.character(df$size))
+ df}
+
# backend independent dfs section
dfs.exists = function(f) {
- if (rmr.backend() == 'hadoop')
+ if (rmr.options.get('backend') == 'hadoop')
hdfs.test(e = f)
else file.exists(f)}
-dfs.rm = function(f){
- if(rmr.backend() == 'hadoop')
+dfs.rm = function(f) {
+ if(rmr.options.get('backend') == 'hadoop')
hdfs.rm(f)
else file.remove(f)}
dfs.is.dir = function(f) {
- if (rmr.backend() == 'hadoop')
+ if (rmr.options.get('backend') == 'hadoop')
hdfs.test(d = f)
else file.info(f)['isdir']}
dfs.empty = function(f) {
- if(rmr.backend() == 'hadoop') {
+ if(rmr.options.get('backend') == 'hadoop') {
if(dfs.is.dir(f)) {
hdfs.test(z = file.path(to.dfs.path(f), 'part-00000'))}
else {hdfs.test(z = f)}}
@@ -318,351 +468,495 @@ to.dfs.path = function(input) {
if(is.function(input)) {
input()}}}
-to.dfs = function(object, file = dfs.tempfile(), textoutputformat = defaulttextoutputformat){
+to.dfs = function(object, output = dfs.tempfile(), format = "native") {
if(is.data.frame(object) || is.matrix(object)) {
- object = from.data.frame(object)
- }
+ object = from.data.frame(object)}
tmp = tempfile()
- dfsOutput = to.dfs.path(file)
- cat(paste
- (lapply
- (object,
- function(x) {kv = if (is.keyval(x)) x else keyval(NULL, x)
- textoutputformat(kv$key, kv$val)}),
- collapse=""),
- file = tmp)
- if(rmr.backend() == 'hadoop'){
- hdfs.put(tmp, dfsOutput)
+ dfsOutput = to.dfs.path(output)
+ if(is.character(format)) format = make.output.format(format)
+
+ write.file =
+ function(obj, f) {
+ con = file(f, if(format$mode == "text") "w" else "wb")
+ record.writer = make.record.writer(format$mode,
+ format$format,
+ con)
+ lapply(obj,
+ function(x) {
+ kv = if(is.keyval(x)) x else keyval(NULL, x)
+ record.writer(kv$key, kv$val)})
+ close(con)}
+
+ write.file(object, tmp)
+ if(rmr.options.get('backend') == 'hadoop') {
+ if(format$mode == "binary")
+ system(paste(hadoop.cmd(), "loadtb", dfsOutput, "<", tmp))
+ else hdfs.put(tmp, dfsOutput)
file.remove(tmp)}
- else
- file.rename(tmp, dfsOutput)
- file
-}
-
-from.dfs = function(file, textinputformat = defaulttextinputformat, todataframe = F){
- if(rmr.backend() == 'hadoop') {
+ else file.rename(tmp, dfsOutput)
+ output}
+
+from.dfs = function(input, format = "native", to.data.frame = FALSE) {
+ part.list = function(fname) {
+ if(rmr.options.get('backend') == "local") fname
+ else {
+ if(dfs.is.dir(fname))
+ pretty.hdfs.ls(paste(fname, "part*", sep = "/"))$path
+ else fname}}
+
+ read.file = function(f) {
+ con = file(f, if(format$mode == "text") "r" else "rb")
+ record.reader = make.record.reader(format$mode, format$format, con)
+ retval = list()
+ rec = record.reader()
+ i = 1
+ while(!is.null(rec)) {
+ logi = log2(i)
+ if(round(logi) == logi) retval = c(retval, rep(list(NULL), length(retval)))
+ retval[[i]] = rec
+ rec = record.reader()
+ i = i + 1}
+ close(con)
+ retval[!sapply(retval, is.null)]}
+
+ dumptb = function(src, dest){
+ lapply(src, function(x) system(paste(hadoop.cmd(), "dumptb", x, ">>", dest)))}
+
+ getmerge = function(src, dest) {
+ on.exit(unlink(tmp))
tmp = tempfile()
- hdfs.get(to.dfs.path(file), tmp)}
- else tmp = to.dfs.path(file)
- retval = if(file.info(tmp)[1,'isdir']) {
- do.call(c,
- lapply(list.files(tmp, "part*"),
- function(f) lapply(readLines(file.path(tmp, f)),
- textinputformat)))}
- else {
- lapply(readLines(tmp), textinputformat)}
- if(!todataframe) {
- retval}
- else{
- to.data.frame(retval) }
-}
+ lapply(src, function(x) {
+ hdfs.get(as.character(x), tmp)
+ system(paste('cat', tmp, '>>' , dest))
+ unlink(tmp)})
+ dest}
+
+ fname = to.dfs.path(input)
+ if(is.character(format)) format = make.input.format(format)
+ if(rmr.options.get("backend") == "hadoop") {
+ tmp = tempfile()
+ if(format$mode == "binary") dumptb(part.list(fname), tmp)
+ else getmerge(part.list(fname), tmp)}
+ else
+ tmp = fname
+ retval = read.file(tmp)
+ if(rmr.options.get("backend") == "hadoop") unlink(tmp)
+ if(to.data.frame) list.to.data.frame(retval)
+ else retval}
# mapreduce
-dfs.tempfile <- function(pattern = "file", tmpdir = tempdir()) {
+dfs.tempfile = function(pattern = "file", tmpdir = tempdir()) {
fname = tempfile(pattern, tmpdir)
namefun = function() {fname}
- reg.finalizer(environment(namefun),
+ reg.finalizer(environment(namefun),
function(e) {
fname = eval(expression(fname), envir = e)
if(Sys.getenv("mapred_task_id") != "" && dfs.exists(fname)) dfs.rm(fname)
})
namefun
}
+dfs.managed.file = function(call, managed.dir = rmr.options.get('managed.dir')) {
+ file.path(managed.dir, digest(lapply(call, eval)))}
mapreduce = function(
- input,
- output = NULL,
- map = to.map(identity),
- reduce = NULL,
- combine = NULL,
- reduceondataframe = FALSE,
- inputformat = NULL,
- outputformat = NULL,
- textinputformat = defaulttextinputformat,
- textoutputformat = defaulttextoutputformat,
- verbose = FALSE) {
+ input,
+ output = NULL,
+ map = to.map(identity),
+ reduce = NULL,
+ combine = NULL,
+ reduce.on.data.frame = FALSE,
+ input.format = "native",
+ output.format = "native",
+ backend.parameters = list(),
+ verbose = TRUE) {
on.exit(expr = gc(), add = TRUE) #this is here to trigger cleanup of tempfiles
- if (is.null(output)) output = dfs.tempfile()
+ if (is.null(output)) output =
+ if(rmr.options.get('depend.check'))
+ dfs.managed.file(match.call())
+ else
+ dfs.tempfile()
- backend = rmr.options$backend
+ backend = rmr.options.get('backend')
- profilenodes = rmr.options$profilenodes
+ profile.nodes = rmr.options.get('profile.nodes')
mr = switch(backend, hadoop = rhstream, local = mr.local, stop("Unsupported backend: ", backend))
+ if(is.character(input.format)) input.format = make.input.format(input.format)
+ if(is.character(output.format)) output.format = make.output.format(output.format)
- mr(map = map,
- reduce = reduce,
- reduceondataframe = reduceondataframe,
- combine = combine,
- in.folder = if(is.list(input)) {lapply(input, to.dfs.path)} else to.dfs.path(input),
- out.folder = to.dfs.path(output),
- profilenodes = profilenodes,
- inputformat = inputformat,
- outputformat = outputformat,
- textinputformat = textinputformat,
- textoutputformat = textoutputformat,
+ mr(map = map,
+ reduce = reduce,
+ reduce.on.data.frame = reduce.on.data.frame,
+ combine = combine,
+ in.folder = if(is.list(input)) {lapply(input, to.dfs.path)} else to.dfs.path(input),
+ out.folder = to.dfs.path(output),
+ profile.nodes = profile.nodes,
+ input.format = input.format,
+ output.format = output.format,
+ backend.parameters = backend.parameters[[backend]],
verbose = verbose)
output
}
# backends
-mr.local = function(map,
- reduce,
- reduceondataframe = FALSE,
- combine = NULL,
- in.folder,
- out.folder,
- profilenodes = FALSE,
- inputformat = NULL,
- outputformat = NULL,
- textinputformat = defaulttextinputformat,
- textoutputformat = defaulttextoutputformat,
+#local
+
+mr.local = function(map,
+ reduce,
+ reduce.on.data.frame,
+ combine,
+ in.folder,
+ out.folder,
+ profile.nodes,
+ input.format,
+ output.format,
+ backend.parameters,
verbose = verbose) {
- if(is.null(reduce)) reduce = function(k,vv) lapply(vv, function(v) keyval(k,v))
+ if(is.null(reduce)) reduce = function(k, vv) lapply(vv, function(v) keyval(k, v))
map.out = do.call(c,
- lapply(do.call(c,
+ lapply(do.call(c,
lapply(in.folder,
function(x) lapply(from.dfs(x,
- textinputformat = textinputformat),
- function(y){attr(y$val, 'rmr.input') = x; y}))),
+ format = input.format),
+ function(y) {attr(y$val, 'rmr.input') = x; y}))),
function(kv) {retval = map(kv$key, kv$val)
if(is.keyval(retval)) list(retval)
else retval}))
map.out = from.dfs(to.dfs(map.out))
reduce.out = tapply(X = map.out,
INDEX = sapply(keys(map.out), digest),
FUN = function(x) reduce(x[[1]]$key,
- if(reduceondataframe) to.data.frame(values(x)) else values(x)),
+ if(reduce.on.data.frame) list.to.data.frame(values(x)) else values(x)),
simplify = FALSE)
if(!is.keyval(reduce.out[[1]]))
reduce.out = do.call(c, reduce.out)
names(reduce.out) = replicate(n=length(names(reduce.out)), "")
- to.dfs(reduce.out, out.folder)}
+ to.dfs(reduce.out, out.folder, format = output.format)}
+
+#hadoop
+## drivers section, or what runs on the nodes
+
+activate.profiling = function() {
+ dir = file.path("/tmp/Rprof", Sys.getenv('mapred_job_id'), Sys.getenv('mapred_tip_id'))
+ dir.create(dir, recursive = T)
+ Rprof(file.path(dir, paste(Sys.getenv('mapred_task_id'), Sys.time())))}
+
+close.profiling = function() Rprof(NULL)
+
+
+map.driver = function(map, record.reader, record.writer, profile) {
+ if(profile) activate.profiling()
+ kv = record.reader()
+ while(!is.null(kv)) {
+ out = map(kv$key, kv$val)
+ if(!is.null(out)) {
+ if (is.keyval(out)) {record.writer(out$key, out$val)}
+ else {lapply(out, function(o) record.writer(o$key, o$val))}}
+ kv = record.reader()}
+ if(profile) close.profiling()
+ invisible()}
+
+list.cmp = function(ll, e) sapply(ll, function(l) isTRUE(all.equal(e, l)))
+## using isTRUE(all.equal(x)) because identical() was too strict, but on paper it should be it
+
+reduce.driver = function(reduce, record.reader, record.writer, reduce.on.data.frame, profile) {
+ reduce.flush = function(current.key, vv) {
+ out = reduce(current.key,
+ if(reduce.on.data.frame) {
+ list.to.data.frame(vv)}
+ else {vv})
+ if(!is.null(out)) {
+ if(is.keyval(out)) {record.writer(out$key, out$val)}
+ else {lapply(out, function(o) record.writer(o$key, o$val))}}}
+ if(profile) activate.profiling()
+ kv = record.reader()
+ current.key = kv$key
+ vv = list()
+ i = 1
+ while(!is.null(kv)) {
+ if(identical(kv$key, current.key)) {
+ logi = log2(i)
+ if(round(logi)==logi) vv = c(vv, rep(list(NULL), length(vv)))
+ vv[[i]] = kv$val
+ i = i + 1
+ }
+ else {
+ reduce.flush(current.key,
+ vv[!sapply(vv, is.null)])
+ current.key = kv$key
+ vv = list(kv$val)
+ i = 2
+ }
+ kv = record.reader()
+ }
+ if(length(vv) > 0) reduce.flush(current.key, vv[!sapply(vv, is.null)])
+ if(profile) close.profiling()
+ invisible()
+}
+
+# the main function for the hadoop backend
+
+hadoop.cmd = function() {
+ hadoopHome = Sys.getenv("HADOOP_HOME")
+ if(hadoopHome == "") warning("Environment variable HADOOP_HOME is missing")
+ hadoopBin = file.path(hadoopHome, "bin")
+ stream.jar = list.files(path=sprintf("%s/contrib/streaming", hadoopHome), pattern="jar$", full=TRUE)
+ sprintf("%s/hadoop jar %s ", hadoopBin, stream.jar)}
rhstream = function(
- map,
- reduce = NULL,
- reduceondataframe = F,
- combine = NULL,
- in.folder,
+ map,
+ reduce,
+ reduce.on.data.frame,
+ combine,
+ in.folder,
out.folder,
- linebufsize = 2000,
- profilenodes = FALSE,
- numreduces,
- cachefiles = c(),
- archives = c(),
- jarfiles = c(),
- otherparams = list(HADOOP_HOME = Sys.getenv('HADOOP_HOME'),
- HADOOP_CONF = Sys.getenv("HADOOP_CONF")),
- mapred = list(),
- mpr.out = NULL,
- inputformat = NULL,
- outputformat = NULL,
- textinputformat = defaulttextinputformat,
- textoutputformat = defaulttextoutputformat,
- verbose = FALSE,
+ profile.nodes,
+ cachefiles = NULL,
+ archives = NULL,
+ jarfiles = NULL,
+ otherparams = list(HADOOP_HOME = Sys.getenv('HADOOP_HOME'),
+ HADOOP_CONF = Sys.getenv("HADOOP_CONF")),
+ input.format,
+ output.format,
+ backend.parameters,
+ verbose = TRUE,
debug = FALSE) {
## prepare map and reduce executables
lines = '#! /usr/bin/env Rscript
options(warn=1)
library(rmr)
load("rmr-local-env")
-
+load("rmr-global-env")
+invisible(lapply(libs, function(l) library(l, character.only = T)))
'
-
- mapLine = 'load("rmr-map-env", envir = environment(map))
- rmr:::mapDriver(map = map,
- linebufsize = linebufsize,
- textinputformat = textinputformat,
- textoutputformat = if(is.null(reduce))
- {textoutputformat}
- else {rmr:::defaulttextoutputformat},
- profile = profilenodes)'
- reduceLine = 'load("rmr-reduce-env", envir = environment(reduce))
- rmr:::reduceDriver(reduce = reduce,
- linebufsize = linebufsize,
- textinputformat = rmr:::defaulttextinputformat,
- textoutputformat = textoutputformat,
- reduceondataframe = reduceondataframe,
- profile = profilenodes)'
- combineLine = 'load("rmr-combine-env", envir = environment(combine))
- rmr:::reduceDriver(reduce = combine,
- linebufsize = linebufsize,
- textinputformat = rmr:::defaulttextinputformat,
- textoutputformat = rmr:::defaulttextoutputformat,
- reduceondataframe = reduceondataframe,
- profile = profilenodes)'
+ map.line = ' rmr:::map.driver(map = map,
+ record.reader = rmr:::make.record.reader(input.format$mode,
+ input.format$format),
+ record.writer = if(is.null(reduce)) {
+ rmr:::make.record.writer(output.format$mode,
+ output.format$format)}
+ else {
+ rmr:::make.record.writer()},
+ profile = profile.nodes)'
+ reduce.line = ' rmr:::reduce.driver(reduce = reduce,
+ record.reader = rmr:::make.record.reader(),
+ record.writer = rmr:::make.record.writer(output.format$mode,
+ output.format$format),
+ reduce.on.data.frame = reduce.on.data.frame,
+ profile = profile.nodes)'
+ combine.line = ' rmr:::reduce.driver(reduce = combine,
+ record.reader = rmr:::make.record.reader(),
+ record.writer = rmr:::make.record.writer(),
+ reduce.on.data.frame = reduce.on.data.frame,
+ profile = profile.nodes)'
map.file = tempfile(pattern = "rhstr.map")
- writeLines(c(lines,mapLine), con = map.file)
+ writeLines(c(lines, map.line), con = map.file)
reduce.file = tempfile(pattern = "rhstr.reduce")
- writeLines(c(lines, reduceLine), con = reduce.file)
+ writeLines(c(lines, reduce.line), con = reduce.file)
combine.file = tempfile(pattern = "rhstr.combine")
- writeLines(c(lines, combineLine), con = combine.file)
+ writeLines(c(lines, combine.line), con = combine.file)
## set up the execution environment for map and reduce
if (!is.null(combine) && is.logical(combine) && combine) {
combine = reduce}
-
+
save.env = function(fun = NULL, name) {
fun.env = file.path(tempdir(), name)
- envir = if(is.null(fun)) parent.env(environment()) else environment(fun)
+ envir =
+ if(is.null(fun)) parent.env(environment()) else {
+ if (is.function(fun)) environment(fun)
+ else fun}
save(list = ls(all = TRUE, envir = envir), file = fun.env, envir = envir)
fun.env}
+ libs = sub("package:", "", grep("package", search(), value = T))
image.cmd.line = paste("-file",
c(save.env(name = "rmr-local-env"),
- save.env(map, "rmr-map-env"),
- if(is.function(reduce)) {
- save.env(reduce, "rmr-reduce-env")},
- if(is.function(combine))
- save.env(combine, "rmr-combine-env")),
- collapse=" ")
-
+ save.env(.GlobalEnv, "rmr-global-env")),
+ collapse = " ")
## prepare hadoop streaming command
- hadoopHome = Sys.getenv("HADOOP_HOME")
- if(hadoopHome == "") warning("Environment variable HADOOP_HOME is missing")
- hadoopBin = file.path(hadoopHome, "bin")
- stream.jar = list.files(path=sprintf("%s/contrib/streaming", hadoopHome),pattern="jar$",full=TRUE)
- hadoop.command = sprintf("%s/hadoop jar %s ", hadoopBin,stream.jar)
+ hadoop.command = hadoop.cmd()
input = make.input.files(in.folder)
- output = if(!missing(out.folder)) sprintf("-output %s",out.folder) else " "
- inputformat = if(is.null(inputformat)){
+ output = if(!missing(out.folder)) sprintf("-output %s", out.folder) else " "
+ input.format.opt = if(is.null(input.format$streaming.format)) {
' ' # default is TextInputFormat
- }else{
- sprintf(" -inputformat %s", inputformat)
+ }else {
+ sprintf(" -inputformat %s", input.format$streaming.format)
}
- outputformat = if(is.null(outputformat)){
+ output.format.opt = if(is.null(output.format$streaming.format)) {
' '}
else {
- sprintf(" -outputformat %s", outputformat)
+ sprintf(" -outputformat %s", output.format$streaming.format)
}
- mapper = sprintf('-mapper "Rscript %s" ', tail(strsplit(map.file,"/")[[1]],1))
- m.fl = sprintf("-file %s ",map.file)
- if(!is.null(reduce) ){
- if(is.character(reduce) && reduce=="aggregate"){
- reducer = sprintf('-reducer aggregate ')
- r.fl = " "
- } else{
- reducer = sprintf('-reducer "Rscript %s" ', tail(strsplit(reduce.file,"/")[[1]],1))
- r.fl = sprintf("-file %s ",reduce.file)
- }
- }else {
- reducer=" ";r.fl = " "
- }
+ stream.map.input =
+ if(input.format$mode == "binary") {
+ " -D stream.map.input=typedbytes"}
+ else {''}
+ stream.map.output =
+ if(is.null(reduce) && output.format$mode == "text") ""
+ else " -D stream.map.output=typedbytes"
+ stream.reduce.input = " -D stream.reduce.input=typedbytes"
+ stream.reduce.output =
+ if(output.format$mode == "binary") " -D stream.reduce.output=typedbytes"
+ else ''
+ stream.mapred.io = paste(stream.map.input,
+ stream.map.output,
+ stream.reduce.input,
+ stream.reduce.output)
+ mapper = sprintf('-mapper "Rscript %s" ', tail(strsplit(map.file, "/")[[1]], 1))
+ m.fl = sprintf("-file %s ", map.file)
+ if(!is.null(reduce) ) {
+ reducer = sprintf('-reducer "Rscript %s" ', tail(strsplit(reduce.file, "/")[[1]], 1))
+ r.fl = sprintf("-file %s ", reduce.file)}
+ else {
+ reducer=" ";r.fl = " "}
if(!is.null(combine) && is.function(combine)) {
- combiner = sprintf('-combiner "Rscript %s" ', tail(strsplit(combine.file, "/")[[1]],1))
+ combiner = sprintf('-combiner "Rscript %s" ', tail(strsplit(combine.file, "/")[[1]], 1))
c.fl = sprintf("-file %s ", combine.file)}
else {
combiner = " "
c.fl = " "}
- if(!missing(numreduces)) numreduces = sprintf("-numReduceTasks %s ", numreduces) else numreduces = " "
+
cmds = make.job.conf(otherparams, pfx="-cmdenv")
- if(is.null(mapred$mapred.textoutputformat.separator)){
- if(!is.null(mpr.out)) mapred$mapred.textoutputformat.separator = sprintf("'%s'",mpr.out)
- }
- jobconfstring = make.job.conf(mapred,pfx="-D")
#debug.opts = "-mapdebug kdfkdfld -reducexdebug jfkdlfkja"
- caches = if(length(cachefiles)>0) make.cache.files(cachefiles,"-files") else " " #<0.21
- archives = if(length(archives)>0) make.cache.files(archives,"-archives") else " "
- mkjars = if(length(jarfiles)>0) make.cache.files(jarfiles,"-libjars",shorten=FALSE) else " "
+ caches = if(length(cachefiles)>0) make.cache.files(cachefiles, "-files") else " " #<0.21
+ archives = if(length(archives)>0) make.cache.files(archives, "-archives") else " "
+ mkjars = if(length(jarfiles)>0) make.cache.files(jarfiles, "-libjars", shorten=FALSE) else " "
- verb = if(verbose) "-verbose " else " "
- finalcommand =
+ final.command =
paste(
- hadoop.command,
- archives,
- caches,
- mkjars,
- jobconfstring,
- inputformat,
- input,
- output,
- mapper,
- reducer,
- combiner,
- m.fl,
- r.fl,
- c.fl,
- image.cmd.line,
- cmds,
- numreduces,
- # debug.opts,
- verb)
- retval = system(finalcommand)
-if (retval != 0) stop("hadoop streaming failed with error code ", retval, "\n")
+ hadoop.command,
+ paste.options(backend.parameters),
+ stream.mapred.io,
+ archives,
+ caches,
+ mkjars,
+ input.format.opt,
+ output.format.opt,
+ input,
+ output,
+ mapper,
+ reducer,
+ combiner,
+ m.fl,
+ r.fl,
+ c.fl,
+ image.cmd.line,
+ cmds,
+ "2>&1")
+ if(verbose) {
+ retval = system(final.command)
+ if (retval != 0) stop("hadoop streaming failed with error code ", retval, "\n")}
+ else {
+ console.output = tryCatch(system(final.command, intern=TRUE),
+ warning = function(e) stop(e))
+ 0
+ }
}
-
-
+##special jobs
## a sort of relational join very useful in a variety of map reduce algorithms
## to.dfs(lapply(1:10, function(i) keyval(i, i^2)), "/tmp/reljoin.left")
## to.dfs(lapply(1:10, function(i) keyval(i, i^3)), "/tmp/reljoin.right")
-## equijoin(leftinput="/tmp/reljoin.left", rightinput="/tmp/reljoin.right", output = "/tmp/reljoin.out")
+## equijoin(left.input="/tmp/reljoin.left", right.input="/tmp/reljoin.right", output = "/tmp/reljoin.out")
## from.dfs("/tmp/reljoin.out")
equijoin = function(
- leftinput = NULL,
- rightinput = NULL,
- input = NULL,
- output = NULL,
- outer = c("", "left", "right", "full"),
- map.left = to.map(identity),
- map.right = to.map(identity),
+ left.input = NULL,
+ right.input = NULL,
+ input = NULL,
+ output = NULL,
+ outer = c("", "left", "right", "full"),
+ map.left = to.map(identity),
+ map.right = to.map(identity),
reduce = function(k, values.left, values.right)
- do.call(c,
- lapply(values.left,
- function(vl) lapply(values.right,
- function(vr) reduceall(k, vl, vr)))),
- reduceall = function(k,vl,vr) keyval(k, list(left = vl, right = vr)))
-{
- stopifnot(xor(!is.null(leftinput), !is.null(input) &&
- (is.null(leftinput)==is.null(rightinput))))
+ do.call(c,
+ lapply(values.left,
+ function(vl) lapply(values.right,
+ function(vr) reduce.all(k, vl, vr)))),
+ reduce.all = function(k, vl, vr) keyval(k, list(left = vl, right = vr)))
+ {
+ stopifnot(xor(!is.null(left.input), !is.null(input) &&
+ (is.null(left.input)==is.null(right.input))))
outer = match.arg(outer)
- leftouter = outer == "left"
- rightouter = outer == "right"
- fullouter = outer == "full"
- if (is.null(leftinput)) {
- leftinput = input}
- markSide =
+ left.outer = outer == "left"
+ right.outer = outer == "right"
+ full.outer = outer == "full"
+ if (is.null(left.input)) {
+ left.input = input}
+ mark.side =
function(kv, isleft) keyval(kv$key, list(val = kv$val, isleft = isleft))
- isLeftSide =
- function(leftinput) {
- leftin = strsplit(to.dfs.path(leftinput), "/+")[[1]]
+ is.left.side =
+ function(left.input) {
+ leftin = strsplit(to.dfs.path(left.input), "/+")[[1]]
mapin = strsplit(Sys.getenv("map_input_file"), "/+")[[1]]
leftin = leftin[-1]
- mapin = mapin[if(mapin[1] == "hdfs:") c(-1,-2) else -1]
+ mapin = mapin[if(mapin[1] == "hdfs:") c(-1, -2) else -1]
all(mapin[1:length(leftin)] == leftin)}
reduce.split =
function(vv) tapply(lapply(vv, function(v) v$val), sapply(vv, function(v) v$isleft), identity, simplify = FALSE)
- padSide =
- function(vv, sideouter, fullouter) if (length(vv) == 0 && (sideouter || fullouter)) c(NA) else vv
+ pad.side =
+ function(vv, side.outer, full.outer) if (length(vv) == 0 && (side.outer || full.outer)) c(NA) else vv
map = if (is.null(input)) {
- function(k,v) {
- ils = switch(rmr.backend(),
- hadoop = isLeftSide(leftinput),
- local = attr(v, 'rmr.input') == to.dfs.path(leftinput),
- stop("Unsupported backend: ", rmr.backend()))
- markSide(if(ils) map.left(k,v) else map.right(k,v), ils)}}
+ function(k, v) {
+ ils = switch(rmr.options.get('backend'),
+ hadoop = is.left.side(left.input),
+ local = attr(v, 'rmr.input') == to.dfs.path(left.input),
+ stop("Unsupported backend: ", rmr.options.get('backend')))
+ mark.side(if(ils) map.left(k, v) else map.right(k, v), ils)}}
else {
- function(k,v) {
- list(markSide(map.left(k,v), TRUE),
- markSide(map.right(k,v), FALSE))}}
+ function(k, v) {
+ list(mark.side(map.left(k, v), TRUE),
+ mark.side(map.right(k, v), FALSE))}}
eqj.reduce = reduce
- mapreduce(map = map,
+ mapreduce(map = map,
reduce =
function(k, vv) {
rs = reduce.split(vv)
- eqj.reduce(k,
- padSide(rs$`TRUE`, rightouter, fullouter),
- padSide(rs$`FALSE`, leftouter, fullouter))},
- input = c(leftinput,rightinput),
+ eqj.reduce(k,
+ pad.side(rs$`TRUE`, right.outer, full.outer),
+ pad.side(rs$`FALSE`, left.outer, full.outer))},
+ input = c(left.input, right.input),
output = output)}
+
+
+## push a file through this to get as many partitions as possible (depending on system settings)
+## data is unchanged
+
+scatter = function(input, output = NULL)
+ mapreduce(input, output, map = function(k, v) keyval(runif(1), keyval(k, v)),
+ reduce = function(k, vv) vv)
+
+##optimizer
+
+is.mapreduce = function(x) {
+ is.call(x) && x[[1]] == "mapreduce"}
+
+mapreduce.arg = function(x, arg) {
+ match.call(mapreduce, x) [[arg]]}
+
+optimize = function(mrex) {
+ mrin = mapreduce.arg(mrex, 'input')
+ if (is.mapreduce(mrex) &&
+ is.mapreduce(mrin) &&
+ is.null(mapreduce.arg(mrin, 'output')) &&
+ is.null(mapreduce.arg(mrin, 'reduce'))) {
+ bquote(
+ mapreduce(input = .(mapreduce.arg(mrin, 'input')),
+ output = .(mapreduce.arg(mrex, 'output')),
+ map = .(compose.mapred)(.(mapreduce.arg(mrex, 'map')),
+ .(mapreduce.arg(mrin, 'map'))),
+ reduce = .(mapreduce.arg(mrex, 'reduce'))))}
+ else mrex }
+
+
+
View
19 rmr/pkg/examples/large-kmeans-test.R
@@ -0,0 +1,19 @@
+input.1000 = mapreduce (input = to.dfs(1:1000),
+ map = function(k,v) keyval(rnorm(1), v),
+ reduce = to.reduce.all(identity))
+
+input.10e6 = mapreduce (input = input.1000,
+ map = function(k,v) lapply(1:1000, function(i) keyval(rnorm(1), v)),
+ reduce = to.reduce.all(identity))
+
+kmeans.input.10e6 = mapreduce(input.1000,
+ map = function(k,v) keyval(rnorm(1), cbind(sample(0:2, recsize, replace = T) +
+ rnorm(recsize, sd = .1),
+ sample(0:3, recsize, replace = T) +
+ rnorm(recsize, sd = .1))))
+
+kmeans.input.10e9 = mapreduce(input.10e6,
+ map = function(k,v) keyval(rnorm(1), cbind(sample(0:2, recsize, replace = T) +
+ rnorm(recsize, sd = .1),
+ sample(0:3, recsize, replace = T) +
+ rnorm(recsize, sd = .1))))
View
2 rmr/pkg/examples/stats.R
@@ -0,0 +1,2 @@
+rmr.sample = function(input, output, rate, rate = NULL, size = NULL, input.size = NULL)
+ if()
View
2 rmr/pkg/man/RevoHStream-package.Rd
@@ -2,5 +2,5 @@
\alias{rmr}
\docType{package}
\title{A package to perform Map Reduce computations in R}
-\description{Running on top of Hadoop, this package allows to define an run map reduce jobs, including specifying the mapper and the reducer as R functions and to move data between R and hadoop in a mostly transparent way. The aim is to make writing map reduce jobs very similar and just as easy as writing a lapply and a tapply. Additional features provide easy job composition, transparent intermediate result management, support for mapper and reducer definition and more.
+\description{Running on top of Hadoop, this package allows to define an run mapreduce jobs, including specifying the mapper and the reducer as R functions, and to move data between R and Hadoop in a mostly transparent way. The aim is to make writing map reduce jobs very similar and just as easy as writing a lapply and a tapply. Additional features provide easy job composition, transparent intermediate result management, support for mapper and reducer definition and more.
}
View
2 rmr/pkg/man/dfs.empty.Rd
@@ -1,7 +1,7 @@
\name{dfs.empty}
\alias{dfs.empty}
\title{
-Check if a dfs file is emptt
+Check if a dfs file is empty
}
\description{
Also takes objects returned by mapreduce
View
12 rmr/pkg/man/equijoin.Rd
@@ -10,12 +10,12 @@ Equijoins using map reduce
A generalized form of equijoin, hybrid between the SQL brethen and mapreduce
}
-\usage{equijoin(leftinput = NULL, rightinput = NULL, input = NULL, output = NULL, outer = c("", "left", "right", "full"), map.left =
+\usage{equijoin(left.input = NULL, right.input = NULL, input = NULL, output = NULL, outer = c("", "left", "right", "full"), map.left =
to.map(identity), map.right = to.map(identity), reduce = function(k, values.left, values.right) do.call(c, lapply(values.left, function(vl)
-lapply(values.right, function(vr) reduceall(k, vl, vr)))), reduceall = function(k, vl, vr) keyval(k, list(left = vl, right = vr)))}
+lapply(values.right, function(vr) reduce.all(k, vl, vr)))), reduce.all = function(k, vl, vr) keyval(k, list(left = vl, right = vr)))}
-\arguments{\item{leftinput}{The left side input to the join.}
- \item{rightinput}{The right side input to the join.}
+\arguments{\item{left.input}{The left side input to the join.}
+ \item{right.input}{The right side input to the join.}
\item{input}{The only input in case of a self join. Mutually exclusive with the previous two.}
\item{output}{Where to write the output.}
\item{outer}{Whether to perform an outer join, one of the usual three types, left, right or full.}
@@ -26,7 +26,7 @@ will become the join key.}
\item{reduce}{Function to be applied, key by key, on the lists of values associated with that key, one from the right side as produced
by map.left and the other from the right side and map.right. This is a more mapreduce like API, very much like a reduce but with two lists
of values, all sharing the same key. Returns 0 or more keyval pairs like any reduce function.}
- \item{reduceall}{Function to be applied to each triple comprising a key, a value associated with that key from the left input and a
+ \item{reduce.all}{Function to be applied to each triple comprising a key, a value associated with that key from the left input and a
value associated with that key from the right input. This is more an SQL like interface, like an equijoin. Returns 0 or more keyval pairs
like any reduce function.}
}
@@ -44,5 +44,5 @@ like any reduce function.}
##---- Should be DIRECTLY executable !! ----
##-- ==> Define data, use random,
##-- or do help(data=index) for the standard data sets.
- from.dfs(equijoin(leftinput = to.dfs(lapply(1:10, function(i) keyval(i, i^2))), rightinput = to.dfs(lapply(1:10, function(i) keyval(i, i^3)))), todataframe=TRUE)
+ from.dfs(equijoin(left.input = to.dfs(lapply(1:10, function(i) keyval(i, i^2))), right.input = to.dfs(lapply(1:10, function(i) keyval(i, i^3)))), to.data.frame=TRUE)
}
View
20 rmr/pkg/man/fromdfstodfs.Rd
@@ -6,16 +6,16 @@
\description{Utility function that reads and writes lists to HDFS}
\usage{
-to.dfs(object, file = dfs.tempfile(), textoutputformat = defaulttextoutputformat)
-from.dfs(file, textinputformat = defaulttextinputformat, todataframe = F)
+to.dfs(object, output = dfs.tempfile(), format = "native")
+from.dfs(input, format = "native", to.data.frame = FALSE)
}
\arguments{
- \item{object}{An list of key-value pairs as returned by keyvalue to write to HDFS; experimentally, a data frame}
- \item{file}{A file in HDFS to read from or write to; for from.dfs, the return value of \code{mapreduce}}
- \item{textoutputformat}{The output format of the file being written (from.dfs only)}
- \item{textinputformat}{The input format of the file being read (from.dfs only)}
- \item{todataframe}{Do what it takes to return a data frame (from.dfs only). }
+ \item{object}{A list of key-value pairs as returned by keyvalue to write to HDFS; experimentally, a data frame}
+ \item{input}{A file in HDFS to read from or write to; for from.dfs, the return value of \code{mapreduce}}
+ \item{output}{A file in HDFS to read from or write to}
+ \item{format}{For \code{from.dfs} either a string naming a format, the same as those allowed by \code{make.input.format}, or the value returned by \code{\link{make.input.format}}. The same is true for \code{to.dfs}, but refer to \code{\link{make.output.format}} instead.}
+ \item{to.data.frame}{Do what it takes to return a data frame.}
}
@@ -24,8 +24,8 @@ from.dfs(file, textinputformat = defaulttextinputformat, todataframe = F)
different by two or three orders of magnitude, roughly speaking, so the conversion will make sense only in specific situations. These
functions do not perform any size control, so the responsibility is on the user.}
-\value{\code{from.dfs} returns the object whose representation is contained in \code{file}. \code{from.dfs} returns the file it wrote a
-representation of the object provided as argument to or, when \code{file} is missing, an object that can be passed as input to a \code{mapreduce}
+\value{\code{from.dfs} returns the object whose representation is contained in \code{file}. \code{to.dfs} returns the file it wrote a
+representation of the object provided as argument to or, when \code{output} is missing, an object that can be passed as input to a \code{mapreduce}
or \code{from.dfs} call. }
-\examples{##See \code{mapreduce} for examples}
+\examples{##See \code{\link{mapreduce}} for examples}
View
6 rmr/pkg/man/keyval.Rd
@@ -19,10 +19,8 @@
}
\details{The keyval helper function should be used to create return values for the map and reduce functions passed as parameters to
-mapReduce, which can also return a list of key-value pairs. Lists of keyvaly pairs are also appropriate arguments for the to.dfs
-function and returned by from.dfs. When provided the first two arguments, the first is the key and the second the value. When the second
-argument is missing, the first must be a list, of which one element will become the key and the rest the value. Which element becomes the
-key is determined by the third, optional argument. keys and values extract a list of keys or values resp. from a list of key value pairs}
+\code{mapreduce}, which can also return NULL or a list of key-value pairs. Lists of keyvalue pairs are also appropriate arguments for the \code{to.dfs}
+function and are returned by \code{from.dfs}. \code{keys} and \code{values} extract a list of keys or values resp. from a list of key value pairs}
\examples{
keyval(1,2)
View
47 rmr/pkg/man/make.io.format.Rd
@@ -0,0 +1,47 @@
+\name{make.input.format}
+\alias{make.input.format}
+\alias{make.output.format}
+
+\title{
+ Create combinations of settings for flexible IO
+}
+\description{
+Create combinations of IO settings either from named formats or from a combination of a Java class, a mode and an R function
+}
+\usage{
+make.input.format(format = native.input.format, mode = c("binary", "text"), streaming.format = NULL, ...)
+make.output.format(format = native.output.format, mode = c("binary", "text"), streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat", ...)}
+
+\arguments{
+ \item{format}{
+ Either a string describing a predefined combination of IO settings (possibilites include: "text", "json", "csv", "native", "native.text",
+ "sequence.typedbytes", "raw.typedbytes") or a function. The exact signature and return value for this function depend on the IO direction and mode.
+\preformatted{
+Arguments: Return value:
+ input output input output
+text line k, v text kv line
+binary con k, v, con binary kv TRUE}
+
+Where \code{k, v} are R objects playing the role of key and value, resp., con is a connection, \code{kv} is a key-value pair and \code{line} is a string.
+}
+ \item{mode}{
+Mode can be either "text" or "binary", which tells R what type of connection to use when opening the IO connections.
+}
+ \item{streaming.format}{
+Class to pass to hadoop streaming as inputformat or outputformat option. This class is the first in the input chain to perform its duties on the input side and the last on the output side. Right now this option is not honored in local mode.
+}
+\item{\dots}{
+Additional arguments to the format function, for instance for the csv format they detail the specifics of the csv dialect to use, see \code{\link{read.table}} and \code{\link{write.table}} for details}
+}
+\details{
+The goal of these function is to encapsulate some of the complexity of the IO settings, providing meaningful defaults and predefined combinations. The input processing is the result of the composition of a Java class and an R function, and the same is true on the output side but in reverse order. If you don't want to deal with the full complexity of defining custom IO formats, there are pre-packaged combinations. "text" is free text, useful mostly on the input side for NLP type applications; "json" is one or two tab separated JSON objects per record; "csv" is the csv format, configurable through additional arguments; "native.text" uses the internal R serialization in text mode, and was the default in previous releases, use only for backward compatibility; "native" uses the internal R serialization, offers the highest level of compatibility with R data types and is the default; "sequence.typedbytes" is a sequence file (in the Hadoop sense) where key and value are of type typedbytes, which is a simple serialization format used in connection with streaming for compatibility with other hadoop subsystems. Typedbytes is documented here \url{https://hadoop.apache.org/mapreduce/docs/current/api/org/apache/hadoop/typedbytes/package-summary.html}
+}
+\value{
+Return a list of IO specifications, to be passed as \code{input.format} and \code{output.format} to \code{\link{mapreduce}}, and as \code{format} to \code{\link{from.dfs}} and \code{\link{to.dfs}}. \code{from.dfs} accepts an input format and \code{to.dfs} accepts an output format.
+}
+\examples{
+##---- Should be DIRECTLY executable !! ----
+##-- ==> Define data, use random,
+##-- or do help(data=index) for the standard data sets.
+
+}
View
116 rmr/pkg/man/mapreduce.Rd
@@ -11,90 +11,58 @@
map = to.map(identity),
reduce = NULL,
combine = NULL,
- reduceondataframe = FALSE,
- inputformat = NULL,
- outputformat = NULL,
- textinputformat = defaulttextinputformat,
- textoutputformat = defaulttextoutputformat,
- verbose = FALSE) }
+ reduce.on.data.frame = FALSE,
+ input.format = "native",
+ output.format = "native",
+ backend.parameters = list(),
+ verbose = TRUE) }
\arguments{
\item{input}{Paths to the input folder(s) (on HDFS) or vector thereof
- or or the return value of another mapreduce or to.dfs call}
- \item{output}{A path to the destination folder (on HDFS); if
- missing, use the return value as output}
- \item{map}{An optional R function(k,v), returning the return value
- of keyval or list thereof, that specifies the map operation to
- execute as part of a map reduce job}
- \item{reduce}{An optional R function(k,vv), returning the return
- value of keyval or list thereof, that specifies the reduce
- operation to execute as part of a map reduce job}
- \item{combine}{turn on the combiner; if TRUE the reduce function is
- used, or specify your own}
- \item{reduceondataframe}{flatten the list of values to a data frame
- in the reduce call}
- \item{inputformat}{Can be the fully qualified Java class, in which
- case the JAR file must be passed via \code{jarfiles}. Defaults to
- \code{TextInputFormat}. Another useful class may be
- \code{SequenceFileAsTextInputFormat}.}
- \item{outputformat}{Can be the fully qualified Java class, in which
- case the JAR file must be passed via \code{jarfiles}. Defaults to
- \code{TextOutputFormat}}
- \item{textinputformat}{a function generating a key-value pair from a
- line of text according to some format convention}
- \item{textoutputformat}{a function generating a line of text from two
- arguments, a key and a value, according to some format convention}
+ or or the return value of another \code{mapreduce} or a \code{\link{to.dfs}} call}
+\item{output}{A path to the destination folder (on HDFS); if missing, use the return value of \code{mapreduce} itself as output}
+\item{map}{An optional R function of two arguments, a key and a value, returning either NULL or the return value of \code{\link{keyval}} or a list thereof, that specifies the map operation to execute as part of a mapreduce job}
+\item{reduce}{An optional R function of two arguments, a key and a list of all the values associated with that key, returning either NULL or the return value of \code{\link{keyval}} or a list thereof, that specifies the reduce operation to
+ execute as part of a map reduce job}
+\item{combine}{a function with the same signature as the reduce function, or TRUE, which means use the reduce function as combiner}
+\item{reduce.on.data.frame}{flatten the list of values to a data frame in the reduce call}
+\item{input.format}{Input specification, see \code{\link{make.input.format}}}
+\item{output.format}{Output specification, see \code{\link{make.output.format}}}
+\item{backend.parameters}{Specify additional, backend-specific options, as in \code{backend.parameters = list(hadoop
+ = list(D = "mapred.reduce.tasks=1"), local = list())}. It is recommended not to use this argument to change the semantics of mapreduce (output
+ should be independent of this argument). Each backend can only see the nested list named after the backend itself. The interpretation
+ is the following: for the hadoop backend, generate an additional hadoop streaming command line argument for each element of the list,
+ "-name value". If the value is TRUE generate "-name" only, if it is FALSE skip. One possible use is to specify the number of mappers and reducers
+ on a per-job basis.For the local backend, the list is ignored.}
\item{verbose}{Run hadoop in verbose mode}}
-\value{The value of \code{output}, or, when missing, an object that
- can be used as input to \code{\link{from.dfs}} or \code{mapreduce}}
+ \value{The value of \code{output}, or, when missing, an object that can be used as input to \code{\link{from.dfs}} or \code{mapreduce}, a stub representing the results of the job}
-\details{Defines and executes a map reduce job. Jobs can be chained
- together by simply providing the return value of one as input to
- the other. The map and reduce functions will run in an environment
- that is an approximation (explained below) of the environment of this call, even if
- the actual execution will happen in a different interpreter on a
- different machine. This is work in progress as we aim to provide
- exactly the same environement, but the current implementation seems
- to cover most use cases. Specifically, at \code{map} or
- \code{reduce} execution time, we load a copy of the environment
- inside the \code{mapreduce} call, as if \code{map} and
- \code{reduce} where called from there and its parent environment,
- that is the one in which \code{mapreduce} call is executed. We do not
- follow the environment chain further up or load the libraries that
- are loaded at the time of the call to \code{mapreduce}, but this
- can be improved upon in future versions. Changes
- to the above environemnts performed inside the map and reduce
- functions won't have any effects on the original environments, in a
- departure from established but rarely used R semantics. This is
- unlikely to change in the future because of the challenges inherent
- in adopting reference semantics in a parallel environment.}
+ \details{Defines and executes a mapreduce job. Jobs can be chained together by simply providing the return value of one as input to the
+ other. The map and reduce functions will run in an environment that is an approximation of the environment of this
+ call, even if the actual execution happens in a different interpreter on a different machine. Changes to the outer
+ environemnts performed inside the map and reduce functions with the \code{<<-} operator will only affect a per-process copy of the
+ environment, not the original one, in a departure from established but rarely used R semantics. This is unlikely to change in the future
+ because of the challenges inherent in adopting reference semantics in a parallel environment. See also the Tutorial
+ \url{https://github.com/RevolutionAnalytics/RHadoop/wiki/Tutorial}}
-\seealso{\code{\link{to.map}} and \code{\link{to.reduce}} can be used to
-convert other functions into suitable arguments for the map and reduce
-arguments; \code{\link{rawtextinputformat}} shows an alternative text
-format specifications. See the inst and tests directories in the
-source for more examples}
+\seealso{\code{\link{to.map}} and \code{\link{to.reduce}} can be used to convert other functions into suitable arguments for the map and
+reduce arguments; see the inst and tests directories in the source package for more examples}
-\examples{
-## Example 1: Word Count
-## classic wordcount
-## input can be any text
+\examples{ ## Example 1: Word Count ## classic wordcount ## input can be any text
wordcount = function (input, output, pattern = " ") {
- mapreduce(input = input ,
- output = output,
- textinputformat = rawtextinputformat,
- map = function(k,v) {
- lapply(
- strsplit(
- x = v,
- split = pattern)[[1]],
- function(w) keyval(w,1))},
- reduce = function(k,vv) {
- keyval(k, sum(unlist(vv)))}
- )
-}
+ mapreduce(input = input ,
+ output = output,
+ input.format = make.input.format("text"),
+ map = function(k,v) {
+ lapply(
+ strsplit(
+ x = v,
+ split = pattern)[[1]],
+ function(w) keyval(w,1))},
+ reduce = function(k,vv) {
+ keyval(k, sum(unlist(vv)))})}
## Example 2: Logistic Regression
## see spark implementation http://www.spark-project.org/examples.html
View
48 rmr/pkg/man/rawtextinputformat.Rd
@@ -1,48 +0,0 @@
-\name{rawtextinputformat}
-\alias{rawtextinputformat}
-\alias{rawtextoutputformat}
-\alias{csvtextinputformat}
-\alias{csvtextoutputformat}
-\alias{jsontextinputformat}
-\alias{jsontextoutputformat}
-\alias{nativetextinputformat}
-\alias{nativetextoutputformat}
-
-\title{Alternative formats for text files}
-
-\description{These functions can only be used as arguments to \code{mapreduce}, \code{from.dfs} and \code{to.dfs} when reading or writing
-text files}
-
-\usage{
- rawtextinputformat(line)
- rawtextoutputformat(k, v)
- csvtextinputformat(key = 1, ...)
- csvtextoutputformat(...)
- jsontextinputformat(line)
- jsontextoutputformat(k, v)
- nativetextinputformat(line)
- nativetextoutputformat(k,v)
-}
-
-\arguments{
- \item{line}{The line of text to parse}
- \item{k}{A key in the mapreduce sense}
- \item{v}{A value in the mapreduce sense}
- \item{key}{Which element to use as key}
- \item{...}{Options to control the csv format details}
-}
-
-\details{Any function that reads a line of text and outputs a keyval pair is a suitable input format function; any function that accepts a
-key and a value and returns a line of text is a suitable output format function. \code{rawtextinputformat} and \code{rawtextoutputformat}
-are examples of such functions for the raw text format (a text file without additional structure besides lines). \code{csvtextinputformat}
-and \code{csvtextoutputformat} instead are higher order functions that generate input and output format functions resp. For the former, the
-options are the ones accepted by \code{read.table} with \code{header} defaulting to FALSE, since we are parsing a single line of a csv file
-here, not a complete one. For the latter, the options are the ones accepted by \code{write.table} with \code{row.names = FALSE, col.names =
-FALSE} by default. Again remember that this is a line by line writer, not a complete file writer like
-\code{write.table}. \code{jsontextinputformat} and \code{jsontextoutputformat} use two JSON objects per line, separated by a tab. Support
-for R types can be tested calling \code{fromJSON(toJSON(x))} for some representative \code{x}. \code{nativetextinputformat} and
-\code{nativetextoutputformat} use an ASCII variation of the native R serde and should support any R object. It is also the
-default. Additional flexibility is offered by the \code{inputformat} and \code{outputformat} arguments to \code{mapreduce}}
-
-\examples{
-\dontrun{mapreduce(input = "/somepath/somefile.txt", textinputformat = rawtextinputformat)}}
View
30 rmr/pkg/man/rmr.backend.Rd
@@ -1,30 +0,0 @@
-\name{rmr.backend}
-\alias{rmr.backend}
-
-\title{Sets backend for mapreduce}
-\description{
-Sets the backend used for mapreduce}
-\usage{
-rmr.backend(backend = c(NULL, "hadoop", "local"))
-}
-
-\arguments{
- \item{backend}{
-One of hadoop or local, the latter being implemented entirely in the current R interpreter.
-}
-}
-
-\details{ Mapreduce has come to mean massive, fault tolerant distributed computing because of its use by Google and Hadoop, but it is also
-an abstract model of computation amenable to different implementations. Here we provide access to Hadoop through the hadoop backend and
-provide an all-R, single interpreter implementation that's good for experimentation and debugging, in particular to debug mapper and
-reducers. Can be specified as an argument to mapreduce or globally with this call. }
-
-\value{
-The current backend as a string.
-}
-
-\examples{
-rmr.backend()
-rmr.backend("local")
-rmr.backend("hadoop")
-}
View
35 rmr/pkg/man/rmr.options.setget.Rd
@@ -0,0 +1,35 @@
+\name{rmr.options.get}
+\alias{rmr.options.get}
+\alias{rmr.options.set}
+%- Also NEED an '\alias' for EACH other topic documented here.
+\title{
+Functions to set and get package options}
+\description{
+set and get package options}
+\usage{
+rmr.options.get(...)
+rmr.options.set(backend = c("hadoop", "local"), profile.nodes = NULL)
+%-, depend.check = NULL, managed.dir = NULL)
+}
+\arguments{
+ \item{...}{Character names of option to get the value of}
+ \item{backend}{One of "hadoop" or "local", the latter being implemented entirely in the current R interpreter, sequentially. Very useful for learning and debugging.}
+ \item{profile.nodes}{Collect profiling information when running additional R interpreters (besides the current one) on the cluster. }
+ %\item{depend.check}{Activate makefile-like dependency checking (under construction)}
+ % \item{managed.dir}{Where to put intermediate result when makefile-like features are activated}
+}
+\details{
+ Mapreduce has come to mean massive, fault tolerant distributed computing because of its use by Google and Hadoop, but it is also
+an abstract model of computation amenable to different implementations. Here we provide access to Hadoop through the hadoop backend and
+provide an all-R, single interpreter implementation (local) that's good for experimentation and debugging, in particular to debug mappers and
+reducers. Profiling data is collected in the following file: \code{file.path("/tmp/Rprof", Sys.getenv('mapred_job_id'), Sys.getenv('mapreduce_tip_id'))}.
+%Describe dependency checking here
+}
+\value{A named list with the options and their values, or just a value if only one requested.}
+
+
+\examples{
+old.backend = rmr.options.get("backend")
+rmr.options.set(backend = "hadoop")
+rmr.options.set(backend = old.backend)
+}
View
30 rmr/pkg/man/rmr.profilenodes.Rd
@@ -1,30 +0,0 @@
-\name{rmr.profilenodes}
-\alias{rmr.profilenodes}
-\title{
-Turn on profiling on the nodes.
-}
-\description{
-Collect profiling information when running additional R interpreters (besides the current one) on the cluster. Show or change setting.
-}
-
-\usage{
-rmr.profilenodes(on = NULL)