SparkR extension for closer to dplyr
R
Switch branches/tags
Nothing to show
Clone or download
hoxo-m Merge pull request #1 from saurfang/dplyr
Override dplyr generics instead
Latest commit 74adba9 Oct 30, 2015

README.md

SparkRext - SparkR extension for closer to dplyr

Koji MAKIYAMA (@hoxo_m)

1. Overview

Apache Spark is one of the hottest products in data science.
Spark 1.4.0 has formally adopted SparkR package which enables to handle Spark DataFrames on R.(See this article)

SparkR is very useful and powerful.
One of the reasons is that SparkR DataFrames present an API similar to dplyr.

For example:

df <- createDataFrame(sqlContext, iris)
df %>%
  select("Sepal_Length", "Species") %>%
  filter(df$Sepal_Length >= 5.5) %>%
  group_by(df$Species) %>%
  summarize(count = n(df$Sepal_Length), mean = mean(df$Sepal_Length)) %>%
  collect  

This is very cool. But I have a little discontent.

One of the reasons that dplyr is so much popular is the functions adopts NSE(non-standard evaluation).

library(dplyr)
iris %>%
  select(Sepal.Length, Species) %>%
  filter(Sepal.Length >= 5.5) %>%
  group_by(Species) %>%
  summarize(count = n(), mean = mean(Sepal.Length))
## Source: local data frame [3 x 3]
## 
##      Species count     mean
##       (fctr) (int)    (dbl)
## 1     setosa     5 5.640000
## 2 versicolor    44 6.050000
## 3  virginica    49 6.622449

It's very smart.
With NSE, you don't need to type quotations or names of DataFrame that the columns belong to.

The package SparkRext have been created to make SparkR be closer to dplyr.

library(SparkRext)
df <- createDataFrame(sqlContext, iris)
df %>%
  select(Sepal_Length, Species) %>%
  filter(Sepal_Length >= 5.5) %>%
  group_by(Species) %>%
  summarize(count = n(Sepal_Length), mean = mean(Sepal_Length)) %>%
  collect  
##      Species count     mean
## 1 versicolor    44 6.050000
## 2     setosa     5 5.640000
## 3  virginica    49 6.622449

SparkRext redefines the functions of SparkR to enable NSE inputs.
As a result, the functions will be able to be used in the same way as dplyr.

2. How to install

The source code for SparkRext package is available on GitHub at

You can install the package from there.

install.packages("devtools") # if you have not installed "devtools" package
devtools::install_github("hoxo-m/SparkRext")

3. Functions

SparkRext redefines six functions on SparkR.

  • filter()
  • select()
  • mutate()
  • arrange()
  • summarize()
  • group_by()

In this section, these funcions are explained.

For illustration, let’s prepare data.

library(dplyr)
library(nycflights13)

set.seed(123)
data <- sample_n(flights, 10000)

library(SparkRext)
prior_library(SparkRext)

df <- createDataFrame(sqlContext, data.frame(data))
df %>% head
##   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013    12  15     2124        -4     2322         1      UA  N801UA
## 2 2013     7  17      651        -9      936       -28      DL  N194DN
## 3 2013     3   2     1636         1     1800         0      WN  N475WN
## 4 2013     8  19     1058        -2     1203       -32      WN  N765SW
## 5 2013     9   9     1251        -9     1412         3      US  N963UW
## 6 2013     1  18     1259        -1     1556       -14      WN  N654SW
##   flight origin dest air_time distance hour minute
## 1    289    EWR  DTW       88      488   21     24
## 2    763    JFK  LAX      306     2475    6     51
## 3   1501    LGA  MKE      103      738   16     36
## 4     51    LGA  MDW      107      725   10     58
## 5   2148    LGA  BOS       38      184   12     51
## 6   2239    EWR  HOU      222     1411   12     59

3-1. filter()

filter() is used to extract rows that the conditions specified are satisfied.

