From be5de6a42e50c42b4af15b87623bc7b49aecb353 Mon Sep 17 00:00:00 2001 From: NarineK Date: Fri, 6 May 2016 14:51:35 -0700 Subject: [PATCH 1/6] Add a wrapper for dapply(repartiition(col,...), ... ) --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 71 +++++++++++++++++++++++ R/pkg/R/generics.R | 4 ++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 21 +++++++ 4 files changed, 97 insertions(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 1432ab8a9d1ce..5b6fa71ab1861 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -60,6 +60,7 @@ exportMethods("arrange", "filter", "first", "freqItems", + "gapply", "group_by", "groupBy", "head", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 43c46b847446b..7cd3b1cc36cb9 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1214,6 +1214,77 @@ setMethod("dapply", dataFrame(sdf) }) +#' gapply +#' +#' Apply a function to each group of a DataFrame. The group is defined by an input +#' grouping column(s). +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column(s) of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' +#' Computes the arithmetic mean of `Sepal_Width` by grouping +#' on `Species`. Output the grouping value and the average. +#' +#' df <- createDataFrame (sqlContext, iris) +#' schema <- structType(structField("Species", "string"), structField("Avg", "double")) +#' df1 <- gapply( +#' df, +#' function(x) { +#' data.frame(x$Species[1], mean(x$Sepal_Width), stringsAsFactors = FALSE) +#' }, +#' schema, col=df$"Species") +#' collect(df1) +#' +#' Species Avg +#' ----------------- +#' virginica 2.974 +#' versicolor 2.770 +#' setosa 3.428 +#' +#' Fits linear models on iris dataset by grouping on the `Species` column and +#' using `Sepal_Length` as a target variable, `Sepal_Width`, `Petal_Length` +#' and `Petal_Width` as training features. +#' +#' df <- createDataFrame (sqlContext, iris) +#' schema <- structType(structField("(Intercept)", "double"), +#' structField("Sepal_Width", "double"), structField("Petal_Length", "double"), +#' structField("Petal_Width", "double")) +#' df1 <- gapply( +#' df, +#' function(x) { +#' m <- suppressWarnings(lm(Sepal_Length ~ +#' Sepal_Width + Petal_Length + Petal_Width, x)) +#' data.frame(t(coef(m))) +#' }, schema, df$"Species") +#' collect(df1) +#' +#'Result +#'--------- +#' Model (Intercept) Sepal_Width Petal_Length Petal_Width +#' 1 0.699883 0.3303370 0.9455356 -0.1697527 +#' 2 1.895540 0.3868576 0.9083370 -0.6792238 +#' 3 2.351890 0.6548350 0.2375602 0.2521257 +#' +#'} +setMethod("gapply", + signature(x = "SparkDataFrame", func = "function", schema = "structType", + col = "Column"), + function(x, func, schema, col, ...) { + repartitionedX <- repartition(x, col = col, ...) + dapply(repartitionedX, func, schema) + }) + ############################## RDD Map Functions ################################## # All of the following functions mirror the existing RDD map functions, # # but allow for use with DataFrames by first converting to an RRDD before calling # diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 8563be1e64983..73c1e23cf0cf7 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -450,6 +450,10 @@ setGeneric("covar_pop", function(col1, col2) {standardGeneric("covar_pop") }) #' @export setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") }) +#' @rdname gapply +#' @export +setGeneric("gapply", function(x, func, schema, col, ...) { standardGeneric("gapply") }) + #' @rdname summary #' @export setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 0f67bc2e331d1..ef0f48a94852c 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2082,6 +2082,27 @@ test_that("dapply() on a DataFrame", { expect_identical(expected, result) }) +test_that("gapply() on a DataFrame", { + df <- createDataFrame (sqlContext, iris) + schema <- structType(structField("Species", "string"), structField("Avg", "double")) + + # Groups by `Species` and computes the average on three R workers + df1 <- gapply( + df, + function(x) { + data.frame(x$Species[1], mean(x$Sepal_Width), stringsAsFactors = FALSE) + }, + schema, col = df$"Species") + actual <- collect(arrange(df1, "Species")) + + # Groups by `Species` and computes the average on one R worker + agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Species), FUN = mean)) + colnames(agg_local_df) <- c("Species", "Avg") + expected <- agg_local_df + + expect_identical(expected, actual) +}) + test_that("repartition by columns on DataFrame", { df <- createDataFrame ( sqlContext, From 30693c2b40ab459a9fe252a2d00595c8190f2094 Mon Sep 17 00:00:00 2001 From: NarineK Date: Fri, 6 May 2016 15:18:23 -0700 Subject: [PATCH 2/6] Small style issues in the doc --- R/pkg/R/DataFrame.R | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 7cd3b1cc36cb9..96f723983fefb 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1239,11 +1239,11 @@ setMethod("dapply", #' df <- createDataFrame (sqlContext, iris) #' schema <- structType(structField("Species", "string"), structField("Avg", "double")) #' df1 <- gapply( -#' df, -#' function(x) { -#' data.frame(x$Species[1], mean(x$Sepal_Width), stringsAsFactors = FALSE) -#' }, -#' schema, col=df$"Species") +#' df, +#' function(x) { +#' data.frame(x$Species[1], mean(x$Sepal_Width), stringsAsFactors = FALSE) +#' }, +#' schema, col=df$"Species") #' collect(df1) #' #' Species Avg @@ -1263,9 +1263,9 @@ setMethod("dapply", #' df1 <- gapply( #' df, #' function(x) { -#' m <- suppressWarnings(lm(Sepal_Length ~ +#' model <- suppressWarnings(lm(Sepal_Length ~ #' Sepal_Width + Petal_Length + Petal_Width, x)) -#' data.frame(t(coef(m))) +#' data.frame(t(coef(model))) #' }, schema, df$"Species") #' collect(df1) #' From 204a1053dabd74d39a25e725276e31bb3a592917 Mon Sep 17 00:00:00 2001 From: NarineK Date: Fri, 6 May 2016 16:27:42 -0700 Subject: [PATCH 3/6] Fixed the test case + added examples with multiple grouping columns --- R/pkg/R/DataFrame.R | 16 +++++++++++++--- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 96f723983fefb..5b11cb186e4ef 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -599,7 +599,7 @@ setMethod("unpersist", #'} setMethod("repartition", signature(x = "SparkDataFrame"), - function(x, numPartitions = NULL, col = NULL, ...) { + function(x, numPartitions = NULL, col, ...) { if (!is.null(numPartitions) && is.numeric(numPartitions)) { # number of partitions and columns both are specified if (!is.null(col) && class(col) == "Column") { @@ -1216,7 +1216,7 @@ setMethod("dapply", #' gapply #' -#' Apply a function to each group of a DataFrame. The group is defined by an input +#' Apply a function to each group of a DataFrame. The group is defined by input #' grouping column(s). #' #' @param x A SparkDataFrame @@ -1243,7 +1243,7 @@ setMethod("dapply", #' function(x) { #' data.frame(x$Species[1], mean(x$Sepal_Width), stringsAsFactors = FALSE) #' }, -#' schema, col=df$"Species") +#' schema, col = df$"Species") #' collect(df1) #' #' Species Avg @@ -1252,6 +1252,16 @@ setMethod("dapply", #' versicolor 2.770 #' setosa 3.428 #' +#' We can also have multiple grouping columns +#' schema <- structType(structField("Species", "string"), structField("Avg", "double")) +#' df1 <- gapply( +#' df, +#' function(x) { +#' data.frame(x$Species[1], sum(x$Sepal_Width), stringsAsFactors = FALSE) +#' }, +#' schema, col = df$"Species", col2 = df$"Petal_Length") +#' collect(df1) +#' #' Fits linear models on iris dataset by grouping on the `Species` column and #' using `Sepal_Length` as a target variable, `Sepal_Width`, `Petal_Length` #' and `Petal_Width` as training features. diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index ef0f48a94852c..e6472fb59cee8 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2083,7 +2083,7 @@ test_that("dapply() on a DataFrame", { }) test_that("gapply() on a DataFrame", { - df <- createDataFrame (sqlContext, iris) + df <- suppressWarnings(createDataFrame (sqlContext, iris)) schema <- structType(structField("Species", "string"), structField("Avg", "double")) # Groups by `Species` and computes the average on three R workers From 97049564433607544beef439ddce272f607298d9 Mon Sep 17 00:00:00 2001 From: NarineK Date: Fri, 6 May 2016 17:22:28 -0700 Subject: [PATCH 4/6] Bring back the modification in repartition --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 5b11cb186e4ef..867904a12058f 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -599,7 +599,7 @@ setMethod("unpersist", #'} setMethod("repartition", signature(x = "SparkDataFrame"), - function(x, numPartitions = NULL, col, ...) { + function(x, numPartitions = NULL, col = NULL, ...) { if (!is.null(numPartitions) && is.numeric(numPartitions)) { # number of partitions and columns both are specified if (!is.null(col) && class(col) == "Column") { From bf3a74d34b21eaa6c3d1422c1135658d9be58a8a Mon Sep 17 00:00:00 2001 From: NarineK Date: Fri, 6 May 2016 20:01:09 -0700 Subject: [PATCH 5/6] Fixing test case --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index e6472fb59cee8..49a8072df972a 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2096,9 +2096,10 @@ test_that("gapply() on a DataFrame", { actual <- collect(arrange(df1, "Species")) # Groups by `Species` and computes the average on one R worker - agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Species), FUN = mean)) + agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Species), FUN = mean), + stringsAsFactors = FALSE) colnames(agg_local_df) <- c("Species", "Avg") - expected <- agg_local_df + expected <- agg_local_df[order(agg_local_df$Species), ] expect_identical(expected, actual) }) From 057ff9b30e56de4172c957e467b9b35dd932999a Mon Sep 17 00:00:00 2001 From: NarineK Date: Fri, 6 May 2016 23:14:46 -0700 Subject: [PATCH 6/6] fixed factor data type issue --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 49a8072df972a..f739508a2d264 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2101,7 +2101,7 @@ test_that("gapply() on a DataFrame", { colnames(agg_local_df) <- c("Species", "Avg") expected <- agg_local_df[order(agg_local_df$Species), ] - expect_identical(expected, actual) + expect_identical(expected$Avg, actual$Avg) }) test_that("repartition by columns on DataFrame", {