Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-13576
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Mar 14, 2016
2 parents fda639b + 9a1680c commit 4e3d390
Show file tree
Hide file tree
Showing 282 changed files with 2,432 additions and 893 deletions.
5 changes: 3 additions & 2 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ Collate:
'schema.R'
'generics.R'
'jobj.R'
'RDD.R'
'pairRDD.R'
'column.R'
'group.R'
'RDD.R'
'pairRDD.R'
'DataFrame.R'
'SQLContext.R'
'backend.R'
Expand All @@ -36,3 +36,4 @@ Collate:
'stats.R'
'types.R'
'utils.R'
RoxygenNote: 5.0.1
16 changes: 8 additions & 8 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ setMethod("colnames<-",
}

# Check if the column names have . in it
if (any(regexec(".", value, fixed=TRUE)[[1]][1] != -1)) {
if (any(regexec(".", value, fixed = TRUE)[[1]][1] != -1)) {
stop("Colum names cannot contain the '.' symbol.")
}

Expand Down Expand Up @@ -351,7 +351,7 @@ setMethod("coltypes",
types <- sapply(dtypes(x), function(x) {x[[2]]})

# Map Spark data types into R's data types using DATA_TYPES environment
rTypes <- sapply(types, USE.NAMES=F, FUN=function(x) {
rTypes <- sapply(types, USE.NAMES = F, FUN = function(x) {
# Check for primitive types
type <- PRIMITIVE_TYPES[[x]]

Expand Down Expand Up @@ -1779,7 +1779,7 @@ setMethod("merge",
signature(x = "DataFrame", y = "DataFrame"),
function(x, y, by = intersect(names(x), names(y)), by.x = by, by.y = by,
all = FALSE, all.x = all, all.y = all,
sort = TRUE, suffixes = c("_x","_y"), ... ) {
sort = TRUE, suffixes = c("_x", "_y"), ... ) {

if (length(suffixes) != 2) {
stop("suffixes must have length 2")
Expand Down Expand Up @@ -2299,7 +2299,7 @@ setMethod("as.data.frame",
function(x, ...) {
# Check if additional parameters have been passed
if (length(list(...)) > 0) {
stop(paste("Unused argument(s): ", paste(list(...), collapse=", ")))
stop(paste("Unused argument(s): ", paste(list(...), collapse = ", ")))
}
collect(x)
})
Expand Down Expand Up @@ -2395,13 +2395,13 @@ setMethod("str",
# Get the first elements for each column

firstElements <- if (types[i] == "character") {
paste(paste0("\"", localDF[,i], "\""), collapse = " ")
paste(paste0("\"", localDF[, i], "\""), collapse = " ")
} else {
paste(localDF[,i], collapse = " ")
paste(localDF[, i], collapse = " ")
}

# Add the corresponding number of spaces for alignment
spaces <- paste(rep(" ", max(nchar(names) - nchar(names[i]))), collapse="")
spaces <- paste(rep(" ", max(nchar(names) - nchar(names[i]))), collapse = "")

# Get the short type. For 'character', it would be 'chr';
# 'for numeric', it's 'num', etc.
Expand All @@ -2413,7 +2413,7 @@ setMethod("str",
# Concatenate the colnames, coltypes, and first
# elements of each column
line <- paste0(" $ ", names[i], spaces, ": ",
dataType, " ",firstElements)
dataType, " ", firstElements)

# Chop off extra characters if this is too long
cat(substr(line, 1, MAX_CHAR_PER_ROW))
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,

setMethod("show", "RDD",
function(object) {
cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep=""))
cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep = ""))
})

setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ parallelize <- function(sc, coll, numSlices = 1) {
# TODO: bound/safeguard numSlices
# TODO: unit tests for if the split works for all primitives
# TODO: support matrix, data frame, etc
# nolint start
# suppress lintr warning: Place a space before left parenthesis, except in a function call.
if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) {
# nolint end
if (is.data.frame(coll)) {
message(paste("context.R: A data frame is parallelized by columns."))
} else {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ readMultipleObjects <- function(inputCon) {
# of the objects, so the number of objects varies, we try to read
# all objects in a loop until the end of the stream.
data <- list()
while(TRUE) {
while (TRUE) {
# If reaching the end of the stream, type returned should be "".
type <- readType(inputCon)
if (type == "") {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -607,15 +607,15 @@ setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr")

#' @rdname showDF
#' @export
setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
setGeneric("showDF", function(x, ...) { standardGeneric("showDF") })

# @rdname subset
# @export
setGeneric("subset", function(x, ...) { standardGeneric("subset") })

#' @rdname agg
#' @export
setGeneric("summarize", function(x,...) { standardGeneric("summarize") })
setGeneric("summarize", function(x, ...) { standardGeneric("summarize") })

#' @rdname summary
#' @export
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0,
standardize = TRUE, solver = "auto") {
family <- match.arg(family)
formula <- paste(deparse(formula), collapse="")
formula <- paste(deparse(formula), collapse = "")
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"fitRModelFormula", formula, data@sdf, family, lambda,
alpha, standardize, solver)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ writeJobj <- function(con, value) {
writeString <- function(con, value) {
utfVal <- enc2utf8(value)
writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1))
writeBin(utfVal, con, endian = "big", useBytes=TRUE)
writeBin(utfVal, con, endian = "big", useBytes = TRUE)
}

writeInt <- function(con, value) {
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ sparkR.init <- function(
if (!file.exists(path)) {
stop("JVM is not ready after 10 seconds")
}
f <- file(path, open="rb")
f <- file(path, open = "rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
rLibPath <- readString(f)
Expand Down Expand Up @@ -185,9 +185,9 @@ sparkR.init <- function(
}

sparkExecutorEnvMap <- convertNamedListToEnv(sparkExecutorEnv)
if(is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
if (is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
paste0("$LD_LIBRARY_PATH:", Sys.getenv("LD_LIBRARY_PATH"))
}

# Classpath separator is ";" on Windows
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ wrapInt <- function(value) {
# Multiply `val` by 31 and add `addVal` to the result. Ensures that
# integer-overflows are handled at every step.
mult31AndAdd <- function(val, addVal) {
vec <- c(bitwShiftL(val, c(4,3,2,1,0)), addVal)
vec <- c(bitwShiftL(val, c(4, 3, 2, 1, 0)), addVal)
Reduce(function(a, b) {
wrapInt(as.numeric(a) + as.numeric(b))
},
Expand Down Expand Up @@ -202,7 +202,7 @@ serializeToString <- function(rdd) {
# This function amortizes the allocation cost by doubling
# the size of the list every time it fills up.
addItemToAccumulator <- function(acc, item) {
if(acc$counter == acc$size) {
if (acc$counter == acc$size) {
acc$size <- acc$size * 2
length(acc$data) <- acc$size
}
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/profile/general.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
dirs <- strsplit(packageDir, ",")[[1]]
.libPaths(c(dirs, .libPaths()))
Sys.setenv(NOAWT=1)
Sys.setenv(NOAWT = 1)
}
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/packageInAJarTest.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ run2 <- myfunc(-4L)

sparkR.stop()

if(run1 != 6) quit(save = "no", status = 1)
if (run1 != 6) quit(save = "no", status = 1)

if(run2 != -3) quit(save = "no", status = 1)
if (run2 != -3) quit(save = "no", status = 1)
14 changes: 7 additions & 7 deletions R/pkg/inst/tests/testthat/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ sc <- sparkR.init()
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("saveAsObjectFile()/objectFile() following textFile() works", {
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
writeLines(mockFile, fileName1)

rdd <- textFile(sc, fileName1, 1)
Expand All @@ -37,7 +37,7 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
})

test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")

l <- list(1, 2, 3)
rdd <- parallelize(sc, l, 1)
Expand All @@ -49,8 +49,8 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
})

test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
writeLines(mockFile, fileName1)

rdd <- textFile(sc, fileName1)
Expand All @@ -73,8 +73,8 @@ test_that("saveAsObjectFile()/objectFile() following RDD transformations works",
})

test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")

rdd1 <- parallelize(sc, "Spark is pretty.")
saveAsObjectFile(rdd1, fileName1)
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/inst/tests/testthat/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test_that("union on two RDDs", {
actual <- collect(unionRDD(rdd, rdd))
expect_equal(actual, as.list(rep(nums, 2)))

fileName <- tempfile(pattern="spark-test", fileext=".tmp")
fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
writeLines(mockFile, fileName)

text.rdd <- textFile(sc, fileName)
Expand Down Expand Up @@ -74,10 +74,10 @@ test_that("zipPartitions() on RDDs", {
actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
func = function(x, y, z) { list(list(x, y, z))} ))
expect_equal(actual,
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
list(list(1, c(1, 2), c(1, 2, 3)), list(2, c(3, 4), c(4, 5, 6))))

mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
writeLines(mockFile, fileName)

rdd <- textFile(sc, fileName, 1)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_broadcast.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ nums <- 1:2
rrdd <- parallelize(sc, nums, 2L)

test_that("using broadcast variable", {
randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
randomMatBr <- broadcast(sc, randomMat)

useBroadcast <- function(x) {
Expand All @@ -37,7 +37,7 @@ test_that("using broadcast variable", {
})

test_that("without using broadcast variable", {
randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))

useBroadcast <- function(x) {
sum(randomMat * x)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ test_that("summary coefficients match with native glm of family 'binomial'", {
training <- filter(df, df$Species != "setosa")
stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
family = "binomial"))
coefs <- as.vector(stats$coefficients[,1])
coefs <- as.vector(stats$coefficients[, 1])

rTraining <- iris[iris$Species %in% c("versicolor","virginica"),]
rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
rCoefs <- as.vector(coef(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
family = binomial(link = "logit"))))

Expand Down
Loading

0 comments on commit 4e3d390

Please sign in to comment.