df %>% filter(month == 12, day == 31) %>% head
##   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013    12  31     1155        -5     1257       -11      B6  N216JB
## 2 2013    12  31     2211        12      100        15      B6  N715JB
## 3 2013    12  31     1504         9     1620        -5      MQ  N501MQ
## 4 2013    12  31     2328        -2      412         3      B6  N651JB
## 5 2013    12  31     1922        -8     2116         1      MQ  N501MQ
## 6 2013    12  31      849        -1     1225        -3      B6  N834JB
##   flight origin dest air_time distance hour minute
## 1    316    JFK  SYR       50      209   11     55
## 2   1183    JFK  MCO      148      944   22     11
## 3   3425    JFK  DCA       52      213   15      4
## 4   1389    EWR  SJU      198     1608   23     28
## 5   3535    JFK  CMH       82      483   19     22
## 6     15    JFK  SFO      371     2586    8     49
df %>% filter(month == 12 | day == 31) %>% head
##   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013    12  15     2124        -4     2322         1      UA  N801UA
## 2 2013    12  30     2031        -4     2351         4      DL  N3743H
## 3 2013    12  16     1248        -4     1407        -4      EV  N11548
## 4 2013    12  27      928        28     1307        48      DL  N901DE
## 5 2013    12   7     1719       -10     2008         8      F9  N209FR
## 6 2013    12  10       NA        NA       NA        NA      EV  N717EV
##   flight origin dest air_time distance hour minute
## 1    289    EWR  DTW       88      488   21     24
## 2   2065    JFK  FLL      173     1069   20     31
## 3   6054    EWR  IAD       49      212   12     48
## 4   2446    LGA  FLL      186     1076    9     28
## 5    507    LGA  DEN      269     1620   17     19
## 6   5245    LGA  PIT       NA      335   NA     NA

Note that filter() of SparkR cannot accept multiple conditions at once.

3-2. select()

select() is used to extract columns specified.

df %>% select(year, month, day) %>% head
##   year month day
## 1 2013    12  15
## 2 2013     7  17
## 3 2013     3   2
## 4 2013     8  19
## 5 2013     9   9
## 6 2013     1  18

Continuous columns can be extracted using a colon :.

df %>% select(year:day) %>% head
##   year month day
## 1 2013    12  15
## 2 2013     7  17
## 3 2013     3   2
## 4 2013     8  19
## 5 2013     9   9
## 6 2013     1  18

You can use the minus sign - to extract columns with the exception of columns specified.

df %>% select(-year, -month, -day) %>% head
##   dep_time dep_delay arr_time arr_delay carrier tailnum flight origin dest
## 1     2124        -4     2322         1      UA  N801UA    289    EWR  DTW
## 2      651        -9      936       -28      DL  N194DN    763    JFK  LAX
## 3     1636         1     1800         0      WN  N475WN   1501    LGA  MKE
## 4     1058        -2     1203       -32      WN  N765SW     51    LGA  MDW
## 5     1251        -9     1412         3      US  N963UW   2148    LGA  BOS
## 6     1259        -1     1556       -14      WN  N654SW   2239    EWR  HOU
##   air_time distance hour minute
## 1       88      488   21     24
## 2      306     2475    6     51
## 3      103      738   16     36
## 4      107      725   10     58
## 5       38      184   12     51
## 6      222     1411   12     59

You can also extract columns by using column numbers.

df %>% select(1, 2, 3) %>% head
##   year month day
## 1 2013    12  15
## 2 2013     7  17
## 3 2013     3   2
## 4 2013     8  19
## 5 2013     9   9
## 6 2013     1  18

You can use the select utility functions in dplyr such as starts_with().

df %>% select(starts_with("arr")) %>% head
##   arr_time arr_delay
## 1     2322         1
## 2      936       -28
## 3     1800         0
## 4     1203       -32
## 5     1412         3
## 6     1556       -14

All select utility functions is below.

  • starts_with(match, ignore.case = TRUE)
  • ends_with(match, ignore.case = TRUE)
  • contains(match, ignore.case = TRUE)
  • matches(match, ignore.case = TRUE)
  • num_range(prefix, range, width = NULL)
  • one_of(...)
  • everything()

Note that select() of SparkR cannot accept a variety of input like this.

3-3. mutate()

mutate() is used to add new columns.

df %>% mutate(gain = arr_delay - dep_delay, speed = distance / air_time * 60) %>% head
##   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013    12  15     2124        -4     2322         1      UA  N801UA
## 2 2013     7  17      651        -9      936       -28      DL  N194DN
## 3 2013     3   2     1636         1     1800         0      WN  N475WN
## 4 2013     8  19     1058        -2     1203       -32      WN  N765SW
## 5 2013     9   9     1251        -9     1412         3      US  N963UW
## 6 2013     1  18     1259        -1     1556       -14      WN  N654SW
##   flight origin dest air_time distance hour minute gain    speed
## 1    289    EWR  DTW       88      488   21     24    5 332.7273
## 2    763    JFK  LAX      306     2475    6     51  -19 485.2941
## 3   1501    LGA  MKE      103      738   16     36   -1 429.9029
## 4     51    LGA  MDW      107      725   10     58  -30 406.5421
## 5   2148    LGA  BOS       38      184   12     51   12 290.5263
## 6   2239    EWR  HOU      222     1411   12     59  -13 381.3514

Note that mutate() of SparkR cannot accept multiple input at once.
Furthermore, mutate() of SparkR cannot also reuse columns added like below.

