# Vectorized Execution in SparkR

First, enable R cell magic to execute R codes in Jupyter.

In [None]:
import rpy2.rinterface
%load_ext rpy2.ipython

After that, prepare data to use. In this simple benchmark, 100000 Records dataset is used.

In [None]:
import urllib.request
from zipfile import ZipFile
from io import BytesIO

url = "http://eforexcel.com/wp/wp-content/uploads/2017/07/100000-Records.zip"
ZipFile(BytesIO(urllib.request.urlopen(url).read())).extractall()

Initialize SparkR with enough memory and load libraries used for benchmarking. In this benchmark, it used 1 for shuffle and default parallelism to mimic the case when the large dataset is processed.

In [None]:
%%R
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.shuffle.partitions = 1,
                                  spark.default.parallelism = 1))
library(microbenchmark)
library(ggplot2)

Prepare R DataFrame to test from the data downloaded above.

In [None]:
%%R
df <- read.csv("100000 Records.csv")
print(paste("R DataFrame size (MB):", object.size(df, units = "MB")))
spark_df <- cache(createDataFrame(df))
num <- count(spark_df)  # trigger the count to make sure input DataFrame is cached.

## R DataFrame to Spark DataFrame

In [None]:
%%R
mbm <- microbenchmark("R to Spark DataFrame without Arrow optimization" = {
                        sparkR.session(master = "local[*]",
                                       sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "false"))
                        createDataFrame(df)
                      },
                      "R to Spark DataFrame with Arrow optimization" = {
                        sparkR.session(master = "local[*]",
                                       sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
                        createDataFrame(df)
                      }, times=1L)

# autoplot(mbm)
mbm

## Spark DataFrame to R DataFrame

In [None]:
%%R
mbm <- microbenchmark("Spark to R DataFrame without Arrow optimization" = {
                        sparkR.session(master = "local[*]",
                                       sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "false"))
                        collect(spark_df)
                      },
                      "Spark to R DataFrame with Arrow optimization" = {
                        sparkR.session(master = "local[*]",
                                       sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
                        collect(spark_df)
                      }, times=1L)

# autoplot(mbm)
mbm

## `dapply`

In [None]:
%%R
mbm <- microbenchmark("dapply without Arrow optimization" = {
                        sparkR.session(master = "local[*]",
                                       sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "false"))
                        count(dapply(spark_df, function(rdf) { rdf }, schema(spark_df)))
                      },
                      "dapply with Arrow optimization" = {
                        sparkR.session(master = "local[*]",
                                       sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
                        count(dapply(spark_df, function(rdf) { rdf }, schema(spark_df)))
                      }, times=1L)

# autoplot(mbm)
mbm

## `gapply`

In [None]:
%%R
mbm <- microbenchmark("gapply without Arrow optimization" = {
                        sparkR.session(master = "local[*]",
                                       sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "false"))
                        count(gapply(spark_df,
                                     "Month_of_Joining",
                                     function(key, group) { group }, schema(spark_df)))
                      },
                      "gapply with Arrow optimization" = {
                        sparkR.session(master = "local[*]",
                                       sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
                        count(gapply(spark_df,
                                     "Month_of_Joining",
                                     function(key, group) { group }, schema(spark_df)))
                      }, times=1L)

# autoplot(mbm)
mbm