From 8fa19d1b42e47a6fab9e73cb30645aeace52a5de Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Sun, 5 Jun 2016 21:41:42 -0700 Subject: [PATCH 1/5] update --- docs/sparkr.md | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/docs/sparkr.md b/docs/sparkr.md index 961bd323fabcb..ee51385ffed13 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -262,6 +262,67 @@ head(df) {% endhighlight %} +### Applying User-defined Function + +#### dapply +Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function should be a `data.frame`. +
+{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame. + +df1 <- dapply(df, function(x) {x}, schema(df)) +head(collect(df1), 3) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 + +{% endhighlight %} +
+ +#### dapplyCollect +Like `dapply`, apply a function to each partition of `SparkDataFrame` and collect the result back. +
+{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame. +ldf <- dapplyCollect( + df, + function(x) { + x <- cbind(x, "waiting_secs"=x$waiting * 60) + }) +head(df, 3) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 + +{% endhighlight %} +
+ +#### lapply +Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark. +Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. +
+{% highlight r %} + +# Perform distributed training of multiple models with spark.lapply +families <- c("gaussian", "poisson") +train <- function(family) { + model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family) + summary(model) +} +model.summaries <- spark.lapply(sc, families, train) + +# Print the summary of each model +print(model.summaries) + +{% endhighlight %} +
+ ## Running SQL Queries from SparkR A SparkR DataFrame can also be registered as a temporary table in Spark SQL and registering a DataFrame as a table allows you to run SQL queries over its data. The `sql` function enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. From 920c975adf176a1cbce3f3631762073a7131f713 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Sat, 18 Jun 2016 17:23:13 -0700 Subject: [PATCH 2/5] address comments --- R/pkg/R/context.R | 2 +- docs/sparkr.md | 34 +++++++++++++++++++++++----------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 5c886030ff5c5..ef97f63205226 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -245,7 +245,7 @@ setCheckpointDir <- function(sc, dirName) { #' \preformatted{ #' train <- function(hyperparam) { #' library(MASS) -#' lm.ridge(“y ~ x+z”, data, lambda=hyperparam) +#' lm.ridge("y ~ x+z", data, lambda=hyperparam) #' model #' } #' } diff --git a/docs/sparkr.md b/docs/sparkr.md index ee51385ffed13..40c9a18e42526 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -263,38 +263,46 @@ head(df) ### Applying User-defined Function +In SparkR, we support several kinds for User-defined Functions: -#### dapply -Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function should be a `data.frame`. +#### Run a given function on a large dataset using `dapply` or `dapplyCollect` + +##### dapply +Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` +and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function +should be a `data.frame`. Schema specifies the row format of the resulting `SparkDataFrame`. It must match the R function's output.
{% highlight r %} # Convert waiting time from hours to seconds. # Note that we can apply UDF to DataFrame. - -df1 <- dapply(df, function(x) {x}, schema(df)) -head(collect(df1), 3) +schema <- structType(structField("eruptions", "double"), structField("waiting", "double"), + structField("waiting_secs", "double")) +df1 <- dapply(df, function(x) {x <- cbind(x, x$waiting * 60)}, schema) +head(collect(df1)) ## eruptions waiting waiting_secs ##1 3.600 79 4740 ##2 1.800 54 3240 ##3 3.333 74 4440 - +##4 2.283 62 3720 +##5 4.533 85 5100 +##6 2.883 55 3300 {% endhighlight %}
-#### dapplyCollect +##### dapplyCollect Like `dapply`, apply a function to each partition of `SparkDataFrame` and collect the result back.
{% highlight r %} # Convert waiting time from hours to seconds. -# Note that we can apply UDF to DataFrame. +# Note that we can apply UDF to DataFrame and return a R's data.frame ldf <- dapplyCollect( df, function(x) { x <- cbind(x, "waiting_secs"=x$waiting * 60) }) -head(df, 3) +head(ldf, 3) ## eruptions waiting waiting_secs ##1 3.600 79 4740 ##2 1.800 54 3240 @@ -303,18 +311,22 @@ head(df, 3) {% endhighlight %}
-#### lapply +#### Run many functions in parallel using `spark.lapply` + +##### lapply Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark. Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list.
{% highlight r %} -# Perform distributed training of multiple models with spark.lapply +# Perform distributed training of multiple models with spark.lapply. Here, we pass +# a read-only list of arguments which specifies family the generalized linear model should be. families <- c("gaussian", "poisson") train <- function(family) { model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family) summary(model) } +# Return a list of model's summaries model.summaries <- spark.lapply(sc, families, train) # Print the summary of each model From 3f2aea9e6c909ec271e146073865b13551d90c27 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Mon, 20 Jun 2016 11:55:15 -0700 Subject: [PATCH 3/5] change spark.lapply api according to #13752 --- docs/sparkr.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index 40c9a18e42526..20fa067abbb57 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -327,7 +327,7 @@ train <- function(family) { summary(model) } # Return a list of model's summaries -model.summaries <- spark.lapply(sc, families, train) +model.summaries <- spark.lapply(families, train) # Print the summary of each model print(model.summaries) From ae26233922c36e747244bd317719b9559a43d38f Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Tue, 21 Jun 2016 01:05:45 -0700 Subject: [PATCH 4/5] add comments --- docs/sparkr.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index 20fa067abbb57..a172c9f9a80a3 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -291,7 +291,9 @@ head(collect(df1))
##### dapplyCollect -Like `dapply`, apply a function to each partition of `SparkDataFrame` and collect the result back. +Like `dapply`, apply a function to each partition of `SparkDataFrame` and collect the result back. The output of function +should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the +output of UDF run on all the partitions can fit in driver memory.
{% highlight r %} @@ -315,7 +317,9 @@ head(ldf, 3) ##### lapply Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark. -Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. +Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. The results of all the computations +should fit in a single machine. If that is not the case they can do something like `df <- createDataFrame(list)` and then use +`dapply`
{% highlight r %} From 8d4f16354005be12b932287f01b44a1c99a56f5b Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Tue, 21 Jun 2016 13:36:35 -0700 Subject: [PATCH 5/5] fix comments --- docs/sparkr.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index a172c9f9a80a3..7910857d096fa 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -262,15 +262,15 @@ head(df) {% endhighlight %}
-### Applying User-defined Function -In SparkR, we support several kinds for User-defined Functions: +### Applying User-Defined Function +In SparkR, we support several kinds of User-Defined Functions: #### Run a given function on a large dataset using `dapply` or `dapplyCollect` ##### dapply -Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` +Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function -should be a `data.frame`. Schema specifies the row format of the resulting `SparkDataFrame`. It must match the R function's output. +should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match the R function's output.
{% highlight r %} @@ -291,7 +291,7 @@ head(collect(df1))
##### dapplyCollect -Like `dapply`, apply a function to each partition of `SparkDataFrame` and collect the result back. The output of function +Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the output of UDF run on all the partitions can fit in driver memory.
@@ -302,7 +302,7 @@ output of UDF run on all the partitions can fit in driver memory. ldf <- dapplyCollect( df, function(x) { - x <- cbind(x, "waiting_secs"=x$waiting * 60) + x <- cbind(x, "waiting_secs" = x$waiting * 60) }) head(ldf, 3) ## eruptions waiting waiting_secs @@ -313,9 +313,9 @@ head(ldf, 3) {% endhighlight %}
-#### Run many functions in parallel using `spark.lapply` +#### Run local R functions distributed using `spark.lapply` -##### lapply +##### spark.lapply Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark. Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. The results of all the computations should fit in a single machine. If that is not the case they can do something like `df <- createDataFrame(list)` and then use