Permalink
Browse files

merge release branch rmr2

  • Loading branch information...
2 parents 611f283 + 9de68b9 commit 74e578d06f98e2f39cb7054ef235dac43f2c32e3 @piccolbo piccolbo committed Sep 28, 2012
Showing with 4,461 additions and 4,833 deletions.
  1. +5 −5 .gitignore
  2. +11 −0 quickcheck/DESCRIPTION
  3. +1 −0 quickcheck/NAMESPACE
  4. +100 −0 quickcheck/R/quickcheck.R
  5. +120 −0 quickcheck/man/generators.Rd
  6. +32 −0 quickcheck/man/quickcheck-package.Rd
  7. +85 −0 quickcheck/man/unit.test.Rd
  8. +61 −0 quickcheck/tests/quickcheck.R
  9. +0 −9 rmr/pkg/NAMESPACE
  10. +0 −154 rmr/pkg/R/IO.R
  11. +0 −904 rmr/pkg/R/mapreduce.R
  12. +0 −16 rmr/pkg/docs/compatibility.Rmd
  13. +0 −15 rmr/pkg/docs/compatibility.md
  14. BIN rmr/pkg/docs/compatibility.pdf
  15. +0 −57 rmr/pkg/docs/fast-k-means.Rmd
  16. +0 −93 rmr/pkg/docs/fast-k-means.md
  17. BIN rmr/pkg/docs/fast-k-means.pdf
  18. BIN rmr/pkg/docs/getting-data-in-and-out.pdf
  19. +0 −111 rmr/pkg/docs/introduction-to-vectorized-API.Rmd
  20. +0 −387 rmr/pkg/docs/introduction-to-vectorized-API.html
  21. +0 −230 rmr/pkg/docs/introduction-to-vectorized-API.md
  22. BIN rmr/pkg/docs/introduction-to-vectorized-API.pdf
  23. +0 −4 rmr/pkg/docs/new-in-this-release.Rmd
  24. +0 −156 rmr/pkg/docs/new-in-this-release.html
  25. +0 −4 rmr/pkg/docs/new-in-this-release.md
  26. BIN rmr/pkg/docs/new-in-this-release.pdf
  27. +0 −307 rmr/pkg/docs/tutorial.Rmd
  28. +0 −550 rmr/pkg/docs/tutorial.html
  29. +0 −419 rmr/pkg/docs/tutorial.md
  30. BIN rmr/pkg/docs/tutorial.pdf
  31. +0 −21 rmr/pkg/man/dfs.empty.Rd
  32. +0 −48 rmr/pkg/man/equijoin.Rd
  33. +0 −31 rmr/pkg/man/fromdfstodfs.Rd
  34. +0 −34 rmr/pkg/man/keyval.Rd
  35. +0 −32 rmr/pkg/man/make.io.format.Rd
  36. +0 −122 rmr/pkg/man/mapreduce.Rd
  37. +0 −21 rmr/pkg/man/scatter.Rd
  38. +0 −154 rmr/pkg/tests/invariants.R
  39. +0 −138 rmr/pkg/tests/kmeans.R
  40. +0 −75 rmr/pkg/tests/linear-least-squares.R
  41. +0 −109 rmr/pkg/tests/quickcheck.R
  42. +0 −230 rmr/pkg/tests/vectorized-API.R
  43. +0 −65 rmr/pkg/tests/weighted-linear-least-squares.R
  44. +1 −0 rmr2/.gitignore
  45. +25 −0 rmr2/docs/IO-speed-tests.Rmd
  46. +25 −61 rmr/pkg/docs/fast-k-means.html → rmr2/docs/IO-speed-tests.html
  47. +25 −0 rmr2/docs/IO-speed-tests.md
  48. 0 {rmr/pkg → rmr2}/docs/Makefile
  49. +12 −0 rmr2/docs/compatibility.Rmd
  50. +23 −4 {rmr/pkg → rmr2}/docs/compatibility.html
  51. +12 −0 rmr2/docs/compatibility.md
  52. +37 −12 {rmr/pkg → rmr2}/docs/getting-data-in-and-out.Rmd
  53. +93 −35 {rmr/pkg → rmr2}/docs/getting-data-in-and-out.html
  54. +105 −33 {rmr/pkg → rmr2}/docs/getting-data-in-and-out.md
  55. 0 {rmr/pkg → rmr2}/docs/kmeans.gif
  56. +24 −0 rmr2/docs/new-in-this-release.Rmd
  57. +180 −0 rmr2/docs/new-in-this-release.html
  58. +24 −0 rmr2/docs/new-in-this-release.md
  59. +192 −0 rmr2/docs/tutorial.Rmd
  60. +443 −0 rmr2/docs/tutorial.html
  61. +331 −0 rmr2/docs/tutorial.md
  62. +6 −5 {rmr → rmr2}/pkg/DESCRIPTION
  63. +11 −0 rmr2/pkg/NAMESPACE
  64. +223 −0 rmr2/pkg/R/IO.R
  65. +91 −0 rmr2/pkg/R/basic.R
  66. +101 −0 rmr2/pkg/R/extras.R
  67. +125 −0 rmr2/pkg/R/keyval.R
  68. +52 −0 rmr2/pkg/R/local.R
  69. +339 −0 rmr2/pkg/R/mapreduce.R
  70. +18 −0 rmr2/pkg/R/quickcheck-rmr.R
  71. +330 −0 rmr2/pkg/R/streaming.R
  72. +36 −0 rmr2/pkg/examples/counts.R
  73. +2 −2 {rmr → rmr2}/pkg/examples/large-kmeans-test.R
  74. +23 −15 {rmr → rmr2}/pkg/examples/ngram.R
  75. 0 {rmr → rmr2}/pkg/examples/stats.R
  76. +13 −0 rmr2/pkg/man/bigdataobject.Rd
  77. +25 −0 rmr2/pkg/man/dfs.empty.Rd
  78. +45 −0 rmr2/pkg/man/equijoin.Rd
  79. +29 −0 rmr2/pkg/man/fromdfstodfs.Rd
  80. +36 −0 rmr2/pkg/man/keyval.Rd
  81. +34 −0 rmr2/pkg/man/make.io.format.Rd
  82. +44 −0 rmr2/pkg/man/mapreduce.Rd
  83. +1 −1 {rmr → rmr2}/pkg/man/rmr-package.Rd
  84. +13 −15 rmr/pkg/man/rmr.options.setget.Rd → rmr2/pkg/man/rmr.options.Rd
  85. +18 −0 rmr2/pkg/man/rmr.sample.Rd
  86. +28 −0 rmr2/pkg/man/rmr.str.Rd
  87. +25 −0 rmr2/pkg/man/scatter.Rd
  88. +3 −7 {rmr → rmr2}/pkg/man/tomaptoreduce.Rd
  89. 0 {rmr → rmr2}/pkg/src/Makevars
  90. 0 {rmr → rmr2}/pkg/src/dataframe-to-list.cpp
  91. 0 {rmr → rmr2}/pkg/src/dataframe-to-list.h
  92. +148 −79 {rmr → rmr2}/pkg/src/typed-bytes.cpp
  93. +1 −1 {rmr → rmr2}/pkg/src/typed-bytes.h
  94. +29 −0 rmr2/pkg/tests/IO.R
  95. +10 −12 {rmr → rmr2}/pkg/tests/basic-examples.R
  96. +72 −0 rmr2/pkg/tests/basic.R
  97. +102 −0 rmr2/pkg/tests/benchmarks.R
  98. +49 −18 {rmr → rmr2}/pkg/tests/getting-data-in-and-out.R
  99. +44 −0 rmr2/pkg/tests/keyval.R
  100. +73 −0 rmr2/pkg/tests/kmeans.R
  101. +45 −0 rmr2/pkg/tests/linear-least-squares.R
  102. +29 −10 {rmr → rmr2}/pkg/tests/logistic-regression.R
  103. +191 −0 rmr2/pkg/tests/mapreduce.R
  104. +1 −1 {rmr → rmr2}/pkg/tests/naive-bayes.R
  105. +29 −21 {rmr → rmr2}/pkg/tests/wordcount.R
  106. 0 {rmr → rmr2}/pkg/tools/whirr/README
  107. 0 {rmr → rmr2}/pkg/tools/whirr/hadoop-ec2-centos.properties
  108. 0 {rmr → rmr2}/pkg/tools/whirr/hadoop-ec2.properties
  109. 0 {rmr → rmr2}/pkg/tools/whirr/lzo.sh
  110. 0 {rmr → rmr2}/pkg/tools/whirr/rmr-dev.sh
  111. 0 {rmr → rmr2}/pkg/tools/whirr/rmr-master-centos.sh
  112. 0 {rmr → rmr2}/pkg/tools/whirr/rmr-master.sh
