Skip to content

Commit

Permalink
Merge pull request #2 from zsxwing/pr4229
Browse files Browse the repository at this point in the history
Pr4229
  • Loading branch information
prabeesh committed Aug 2, 2015
2 parents 126608a + abf5f18 commit 03f3e88
Show file tree
Hide file tree
Showing 1,117 changed files with 60,374 additions and 19,311 deletions.
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ help/*
html/*
INDEX
.lintr
gen-java.*
.*avpr
2 changes: 1 addition & 1 deletion R/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ SparkR is an R package that provides a light-weight frontend to use Spark from R

#### Build Spark

Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-PsparkR` profile to build the R package. For example to use the default Hadoop versions you can run
Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
```
build/mvn -DskipTests -Psparkr package
```
Expand Down
5 changes: 5 additions & 0 deletions R/install-dev.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
popd
8 changes: 6 additions & 2 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ LIB_DIR="$FWDIR/lib"

mkdir -p $LIB_DIR

pushd $FWDIR
pushd $FWDIR > /dev/null

# Generate Rd files if devtools is installed
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'

# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

popd
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR

popd > /dev/null
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Collate:
'client.R'
'context.R'
'deserialize.R'
'mllib.R'
'serialize.R'
'sparkR.R'
'utils.R'
'zzz.R'
15 changes: 15 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ export("sparkR.init")
export("sparkR.stop")
export("print.jobj")

# MLlib integration
exportMethods("glm",
"predict",
"summary")

# Job group lifecycle management methods
export("setJobGroup",
"clearJobGroup",
Expand All @@ -22,7 +27,9 @@ exportMethods("arrange",
"collect",
"columns",
"count",
"crosstab",
"describe",
"dim",
"distinct",
"dropna",
"dtypes",
Expand All @@ -39,11 +46,16 @@ exportMethods("arrange",
"isLocal",
"join",
"limit",
"merge",
"names",
"ncol",
"nrow",
"orderBy",
"mutate",
"names",
"persist",
"printSchema",
"rbind",
"registerTempTable",
"rename",
"repartition",
Expand All @@ -58,8 +70,10 @@ exportMethods("arrange",
"show",
"showDF",
"summarize",
"summary",
"take",
"unionAll",
"unique",
"unpersist",
"where",
"withColumn",
Expand All @@ -77,6 +91,7 @@ exportMethods("abs",
"atan",
"atan2",
"avg",
"between",
"cast",
"cbrt",
"ceiling",
Expand Down
154 changes: 149 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ setMethod("names",
columns(x)
})

#' @rdname columns
setMethod("names<-",
signature(x = "DataFrame"),
function(x, value) {
if (!is.null(value)) {
sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value)))
dataFrame(sdf)
}
})

#' Register Temporary Table
#'
#' Registers a DataFrame as a Temporary Table in the SQLContext
Expand Down Expand Up @@ -473,6 +483,18 @@ setMethod("distinct",
dataFrame(sdf)
})

#' @title Distinct rows in a DataFrame
#
#' @description Returns a new DataFrame containing distinct rows in this DataFrame
#'
#' @rdname unique
#' @aliases unique
setMethod("unique",
signature(x = "DataFrame"),
function(x) {
distinct(x)
})

#' Sample
#'
#' Return a sampled subset of this DataFrame using a random seed.
Expand Down Expand Up @@ -534,6 +556,58 @@ setMethod("count",
callJMethod(x@sdf, "count")
})

#' @title Number of rows for a DataFrame
#' @description Returns number of rows in a DataFrames
#'
#' @name nrow
#'
#' @rdname nrow
#' @aliases count
setMethod("nrow",
signature(x = "DataFrame"),
function(x) {
count(x)
})

#' Returns the number of columns in a DataFrame
#'
#' @param x a SparkSQL DataFrame
#'
#' @rdname ncol
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' ncol(df)
#' }
setMethod("ncol",
signature(x = "DataFrame"),
function(x) {
length(columns(x))
})

#' Returns the dimentions (number of rows and columns) of a DataFrame
#' @param x a SparkSQL DataFrame
#'
#' @rdname dim
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' dim(df)
#' }
setMethod("dim",
signature(x = "DataFrame"),
function(x) {
c(count(x), ncol(x))
})

#' Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.
#'
#' @param x A SparkSQL DataFrame
Expand Down Expand Up @@ -1205,6 +1279,15 @@ setMethod("join",
dataFrame(sdf)
})

#' rdname merge
#' aliases join
setMethod("merge",
signature(x = "DataFrame", y = "DataFrame"),
function(x, y, joinExpr = NULL, joinType = NULL, ...) {
join(x, y, joinExpr, joinType)
})


#' UnionAll
#'
#' Return a new DataFrame containing the union of rows in this DataFrame
Expand All @@ -1231,6 +1314,22 @@ setMethod("unionAll",
dataFrame(unioned)
})

#' @title Union two or more DataFrames
#
#' @description Returns a new DataFrame containing rows of all parameters.
#
#' @rdname rbind
#' @aliases unionAll
setMethod("rbind",
signature(... = "DataFrame"),
function(x, ..., deparse.level = 1) {
if (nargs() == 3) {
unionAll(x, ...)
} else {
unionAll(x, Recall(..., deparse.level = 1))
}
})

#' Intersect
#'
#' Return a new DataFrame containing rows only in both this DataFrame
Expand Down Expand Up @@ -1314,21 +1413,23 @@ setMethod("except",
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("write.df",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] = path
options[["path"]] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
})
Expand All @@ -1337,7 +1438,7 @@ setMethod("write.df",
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})
Expand Down Expand Up @@ -1375,18 +1476,20 @@ setMethod("saveDF",
#' saveAsTable(df, "myfile")
#' }
setMethod("saveAsTable",
signature(df = "DataFrame", tableName = 'character', source = 'character',
mode = 'character'),
signature(df = "DataFrame", tableName = "character", source = "character",
mode = "character"),
function(df, tableName, source = NULL, mode="append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)
Expand Down Expand Up @@ -1430,6 +1533,19 @@ setMethod("describe",
dataFrame(sdf)
})

#' @title Summary
#'
#' @description Computes statistics for numeric columns of the DataFrame
#'
#' @rdname summary
#' @aliases describe
setMethod("summary",
signature(x = "DataFrame"),
function(x) {
describe(x)
})


#' dropna
#'
#' Returns a new DataFrame omitting rows with null values.
Expand Down Expand Up @@ -1554,3 +1670,31 @@ setMethod("fillna",
}
dataFrame(sdf)
})

#' crosstab
#'
#' Computes a pair-wise frequency table of the given columns. Also known as a contingency
#' table. The number of distinct values for each column should be less than 1e4. At most 1e6
#' non-zero pair frequencies will be returned.
#'
#' @param col1 name of the first column. Distinct items will make the first item of each row.
#' @param col2 name of the second column. Distinct items will make the column names of the output.
#' @return a local R data.frame representing the contingency table. The first column of each row
#' will be the distinct values of `col1` and the column names will be the distinct values
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
#' occurrences will have zero as their counts.
#'
#' @rdname statfunctions
#' @export
#' @examples
#' \dontrun{
#' df <- jsonFile(sqlCtx, "/path/to/file.json")
#' ct = crosstab(df, "title", "gender")
#' }
setMethod("crosstab",
signature(x = "DataFrame", col1 = "character", col2 = "character"),
function(x, col1, col2) {
statFunctions <- callJMethod(x@sdf, "stat")
sct <- callJMethod(statFunctions, "crosstab", col1, col2)
collect(dataFrame(sct))
})
Loading

0 comments on commit 03f3e88

Please sign in to comment.