Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-16272
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Jul 13, 2016
2 parents 392bddc + f376c37 commit b928a55
Show file tree
Hide file tree
Showing 440 changed files with 16,999 additions and 5,688 deletions.
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ exportMethods("arrange",
"first",
"freqItems",
"gapply",
"gapplyCollect",
"group_by",
"groupBy",
"head",
Expand Down Expand Up @@ -234,6 +235,7 @@ exportMethods("%in%",
"over",
"percent_rank",
"pmod",
"posexplode",
"quarter",
"rand",
"randn",
Expand Down
118 changes: 105 additions & 13 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ setMethod("dapplyCollect",

#' gapply
#'
#' Group the SparkDataFrame using the specified columns and apply the R function to each
#' Groups the SparkDataFrame using the specified columns and applies the R function to each
#' group.
#'
#' @param x A SparkDataFrame
Expand All @@ -1356,9 +1356,11 @@ setMethod("dapplyCollect",
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
#' The schema must match to output of `func`. It has to be defined for each
#' output column with preferred output column name and corresponding data type.
#' @return a SparkDataFrame
#' @family SparkDataFrame functions
#' @rdname gapply
#' @name gapply
#' @seealso \link{gapplyCollect}
#' @export
#' @examples
#'
Expand All @@ -1374,14 +1376,22 @@ setMethod("dapplyCollect",
#' columns with data types integer and string and the mean which is a double.
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' df1 <- gapply(
#' result <- gapply(
#' df,
#' list("a", "c"),
#' c("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' },
#' schema)
#' collect(df1)
#' }, schema)
#'
#' We can also group the data and afterwards call gapply on GroupedData.
#' For Example:
#' gdf <- group_by(df, "a", "c")
#' result <- gapply(
#' gdf,
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' }, schema)
#' collect(result)
#'
#' Result
#' ------
Expand All @@ -1399,16 +1409,16 @@ setMethod("dapplyCollect",
#' structField("Petal_Width", "double"))
#' df1 <- gapply(
#' df,
#' list(df$"Species"),
#' df$"Species",
#' function(key, x) {
#' m <- suppressWarnings(lm(Sepal_Length ~
#' Sepal_Width + Petal_Length + Petal_Width, x))
#' data.frame(t(coef(m)))
#' }, schema)
#' collect(df1)
#'
#'Result
#'---------
#' Result
#' ---------
#' Model (Intercept) Sepal_Width Petal_Length Petal_Width
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
Expand All @@ -1423,6 +1433,89 @@ setMethod("gapply",
gapply(grouped, func, schema)
})

#' gapplyCollect
#'
#' Groups the SparkDataFrame using the specified columns, applies the R function to each
#' group and collects the result back to R as data.frame.
#'
#' @param x A SparkDataFrame
#' @param cols Grouping columns
#' @param func A function to be applied to each group partition specified by grouping
#' column of the SparkDataFrame. The function `func` takes as argument
#' a key - grouping columns and a data frame - a local R data.frame.
#' The output of `func` is a local R data.frame.
#' @return a data.frame
#' @family SparkDataFrame functions
#' @rdname gapplyCollect
#' @name gapplyCollect
#' @seealso \link{gapply}
#' @export
#' @examples
#'
#' \dontrun{
#' Computes the arithmetic mean of the second column by grouping
#' on the first and third columns. Output the grouping values and the average.
#'
#' df <- createDataFrame (
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
#' c("a", "b", "c", "d"))
#'
#' result <- gapplyCollect(
#' df,
#' c("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' colnames(y) <- c("key_a", "key_c", "mean_b")
#' y
#' })
#'
#' We can also group the data and afterwards call gapply on GroupedData.
#' For Example:
#' gdf <- group_by(df, "a", "c")
#' result <- gapplyCollect(
#' gdf,
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' colnames(y) <- c("key_a", "key_c", "mean_b")
#' y
#' })
#'
#' Result
#' ------
#' key_a key_c mean_b
#' 3 3 3.0
#' 1 1 1.5
#'
#' Fits linear models on iris dataset by grouping on the 'Species' column and
#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
#' and 'Petal_Width' as training features.
#'
#' df <- createDataFrame (iris)
#' result <- gapplyCollect(
#' df,
#' df$"Species",
#' function(key, x) {
#' m <- suppressWarnings(lm(Sepal_Length ~
#' Sepal_Width + Petal_Length + Petal_Width, x))
#' data.frame(t(coef(m)))
#' })
#'
#' Result
#'---------
#' Model X.Intercept. Sepal_Width Petal_Length Petal_Width
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
#' 3 2.351890 0.6548350 0.2375602 0.2521257
#'
#'}
#' @note gapplyCollect(SparkDataFrame) since 2.0.0
setMethod("gapplyCollect",
signature(x = "SparkDataFrame"),
function(x, cols, func) {
grouped <- do.call("groupBy", c(x, cols))
gapplyCollect(grouped, func)
})

############################## RDD Map Functions ##################################
# All of the following functions mirror the existing RDD map functions, #
# but allow for use with DataFrames by first converting to an RRDD before calling #
Expand Down Expand Up @@ -2494,8 +2587,8 @@ setMethod("saveAsTable",

#' summary
#'
#' Computes statistics for numeric columns.
#' If no columns are given, this function computes statistics for all numerical columns.
#' Computes statistics for numeric and string columns.
#' If no columns are given, this function computes statistics for all numerical or string columns.
#'
#' @param x A SparkDataFrame to be computed.
#' @param col A string of name
Expand Down Expand Up @@ -2529,8 +2622,7 @@ setMethod("describe",
setMethod("describe",
signature(x = "SparkDataFrame"),
function(x) {
colList <- as.list(c(columns(x)))
sdf <- callJMethod(x@sdf, "describe", colList)
sdf <- callJMethod(x@sdf, "describe", list())
dataFrame(sdf)
})

Expand Down
10 changes: 8 additions & 2 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,14 @@ dropTempView <- function(viewName) {
#'
#' The data source is specified by the `source` and a set of options(...).
#' If `source` is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used.
#' "spark.sql.sources.default" will be used. \cr
#' Similar to R read.csv, when `source` is "csv", by default, a value of "NA" will be interpreted
#' as NA.
#'
#' @param path The path of files to load
#' @param source The name of external data source
#' @param schema The data schema defined in structType
#' @param na.strings Default string value for NA when source is "csv"
#' @return SparkDataFrame
#' @rdname read.df
#' @name read.df
Expand All @@ -735,7 +738,7 @@ dropTempView <- function(viewName) {
#' @name read.df
#' @method read.df default
#' @note read.df since 1.4.0
read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
if (!is.null(path)) {
Expand All @@ -744,6 +747,9 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
if (is.null(source)) {
source <- getDefaultSqlSource()
}
if (source == "csv" && is.null(options[["nullValue"]])) {
options[["nullValue"]] <- na.strings
}
if (!is.null(schema)) {
stopifnot(class(schema) == "structType")
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession, source,
Expand Down
17 changes: 17 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2934,3 +2934,20 @@ setMethod("sort_array",
jc <- callJStatic("org.apache.spark.sql.functions", "sort_array", x@jc, asc)
column(jc)
})

#' posexplode
#'
#' Creates a new row for each element with position in the given array or map column.
#'
#' @rdname posexplode
#' @name posexplode
#' @family collection_funcs
#' @export
#' @examples \dontrun{posexplode(df$c)}
#' @note posexplode since 2.1.0
setMethod("posexplode",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "posexplode", x@jc)
column(jc)
})
8 changes: 8 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ setGeneric("dapplyCollect", function(x, func) { standardGeneric("dapplyCollect")
#' @export
setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })

#' @rdname gapplyCollect
#' @export
setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") })

#' @rdname summary
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
Expand Down Expand Up @@ -1050,6 +1054,10 @@ setGeneric("percent_rank", function(x) { standardGeneric("percent_rank") })
#' @export
setGeneric("pmod", function(y, x) { standardGeneric("pmod") })

#' @rdname posexplode
#' @export
setGeneric("posexplode", function(x) { standardGeneric("posexplode") })

#' @rdname quarter
#' @export
setGeneric("quarter", function(x) { standardGeneric("quarter") })
Expand Down
93 changes: 40 additions & 53 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -196,64 +196,51 @@ createMethods()

#' gapply
#'
#' Applies a R function to each group in the input GroupedData
#'
#' @param x a GroupedData
#' @param func A function to be applied to each group partition specified by GroupedData.
#' The function `func` takes as argument a key - grouping columns and
#' a data frame - a local R data.frame.
#' The output of `func` is a local R data.frame.
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
#' The schema must match to output of `func`. It has to be defined for each
#' output column with preferred output column name and corresponding data type.
#' @return a SparkDataFrame
#' @param x A GroupedData
#' @rdname gapply
#' @name gapply
#' @export
#' @examples
#' \dontrun{
#' Computes the arithmetic mean of the second column by grouping
#' on the first and third columns. Output the grouping values and the average.
#'
#' df <- createDataFrame (
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
#' c("a", "b", "c", "d"))
#'
#' Here our output contains three columns, the key which is a combination of two
#' columns with data types integer and string and the mean which is a double.
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' df1 <- gapply(
#' df,
#' list("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' },
#' schema)
#' collect(df1)
#'
#' Result
#' ------
#' a c avg
#' 3 3 3.0
#' 1 1 1.5
#' }
#' @note gapply(GroupedData) since 2.0.0
setMethod("gapply",
signature(x = "GroupedData"),
function(x, func, schema) {
try(if (is.null(schema)) stop("schema cannot be NULL"))
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
sdf <- callJStatic(
"org.apache.spark.sql.api.r.SQLUtils",
"gapply",
x@sgd,
serialize(cleanClosure(func), connection = NULL),
packageNamesArr,
broadcastArr,
schema$jobj)
dataFrame(sdf)
if (is.null(schema)) stop("schema cannot be NULL")
gapplyInternal(x, func, schema)
})

#' gapplyCollect
#'
#' @param x A GroupedData
#' @rdname gapplyCollect
#' @name gapplyCollect
#' @export
#' @note gapplyCollect(GroupedData) since 2.0.0
setMethod("gapplyCollect",
signature(x = "GroupedData"),
function(x, func) {
gdf <- gapplyInternal(x, func, NULL)
content <- callJMethod(gdf@sdf, "collect")
# content is a list of items of struct type. Each item has a single field
# which is a serialized data.frame corresponds to one group of the
# SparkDataFrame.
ldfs <- lapply(content, function(x) { unserialize(x[[1]]) })
ldf <- do.call(rbind, ldfs)
row.names(ldf) <- NULL
ldf
})

gapplyInternal <- function(x, func, schema) {
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
sdf <- callJStatic(
"org.apache.spark.sql.api.r.SQLUtils",
"gapply",
x@sgd,
serialize(cleanClosure(func), connection = NULL),
packageNamesArr,
broadcastArr,
if (class(schema) == "structType") { schema$jobj } else { NULL })
dataFrame(sdf)
}

0 comments on commit b928a55

Please sign in to comment.