df %>% mutate(gain = arr_delay - dep_delay, gain_per_hour = gain/(air_time/60)) %>% head
##   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013    12  15     2124        -4     2322         1      UA  N801UA
## 2 2013     7  17      651        -9      936       -28      DL  N194DN
## 3 2013     3   2     1636         1     1800         0      WN  N475WN
## 4 2013     8  19     1058        -2     1203       -32      WN  N765SW
## 5 2013     9   9     1251        -9     1412         3      US  N963UW
## 6 2013     1  18     1259        -1     1556       -14      WN  N654SW
##   flight origin dest air_time distance hour minute gain gain_per_hour
## 1    289    EWR  DTW       88      488   21     24    5     3.4090909
## 2    763    JFK  LAX      306     2475    6     51  -19    -3.7254902
## 3   1501    LGA  MKE      103      738   16     36   -1    -0.5825243
## 4     51    LGA  MDW      107      725   10     58  -30   -16.8224299
## 5   2148    LGA  BOS       38      184   12     51   12    18.9473684
## 6   2239    EWR  HOU      222     1411   12     59  -13    -3.5135135

3-4. arrange()

arrange() is used to sort rows by columns specified.

df %>% arrange(month, day) %>% head
##   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013     1   1     1353        -4     1549        24      EV  N14105
## 2 2013     1   1     1832         4     2144         0      UA  N18220
## 3 2013     1   1      602        -3      821        16      MQ  N730MQ
## 4 2013     1   1     1416         5     1603        14      UA  N456UA
## 5 2013     1   1     1127        -2     1303        -6      EV  N14180
## 6 2013     1   1     2323        83       22        69      EV  N13538
##   flight origin dest air_time distance hour minute
## 1   4171    EWR  MSN      152      799   13     53
## 2   1075    EWR  SNA      342     2434   18     32
## 3   4401    LGA  DTW      105      502    6      2
## 4    683    EWR  ORD      136      719   14     16
## 5   4294    EWR  RDU       73      416   11     27
## 6   4257    EWR  BTV       44      266   23     23

It will be sorted in ascending order if you write just column names.
If you want to sort in descending order, you can use desc().

df %>% arrange(month, desc(day)) %>% head
##   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013     1  31     1424        -5     1752        -2      UA  N512UA
## 2 2013     1  31     1853        -2     2149         7      DL  N175DZ
## 3 2013     1  31      958        -2     1251       -30      UA  N464UA
## 4 2013     1  31     1448       105     1635       131      B6  N292JB
## 5 2013     1  31       NA        NA       NA        NA      US        
## 6 2013     1  31     1358        -7     1717        12      B6  N554JB
##   flight origin dest air_time distance hour minute
## 1    257    JFK  SFO      355     2586   14     24
## 2    951    JFK  ATL      129      760   18     53
## 3    499    EWR  SEA      324     2402    9     58
## 4     32    JFK  ROC       54      264   14     48
## 5   1625    LGA  CLT       NA      544   NA     NA
## 6     63    JFK  TPA      164     1005   13     58

You can also sort by values that are transformed from columns.

df %>% arrange(abs(dep_delay)) %>% head
##   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013     1  28       NA        NA       NA        NA      EV  N13969
## 2 2013     2   9       NA        NA       NA        NA      UA        
## 3 2013     1  25       NA        NA       NA        NA      US        
## 4 2013    12  10       NA        NA       NA        NA      EV  N717EV
## 5 2013    11  26       NA        NA       NA        NA      EV  N755EV
## 6 2013     7   1       NA        NA       NA        NA      MQ  N711MQ
##   flight origin dest air_time distance hour minute
## 1   4588    EWR  MHT       NA      209   NA     NA
## 2   1274    EWR  SJU       NA     1608   NA     NA
## 3   2138    LGA  BOS       NA      184   NA     NA
## 4   5245    LGA  PIT       NA      335   NA     NA
## 5   5048    LGA  RIC       NA      292   NA     NA
## 6   3388    LGA  CMH       NA      479   NA     NA

3-5. summarize()

summarize() is used to collapse a DataFrame to a single row.

df %>% summarize(count = n(year)) %>% collect
##   count
## 1 10000

Typically, summarize() is used with group_by() to collapse each group to a single row.

As far as I know, you can use the following functions in summarize().

  • n()
  • n_distinct()
  • approxCountDistinct()
  • mean()
  • first()
  • last()

It seems that other aggregate functions are available in Scala (See docs).

Like dplyr, you can use summarise() instead of simmarize().

3-6. group_by()

group_by() is used to describe how to break a DataFrame down into groups of rows.
Usually it is used with summarize() to collapse each group to a single row.

