Skip to content

Commit

Permalink
Merge branch 'master' into pr4229
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jul 31, 2015
2 parents 126608a + 9307f56 commit 734db99
Show file tree
Hide file tree
Showing 1,061 changed files with 53,714 additions and 18,466 deletions.
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ help/*
html/*
INDEX
.lintr
gen-java.*
.*avpr
2 changes: 1 addition & 1 deletion R/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ SparkR is an R package that provides a light-weight frontend to use Spark from R

#### Build Spark

Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-PsparkR` profile to build the R package. For example to use the default Hadoop versions you can run
Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
```
build/mvn -DskipTests -Psparkr package
```
Expand Down
5 changes: 5 additions & 0 deletions R/install-dev.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
popd
8 changes: 6 additions & 2 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ LIB_DIR="$FWDIR/lib"

mkdir -p $LIB_DIR

pushd $FWDIR
pushd $FWDIR > /dev/null

# Generate Rd files if devtools is installed
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'

# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

popd
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR

popd > /dev/null
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Collate:
'client.R'
'context.R'
'deserialize.R'
'mllib.R'
'serialize.R'
'sparkR.R'
'utils.R'
'zzz.R'
7 changes: 7 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ export("sparkR.init")
export("sparkR.stop")
export("print.jobj")

# MLlib integration
exportMethods("glm",
"predict",
"summary")

# Job group lifecycle management methods
export("setJobGroup",
"clearJobGroup",
Expand All @@ -22,6 +27,7 @@ exportMethods("arrange",
"collect",
"columns",
"count",
"crosstab",
"describe",
"distinct",
"dropna",
Expand Down Expand Up @@ -77,6 +83,7 @@ exportMethods("abs",
"atan",
"atan2",
"avg",
"between",
"cast",
"cbrt",
"ceiling",
Expand Down
38 changes: 33 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ setMethod("except",
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("write.df",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand All @@ -1328,7 +1328,7 @@ setMethod("write.df",
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] = path
options[["path"]] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
})
Expand All @@ -1337,7 +1337,7 @@ setMethod("write.df",
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})
Expand Down Expand Up @@ -1375,8 +1375,8 @@ setMethod("saveDF",
#' saveAsTable(df, "myfile")
#' }
setMethod("saveAsTable",
signature(df = "DataFrame", tableName = 'character', source = 'character',
mode = 'character'),
signature(df = "DataFrame", tableName = "character", source = "character",
mode = "character"),
function(df, tableName, source = NULL, mode="append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand Down Expand Up @@ -1554,3 +1554,31 @@ setMethod("fillna",
}
dataFrame(sdf)
})

#' crosstab
#'
#' Computes a pair-wise frequency table of the given columns. Also known as a contingency
#' table. The number of distinct values for each column should be less than 1e4. At most 1e6
#' non-zero pair frequencies will be returned.
#'
#' @param col1 name of the first column. Distinct items will make the first item of each row.
#' @param col2 name of the second column. Distinct items will make the column names of the output.
#' @return a local R data.frame representing the contingency table. The first column of each row
#' will be the distinct values of `col1` and the column names will be the distinct values
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
#' occurrences will have zero as their counts.
#'
#' @rdname statfunctions
#' @export
#' @examples
#' \dontrun{
#' df <- jsonFile(sqlCtx, "/path/to/file.json")
#' ct = crosstab(df, "title", "gender")
#' }
setMethod("crosstab",
signature(x = "DataFrame", col1 = "character", col2 = "character"),
function(x, col1, col2) {
statFunctions <- callJMethod(x@sdf, "stat")
sct <- callJMethod(statFunctions, "crosstab", col1, col2)
collect(dataFrame(sct))
})
2 changes: 0 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
serializedFuncArr,
rdd@env$prev_serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
} else {
Expand All @@ -175,7 +174,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
rdd@env$prev_serializedMode,
serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
}
Expand Down
8 changes: 5 additions & 3 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ infer_type <- function(x) {
createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) {
if (is.data.frame(data)) {
# get the names of columns, they will be put into RDD
schema <- names(data)
if (is.null(schema)) {
schema <- names(data)
}
n <- nrow(data)
m <- ncol(data)
# get rid of factor type
Expand Down Expand Up @@ -455,7 +457,7 @@ dropTempTable <- function(sqlContext, tableName) {
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand Down Expand Up @@ -504,7 +506,7 @@ loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
createExternalTable <- function(sqlContext, tableName, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
dataFrame(sdf)
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/backend.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ invokeJava <- function(isStatic, objId, methodName, ...) {

# TODO: check the status code to output error information
returnStatus <- readInt(conn)
stopifnot(returnStatus == 0)
if (returnStatus != 0) {
stop(readString(conn))
}
readObject(conn)
}
6 changes: 3 additions & 3 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ connectBackend <- function(hostname, port, timeout = 6000) {

determineSparkSubmitBin <- function() {
if (.Platform$OS.type == "unix") {
sparkSubmitBinName = "spark-submit"
sparkSubmitBinName <- "spark-submit"
} else {
sparkSubmitBinName = "spark-submit.cmd"
sparkSubmitBinName <- "spark-submit.cmd"
}
sparkSubmitBinName
}
Expand All @@ -48,7 +48,7 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack
jars <- paste("--jars", jars)
}

if (packages != "") {
if (!identical(packages, "")) {
packages <- paste("--packages", packages)
}

Expand Down
17 changes: 17 additions & 0 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ setMethod("substr", signature(x = "Column"),
column(jc)
})

#' between
#'
#' Test if the column is between the lower bound and upper bound, inclusive.
#'
#' @rdname column
#'
#' @param bounds lower and upper bounds
setMethod("between", signature(x = "Column"),
function(x, bounds) {
if (is.vector(bounds) && length(bounds) == 2) {
jc <- callJMethod(x@jc, "between", bounds[1], bounds[2])
column(jc)
} else {
stop("bounds should be a vector of lower and upper bounds")
}
})

#' Casts the column to a different data type.
#'
#' @rdname column
Expand Down
5 changes: 3 additions & 2 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# Int -> integer
# String -> character
# Boolean -> logical
# Float -> double
# Double -> double
# Long -> double
# Array[Byte] -> raw
Expand Down Expand Up @@ -101,11 +102,11 @@ readList <- function(con) {

readRaw <- function(con) {
dataLen <- readInt(con)
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
readBin(con, raw(), as.integer(dataLen), endian = "big")
}

readRawLen <- function(con, dataLen) {
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
readBin(con, raw(), as.integer(dataLen), endian = "big")
}

readDeserialize <- function(con) {
Expand Down
29 changes: 21 additions & 8 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
# @rdname aggregateRDD
# @seealso reduce
# @export
setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
setGeneric("aggregateRDD",
function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })

# @rdname cache-methods
# @export
Expand Down Expand Up @@ -58,6 +59,10 @@ setGeneric("count", function(x) { standardGeneric("count") })
# @export
setGeneric("countByValue", function(x) { standardGeneric("countByValue") })

# @rdname statfunctions
# @export
setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") })

# @rdname distinct
# @export
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
Expand Down Expand Up @@ -249,8 +254,10 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")

# @rdname intersection
# @export
setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") })
setGeneric("intersection",
function(x, other, numPartitions = 1) {
standardGeneric("intersection")
})

# @rdname keys
# @export
Expand Down Expand Up @@ -484,9 +491,7 @@ setGeneric("sample",
#' @rdname sample
#' @export
setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) {
standardGeneric("sample_frac")
})
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })

#' @rdname saveAsParquetFile
#' @export
Expand Down Expand Up @@ -548,8 +553,8 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn

#' @rdname withColumnRenamed
#' @export
setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
standardGeneric("withColumnRenamed") })
setGeneric("withColumnRenamed",
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })


###################### Column Methods ##########################
Expand All @@ -566,6 +571,10 @@ setGeneric("asc", function(x) { standardGeneric("asc") })
#' @export
setGeneric("avg", function(x, ...) { standardGeneric("avg") })

#' @rdname column
#' @export
setGeneric("between", function(x, bounds) { standardGeneric("between") })

#' @rdname column
#' @export
setGeneric("cast", function(x, dataType) { standardGeneric("cast") })
Expand Down Expand Up @@ -656,3 +665,7 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
#' @rdname column
#' @export
setGeneric("upper", function(x) { standardGeneric("upper") })

#' @rdname glm
#' @export
setGeneric("glm")
4 changes: 2 additions & 2 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ setMethod("count",
setMethod("agg",
signature(x = "GroupedData"),
function(x, ...) {
cols = list(...)
cols <- list(...)
stopifnot(length(cols) > 0)
if (is.character(cols[[1]])) {
cols <- varargsToEnv(...)
Expand All @@ -97,7 +97,7 @@ setMethod("agg",
if (!is.null(ns)) {
for (n in ns) {
if (n != "") {
cols[[n]] = alias(cols[[n]], n)
cols[[n]] <- alias(cols[[n]], n)
}
}
}
Expand Down
Loading

0 comments on commit 734db99

Please sign in to comment.