Skip to content

Commit

Permalink
Merge in master
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Jul 16, 2015
2 parents aa5b093 + fec10f0 commit 100a39b
Show file tree
Hide file tree
Showing 602 changed files with 17,425 additions and 7,060 deletions.
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ help/*
html/*
INDEX
.lintr
gen-java.*
.*avpr
5 changes: 5 additions & 0 deletions R/install-dev.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
popd
8 changes: 6 additions & 2 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ LIB_DIR="$FWDIR/lib"

mkdir -p $LIB_DIR

pushd $FWDIR
pushd $FWDIR > /dev/null

# Generate Rd files if devtools is installed
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'

# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

popd
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR

popd > /dev/null
1 change: 0 additions & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,3 @@ Collate:
'serialize.R'
'sparkR.R'
'utils.R'
'zzz.R'
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ exportMethods("abs",
"atan",
"atan2",
"avg",
"between",
"cast",
"cbrt",
"ceiling",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,7 @@ setMethod("write.df",
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] = path
options[['path']] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
})
Expand Down
2 changes: 0 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
serializedFuncArr,
rdd@env$prev_serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
} else {
Expand All @@ -175,7 +174,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
rdd@env$prev_serializedMode,
serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
}
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ infer_type <- function(x) {
createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) {
if (is.data.frame(data)) {
# get the names of columns, they will be put into RDD
schema <- names(data)
if (is.null(schema)) {
schema <- names(data)
}
n <- nrow(data)
m <- ncol(data)
# get rid of factor type
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ connectBackend <- function(hostname, port, timeout = 6000) {

determineSparkSubmitBin <- function() {
if (.Platform$OS.type == "unix") {
sparkSubmitBinName = "spark-submit"
sparkSubmitBinName <- "spark-submit"
} else {
sparkSubmitBinName = "spark-submit.cmd"
sparkSubmitBinName <- "spark-submit.cmd"
}
sparkSubmitBinName
}
Expand Down
17 changes: 17 additions & 0 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ setMethod("substr", signature(x = "Column"),
column(jc)
})

#' between
#'
#' Test if the column is between the lower bound and upper bound, inclusive.
#'
#' @rdname column
#'
#' @param bounds lower and upper bounds
setMethod("between", signature(x = "Column"),
function(x, bounds) {
if (is.vector(bounds) && length(bounds) == 2) {
jc <- callJMethod(x@jc, "between", bounds[1], bounds[2])
column(jc)
} else {
stop("bounds should be a vector of lower and upper bounds")
}
})

#' Casts the column to a different data type.
#'
#' @rdname column
Expand Down
1 change: 1 addition & 0 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# Int -> integer
# String -> character
# Boolean -> logical
# Float -> double
# Double -> double
# Long -> double
# Array[Byte] -> raw
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,10 @@ setGeneric("asc", function(x) { standardGeneric("asc") })
#' @export
setGeneric("avg", function(x, ...) { standardGeneric("avg") })

#' @rdname column
#' @export
setGeneric("between", function(x, bounds) { standardGeneric("between") })

#' @rdname column
#' @export
setGeneric("cast", function(x, dataType) { standardGeneric("cast") })
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ setMethod("count",
setMethod("agg",
signature(x = "GroupedData"),
function(x, ...) {
cols = list(...)
cols <- list(...)
stopifnot(length(cols) > 0)
if (is.character(cols[[1]])) {
cols <- varargsToEnv(...)
Expand All @@ -97,7 +97,7 @@ setMethod("agg",
if (!is.null(ns)) {
for (n in ns) {
if (n != "") {
cols[[n]] = alias(cols[[n]], n)
cols[[n]] <- alias(cols[[n]], n)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ setMethod("partitionBy",
serializedHashFuncBytes,
getSerializedMode(x),
packageNamesArr,
as.character(.sparkREnv$libname),
broadcastArr,
callJMethod(jrdd, "classTag"))

Expand Down
1 change: 1 addition & 0 deletions R/pkg/R/schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ structField.character <- function(x, type, nullable = TRUE) {
}
options <- c("byte",
"integer",
"float",
"double",
"numeric",
"character",
Expand Down
10 changes: 0 additions & 10 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

.sparkREnv <- new.env()

sparkR.onLoad <- function(libname, pkgname) {
.sparkREnv$libname <- libname
}

# Utility function that returns TRUE if we have an active connection to the
# backend and FALSE otherwise
connExists <- function(env) {
Expand Down Expand Up @@ -80,7 +76,6 @@ sparkR.stop <- function() {
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkPackages Character string vector of packages from spark-packages.org
#' @export
#' @examples
Expand All @@ -101,7 +96,6 @@ sparkR.init <- function(
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
sparkRLibDir = "",
sparkPackages = "") {

if (exists(".sparkRjsc", envir = .sparkREnv)) {
Expand Down Expand Up @@ -170,10 +164,6 @@ sparkR.init <- function(
sparkHome <- normalizePath(sparkHome)
}

if (nchar(sparkRLibDir) != 0) {
.sparkREnv$libname <- sparkRLibDir
}

sparkEnvirMap <- new.env()
for (varname in names(sparkEnvir)) {
sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
if (isInstanceOf(obj, "scala.Tuple2")) {
# JavaPairRDD[Array[Byte], Array[Byte]].

keyBytes = callJMethod(obj, "_1")
valBytes = callJMethod(obj, "_2")
keyBytes <- callJMethod(obj, "_1")
valBytes <- callJMethod(obj, "_2")
res <- list(unserialize(keyBytes),
unserialize(valBytes))
} else {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/profile/general.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

.First <- function() {
home <- Sys.getenv("SPARK_HOME")
.libPaths(c(file.path(home, "R", "lib"), .libPaths()))
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
.libPaths(c(packageDir, .libPaths()))
Sys.setenv(NOAWT=1)
}
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ context("functions on binary files")
# JavaSparkContext handle
sc <- sparkR.init()

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("saveAsObjectFile()/objectFile() following textFile() works", {
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ test_that("zipPartitions() on RDDs", {
expect_equal(actual,
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ test_that("zipRDD() on RDDs", {
expect_equal(actual,
list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

Expand Down Expand Up @@ -483,7 +483,7 @@ test_that("cartesian() on RDDs", {
actual <- collect(cartesian(rdd, emptyRdd))
expect_equal(actual, list())

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

Expand Down
38 changes: 38 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,32 @@ test_that("create DataFrame from RDD", {
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))

df <- jsonFile(sqlContext, jsonPathNa)
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
}, error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
insertInto(df, "people")
expect_equal(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"), c(16))
expect_equal(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"), c(176.5))

schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
df2 <- createDataFrame(sqlContext, df.toRDD, schema)
expect_equal(columns(df2), c("name", "age", "height"))
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5))

localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), height=c(164.10, 181.4, 173.7))
df <- createDataFrame(sqlContext, localDF, schema)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
expect_equal(columns(df), c("name", "age", "height"))
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df, df$name == "John")), c("John", 19, 164.10))
})

test_that("convert NAs to null type in DataFrames", {
Expand Down Expand Up @@ -612,6 +638,18 @@ test_that("column functions", {
c7 <- floor(c) + log(c) + log10(c) + log1p(c) + rint(c)
c8 <- sign(c) + sin(c) + sinh(c) + tan(c) + tanh(c)
c9 <- toDegrees(c) + toRadians(c)

df <- jsonFile(sqlContext, jsonPath)
df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
expect_equal(collect(df2)[[2, 1]], TRUE)
expect_equal(collect(df2)[[2, 2]], FALSE)
expect_equal(collect(df2)[[3, 1]], FALSE)
expect_equal(collect(df2)[[3, 2]], TRUE)

df3 <- select(df, between(df$name, c("Apache", "Spark")))
expect_equal(collect(df3)[[1, 1]], TRUE)
expect_equal(collect(df3)[[2, 1]], FALSE)
expect_equal(collect(df3)[[3, 1]], TRUE)
})

test_that("column binary mathfunctions", {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_textFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ context("the textFile() function")
# JavaSparkContext handle
sc <- sparkR.init()

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("textFile() on a local file returns an RDD", {
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ test_that("cleanClosure on R functions", {
# Test for overriding variables in base namespace (Issue: SparkR-196).
nums <- as.list(1:10)
rdd <- parallelize(sc, nums, 2L)
t = 4 # Override base::t in .GlobalEnv.
t <- 4 # Override base::t in .GlobalEnv.
f <- function(x) { x > t }
newF <- cleanClosure(f)
env <- environment(newF)
Expand Down
11 changes: 10 additions & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,17 @@ install_scala() {
# the environment
ZINC_PORT=${ZINC_PORT:-"3030"}

# Check for the `--force` flag dictating that `mvn` should be downloaded
# regardless of whether the system already has a `mvn` install
if [ "$1" == "--force" ]; then
FORCE_MVN=1
shift
fi

# Install Maven if necessary
MVN_BIN="$(command -v mvn)"

if [ ! "$MVN_BIN" ]; then
if [ ! "$MVN_BIN" -o -n "$FORCE_MVN" ]; then
install_mvn
fi

Expand All @@ -139,5 +146,7 @@ fi
# Set any `mvn` options if not already present
export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}

echo "Using \`mvn\` from path: $MVN_BIN"

# Last, call the `mvn` command as usual
${MVN_BIN} "$@"

0 comments on commit 100a39b

Please sign in to comment.