View
10 .gitignore
@@ -3,25 +3,25 @@
#for R users
.RData
.Rhistory
-pkg.Rcheck
+*.Rcheck
+Rprof.out
#for RStudio users
.Rproj.user
*.Rproj
#for rmr users
-rhstr.*
-rmr-*-env
+rmr-*-env*
+rmr-streaming-*
#for emacs users
*~
#for whirr users
whirr.log*
#for emerge users
*.orig
-
rhbase/pkg/config.log
-
#Compilation artifacts
src-i386
src-x86_64
*.o
*.so
*.rds
+Rprof.out
View
11 quickcheck/DESCRIPTION
@@ -0,0 +1,11 @@
+Package: quickcheck
+Type: Package
+Title: Support writing randomized unit tests
+Version: 1.0
+Date: 2012-08-15
+Author: Revolution Analytics
+Depends: digest
+Collate: quickcheck.R
+Maintainer: Revolution Analytics <rhadoop@revolutionanalytics.com>
+Description: provides a unit.test function that generates a test and many simple generators
+License: Apache License (== 2.0)
View
1 quickcheck/NAMESPACE
@@ -0,0 +1 @@
+exportPattern("^[[:alpha:]]+")
View
100 quickcheck/R/quickcheck.R
@@ -0,0 +1,100 @@
+# Copyright 2011 Revolution Analytics
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+##main function
+unit.test = function(predicate, generators = list(), sample.size = 10, precondition = function(...) TRUE, stop = TRUE) {
+ set.seed(0)
+ options(warning.length = 8125) #as big as allowed
+ results = sapply(1:sample.size, function(i) {
+ args = lapply(generators, function(a) a())
+ if(do.call(precondition, args) && !do.call(function(...){tryCatch(predicate(...), error = function(e){traceback(); print(e); FALSE})}, args)){
+ print(paste("FAIL: predicate:",
+ paste(deparse(predicate), collapse = " ")))
+ list(predicate = predicate, args = args)
+ }}, simplify = TRUE)
+ if(is.null(unlist(results)))
+ print(paste ("Pass ", paste(deparse(predicate), "\n", collapse = " ")))
+ else {if (stop) stop(results) else results}}
+
+## for short
+catch.out = function(...) capture.output(invisible(...))
+## test data generators generators,
+
+## basic types
+tdgg.logical = function(p.true = .5, lambda = 8) function() rbinom(1 + rpois(1, lambda),1,p.true) == 1
+
+tdgg.integer =
+ function(elem.lambda = 100, len.lambda = 8)
+ function() as.integer(rpois(1 + rpois(1, len.lambda), elem.lambda)) #why poisson? Why not? Why 100?
+
+tdgg.double = function(min = -1, max = 1, lambda = 8) function() runif(1 + rpois(1, lambda), min, max)
+
+##tdgg.complex NAY
+
+library(digest)
+tdgg.character =
+ function(str.lambda = 8, len.lambda = 8)
+ function()
+ sapply(runif(1 + rpois(1, len.lambda)), function(x) substr(digest(x), 1, rpois(1, str.lambda)))
+
+tdgg.raw = function(lambda = 8) {tdg = tdgg.character(1, lambda); function() unlist(sapply(tdg(), charToRaw))}
+
+tdgg.list = function(tdg = tdgg.any(list.tdg = tdg, len.lambda = lambda, max.level = max.level),
+ lambda = 10, max.level = 20)
+ function() {
+ if(sys.nframe() < max.level) replicate(rpois(1, lambda),tdg(), simplify = FALSE) else list()}
+
+tdgg.data.frame =
+ function(row.lambda = 20,
+ col.lambda = 5){
+ function() {
+ ncol = 1 + rpois(1, col.lambda)
+ nrow = 1 + rpois(1, row.lambda)
+ gens = list(tdgg.logical(),
+ tdgg.integer(),
+ tdgg.double(),
+ tdgg.character())
+ columns = lapply(sample(gens,ncol, replace=TRUE),
+ function(g) replicate(nrow, g()[1], simplify = TRUE))
+ names(columns) = paste("col", 1:ncol)
+ do.call(data.frame, columns)}}
+
+## special distributions
+tdgg.numeric.list = function(lambda = 100) function() lapply(1:rpois(1,lambda), function(i) runif(1))
+tdgg.fixed.list = function(...) function() lapply(list(...), function(tdg) tdg())
+tdgg.prototype = function(prototype, generator = tdgg.any()) function() rapply(prototype, function(x) generator(), how = "list")
+
+tdgg.prototype.list =
+ function(prototype, lambda) {
+ tdg = tdgg.prototype(prototype)
+ function() replicate(rpois(1, lambda), tdg(), simplify = FALSE)}
+
+tdgg.constant = function(const) function() const
+tdgg.select = function(l) function() sample(l,1)[[1]]
+tdgg.distribution = function(distribution, ...) function() distribution(1, ...)
+
+##combiners
+tdgg.mixture = function(...) function() sample(list(...),1)[[1]]()
+
+## combine everything
+tdgg.any =
+ function(p.true = .5, int.lambda = 100, min = -1, max = 1,
+ list.tdg = tdgg.any(), max.level = 20, len.lambda = 10)
+ tdgg.mixture(tdgg.logical(p.true, len.lambda),
+ tdgg.integer(int.lambda, len.lambda),
+ tdgg.double(min, max, len.lambda),
+ tdgg.character(len.lambda),
+ tdgg.raw(len.lambda),
+ tdgg.list(list.tdg, len.lambda, max.level))
+
View
120 quickcheck/man/generators.Rd
@@ -0,0 +1,120 @@
+\name{tdgg.any}
+\name{tdgg.character}
+\name{tdgg.constant}
+\name{tdgg.data.frame}
+\name{tdgg.distribution}
+\name{tdgg.double}
+\name{tdgg.fixed.list}
+\name{tdgg.integer}
+\name{tdgg.list}
+\name{tdgg.logical}
+\name{tdgg.mixture}
+\name{tdgg.numeric.list}
+\name{tdgg.prototype}
+\name{tdgg.prototype.list}
+\name{tdgg.raw}
+\name{tdgg.select}
+\name{tdgg.vector}
+\alias{tdgg.any}
+\alias{tdgg.character}
+\alias{tdgg.constant}
+\alias{tdgg.data.frame}
+\alias{tdgg.distribution}
+\alias{tdgg.double}
+\alias{tdgg.fixed.list}
+\alias{tdgg.integer}
+\alias{tdgg.list}
+\alias{tdgg.logical}
+\alias{tdgg.mixture}
+\alias{tdgg.numeric.list}
+\alias{tdgg.prototype}
+\alias{tdgg.prototype.list}
+\alias{tdgg.raw}
+\alias{tdgg.select}
+\alias{tdgg.vector}
+%- Also NEED an '\alias' for EACH other topic documented here.
+\title{
+Meta-generators for random data of all types
+}
+\description{
+These function return random data generators, argument-less functions that generate all sort of random data, to be used for the \code{generators} argument to the \code{\link{unit.test}} function.
+}
+\usage{
+catch.out(...)
+tdgg.any(p.true = 0.5, lambda.int = 100, min = -1, max = 1, len.char = 8, len.raw = 8, lambda.list = 10, list.tdg = tdgg.any(), lambda.vector = 10, max.level = 20,
+ vector.tdg = tdgg.double())
+tdgg.character(str.lambda = 8, len.lambda = 8)
+tdgg.constant(const)
+tdgg.data.frame(row.lambda = 20, col.lambda = 5)
+tdgg.distribution(distribution, ...)
+tdgg.double(min = -1, max = 1, lambda = 8)
+tdgg.fixed.list(...)
+tdgg.integer(elem.lambda = 100, len.lambda = 8)
+tdgg.list(tdg = tdgg.any(list.tdg = tdg, lambda.list = lambda, max.level = max.level), lambda = 10, max.level = 20)
+tdgg.logical(p.true = 0.5, lambda = 8)
+tdgg.mixture(...)
+tdgg.numeric.list(lambda = 100)
+tdgg.prototype(prototype)
+tdgg.prototype.list(prototype, lambda)
+tdgg.raw(lambda = 8)
+tdgg.select(l) }
+%- maybe also 'usage' for other objects documented here.
+\arguments{
+ \item{p.true}{
+
+}
+ \item{lambda.int}{
+
+}
+ \item{min}{
+
+}
+ \item{max}{
+
+}
+ \item{len.char}{
+
+}
+ \item{len.raw}{
+
+}
+ \item{lambda.list}{
+
+}
+ \item{list.tdg}{
+
+}
+ \item{lambda.vector}{
+
+}
+ \item{max.level}{
+
+}
+ \item{vector.tdg}{
+
+}
+}
+\details{
+
+}
+\value{
+}
+\references{
+}
+\author{
+
+}
+\note{
+
+}
+
+%% ~Make other sections like Warning with \section{Warning }{....} ~
+
+\seealso{
+
+}
+\examples{
+}
+
+\keyword{ }
+\keyword{ }
View
32 quickcheck/man/quickcheck-package.Rd
@@ -0,0 +1,32 @@
+\name{quickcheck-package}
+\alias{quickcheck-package}
+\alias{quickcheck}
+\docType{package}
+\title{
+Randomized unit testing
+}
+\description{
+Supports the generatio of randomized unit tests by defining the \code{\link{unit.test}} function and a collection of meta-genarators for data of all types
+}
+\details{
+\tabular{ll}{
+Package: \tab quickcheck\cr
+Type: \tab Package\cr
+Version: \tab 1.0\cr
+Date: \tab 2012-08-15\cr
+License: \tab What license is it under?\cr
+}
+
+}
+\author{
+Revolution Analytics <rhadoop@revolutionanalytics.com>
+}
+\references{
+}
+\keyword{testing }
+\seealso{
+
+}
+\examples{
+
+}
View
85 quickcheck/man/unit.test.Rd
@@ -0,0 +1,85 @@
+\name{unit.test}
+\alias{unit.test}
+%- Also NEED an '\alias' for EACH other topic documented here.
+\title{
+Generate unit tests}
+\description{
+Evaluates a predicate with n arguments provided by evaluating the generators. Stops or returns test cases.
+}
+\usage{
+unit.test(predicate, generators = list(), sample.size = 10, precondition = function(...) TRUE, stop = TRUE)
+}
+%- maybe also 'usage' for other objects documented here.
+\arguments{
+ \item{predicate}{
+
+}
+ \item{generators}{
+
+}
+ \item{sample.size}{
+
+}
+ \item{precondition}{
+
+}
+}
+\details{
+
+}
+\value{
+%% ~Describe the value returned
+%% If it is a LIST, use
+%% \item{comp1 }{Description of 'comp1'}
+%% \item{comp2 }{Description of 'comp2'}
+%% ...
+}
+\references{
+%% ~put references to the literature/web site here ~
+}
+\author{
+
+}
+\note{
+
+}
+
+%% ~Make other sections like Warning with \section{Warning }{....} ~
+
+\seealso{
+
+}
+\examples{
+##---- Should be DIRECTLY executable !! ----
+##-- ==> Define data, use random,
+##-- or do help(data=index) for the standard data sets.
+
+## The function is currently defined as
+function (predicate, generators, sample.size = 10, precondition = function(...) T)
+{
+ set.seed(0)
+ options(warning.length = 8125)
+ results = sapply(1:sample.size, function(i) {
+ args = lapply(generators, function(a) a())
+ if (do.call(precondition, args) && !do.call(function(...) {
+ tryCatch(predicate(...), error = function(e) {
+ traceback()
+ print(e)
+ FALSE
+ })
+ }, args)) {
+ print(paste("FAIL: predicate:", paste(deparse(predicate),
+ collapse = " ")))
+ list(predicate = predicate, args = args)
+ }
+ }, simplify = TRUE)
+ if (length(results) == 0)
+ print(paste("Pass ", paste(deparse(predicate), "\n",
+ collapse = " ")))
+ else results
+ }
+}
+% Add one or more standard keywords, see file 'KEYWORDS' in the
+% R documentation directory.
+\keyword{ ~kwd1 }
+\keyword{ ~kwd2 }% __ONLY ONE__ keyword per line
View
61 quickcheck/tests/quickcheck.R
@@ -0,0 +1,61 @@
+# Copyright 2011 Revolution Analytics
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+library(quickcheck)
+## generator test thyself
+##tdgg.logical
+unit.test(function(p.true) {
+ sample = tdgg.logical(p.true,lambda=1000)()
+ binom.test(
+ sum(sample),
+ length(sample),
+ p.true,"two.sided")$p.value > 0.001},
+ generators = list(tdgg.distribution(runif, min = .1, max = .9)))
+##tdgg.integer
+unit.test(is.integer,
+ generators = list(tdgg.integer()))
+##tdgg.double
+unit.test(is.double,
+ generators = list(tdgg.double()))
+##tdgg.complex NAY
+##tdgg.character:
+unit.test(is.character,
+ generators = list(tdgg.character()))
+
+##tdgg.raw
+unit.test(is.raw,
+ generators = list(tdgg.raw()))
+
+#tdgconstant
+unit.test(function(x) tdgg.constant(x)() == x, generators = list(tdgg.distribution(runif)))
+#tdgselect
+unit.test(function(l) is.element(tdgg.select(l)(), l), generators = list(tdgg.numeric.list(10)))
+#tdgmixture
+unit.test(function(n) is.element(tdgg.mixture(tdgg.constant(n), tdgg.constant(2*n))(), list(n,2*n)),
+ generators = list(tdgg.distribution(runif)))
+#tdgdistribution
+unit.test(function(d) {
+ tdgd = tdgg.distribution(d)
+ ks.test(d(10000), sapply(1:10000, function(i) tdgd()))$p > 0.001},
+ generators = list(tdgg.select(list(runif, rnorm))))
+# tdgg.list
+# tdgg.data.frame
+# tdgg.numeric.list
+# tdgg.fixed.list
+# tdgg.prototype
+# tdgg.prototype.list
+# tdgg.constant
+# tdgg.select
+# tdgg.mixture
+# tdgg.any
View
9 rmr/pkg/NAMESPACE
@@ -1,9 +0,0 @@
-useDynLib(rmr)
-export(mapreduce)
-export(from.dfs, to.dfs)
-export(equijoin, scatter)
-export(dfs.empty)
-export(rmr.options.set, rmr.options.get)
-export(keyval, keys, values)
-export(make.input.format, make.output.format)
-export(to.map, to.reduce, to.reduce.all)
View
154 rmr/pkg/R/IO.R
@@ -1,154 +0,0 @@
-# Copyright 2011 Revolution Analytics
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-lapply.nrecs = function(..., nrecs) {
- out = lapply(...)
- if(nrecs == 1) out[[1]] else out}
-
-native.text.input.format = function(con, nrecs) {
- lines = readLines(con, nrecs)
- if (length(lines) == 0) NULL
- else {
- splits = strsplit(lines, "\t")
- deser.one = function(x) unserialize(charToRaw(gsub("\\\\n", "\n", x)))
- keyval(lapply.nrecs(splits, function(x) de(x[1]), nrecs = nrecs),
- lapply.nrecs(splits, function(x) de(x[2]), nrecs = nrecs),
- vectorized = nrecs > 1)}}
-
-native.text.output.format = function(k, v, con, vectorized) {
- ser = function(x) gsub("\n", "\\\\n", rawToChar(serialize(x, ascii=T, connection = NULL)))
- ser.pair = function(k,v) paste(ser(k), ser(v), sep = "\t")
- out =
- if(vectorized)
- mapply(ser.pair, k, v)
- else ser.pair(k,v)
- writeLines(out, con = con, sep = "\n")}
-
-json.input.format = function(con, nrecs) {
- lines = readLines(con, nrecs)
- if (length(lines) == 0) NULL
- else {
- splits = strsplit(lines, "\t")
- if(length(splits[[1]]) == 1) keyval(NULL, lapply.nrecs(splits, function(x) fromJSON(x[1], asText = TRUE), nrecs = nrecs))
- else keyval(lapply.nrecs(splits, function(x) fromJSON(x[1], asText = TRUE), nrecs = nrecs),
- lapply.nrecs(splits, function(x) fromJSON(x[2], asText = TRUE), nrecs = nrecs),
- vectorized = nrecs > 1)}}
-
-json.output.format = function(k, v, con, vectorized) {
- ser = function(k, v) paste(gsub("\n", "", toJSON(k, .escapeEscapes=TRUE, collapse = "")),
- gsub("\n", "", toJSON(v, .escapeEscapes=TRUE, collapse = "")),
- sep = "\t")
- out =
- if(vectorized)
- mapply(k,v, ser)
- else
- ser(k,v)
- writeLines(out, con = con, sep = "\n")}
-
-text.input.format = function(con, nrecs) {
- lines = readLines(con, nrecs)
- if (length(lines) == 0) NULL
- else keyval(NULL, lines, vectorized = nrecs > 1)}
-
-text.output.format = function(k, v, con, vectorized) {
- ser = function(k,v) paste(k, v, collapse = "", sep = "\t")
- out = if(vectorized)
- mapply(ser, k, v)
- else
- ser(k,v)
- writeLines(out, sep = "\n", con = con)}
-
-csv.input.format = function(...) function(con, nrecs) {
- df =
- tryCatch(
- read.table(file = con, nrows = nrecs, header = FALSE, ...),
- error = function(e) NULL)
- if(is.null(df) || dim(df)[[1]] == 0) NULL
- else keyval(NULL, df, vectorized = nrecs > 1)}
-
-csv.output.format = function(...) function(k, v, con, vectorized)
- # this is vectorized only, need to think what that means
- write.table(file = con,
- x = if(is.null(k)) v else cbind(k,v),
- ...,
- row.names = FALSE,
- col.names = FALSE)
-
-typed.bytes.reader = function(data, nobjs) {
- if(is.null(data)) NULL
- else
- .Call("typed_bytes_reader", data, nobjs, PACKAGE = "rmr")
-}
-typed.bytes.writer = function(objects) {
- .Call("typed_bytes_writer", objects, PACKAGE = "rmr")
-}
-
-typed.bytes.input.format = function() {
- obj.buffer = list()
- raw.buffer = raw()
- read.size = 1000
- function(con, nrecs) {
- nobjs = 2*nrecs
- while(length(obj.buffer) < nobjs) {
- raw.buffer <<- c(raw.buffer, readBin(con, raw(), read.size))
- if(length(raw.buffer) == 0) break;
- parsed = typed.bytes.reader(raw.buffer, as.integer(read.size/2))
- obj.buffer <<- c(obj.buffer, parsed$objects)
- if(parsed$length != 0) raw.buffer <<- raw.buffer[-(1:parsed$length)]
- read.size = as.integer(1.2 * read.size)}
- read.size = as.integer(read.size/1.2)
- actual.recs = min(nrecs, length(obj.buffer)/2)
- retval = if(length(obj.buffer) == 0) NULL
- else {
- if(nrecs == 1)
- keyval(obj.buffer[[1]], obj.buffer[[2]], vectorized = FALSE)
- else keyval(obj.buffer[2*(1:actual.recs) - 1],
- obj.buffer[2*(1:actual.recs)],
- vectorized = TRUE)}
- if(actual.recs > 0) obj.buffer <<- obj.buffer[-(1:(2*actual.recs))]
- retval}}
-
-typed.bytes.output.format = function(k, v, con, vectorized){
- writeBin(
- typed.bytes.writer(
- if(vectorized){
- k = to.list(k)
- v = to.list(v)
- tmp = list()
- tmp[2*(1:length(k)) - 1] = k
- tmp[2*(1:length(k))] = v
- tmp}
- else {
- list(k,v)}),
- con)}
-
-native.input.format = typed.bytes.input.format
-
-native.writer = function(value, con) {
- w = function(x, size = NA_integer_) writeBin(x, con, size = size, endian = "big")
- write.code = function(x) w(as.integer(x), size = 1)
- write.length = function(x) w(as.integer(x), size = 4)
- bytes = serialize(value, NULL)
- write.code(144)
- write.length(length(bytes))
- w(bytes)
- TRUE}
-
-native.output.format = function(k, v, con, vectorized){
- if(vectorized)
- typed.bytes.output.format(k, v, con, vectorized)
- else {
- native.writer(k, con)
- native.writer(v, con)}}
View
904 rmr/pkg/R/mapreduce.R
@@ -1,904 +0,0 @@
-# Copyright 2011 Revolution Analytics
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-#data structures
-
-make.fast.list = function(l = list()) {
- l1 = l
- l2 = list(NULL)
- i = 1
- function(els = NULL){
- if(missing(els)) c(l1, l2[!sapply(l2, is.null)])
- else{
- if(i + length(els) - 1 > length(l2)) {
- l1 <<- c(l1, l2[!sapply(l2, is.null)])
- i <<- 1
- l2 <<- rep(list(NULL), length(l1) + length(els))}
- l2[i:(i + length(els) - 1)] <<- els
- i <<- i + length(els)}}}
-
-#list manip
-
-catply = function(x, fun) do.call(c, lapply(x, fun))
-
-#options
-
-rmr.options = new.env(parent=emptyenv())
-rmr.options$backend = "hadoop"
-rmr.options$vectorized.nrows = 1000
-rmr.options$profile.nodes = FALSE
-rmr.options$depend.check = FALSE
-#rmr.options$managed.dir = "/var/rmr/managed"
-
-rmr.options.get = function(...) {
- opts = as.list(rmr.options)
- if(missing(...))
- opts
- else {
- args = c(...)
- if (length(args) > 1)
- opts[args]
- else
- opts[[args]]}}
-
-rmr.options.set = function(backend = c("hadoop", "local"),
- profile.nodes = NULL,
- vectorized.nrows = NULL#,
- #depend.check = NULL,
- #managed.dir = NULL
- ) {
- this.call = match.call()
- backend = match.arg(backend) #this doesn't do anything, fix
- lapply(names(this.call)[-1],
- function(x)
- assign(x, eval(this.call[[x]]), envir = rmr.options))
- as.list(rmr.options)}
-
-# additional hadoop features, disabled for now
-counter = function(group="r-stream", family, value) {
- cat(sprintf("report:counter:%s, $s, %s",
- as.character(group),
- as.character(family),
- as.integer(value)),
- stderr())
-}
-
-status = function(what) {
- cat(sprintf("report:status:%s",
- as.character(what)),
- stderr())
-}
-
-## could think of this as a utils section
-## keyval manip
-keyval = function(k, v, vectorized = FALSE) {
- kv = list(key = k, val = v)
- attr(kv, 'rmr.keyval') = TRUE
- if(vectorized) {
- attr(kv, 'rmr.vectorized') = TRUE
- }
- kv}
-
-is.keyval = function(kv) !is.null(attr(kv, 'rmr.keyval', exact = TRUE))
-is.vectorized.keyval = function(kv) !is.null(attr(kv, 'rmr.vectorized', exact = TRUE))
-
-keyval.project = function(i) {
- list.singleton =
- function(kv){
- if(is.keyval(kv))
- list(kv)
- else
- kv}
- function(kvl) {
- catply(list.singleton(kvl),
- function(kv) {
- if(is.vectorized.keyval(kv))
- kv[[i]]
- else
- list(kv[[i]])})}}
-
-keys = keyval.project(1)
-values = keyval.project(2)
-
-keyval.to.list = function(kvl) {l = values(kvl); names(l) = keys(kvl); l}
-
-to.list = function(x) {
- if(is.data.frame(x)) {
- .Call('dataframe_to_list', x, nrow(x), ncol(x), replicate(nrow(x), as.list(1:ncol(x)), simplify=F))}
- else {
- if(is.matrix(x))
- lapply(1:nrow(x), function(i) as.list(x[i,]))
- else
- as.list(x)}}
-
-to.data.frame = function(x, col.names = names(x[[1]])) {
- if(is.data.frame(x)) x
- else {
- df = as.data.frame(do.call(rbind, as.list(x)))
- if(ncol(df) > 0)
- for(col in 1:ncol(df)){
- df[,col] = unlist(df[,col ])}
- if(!is.null(col.names)) names(df) = col.names
- df}}
-
-keyval.list.to.data.frame =
- function(x) {
- kk = to.data.frame(keys(x))
- vv = to.data.frame(values(x))
- if(!is.null(nrow(kk)) && nrow(kk) == nrow(vv))
- keyval(kk, vv, vectorized = TRUE)
- else {
- warning("dropping keys")
- vv}}
-
-## map and reduce function generation
-
-to.map = function(fun1, fun2 = identity) {
- if (missing(fun2)) {
- function(k, v) fun1(keyval(k, v))}
- else {
- function(k, v) keyval(fun1(k), fun2(v))}}
-
-to.reduce = to.map
-
-to.reduce.all = function(fun1, fun2 = identity) {
- if (missing(fun2)) {
- function(k, vv) lapply(vv, function(v) fun1(keyval(k, v)))}
- else {
- function(k, vv) lapply(vv, function(v) keyval(fun1(k), fun2(v)))}}
-
-## mapred combinators
-wrap.keyval = function(kv) {
- if(is.null(kv)) list()
- else if (is.keyval(kv)) list(kv)
- else kv}
-
-compose.mapred = function(mapred, map) function(k, v) {
- out = mapred(k, v)
- if (is.null(out)) NULL
- else if (is.keyval(out)) map(out$key, out$val)
- else do.call(c,
- lapply(out,
- function(x)
- wrap.keyval(map(x$key, x$val))))}
-
-union.mapred = function(mr1, mr2) function(k, v) {
- out = c(wrap.keyval(mr1(k, v)), wrap.keyval(mr2(k, v)))
- if (length(out) == 0) NULL else out}
-
-
-
-#some option formatting utils
-
-paste.options = function(optlist) {
- optlist = unlist(sapply(optlist, function(x) if (is.logical(x)) {if(x) "" else NULL} else x))
- if(is.null(optlist)) ""
- else paste(unlist(rbind(paste("-", names(optlist), sep = ""), optlist)), collapse = " ")}
-
-make.input.files = function(infiles) {
- if(length(infiles) == 0) return(" ")
- paste(sapply(infiles,
- function(r) {
- sprintf("-input %s ", r)}),
- collapse=" ")}
-
-# I/O
-
-make.record.reader = function(mode = NULL, format = NULL, con = NULL, nrecs = 1) {
- default = make.input.format()
- if(is.null(mode)) mode = default$mode
- if(is.null(format)) format = default$format
- if(mode == "text") {
- if(is.null(con)) con = file("stdin", "r")} #not stdin() which is parsed by the interpreter
- else {
- if(is.null(con)) con = pipe("cat", "rb")}
- function() format(con, nrecs)}
-
-make.record.writer = function(mode = NULL, format = NULL, con = NULL) {
- default = make.output.format()
- if(is.null(mode)) mode = default$mode
- if(is.null(format)) format = default$format
- if(mode == "text") {
- if(is.null(con)) con = stdout()}
- else {
- if(is.null(con)) con = pipe("cat", "wb")}
- function(k, v, vectorized) format(k, v, con, vectorized)}
-
-IO.formats = c("text", "json", "csv", "native", "native.text",
- "sequence.typedbytes")
-
-make.input.format = function(format = native.input.format(),
- mode = c("binary", "text"),
- streaming.format = NULL, ...) {
- mode = match.arg(mode)
- if(is.character(format)) {
- format = match.arg(format, IO.formats)
- switch(format,
- text = {format = text.input.format
- mode = "text"},
- json = {format = json.input.format
- mode = "text"},
- csv = {format = csv.input.format(...)
- mode = "text"},
- native.text = {format = native.text.input.format()
- mode = "text"
- warning("The native.text format is deprecated in favor of native. Please switch and convert your data.")},
- native = {format = native.input.format()
- mode = "binary"},
- sequence.typedbytes = {format = typed.bytes.input.format()
- mode = "binary"})}
- if(is.null(streaming.format) && mode == "binary")
- streaming.format = "org.apache.hadoop.streaming.AutoInputFormat"
- list(mode = mode, format = format, streaming.format = streaming.format)}
-
-make.output.format = function(format = native.output.format,
- mode = c("binary", "text"),
- streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat",
- ...) {
- mode = match.arg(mode)
- if(is.character(format)) {
- format = match.arg(format, IO.formats)
- switch(format,
- text = {format = text.output.format
- mode = "text"
- streaming.format = NULL},
- json = {format = json.output.format
- mode = "text"
- streaming.format = NULL},
- csv = {format = csv.output.format(...)
- mode = "text"
- streaming.format = NULL},
- native.text = {format = native.text.output.format
- mode = "text"
- streaming.format = NULL
- warning("The native.text format is deprecated in favor of native. Please switch and convert your data.")},
- native = {format = native.output.format
- mode = "binary"
- streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"},
- sequence.typedbytes = {format = typed.bytes.output.format
- mode = "binary"
- streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"})}
- mode = match.arg(mode)
- list(mode = mode, format = format, streaming.format = streaming.format)}
-
-
-
-#output cmp
-cmp = function(x, y) {
- kx = keys(x)
- ky = keys(y)
- vx = values(x)
- vy = values(y)
- ox = order(sapply(kx, digest), sapply(vx, function(z){attr(z, "rmr.input") = NULL; digest(z)}))
- oy = order(sapply(ky, digest), sapply(vy, function(z){attr(z, "rmr.input") = NULL; digest(z)}))
- isTRUE(all.equal(kx[ox], ky[oy], check.attributes = FALSE)) &&
- isTRUE(all.equal(vx[ox], vy[oy], check.attributes = FALSE))}
-
-#hdfs section
-
-hdfs = function(cmd, intern, ...) {
- if (is.null(names(list(...)))) {
- argnames = sapply(1:length(list(...)), function(i) "")}
- else {
- argnames = names(list(...))}
- system(paste(hadoop.cmd(), " dfs -", cmd, " ",
- paste(
- apply(cbind(argnames, list(...)), 1,
- function(x) paste(
- if(x[[1]] == "") {""} else {"-"},
- x[[1]],
- " ",
- to.dfs.path(x[[2]]),
- sep = ""))[
- order(argnames, decreasing = T)],
- collapse = " "),
- sep = ""),
- intern = intern)}
-
-getcmd = function(matched.call)
- strsplit(tail(as.character(as.list(matched.call)[[1]]), 1), "\\.")[[1]][[2]]
-
-hdfs.match.sideeffect = function(...) {
- hdfs(getcmd(match.call()), FALSE, ...) == 0}
-
-#this returns a character matrix, individual cmds may benefit from additional transformations
-hdfs.match.out = function(...) {
- oldwarn = options("warn")[[1]]
- options(warn = -1)
- retval = do.call(rbind, strsplit(hdfs(getcmd(match.call()), TRUE, ...), " +"))
- options(warn = oldwarn)
- retval}
-
-mkhdfsfun = function(hdfscmd, out)
- eval(parse(text = paste ("hdfs.", hdfscmd, " = hdfs.match.", if(out) "out" else "sideeffect", sep = "")),
- envir = parent.env(environment()))
-
-for (hdfscmd in c("ls", "lsr", "df", "du", "dus", "count", "cat", "text", "stat", "tail", "help"))
- mkhdfsfun(hdfscmd, TRUE)
-
-for (hdfscmd in c("mv", "cp", "rm", "rmr", "expunge", "put", "copyFromLocal", "moveFromLocal", "get", "getmerge",
- "copyToLocal", "moveToLocal", "mkdir", "setrep", "touchz", "test", "chmod", "chown", "chgrp"))
- mkhdfsfun(hdfscmd, FALSE)
-
-pretty.hdfs.ls = function(...) {
- ls.out = hdfs.ls(...)
- crud = grep("Found", ls.out[,1])
- if(length(crud) > 0)
- ls.out = ls.out[-crud,]
- if(class(ls.out) == "character") ls.out = t(ls.out)
- df = as.data.frame(ls.out,stringsAsFactors=F)
- names(df) = c("mode", "links", "owner", "group", "size", "last.modified.date", "last.modified.time", "path")
- df$links = as.numeric(sapply(as.character(df$links), function(x) if (x=="-") 0 else x))
- df$size = as.numeric(as.character(df$size))
- df}
-
-# backend independent dfs section
-part.list = function(fname) {
- if(rmr.options.get('backend') == "local") fname
- else {
- if(dfs.is.dir(fname))
- pretty.hdfs.ls(paste(fname, "part*", sep = "/"))$path
- else fname}}
-
-dfs.exists = function(f) {
- if (rmr.options.get('backend') == 'hadoop')
- hdfs.test(e = f)
- else file.exists(f)}
-
-dfs.rm = function(f) {
- if(rmr.options.get('backend') == 'hadoop')
- hdfs.rm(f)
- else file.remove(f)}
-
-dfs.is.dir = function(f) {
- if (rmr.options.get('backend') == 'hadoop')
- hdfs.test(d = f)
- else file.info(f)['isdir']}
-
-dfs.empty = function(f) {
- f = to.dfs.path(f)
- if(rmr.options.get('backend') == 'hadoop') {
- if(dfs.is.dir(f)) {
- all(
- lapply(
- part.list(f),
- function(x) hdfs.test(z = x)))}
- else {hdfs.test(z = f)}}
- else file.info(f)['size'] == 0}
-
-# dfs bridge
-
-to.dfs.path = function(input) {
- if (is.character(input)) {
- input}
- else {
- if(is.function(input)) {
- input()}}}
-
-to.dfs = function(object, output = dfs.tempfile(), format = "native") {
- object = to.list(object)
- tmp = tempfile()
- dfsOutput = to.dfs.path(output)
- if(is.character(format)) format = make.output.format(format)
-
- write.file =
- function(obj, f) {
- con = file(f, if(format$mode == "text") "w" else "wb")
- record.writer = make.record.writer(format$mode,
- format$format,
- con)
- if(is.vectorized.keyval(obj))
- record.writer(obj$key, obj$val, TRUE)
- else
- lapply(obj,
- function(x) {
- kv = if(is.keyval(x)) x else keyval(NULL, x)
- record.writer(kv$key, kv$val, FALSE)})
- close(con)}
-
- write.file(object, tmp)
- if(rmr.options.get('backend') == 'hadoop') {
- if(format$mode == "binary")
- system(paste(hadoop.streaming(), "loadtb", dfsOutput, "<", tmp))
- else hdfs.put(tmp, dfsOutput)}
- else file.copy(tmp, dfsOutput)
- file.remove(tmp)
- output}
-
-from.dfs = function(input, format = "native", to.data.frame = FALSE, vectorized = FALSE, structured = FALSE) {
- if(is.logical(vectorized)) nrecs = if(vectorized) rmr.options$vectorized.nrows else 1
- else nrecs = vectorized
- read.file = function(f) {
- con = file(f, if(format$mode == "text") "r" else "rb")
- record.reader = make.record.reader(format$mode, format$format, con, nrecs)
- retval = make.fast.list()
- rec = record.reader()
- while(!is.null(rec)) {
- retval(if(is.keyval(rec)) list(rec) else rec)
- rec = record.reader()}
- close(con)
- retval()}
-
- dumptb = function(src, dest){
- lapply(src, function(x) system(paste(hadoop.streaming(), "dumptb", x, ">>", dest)))}
-
- getmerge = function(src, dest) {
- on.exit(unlink(tmp))
- tmp = tempfile()
- lapply(src, function(x) {
- hdfs.get(as.character(x), tmp)
- system(paste('cat', tmp, '>>' , dest))
- unlink(tmp)})
- dest}
-
- fname = to.dfs.path(input)
- if(is.character(format)) format = make.input.format(format)
- if(rmr.options.get("backend") == "hadoop") {
- tmp = tempfile()
- if(format$mode == "binary") dumptb(part.list(fname), tmp)
- else getmerge(part.list(fname), tmp)}
- else
- tmp = fname
- retval = read.file(tmp)
- if(rmr.options.get("backend") == "hadoop") unlink(tmp)
- if(to.data.frame) warning("to.data.frame deprecated, use structured instead")
- if(to.data.frame || structured)
- keyval.list.to.data.frame(retval)
- else if(vectorized)
- keyval(do.call(c, lapply(retval,keys)), do.call(c, lapply(retval, values)), vectorized = TRUE)
- else(retval)}
-
-# mapreduce
-
-dfs.tempfile = function(pattern = "file", tmpdir = tempdir()) {
- fname = tempfile(pattern, tmpdir)
- namefun = function() {fname}
- reg.finalizer(environment(namefun),
- function(e) {
- fname = eval(expression(fname), envir = e)
- if(Sys.getenv("mapred_task_id") != "" && dfs.exists(fname)) dfs.rm(fname)
- })
- namefun
-}
-
-dfs.managed.file = function(call, managed.dir = rmr.options.get('managed.dir')) {
- file.path(managed.dir, digest(lapply(call, eval)))}
-
-mapreduce = function(
- input,
- output = NULL,
- map = to.map(identity),
- reduce = NULL,
- combine = NULL,
- reduce.on.data.frame = FALSE,
- input.format = "native",
- output.format = "native",
- vectorized = list(map = FALSE, reduce = FALSE),
- structured = list(map = FALSE, reduce = FALSE),
- backend.parameters = list(),
- verbose = TRUE) {
-
- on.exit(expr = gc(), add = TRUE) #this is here to trigger cleanup of tempfiles
- if (is.null(output)) output =
- if(rmr.options.get('depend.check'))
- dfs.managed.file(match.call())
- else
- dfs.tempfile()
- if(is.character(input.format)) input.format = make.input.format(input.format)
- if(is.character(output.format)) output.format = make.output.format(output.format)
- if(is.logical(vectorized)) vectorized = list(map = vectorized, reduce = vectorized)
- if(is.logical(vectorized$map)){
- vectorized$map = if (vectorized$map) rmr.options$vectorized.nrows else 1}
- if(is.logical(structured)) structured = list(map = structured, reduce = structured)
- structured$map = !is.null(structured$map) && structured$map && (vectorized$map != 1)
- if(!missing(reduce.on.data.frame)) {
- warning("reduce.on.data.frame deprecated, use structured instead")
- structured$reduce = reduce.on.data.frame}
- if(is.null(structured$reduce)) structured$reduce = FALSE
-
- backend = rmr.options.get('backend')
-
- profile.nodes = rmr.options.get('profile.nodes')
-
- mr = switch(backend, hadoop = rhstream, local = mr.local, stop("Unsupported backend: ", backend))
-
- mr(map = map,
- reduce = reduce,
- combine = combine,
- in.folder = if(is.list(input)) {lapply(input, to.dfs.path)} else to.dfs.path(input),
- out.folder = to.dfs.path(output),
- profile.nodes = profile.nodes,
- input.format = input.format,
- output.format = output.format,
- vectorized = vectorized,
- structured = structured,
- backend.parameters = backend.parameters[[backend]],
- verbose = verbose)
- output
-}
-
-# backends
-
-#local
-
-mr.local = function(map,
- reduce,
- combine,
- in.folder,
- out.folder,
- profile.nodes,
- input.format,
- output.format,
- vectorized,
- structured,
- backend.parameters,
- verbose = verbose) {
- if(is.null(reduce)) reduce = function(k, vv) lapply(vv, function(v) keyval(k, v))
-
- apply.map =
- function(kv) {
- retval = map(if(structured$map) to.data.frame(kv$key) else kv$key,
- if(structured$map) to.data.frame(kv$val) else kv$val)
-
- if(is.keyval(retval) && !is.vectorized.keyval(retval)) list(retval)
- else {
- if(is.vectorized.keyval(retval)){
- kr = keys(retval)
- vr = values(retval)
- lapply(seq_along(kr), function(i) keyval(kr[[i]], vr[[i]]))}
- else retval}}
- get.data =
- function(fname) {
- in.data = from.dfs(fname, format = input.format, vectorized = vectorized$map > 1)
- if(vectorized$map == 1) {
- lapply(in.data, function(rec) {attr(rec$val, 'rmr.input') = fname; rec})}
- else {
- attr(in.data$val, 'rmr.input') = fname
- list(in.data)}}
- map.out =
- catply(
- catply(
- in.folder,
- get.data),
- apply.map)
- map.out = from.dfs(to.dfs(map.out))
- reduce.out = as.list(tapply(X = map.out,
- INDEX = sapply(keys(map.out), digest),
- FUN = function(x) reduce(x[[1]]$key,
- if(structured$reduce) to.data.frame(values(x)) else values(x)),
- simplify = FALSE))
- if(!is.keyval(reduce.out[[1]]))
- reduce.out = do.call(c, reduce.out)
- names(reduce.out) = replicate(n=length(names(reduce.out)), "")
- to.dfs(reduce.out, out.folder, format = output.format)}
-
-#hadoop
-## loops section, or what runs on the nodes
-
-activate.profiling = function() {
- dir = file.path("/tmp/Rprof", Sys.getenv('mapred_job_id'), Sys.getenv('mapred_tip_id'))
- dir.create(dir, recursive = T)
- Rprof(file.path(dir, paste(Sys.getenv('mapred_task_id'), Sys.time())))}
-
-close.profiling = function() Rprof(NULL)
-
-
-map.loop = function(map, record.reader, record.writer, structured, profile) {
- if(profile) activate.profiling()
- kv = record.reader()
- while(!is.null(kv)) {
- out = map(if(structured) to.data.frame(kv$key) else kv$key,
- if(structured) to.data.frame(kv$val) else kv$val)
- if(!is.null(out)) {
- if (is.keyval(out)) {record.writer(out$key, out$val, is.vectorized.keyval(out))}
- else {lapply(out, function(o) record.writer(o$key, o$val, is.vectorized.keyval(o)))}}
- kv = record.reader()}
- if(profile) close.profiling()
- invisible()}
-
-list.cmp = function(ll, e) sapply(ll, function(l) isTRUE(all.equal(e, l, check.attributes = FALSE)))
-## using isTRUE(all.equal(x)) because identical() was too strict, but on paper it should be it
-
-reduce.loop = function(reduce, record.reader, record.writer, structured, profile) {
- reduce.flush = function(current.key, vv) {
- out = reduce(current.key,
- if(structured) {
- to.data.frame(vv)}
- else {vv})
- if(!is.null(out)) {
- if(is.keyval(out)) {record.writer(out$key, out$val, is.vectorized.keyval(out))}
- else {lapply(out, function(o) record.writer(o$key, o$val, is.vectorized.keyval(o)))}}}
- if(profile) activate.profiling()
- kv = record.reader()
- current.key = kv$key
- vv = make.fast.list()
- while(!is.null(kv)) {
- if(identical(kv$key, current.key)) vv(if(is.vectorized.keyval(kv)) kv$val else list(kv$val))
- else {
- reduce.flush(current.key, vv())
- current.key = kv$key
- vv = make.fast.list(if(is.vectorized.keyval(kv)) kv$val else list(kv$val))}
- kv = record.reader()}
- if(length(vv()) > 0) reduce.flush(current.key, vv())
- if(profile) close.profiling()
- invisible()
-}
-
-# the main function for the hadoop backend
-
-hadoop.cmd = function() {
- hadoop_cmd = Sys.getenv("HADOOP_CMD")
- if( hadoop_cmd == "") {
- hadoop_home = Sys.getenv("HADOOP_HOME")
- if(hadoop_home == "") stop("Please make sure that the env. variable HADOOP_CMD or HADOOP_HOME are set")
- file.path(hadoop_home, "bin", "hadoop")}
- else hadoop_cmd}
-
-hadoop.streaming = function() {
- hadoop_streaming = Sys.getenv("HADOOP_STREAMING")
- if(hadoop_streaming == ""){
- hadoop_home = Sys.getenv("HADOOP_HOME")
- if(hadoop_home == "") stop("Please make sure that the env. variable HADOOP_STREAMING or HADOOP_HOME are set")
- stream.jar = list.files(path=sprintf("%s/contrib/streaming", hadoop_home), pattern="jar$", full.names = TRUE)
- sprintf("%s jar %s ", hadoop.cmd(), stream.jar)}
- else sprintf("%s jar %s ", hadoop.cmd(), hadoop_streaming)}
-
-rhstream = function(
- map,
- reduce,
- combine,
- in.folder,
- out.folder,
- profile.nodes,
- input.format,
- output.format,
- vectorized,
- structured,
- backend.parameters,
- verbose = TRUE,
- debug = FALSE) {
- ## prepare map and reduce executables
- lines = 'options(warn=1)
-
-library(rmr)
-load("rmr-local-env")
-load("rmr-global-env")
-invisible(lapply(libs, function(l) require(l, character.only = T)))
-'
- map.line = ' rmr:::map.loop(map = map,
- record.reader = rmr:::make.record.reader(input.format$mode,
- input.format$format,
- nrecs = vectorized$map),
- record.writer = if(is.null(reduce)) {
- rmr:::make.record.writer(output.format$mode,
- output.format$format)}
- else {
- rmr:::make.record.writer()},
- structured$map,
- profile = profile.nodes)'
- reduce.line = ' rmr:::reduce.loop(reduce = reduce,
- record.reader = rmr:::make.record.reader(),
- record.writer = rmr:::make.record.writer(output.format$mode,
- output.format$format),
- structured = structured$reduce,
- profile = profile.nodes)'
- combine.line = ' rmr:::reduce.loop(reduce = combine,
- record.reader = rmr:::make.record.reader(),
- record.writer = rmr:::make.record.writer(),
- structured = structured$reduce,
- profile = profile.nodes)'
-
- map.file = tempfile(pattern = "rhstr.map")
- writeLines(c(lines, map.line), con = map.file)
- reduce.file = tempfile(pattern = "rhstr.reduce")
- writeLines(c(lines, reduce.line), con = reduce.file)
- combine.file = tempfile(pattern = "rhstr.combine")
- writeLines(c(lines, combine.line), con = combine.file)
-
- ## set up the execution environment for map and reduce
- if (!is.null(combine) && is.logical(combine) && combine) {
- combine = reduce}
-
- save.env = function(fun = NULL, name) {
- fun.env = file.path(tempdir(), name)
- envir =
- if(is.null(fun)) parent.env(environment()) else {
- if (is.function(fun)) environment(fun)
- else fun}
- save(list = ls(all.names = TRUE, envir = envir), file = fun.env, envir = envir)
- fun.env}
-
- libs = sub("package:", "", grep("package", search(), value = T))
- image.cmd.line = paste("-file",
- c(save.env(name = "rmr-local-env"),
- save.env(.GlobalEnv, "rmr-global-env")),
- collapse = " ")
- ## prepare hadoop streaming command
- hadoop.command = hadoop.streaming()
- input = make.input.files(in.folder)
- output = if(!missing(out.folder)) sprintf("-output %s", out.folder) else " "
- input.format.opt = if(is.null(input.format$streaming.format)) {
- ' ' # default is TextInputFormat
- }else {
- sprintf(" -inputformat %s", input.format$streaming.format)
- }
- output.format.opt = if(is.null(output.format$streaming.format)) {
- ' '}
- else {
- sprintf(" -outputformat %s", output.format$streaming.format)
- }
- stream.map.input =
- if(input.format$mode == "binary") {
- " -D stream.map.input=typedbytes"}
- else {''}
- stream.map.output =
- if(is.null(reduce) && output.format$mode == "text") ""
- else " -D stream.map.output=typedbytes"
- stream.reduce.input = " -D stream.reduce.input=typedbytes"
- stream.reduce.output =
- if(output.format$mode == "binary") " -D stream.reduce.output=typedbytes"
- else ''
- stream.mapred.io = paste(stream.map.input,
- stream.map.output,
- stream.reduce.input,
- stream.reduce.output)
- mapper = sprintf('-mapper "Rscript %s" ', tail(strsplit(map.file, "/")[[1]], 1))
- m.fl = sprintf("-file %s ", map.file)
- if(!is.null(reduce) ) {
- reducer = sprintf('-reducer "Rscript %s" ', tail(strsplit(reduce.file, "/")[[1]], 1))
- r.fl = sprintf("-file %s ", reduce.file)}
- else {
- reducer=" ";r.fl = " "}
- if(!is.null(combine) && is.function(combine)) {
- combiner = sprintf('-combiner "Rscript %s" ', tail(strsplit(combine.file, "/")[[1]], 1))
- c.fl = sprintf("-file %s ", combine.file)}
- else {
- combiner = " "
- c.fl = " "}
-
- #debug.opts = "-mapdebug kdfkdfld -reducexdebug jfkdlfkja"
-
- final.command =
- paste(
- hadoop.command,
- stream.mapred.io,
- paste.options(backend.parameters),
- input,
- output,
- mapper,
- combiner,
- reducer,
- image.cmd.line,
- m.fl,
- r.fl,
- c.fl,
- input.format.opt,
- output.format.opt,
- "2>&1")
- if(verbose) {
- retval = system(final.command)
- if (retval != 0) stop("hadoop streaming failed with error code ", retval, "\n")}
- else {
- console.output = tryCatch(system(final.command, intern=TRUE),
- warning = function(e) stop(e))
- 0
- }
-}
-##special jobs
-
-## a sort of relational join very useful in a variety of map reduce algorithms
-
-## to.dfs(lapply(1:10, function(i) keyval(i, i^2)), "/tmp/reljoin.left")
-## to.dfs(lapply(1:10, function(i) keyval(i, i^3)), "/tmp/reljoin.right")
-## equijoin(left.input="/tmp/reljoin.left", right.input="/tmp/reljoin.right", output = "/tmp/reljoin.out")
-## from.dfs("/tmp/reljoin.out")
-
-equijoin = function(
- left.input = NULL,
- right.input = NULL,
- input = NULL,
- output = NULL,
- outer = c("", "left", "right", "full"),
- map.left = to.map(identity),
- map.right = to.map(identity),
- reduce = function(k, values.left, values.right)
- do.call(c,
- lapply(values.left,
- function(vl) lapply(values.right,
- function(vr) reduce.all(k, vl, vr)))),
- reduce.all = function(k, vl, vr) keyval(k, list(left = vl, right = vr)))
- {
- stopifnot(xor(!is.null(left.input), !is.null(input) &&
- (is.null(left.input)==is.null(right.input))))
- outer = match.arg(outer)
- left.outer = outer == "left"
- right.outer = outer == "right"
- full.outer = outer == "full"
- if (is.null(left.input)) {
- left.input = input}
- mark.side =
- function(kv, isleft) keyval(kv$key, list(val = kv$val, isleft = isleft))
- is.left.side =
- function(left.input) {
- leftin = strsplit(to.dfs.path(left.input), "/+")[[1]]
- mapin = strsplit(Sys.getenv("map_input_file"), "/+")[[1]]
- leftin = leftin[-1]
- mapin = mapin[if(is.element(mapin[1], c("hdfs:", "maprfs:"))) c(-1, -2) else -1]
- all(mapin[1:length(leftin)] == leftin)}
- reduce.split =
- function(vv) tapply(lapply(vv, function(v) v$val), sapply(vv, function(v) v$isleft), identity, simplify = FALSE)
- pad.side =
- function(vv, side.outer, full.outer) if (length(vv) == 0 && (side.outer || full.outer)) c(NA) else vv
- map = if (is.null(input)) {
- function(k, v) {
- ils = switch(rmr.options.get('backend'),
- hadoop = is.left.side(left.input),
- local = attr(v, 'rmr.input') == to.dfs.path(left.input),
- stop("Unsupported backend: ", rmr.options.get('backend')))
- mark.side(if(ils) map.left(k, v) else map.right(k, v), ils)}}
- else {
- function(k, v) {
- list(mark.side(map.left(k, v), TRUE),
- mark.side(map.right(k, v), FALSE))}}
- eqj.reduce = reduce
- mapreduce(map = map,
- reduce =
- function(k, vv) {
- rs = reduce.split(vv)
- eqj.reduce(k,
- pad.side(rs$`TRUE`, right.outer, full.outer),
- pad.side(rs$`FALSE`, left.outer, full.outer))},
- input = c(left.input, right.input),
- output = output)}
-
-
-
-## push a file through this to get as many partitions as possible (depending on system settings)
-## data is unchanged
-
-scatter = function(input, output = NULL)
- mapreduce(input,
- output,
- map = function(k, v) keyval(sample(1:1000, 1), keyval(k, v)),
- reduce = function(k, vv) vv)
-
-##optimizer
-
-is.mapreduce = function(x) {
- is.call(x) && x[[1]] == "mapreduce"}
-
-mapreduce.arg = function(x, arg) {
- match.call(mapreduce, x) [[arg]]}
-
-optimize = function(mrex) {
- mrin = mapreduce.arg(mrex, 'input')
- if (is.mapreduce(mrex) &&
- is.mapreduce(mrin) &&
- is.null(mapreduce.arg(mrin, 'output')) &&
- is.null(mapreduce.arg(mrin, 'reduce'))) {
- bquote(
- mapreduce(input = .(mapreduce.arg(mrin, 'input')),
- output = .(mapreduce.arg(mrex, 'output')),
- map = .(compose.mapred)(.(mapreduce.arg(mrex, 'map')),
- .(mapreduce.arg(mrin, 'map'))),
- reduce = .(mapreduce.arg(mrex, 'reduce'))))}
- else mrex }
-
-
-##other
-
-reload = function() {
- detach("package:rmr", unload=T)
- library.dynam.unload("rmr",system.file(package="rmr"))
- library(rmr)}
View
16 rmr/pkg/docs/compatibility.Rmd
@@ -1,16 +0,0 @@
-# Compatibility testing for rmr 1.3.x
-Please contribute with additional reports. To claim compatibility you need to run `R CMD check path-to-rmr` successfully.
-As with any new release, testing on additional platforms is under way. If you build your own Hadoop, see [Which Hadoop for rmr](https://github.com/RevolutionAnalytics/RHadoop/wiki/Which-Hadoop-for-rmr).
-
-<table>
-<thead>
-<tr><th>rmr</th><th>Hadoop</th><th>R</th><th>OS</th><th>Compatibility</th><th>Reporter</th></tr>
-</thead>
-<tbody>
-<tr><td>1.3.1</td><td>mr1-cdh4.0.0</td><td>Revolution R Enterprise 6.0</td><td>64-bit CentOS 5.6</td><td>only x86_64 and mr1</td><td>Revolution Analytics</td></tr>
-<tr><td>1.3.1</td><td>CDH3u4</td><td>Revolution R Enterprise 6.0</td><td>64-bit CentOS 5.6</td><td>only x86_64</td><td>Revolution Analytics</td></tr>
-<tr><td>1.3.1</td><td>Apache Hadoop 1.0.2</td><td>Revolution R Enterprise 6.0</td><td>64-bit CentOS 5.6</td><td>only x86_64</td><td>Revolution Analytics</td></tr>
-<tr><td>1.3.1</td><td>CDH3u4</td><td>Open Source R 2.14.1</td><td>64-bit Ubuntu 10.10</td><td>only x86_64</td><td>@wdavidw</td></tr>
-
-</tbody>
-</table>
View
15 rmr/pkg/docs/compatibility.md
@@ -1,15 +0,0 @@
-# Compatibility testing for rmr 1.3.x
-Please contribute with additional reports. To claim compatibility you need to run `R CMD check path-to-rmr` successfully.
-As with any new release, testing on additional platforms is under way. If you build your own Hadoop, see [Which Hadoop for rmr](https://github.com/RevolutionAnalytics/RHadoop/wiki/Which-Hadoop-for-rmr).
-
-<table>
-<thead>
-<tr><th>rmr</th><th>Hadoop</th><th>R</th><th>OS</th><th>Compatibility</th><th>Reporter</th></tr>
-</thead>
-<tbody>
-<tr><td>1.3.1</td><td>mr1-cdh4.0.0</td><td>Revolution R Enterprise 6.0</td><td>64-bit CentOS 5.6</td><td>only x86_64 and mr1</td><td>Revolution Analytics</td></tr>
-<tr><td>1.3.1</td><td>CDH3u4</td><td>Revolution R Enterprise 6.0</td><td>64-bit CentOS 5.6</td><td>only x86_64</td><td>Revolution Analytics</td></tr>
-<tr><td>1.3.1</td><td>Apache Hadoop 1.0.2</td><td>Revolution R Enterprise 6.0</td><td>64-bit CentOS 5.6</td><td>only x86_64</td><td>Revolution Analytics</td></tr>
-<tr><td>1.3.1</td><td>CDH3u4</td><td>Open Source R 2.14.1</td><td>64-bit Ubuntu 10.10</td><td>only x86_64</td><td>@wdavidw</td></tr>
-</tbody>
-</table>
View
BIN rmr/pkg/docs/compatibility.pdf
Binary file not shown.
View
57 rmr/pkg/docs/fast-k-means.Rmd
@@ -1,57 +0,0 @@
-`r read_chunk('../tests/kmeans.R')`
-`r opts_chunk$set(echo=TRUE, eval=FALSE, cache=FALSE, tidy=FALSE)`
-
-We have covered a basic k-means implementation with `rmr` in the [Tutorial](tutorial.md). If you tried it out, though, you probably have noticed that its performance leaves to be desired and wonder if anything can be done about it. Or your have read [Efficient rmr techniques](https://github.com/RevolutionAnalytics/RHadoop/wiki/Efficient-rmr-techniques) and would like to see those suggestions put to work beyond the toy "large sums" example used therein. Then this document should be of interest to you since we will cover an implementation that is modestly more complex and is two orders of magnitude faster. To make the most of it, it's recommended that you read the other two documents first.
-
-First we need to reorganize our data representation a little, creating bulkier records that contain a sizeable subset of the data set each, as opposed to a single point. To this end, instead of storing one point per record we will store a matrix, with one data point per row of this matrix. We'll set the number of rows to 1000 which is enough to reap the benefits of using "vectorised" functions in R but not big enough to hit memory limits in most cases.
-
-```{r kmeans.data.fast}
-```
-
-This is how a sample call would look like, with the first argument being a sample dataset.
-
-```{r kmeans.run.fast}
-```
-
-This creates and processes a dataset with 100,000 data points, organized in 100 records. For a larger data set you would need to increase the number of records only, the size of each record can stay the same. As you may recall, the implementation of kmeans we described in the tutorial was organized in two functions, one containing the main iteration loop and the other computing distances and new centers. The good news is the first function can stay largely the same but for the addition of a flag that tells whether to use the optimized version of the "inner" function, so we don't need to cover it here (the code is in the source under `tests`, only in the dev branch for now) and a different default for the distance function &mdash; more on this soon. The important changes are in the `kmeans.iter.fast` function, which provides an alternative to the `kmeans.iter` function in the original implementation. Let's first discuss why we need a different default distance function, and in general why the distance function has a different signature in the fast version. One of the most CPU intensive tasks in this algorithm is computing distances between a candidate center and each data point. If we don't implement this one in an efficient way, we can't hope for an overall efficient implementation. Since it takes about a microsecond to call the simplest function in R (vs. ~10 nanoseconds in C), we need to get a significant amount of work done for each call. Therefore, instead of specifying the distance function as a function of two points, we will switch to a function of one point and and a set thereof that returns that distance between the first argument and each element of the second. In this implementation we will us a matrix instead of a set, since there are powerful primitives available to operate on matrices. The following is the default distance function with this new signature, where we can see that we avoided any explicit loops over the rows of the matrix `yy`. There are two implicit loops, `Reduce` and `lapply`, but internally they used vectorized operators, that is the overhead of those explicit loops is small compared to the time taken by the vectorised operators.
-
-```{r kmeans.fast.dist}
-```
-
-With fast distance computation taken care of, at least for the euclidean case, let's look at the fast implementation of the kmeans iteration.
-
-```{r kmeans.iter.fast.signature}
-```
-
-There is no news here as far as the signature but for a different distance default, so we can move on to the body. The following function is a conversion function that allows us to work around a limitation in the RJSONIO library we are using to serialize R objects. Unserializing a deserialized matrix returns a list of vectors, which we can easily turn into a matrix again. Whenever you have doubts whether the R object you intend to use as an argument or return value of a mapper or reducer will be encoded and decoded correctly, an option is to try `RJSONIO::fromJSON(RJSONIO::toJSON(x))` where `x` is the object of interest. This a price to pay for using a language agnostic serialization scheme.
-
-```{r kmeans.iter.fast.list.to.matrix}
-```
-
-The next is the main mapreduce call, which, as in the Tutorial, can have two different map functions: let's look at each in detail.
-
-```{r kmeans.iter.fast.mapreduce}
-```
-
-The first of the two map functions is used only for the first iteration, when no set of cluster centers is available, only a number, and randomly assigns each point to a center, just as in the Tutorial, but here the matrix argument `v` represents multiple data points and we need to assign each of them to a center efficiently. Moreover, we are going to switch from computing the means of data points in a cluster to computing their sums and counts, delaying taking the ratio of the two as much as possible. This way we can apply early data reduction as will be clear soon. To achieve this, the first step in the mapper is to extend the matrix of points with a column of counts, all initialized to one. The next line assigns points to clusters using `sample`. This assignment is then supplied to the function `by` which applies a column sum to each group of rows in the matrix `v` of data points, as defined by being closest to the same center. This is where we apply the `sum` operation at the earliest possible stage &mdash; you can see it as an in-map combiner. Also, since the first column of the matrix is now devoted to counts, we are calculating those as well. In the last line, the only thing left is to generate a list of keyvalue pairs, one per center, and return it.
-
-```{r kemans.iter.fast.map.first}
-```
-
-For all iterations after the first, the assignment of points to centers follows a min distance criterion. The first line back-converts `v` to a matrix whereas the second uses the aforementioned `fast.dist` function in combination with `apply` to generate a data points x centers matrix of distances. The next assignment, which takes a few lines, aims to compute the row by row min of this distance matrix and return the index of a column containing the minimum for each row. We can not use the customary function `min` to accomplish this as it returns only one number, hence we would need to call it for each data point. So we need to use its parallel, less known relative `pmin` and apply it to the columns of the distance matrix using the combination `do.call` and `lapply`. The output of this is a two column matrix where each row contains the index of a row and the column index of the min for that row. The following assignment sorts this matrix so that the results are in the same order as the `v` matrix. The last few steps are the same as for the first type of map and implement the early reduction we talked about.
-
-```{r kmeans.iter.fast.map.rest}
-```
-
-In the reduce function, we simply sum over the colums of the matrix of points associated with the same cluster center. Actually, since we have started adding over subgroups of points in the mapper itself, what we are adding here are already partial sums and partial counts (which we store in the first column, as you may recall). Since this is an associative and commutative operation, it can only help to also switch the combiner on. There is on subtle change necessary to do so successfully, which required some debugging even for the author of this document, allegedly a mapreduce expert. In a first version, the reducer returned key value pairs with a NULL key. After all, the reduce phase happens after the shuffle phase, so what use are keys? Not so if the combiner is turned on, as records are first shuffled into the combiner and then re-shuffled into the reducer. So the reducer has to set a key, usually keeping the one set by the mapper (the reducer has to be key-idempotent for the combiner to work)
-
-```{r kmeans.iter.fast.reduce}
-```
-
-The last few lines are an optional argument to `from.dfs` that operates a conversion from list to dataframe when possible, the selection of centers with at least a count of one associated point and, at the very last step, converting sums into averages.
-
-```{r kmeans.iter.fast.newcenters}
-```
-
-
-To recap, we started by using a slightly different representation with a performance-tunable record size. We have re-implemented essentially the same mapper and reducer functions as in the plain-vanilla version, but using only vectorised, efficient library function that acted on all the data points in one record within a single call. Finally, we have delayed the computation of means and replaced it with the computation of pairs (sum, count), which, thanks to the associativity and commutativity of sums, can be performed on arbitrary subgroups as soon as they are formed, effecting an early data reduction. These transformations exemplify only a subset of the topics covered in [[Efficient rmr techniques]] but are enough to produce a dramatic speedup in this case.
View
93 rmr/pkg/docs/fast-k-means.md
@@ -1,93 +0,0 @@
-
-
-
-We have covered a basic k-means implementation with `rmr` in the [Tutorial](tutorial.md). If you tried it out, though, you probably have noticed that its performance leaves to be desired and wonder if anything can be done about it. Or your have read [Efficient rmr techniques](https://github.com/RevolutionAnalytics/RHadoop/wiki/Efficient-rmr-techniques) and would like to see those suggestions put to work beyond the toy "large sums" example used therein. Then this document should be of interest to you since we will cover an implementation that is modestly more complex and is two orders of magnitude faster. To make the most of it, it's recommended that you read the other two documents first.
-
-First we need to reorganize our data representation a little, creating bulkier records that contain a sizeable subset of the data set each, as opposed to a single point. To this end, instead of storing one point per record we will store a matrix, with one data point per row of this matrix. We'll set the number of rows to 1000 which is enough to reap the benefits of using "vectorised" functions in R but not big enough to hit memory limits in most cases.
-
-
-```r
- recsize = 1000
- input = to.dfs(lapply(1:100,
- function(i) keyval(NULL, cbind(sample(0:2, recsize, replace = T) + rnorm(recsize, sd = .1),
- sample(0:3, recsize, replace = T) + rnorm(recsize, sd = .1)))))
-```
-
-
-This is how a sample call would look like, with the first argument being a sample dataset.
-
-
-```r
- kmeans(input, 12, iterations = 5, fast = T)}
-```
-
-
-This creates and processes a dataset with 100,000 data points, organized in 100 records. For a larger data set you would need to increase the number of records only, the size of each record can stay the same. As you may recall, the implementation of kmeans we described in the tutorial was organized in two functions, one containing the main iteration loop and the other computing distances and new centers. The good news is the first function can stay largely the same but for the addition of a flag that tells whether to use the optimized version of the "inner" function, so we don't need to cover it here (the code is in the source under `tests`, only in the dev branch for now) and a different default for the distance function &mdash; more on this soon. The important changes are in the `kmeans.iter.fast` function, which provides an alternative to the `kmeans.iter` function in the original implementation. Let's first discuss why we need a different default distance function, and in general why the distance function has a different signature in the fast version. One of the most CPU intensive tasks in this algorithm is computing distances between a candidate center and each data point. If we don't implement this one in an efficient way, we can't hope for an overall efficient implementation. Since it takes about a microsecond to call the simplest function in R (vs. ~10 nanoseconds in C), we need to get a significant amount of work done for each call. Therefore, instead of specifying the distance function as a function of two points, we will switch to a function of one point and and a set thereof that returns that distance between the first argument and each element of the second. In this implementation we will us a matrix instead of a set, since there are powerful primitives available to operate on matrices. The following is the default distance function with this new signature, where we can see that we avoided any explicit loops over the rows of the matrix `yy`. There are two implicit loops, `Reduce` and `lapply`, but internally they used vectorized operators, that is the overhead of those explicit loops is small compared to the time taken by the vectorised operators.
-
-
-```r
-fast.dist = function(yy, x) { #compute all the distances between x and rows of yy
- squared.diffs = (t(t(yy) - x))^2
- ##sum the columns, take the root, loop on dimension
- sqrt(Reduce(`+`, lapply(1:dim(yy)[2], function(d) squared.diffs[,d])))}
-```
-
-
-With fast distance computation taken care of, at least for the euclidean case, let's look at the fast implementation of the kmeans iteration.
-
-
-```r
-kmeans.iter.fast =
- function(points, distfun, ncenters = dim(centers)[1], centers = NULL) {
-```
-
-
-There is no news here as far as the signature but for a different distance default, so we can move on to the body. The following function is a conversion function that allows us to work around a limitation in the RJSONIO library we are using to serialize R objects. Unserializing a deserialized matrix returns a list of vectors, which we can easily turn into a matrix again. Whenever you have doubts whether the R object you intend to use as an argument or return value of a mapper or reducer will be encoded and decoded correctly, an option is to try `RJSONIO::fromJSON(RJSONIO::toJSON(x))` where `x` is the object of interest. This a price to pay for using a language agnostic serialization scheme.
-
-
-```r
- list.to.matrix = function(l) do.call(rbind,l)
-```
-
-
-The next is the main mapreduce call, which, as in the Tutorial, can have two different map functions: let's look at each in detail.
-
-
-```r
- newCenters = from.dfs(
- mapreduce(
- input = points,
-```
-
-
-The first of the two map functions is used only for the first iteration, when no set of cluster centers is available, only a number, and randomly assigns each point to a center, just as in the Tutorial, but here the matrix argument `v` represents multiple data points and we need to assign each of them to a center efficiently. Moreover, we are going to switch from computing the means of data points in a cluster to computing their sums and counts, delaying taking the ratio of the two as much as possible. This way we can apply early data reduction as will be clear soon. To achieve this, the first step in the mapper is to extend the matrix of points with a column of counts, all initialized to one. The next line assigns points to clusters using `sample`. This assignment is then supplied to the function `by` which applies a column sum to each group of rows in the matrix `v` of data points, as defined by being closest to the same center. This is where we apply the `sum` operation at the earliest possible stage &mdash; you can see it as an in-map combiner. Also, since the first column of the matrix is now devoted to counts, we are calculating those as well. In the last line, the only thing left is to generate a list of keyvalue pairs, one per center, and return it.
-
-
-
-
-For all iterations after the first, the assignment of points to centers follows a min distance criterion. The first line back-converts `v` to a matrix whereas the second uses the aforementioned `fast.dist` function in combination with `apply` to generate a data points x centers matrix of distances. The next assignment, which takes a few lines, aims to compute the row by row min of this distance matrix and return the index of a column containing the minimum for each row. We can not use the customary function `min` to accomplish this as it returns only one number, hence we would need to call it for each data point. So we need to use its parallel, less known relative `pmin` and apply it to the columns of the distance matrix using the combination `do.call` and `lapply`. The output of this is a two column matrix where each row contains the index of a row and the column index of the min for that row. The following assignment sorts this matrix so that the results are in the same order as the `v` matrix. The last few steps are the same as for the first type of map and implement the early reduction we talked about.
-
-
-
-
-In the reduce function, we simply sum over the colums of the matrix of points associated with the same cluster center. Actually, since we have started adding over subgroups of points in the mapper itself, what we are adding here are already partial sums and partial counts (which we store in the first column, as you may recall). Since this is an associative and commutative operation, it can only help to also switch the combiner on. There is on subtle change necessary to do so successfully, which required some debugging even for the author of this document, allegedly a mapreduce expert. In a first version, the reducer returned key value pairs with a NULL key. After all, the reduce phase happens after the shuffle phase, so what use are keys? Not so if the combiner is turned on, as records are first shuffled into the combiner and then re-shuffled into the reducer. So the reducer has to set a key, usually keeping the one set by the mapper (the reducer has to be key-idempotent for the combiner to work)
-
-
-```r
- reduce = function(k, vv) {
- keyval(k, apply(list.to.matrix(vv), 2, sum))},
-```
-
-
-The last few lines are an optional argument to `from.dfs` that operates a conversion from list to dataframe when possible, the selection of centers with at least a count of one associated point and, at the very last step, converting sums into averages.
-
-
-```r
- newCenters = cbind(newCenters$key, newCenters$val)
- newCenters = newCenters[newCenters[,2] > 0, -1]
- (newCenters/newCenters[,1])[,-1]}
-```
-
-
-
-To recap, we started by using a slightly different representation with a performance-tunable record size. We have re-implemented essentially the same mapper and reducer functions as in the plain-vanilla version, but using only vectorised, efficient library function that acted on all the data points in one record within a single call. Finally, we have delayed the computation of means and replaced it with the computation of pairs (sum, count), which, thanks to the associativity and commutativity of sums, can be performed on arbitrary subgroups as soon as they are formed, effecting an early data reduction. These transformations exemplify only a subset of the topics covered in [[Efficient rmr techniques]] but are enough to produce a dramatic speedup in this case.
View
BIN rmr/pkg/docs/fast-k-means.pdf
Binary file not shown.
View
BIN rmr/pkg/docs/getting-data-in-and-out.pdf
Binary file not shown.
View
111 rmr/pkg/docs/introduction-to-vectorized-API.Rmd
@@ -1,111 +0,0 @@
-
-`r read_chunk('../tests/vectorized-API.R')`
-`r opts_chunk$set(echo=TRUE, eval=FALSE, cache=FALSE, tidy = FALSE)`
-
-# Introduction to the `vectorized` and `structured` options in `rmr` 1.3
-
-## Goals for 1.3
-The main goal for 1.3 was to do a performance review and implement changes to eliminate the main performance bottlenecks. Following the review, we determined that better support for a vectorized programming style was necessary to allow writing efficient R and hence efficient `rmr` programs in the case of small record size &mdash; for large record size the vectorization can happen at the record level. We also concluded that the native format parser was unacceptably slow for small objects and other parsers could benefit from a vectorized version.
-We selected a number of very basic use cases to exercise different aspects of the library and the focus was on speed gains on those cases while minimizing code changes. Since API changes were necessary, it was decided to tackle at the same time the issue of better supporting the structured data case, to try and make API changes as soon and as rarely as possible and to keep the `vectorized` and `structured` options consistent. The structured data features are mostly implemented but not necessarily with great attention to speed and backward incompatible changes are possible in the future. The goal was to try and get an expanded, consistent API to the users as quickly as possible as well speed-enhancing features.
-
-## Changes Overview
-We have added a `vectorized` option to `from.dfs`, `mapreduce` and `keyval`. The precise syntax and semantics will be clear from the examples and is described in the R `help()` but the main idea is to instruct the library that the user intends to process or generate multiple key-value pairs in one call. We have also provided vectorized implementations for the typedbytes, CSV and text formats. We have also added a `structured` option to the same functions that in general means that key-value pairs are provided or expected by the user as data frames.
-
-## Data types
-The main data type in rmr is they key-value pair, a two element list with some attributes generated with the function `keyal`. Any R object can take the role of key or value (but see [Caveats](##Caveats) below). Collections of key-value pairs can be represented as lists thereof. This is a row-first representation that is close to how Hadoop does things, but not very natural or efficient in R. Therefore we support an alternative, the *vectorized* key-value pair, which is created by calling `keyval` with the `vectorized` option set to `TRUE`. The arguments passed to `keyval` represent not one but many keys or values and need to have the same length or the same number of rows if this is defined. In the unstructured case, keys and values can be provided to `keyval` as lists, otherwise atomic vectors or data frames are also accepted, and we plan to extend support for other data types (matrices in particular). These data types are accepted by `to.dfs` and generated by `from.dfs`, controlled by the options `structured` and `vectorized`, and they are also used as arguments and return values for the `map` and `reduce` functions, which are arguments to `mapreduce`.
-
-## Caveats
-Because of the inefficiency of the native R serialization on small objects, even using a C implementation, we decided to switch automatically to a different serialization format, typedbytes, when the `vectorized` option is on. This means that users are not going to enjoy the same kind of absolute compatibility with R, but this usually is not a huge drawback when dealing with small records, typically scalars. For example, if each record contains a 100x100 submatrix of a larger matrix, the `vectorized` API doesn't support matrices very well but it's also not going to give a speed boost. If one is processing graphs and each record is a just an integer pair `(start, stop)`, that's where the `vectorized` interface gives the biggest speed boost and typedbytes serialization is adequate for simple records. There may be "in between" cases, for instance when keys or values are very small matrices, where neither option is ideal. In consideration of that, in the future we may expand typedbytes with additional types to be more R-friendly.
-
-## Examples
-First let's create some input. Input size is arbitrary but it is the one used in the automated tests included in the package and to obtain the timings. Here we are assuming a `"hadoop"` backend. By setting `vectorized` to `TRUE` we instruct `keyval` to consider its arguments collections of keys and values, not individual ones. At the same time `to.dfs` automatically switches to a simplified serialization format. The nice thing is that we don't have to remember if a data set was written out with one or the other serialization, as they are compatible on the read side. For testing purposes we need to create a data set in this format to maximize the benefits of the `vectorized` option.
-
-```{r input}
-```
-
-### Read it back
-The simplest possible task is to read back what we just wrote out, and we know how to do that already.
-```{r read-write}
-```
-In the next code block we switch on vectorization. That is, instead of reading in a list of key-value pairs we are going to have list of vectorized key-value pairs, that is every pair contains a list of keys and a list of values of the same length. The `vectorized` argument can be set to an integer as well for more precise control of how many keys and values should be stored in a key-value pair. With this change alone, from my limited, standalone mode testing we have achieved an almost 7X speed-up (raw timing data is in [`vectorized-API.R`](../tests/vectorized-API.R))
-```{r read-write-vec}
-```
-
-### Pass-through
-Next on the complexity scale, or lack thereof, is the pass-through or identity map-reduce job. First in its plain-vanilla incarnation, same as `rmr`-1.2.
-
-```{r pass-through}
-```
-
-Next we turn on vectorization. Vectorization can be turned on independently on the map and reduce side but the reduce side is not implemented yet, nor it is completely clear what the equivalent should be; we are going to let real use cases accumulate before working on vectorization on the reduce side. By tuning on map-side vectorization, the arguments `k` and `v` are going to be equal-sized lists of key and values. To write them back as they are, we need to also turn on vectorization in the `keyval` function, lest we make one key out of many or as an alternative to using inefficient `apply` family calls. The speed up here is about 4X.
-
-```{r pass-through-vec}
-```
-
-### General Filter
-Let's now look at a very simple but potentially useful example, filtering. We have a function returning `logical`, for instance the following:
-```{r predicate}
-```
-with the goal of dropping all records for which `predicate` returns `FALSE`. This particular one is handy for this document because it works equally well for the vectorized and unvectorized cases. The plain-vanilla implementation is the following:
-```{r filter}
-```
-For the vectorized version, we want to switch on vectorization for the map function and in `keyval` at the same time. Remember, this is not always the case, it makes sense when we have small records in and small records out. The speedup here is about 3X.
-```{r filter-vec}
-```
-This is the first case where we can show what changes by turning on the `structured` option. On the map side it makes sense only in conjunction with `vectorized` or otherwise with only one row of data there isn't much to turn into a data frame. On the reduce side the `structured` option is equivalent to the now deprecated `reduce.on.data.frame`, which turns the second argument of the reduce function into a data frame, before it is evaluated. In this case we have a single column data frame, so it is not particularly interesting, but it is possible.
-```{r filter-vec-struct}
-```
-
-### Select Columns
-In this example we want to select specific elements of each record or columns, if we are dealing with structured data. We need to generate slightly more complex input data so that this operation makes sense.
-```{r select-input}
-```
-The selection function takes slightly different forms for the unvectorized and vectorized cases. Here we are picking the second column just for illustration purposes.
-```{r select-fun}
-```
-```{r select-fun-vec}
-```
-In the latter case we do not have an option to refer to the column by name, as in `do.call(rbind,v)[,'b']`. Unfortunately names are not preserved by the simplified serialization method used when vectorization is on, a shortcoming we plan to address in the future.
-In the structured case we don't even need a function to perform a column selection, just an index number.
-```{r select-fun-vec-struct}
-```
-As usual, we start showing the plain-vanilla implementation. Nothing major to report here.
-```{r select}
-```
-In the vectorized version, we turn on vectorization both in input and output and we switch to a vectorized selection function. The speed up is 5X.
-```{r select-vec}
-```
-As selecting a column is a typical operation that is well defined and natural on structured data, in the structured version we don't even need a selection function, just a column number. Unfortunately column names are lost in the current implementation, something we would like to address in the future.
-```{r select-vec-struct}
-```
-
-### Big Sum
-We now move on to the first example including a reduce. It's an extreme case of data reduction as our only goal is to perform a large sum. Let's start by generating some data.
-```{r bigsum-input}
-```
-This the plain-vanilla implementation. Turning on the combiner when possible is always recommended, but in this case it is mandatory: without it the reduce process would likely run out of memory.
-```{r bigsum}
-```
-In its vectorized form, this program applies an additional trick, which is to start summing in the map function, which becomes an early reduce of sorts. In fact, it would be possible to use the same function for both map and reduce. Like a combine, this early reduction happens locally near the data but, in addition, it doesn't require the data to be serialized and unserialized in between. It's an extreme application of the _mantram_ "reduce early, reduce often", for a speed gain in excess of 6X.
-```{r bigsum-vec}
-```
-In the structured version we rely on the implicit conversion to data frame to save a couple of `unlist` calls, for a cleaner look.
-```{r bigsum-vec-struct}
-```
-
-### Group and Aggregate
-This is an example of a more realistic aggregation on a user-defined number of groups expressed in a more generic form with `group` and `aggregate` functions. First let's generate some data.
-```{r group-aggregate-input}
-```
-Then pick specific group and aggregate functions to make the example fully specified and runnable. For simplicity's sake, these are written to work in all cases, plain, vectorized and structured. Some further optimizations are possible.
-```{r group-aggregate-functions}
-```
-What this means is that we are again calculating sums of numbers, but this time we are going to have a separate sum for each of 100 different groups. Let's start with the plain vanilla one.
-```{r group-aggregate}
-```
-In the vectorized version we could again apply the trick of in-map aggregation, but it wouldn't buy us as much as in the previous example. The speedup here is 2.5X
-```{r group-aggregate-vec}
-```
-Finally, the structured version to complete these test cases.
-```{r group-aggregate-vec-struct}
-```
View
387 rmr/pkg/docs/introduction-to-vectorized-API.html
@@ -1,387 +0,0 @@
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
-"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
-<!-- saved from url=(0014)about:internet -->
-<html>
-<head>
-<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
-
-<title>Introduction to the <code>vectorized</code> and <code>structured</code> options in <code>rmr</code> 1.3</title>
-
-<base target="_blank"/>
-
-<style type="text/css">
-body, td {
- font-family: sans-serif;
- background-color: white;
- font-size: 12px;
- margin: 8px;
-}
-
-tt, code, pre {
- font-family: 'DejaVu Sans Mono', 'Droid Sans Mono', 'Lucida Console', Consolas, Monaco, monospace;
-}
-
-h1 {
- font-size:2.2em;
-}
-
-h2 {
- font-size:1.8em;
-}
-
-h3 {
- font-size:1.4em;
-}
-
-h4 {
- font-size:1.0em;
-}
-
-h5 {
- font-size:0.9em;
-}
-
-h6 {
- font-size:0.8em;
-}
-
-a:visited {
- color: rgb(50%, 0%, 50%);
-}
-
-pre {
- margin-top: 0;
- max-width: 95%;
- border: 1px solid #ccc;
-}
-
-pre code {
- display: block; padding: 0.5em;
-}
-
-code.r {
- background-color: #F8F8F8;
-}
-
-table, td, th {
- border: none;
-}
-
-blockquote {
- color:#666666;
- margin:0;
- padding-left: 1em;
- border-left: 0.5em #EEE solid;
-}
-
-hr {
- height: 0px;
- border-bottom: none;
- border-top-width: thin;
- border-top-style: dotted;
- border-top-color: #999999;
-}
-
-@media print {
- * {
- background: transparent !important;
- color: black !important;
- filter:none !important;
- -ms-filter: none !important;
- }
-
- body {
- font-size:12pt;
- max-width:100%;
- }
-
- a, a:visited {
- text-decoration: underline;
- }
-
- hr {
- visibility: hidden;
-<