df %>% 
  group_by(tailnum) %>%
  summarize(mean_distance = mean(distance)) %>% 
  head
##   tailnum mean_distance
## 1  N8580A      245.6667
## 2  N554JB     1274.5000
## 3  N600LR      695.0000
## 4  N3HAAA     1075.0000
## 5  N871AS      617.0000
## 6  N77518      642.5000

You can indicate multiple colmuns.

df %>% 
  group_by(year, month, day) %>%
  summarize(count = n(year)) %>% 
  arrange(year, month, day) %>%
  head
##   year month day count
## 1 2013     1   1    25
## 2 2013     1   2    29
## 3 2013     1   3    24
## 4 2013     1   4    30
## 5 2013     1   5    16
## 6 2013     1   6    35

Unlike dplyr, only summarize() can receive the results of group_by().

4. How to use

To install SparkR 1.4.0, the next articles may be useful.

When you can load SparkR package, you will be also able to use SparkRext package.

# Preparation of data
library(dplyr)
library(nycflights13)
set.seed(123)
data <- sample_n(flights, 10000)

# Load library
library(SparkRext)

# Create Spark context and SQL context
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)

# Create DataFrame
df <- createDataFrame(sqlContext, data.frame(data))

# Forcing to use the functions of SparkRext
prior_library(SparkRext)

# Play with DataFrame
result <- df %>%
  select(year:day, flight, distance) %>%
  group_by(year, month, day) %>%
  summarize(flight_mean = mean(flight), distance_mean = mean(distance)) %>%
  filter(flight_mean >= 2000, distance_mean >= 1000) %>%
  arrange(year, month, day) %>%
  collect

# Print result
head(result)
##   year month day flight_mean distance_mean
## 1 2013     1  13    2271.043      1025.261
## 2 2013     1  24    2331.600      1092.850
## 3 2013     2  25    2055.471      1074.529
## 4 2013     3   5    2003.000      1003.448
## 5 2013     3   8    2167.138      1048.448
## 6 2013     3   9    2023.280      1168.400

5. Caution points

5-1. prior_library()

SparkRext has the functions with the same name as the functions of SparkR.
Therefore, SparkRext is sensitive to the order of loading of libraries.

In order to avoid confusion, there is prior_library() function.

prior_library(SparkRext)

By doing this, the functions of SparkRext will be called with the highest priority.
You can confirm this by checking the search path:

head(search())
## [1] ".GlobalEnv"           "package:SparkRext"    "package:SparkR"      
## [4] "package:nycflights13" "package:dplyr"        "package:stats"

If you want to switch to SparkR, you can do it.

prior_library(SparkR)
head(search())
## [1] ".GlobalEnv"           "package:SparkR"       "package:SparkRext"   
## [4] "package:nycflights13" "package:dplyr"        "package:stats"

You can also switch to dplyr.

prior_library(dplyr)
head(search())
## [1] ".GlobalEnv"           "package:dplyr"        "package:SparkR"      
## [4] "package:SparkRext"    "package:nycflights13" "package:stats"

5-2. Interoperability betwen SparkR and dplyr

The appeal of SparkR is operating a large scale dataset with familiar R syntax. However it would be a shame if we limit ourselves into relying on SparkR for all data manipulation. For example, R visualization is very powerful and easy to use. You might want to produce a medium size aggregated local data.frame using SparkR and proceed further slice and dice during charting.

For example, we can look average departure delay in a few different dimensions

library(ggplot2)

aggDF <- df %>%
  group_by(year, month, carrier, origin, dest) %>%
  summarise(n = n(dep_delay), dep_delay = mean(dep_delay)) %>%
  collect()

aggDF %>%
  group_by(period = as.Date(paste0(year, "-", month, "-01")), carrier) %>%
  summarise(total = n(), dep_delay = weighted.mean(dep_delay, n)) %>%
  # keep period/carrier with more than 20 observations
  filter(total > 20) %>%
  ggplot(aes(period, dep_delay, color = carrier)) +
  geom_line() +
  theme_bw()

aggDF %>%
  group_by(origin, dest) %>%
  summarise(total = n(), dep_delay = weighted.mean(dep_delay, n)) %>%
  filter(total > 30) %>%
  ggplot(aes(origin, dest, fill = dep_delay)) +
  geom_tile(colour = "white") +
  scale_fill_gradient(low = "white", high = "steelblue")

As you can see, the seamless transition from large dataframe to small dataframe can be very powerful. Data science is a not big data or small data endeavor. Having the same set of functions that allow us handle both end of the spectrum in the same project can deliver an really enjoyable experience.

6. Bug reports