# Testing Spark + `sparklyr` - part 2

## Table of Contents

- [Kickstart](#Kickstart)
- [Uploading the data in JSON format](#Uploading-the-data-in-JSON-format)
- [Install libraries in SparkR](#Install-libraries-in-SparkR)
- [Spark and R in Jupyter](#Spark-and-R-in-Jupyter)
    - [Configuring Spark and R](#Configuring-Spark-and-R)
    - [Loading spark context (RDDs)](#Loading-spark-context-(RDDs)
- [Reading JSON into Spark context: `reviews_Books_5.json`](#Reading-JSON-into-Spark-context:-reviews_Books_5.json)
    - [Prepare dataset](#Prepare-dataset)
    - [Split Data](#Split-Data)
    - [Binarize - dichotomize](#Binarize---dichotomize)
    - [Tokenize](#Tokenize)
- [Workaround: `bin_reviews` in PySpark](#Workaround:-bin_reviews-in-PySpark)
- [Returning to `sparklyr`](#Returning-to-sparklyr)
    - [Remove stop-words](#Remove-stop-words)
    - [Fit data](#Fit-data)
    - [Predict](#Predict)
    - [Plot data](#Plot-data)
- [Session](#Session)
    - [Paths recognised by sparkR](#Paths-recognised-by-sparkR)
    - [R session info](#R-session-info)
- [References](#References)

## Kickstart

See the [pyspark course](https://github.com/javicacheiro/pyspark_course/blob/master/unit_1_tools.ipynb) here.

## Uploading the data in JSON format 

First, we need to upload our data from our HOME to the HDFS home directory.  
- You can add a single file, or an entire folder.
- You can also list or delete files, folders.

    $ hdfs dfs -put /mnt/gluster/...../data/amazon/reviews_Books_5.json .
      # list data
      hdfs dfs -ls
      # deleting files, folders
      hdfs dfs -rm -r -f data/jscars.json

## Install libraries in SparkR

Before all, enter `sparkR` from your HOME directory:

    $ sparkR

Then, install the libraries as needed. When you finish, exit the interactive `sparkR` session.

    > install.packages(c("sparklyr", "dplyr", "knitr", "ggplot2", "repr", "tidytext", "tidyr", "purrr")
    > q()

## Spark and R in Jupyter

### Configuring Spark and R 

Add this variable to .bashrc to avoid setting `Sys.setenv` in `R`.

    $ cd $HOME
      export R_PROFILE_USER=/usr/hdp/2.4.2.0-258/spark/R/lib/SparkR/profile/shell.R
      source .bashrc

Set `R` environment variables (needed for Jupyter notebooks in the cluster `sparkR` installation).

In [2]:
# Sys.setenv(SPARK_HOME='/usr/hdp/2.4.2.0-258/spark') # commented after being added to .bashrc
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))

Load libraries:

In [4]:
x <- c("sparklyr", "dplyr", "knitr", "ggplot2", "repr", "tidytext", "tidyr", "purrr")
lapply(x, require, character.only = TRUE, quietly = TRUE)

### Loading spark context (RDDs)

Currently there are three types of contexts:

- Local context: 
    - Interactive. 
    - If the user exits session, the tasks are terminated (use `screen` to run after session close).
    - All processes reside in the LOGIN node (drivers and executors).
    - Can only be used for tasks that require very few resources.
- YARN-client: 
    - Interactive.
    - If the user exits session, the tasks are terminated (use `screen` to run after session close).
    - The driver resides in the LOGIN node, but the executors are in the CLUSTER nodes. Thus, executors can use all the memory available for the task in the CLUSTER nodes.
    - Can be used for memory-intensive tasks.
- YARN-cluster:
    - Not interactive.
    - Both the driver and the executors reside in the CLUSTER nodes.
    - Can be used for memory-intensive tasks.
    - Currenty doesn't seem available for this version of `R/sparklyr`.
    
Defining a new context (`sc`) overwrites the previous one.

In [5]:
# Initiating spark context: yarn (for loading bigger datasets)
sc <- spark_connect(master = "yarn-client", spark_home = "/usr/hdp/2.4.2.0-258/spark")

We can disable warnings with:

In [4]:
options(warn = -1)

## Reading JSON into Spark context: `reviews_Books_5.json`

We will use a [dataset of Amazon Product Data](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books_5.json.gz) [1] that contains 8.9M book reviews from Amazon, spanning May 1996 - July 2014.

Dataset characteristics:

- Number of reviews: 8.9M
- Size: 8.8GB (uncompressed)
- HDFS blocks: 70 (each with 3 replicas)

[1] Image-based recommendations on styles and substitutes J. McAuley, C. Targett, J. Shi, A. van den Hengel SIGIR, 2015 http://jmcauley.ucsd.edu/data/amazon/.  

I am translating into R the following tutorial: Sentiment analysis with Spark ML. [Material for Machine Learning Workshop Galicia 2016](http://nbviewer.jupyter.org/github/javicacheiro/machine_learning_galicia_2016/blob/master/notebooks/sentiment_analysis-amazon_books.ipynb).

In [6]:
books <- spark_read_json(sc, name = "books", path = "amazon/reviews_Books_5.json") 
# very big dataset

In [7]:
books %>%
  head(2) %>%
  collect()

asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
000100039X,"0, 0",5,Spiritually and mentally inspiring! A book that allows you to question your morals and will help you discover who you really are!,"12 16, 2012",A10000012B7CGYKOMPQ4L,Adam,Wonderful!,1355616000
000100039X,"0, 2",5,"This is one my must have books. It is a masterpiece of spirituality. I'll be the first to admit, its literary quality isn't much. It is rather simplistically written, but the message behind it is so powerful that you have to read it. It will take you to enlightenment.","12 11, 2003",A2S166WSCFIFP5,"adead_poet@hotmail.com ""adead_poet@hotmail.com""",close to god,1071100800


In [8]:
# sdf_schema(books)
print(sdf_schema(books))

$asin
$asin$name
[1] "asin"

$asin$type
[1] "StringType"


$helpful
$helpful$name
[1] "helpful"

$helpful$type
[1] "ArrayType(LongType,true)"


$overall
$overall$name
[1] "overall"

$overall$type
[1] "DoubleType"


$reviewText
$reviewText$name
[1] "reviewText"

$reviewText$type
[1] "StringType"


$reviewTime
$reviewTime$name
[1] "reviewTime"

$reviewTime$type
[1] "StringType"


$reviewerID
$reviewerID$name
[1] "reviewerID"

$reviewerID$type
[1] "StringType"


$reviewerName
$reviewerName$name
[1] "reviewerName"

$reviewerName$type
[1] "StringType"


$summary
$summary$name
[1] "summary"

$summary$type
[1] "StringType"


$unixReviewTime
$unixReviewTime$name
[1] "unixReviewTime"

$unixReviewTime$type
[1] "LongType"




Here I used the Hive function `rpad` to truncate the variable `reviewText` to 30 characters. This allows for a correct display of the table.  
See more Hive functions in the [**References**](#References) section below. And also [this](http://www.folkstalk.com/2011/11/string-functions-in-hive.html):

In [14]:
books %>%
  mutate(reviewText_trunc = as.character(rpad(reviewText, 30, '...'))) %>%
  select(reviewText_trunc, overall) %>% 
  head() %>%
  collect()

reviewText_trunc,overall
Spiritually and mentally inspi,5
This is one my must have books,5
This book provides a reflectio,5
I first read THE PROPHET in co,5
A timeless classic. It is a v,5
Reading this made my mind feel,5


In [11]:
books %>%
  group_by(overall) %>%
  count(overall, sort = TRUE) %>%
  collect() 

overall,n
5,4980815
4,2223094
3,955189
2,415110
1,323833


### Prepare dataset

We will avoid neutral reviews by keeping only reviews with 1 or 5 stars overall score. We will also filter out the reviews that contain no text.

In [9]:
reviews <- books %>%
  filter(overall == 1 | overall == 5) %>%
  filter(reviewText != '')

In [10]:
reviews %>%
  head(1) %>%
  collect()

asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
000100039X,"0, 0",5,Spiritually and mentally inspiring! A book that allows you to question your morals and will help you discover who you really are!,"12 16, 2012",A10000012B7CGYKOMPQ4L,Adam,Wonderful!,1355616000


We will use `cache` when the lineage of your RDD branches out or when an RDD is used multiple times like in a loop

In [17]:
# tbl_cache(sc, reviews, force = TRUE)
## tbl_uncache(sc, books)

In [10]:
reviews %>%
  count() %>%
  collect()

n
5304187


So far, so good.

### Binarize - dichotomize

We will convert the numerical covariate `overall` to binary (0/1) features.

In [11]:
bin_reviews <- ft_binarizer(reviews, threshold = 2.5, input.col = 'overall', output.col = 'label')

In [20]:
bin_reviews %>%
  select(reviewText, overall, label) %>% 
  head(1) %>%
  collect()

reviewText,overall,label
Spiritually and mentally inspiring! A book that allows you to question your morals and will help you discover who you really are!,5,1


In [13]:
bin_reviews %>%
  mutate(reviewText_trunc = as.character(rpad(reviewText, 30, '...'))) %>%
  select(reviewText_trunc, overall) %>% 
  head(3) %>%
  collect()

reviewText_trunc,overall
Spiritually and mentally inspi,5
This is one my must have books,5
This book provides a reflectio,5


### Tokenize

In [12]:
tokenized_reviews <- ft_tokenizer(bin_reviews, input.col = 'reviewText', output.col = 'word')

In [13]:
tokenized_reviews %>%
  select(reviewText, label, word) %>% 
  head(1) %>%
  collect()

reviewText,label,word
Spiritually and mentally inspiring! A book that allows you to question your morals and will help you discover who you really are!,1,"spiritually, and , mentally , inspiring! , a , book , that , allows , you , to , question , your , morals , and , will , help , you , discover , who , you , really , are!"


In [14]:
tokenized_reviews %>%
  select(word) %>% 
  head(1) 

# Source:   lazy query [?? x 1]
# Database: spark_connection
         word
       <list>
1 <list [22]>

Here we see that the variable `word` is a **list-column**. To process it we would have to use [`tidyr::unnest`](https://www.rdocumentation.org/packages/tidyr/versions/0.6.3/topics/unnest).  
However, working with R methods such as `unnest` implies that the dataset must be downloaded to memory, which is unfeasible given its size.  
Unfortunately, as of today there's no method for unnesting in `sparklyr` (see [here](https://campus.datacamp.com/courses/introduction-to-spark-in-r-using-sparklyr/going-native-use-the-native-interface-to-manipulate-spark-dataframes?ex=6)).  
As an example, you can see what `unnest` can do in R: 

In [3]:
token_reviews <- 2
  tokenized_reviews %>%
  selectselect(word) %>%
  head(10) %>%
  collect() %>%
  mutate(word = lapply(word, as.character)) %>%
  unnest(word)

In [22]:
token_reviews %>% 
  head(5)

word
spiritually
and
mentally
inspiring!
a


## Workaround: `bin_reviews` in PySpark

When a method is not present in `sparklyr`, the easiest solution is resorting to [**PySpark**](https://spark.apache.org/docs/0.9.0/python-programming-guide.html), which exposes the Spark (Scala) programming model to Python.  
A great tutorial to start is this [PySpark Course](https://github.com/javicacheiro/pyspark_course), by @javicacheiro.  
We will start feeding some data to PySpark, so first we must save our dataset `bin_reviews` in HDFS:

In [12]:
spark_write_json(bin_reviews, "amazon/bin_reviews.json", mode = NULL, options = list())
# spark_write_json(tokenized_reviews, "amazon/tokenized_reviews.json", mode = NULL, options = list())

Just to be sure, we can check if the database has been saved:

    $ hdfs dfs -ls amazon

Follow the PySpark transformations to the data, in the notebook [sparklyr_python.ipynb](./sparklyr_python.ipynb): 
- In PySpark, we will tokenize our `words` variable.  
- We will remove the stop words.
- We will 'flatten' or 'explode' our `words` column to have each word in its own row.
- Then, we'll reload the transformed dataset! (Thanks, pySpark!)

## Returning to `sparklyr`

Import the transformed data into `sparklyr` again: 

In [None]:
unnested_reviews <- spark_read_json(sc, name = "unnested_reviews", path = "amazon/unnested_reviews.json") 

### Remove stop-words

In [23]:
# data("stop_words")
spk_stop_words <- sdf_copy_to(sc, stop_words, "spk_stop_words", overwrite = TRUE)

In [70]:
spk_stop_words %>%
  head(5) %>%
  collect()

word,lexicon
a,SMART
a's,SMART
able,SMART
about,SMART
above,SMART


In [None]:
removed_reviews <- tokenized_reviews %>%
  anti_join(spk_stop_words, by="word") %>%
  head(2)

### Split Data

In [11]:
partitions <- reviews %>%
  sdf_partition(trainingData = 0.8, testData = 0.2)

In [15]:
partitions 

$trainingData
# Source:   table<sparklyr_tmp_730d6c85138b> [?? x 9]
# Database: spark_connection
         asin    helpful overall
        <chr>     <list>   <dbl>
 1 000100039X <list [2]>       5
 2 000100039X <list [2]>       5
 3 000100039X <list [2]>       5
 4 000100039X <list [2]>       5
 5 000100039X <list [2]>       5
 6 000100039X <list [2]>       5
 7 000100039X <list [2]>       5
 8 000100039X <list [2]>       5
 9 000100039X <list [2]>       5
10 000100039X <list [2]>       5
# ... with 4.243e+06 more rows, and 6 more variables: reviewText <chr>,
#   reviewTime <chr>, reviewerID <chr>, reviewerName <chr>, summary <chr>,
#   unixReviewTime <dbl>

$testData
# Source:   table<sparklyr_tmp_730d515d0327> [?? x 9]
# Database: spark_connection
         asin    helpful overall
        <chr>     <list>   <dbl>
 1 000100039X <list [2]>       5
 2 000100039X <list [2]>       5
 3 000100039X <list [2]>       5
 4 000100039X <list [2]>       5
 5 000100039X <list [2]>       5
 6 0001000

### Fit data

### Predict

### Plot data

## Session 

### Paths recognised by sparkR 

In [29]:
.libPaths()
Sys.getenv("R_HOME")

### R session info

In [86]:
sessionInfo()

R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

locale:
 [1] LC_CTYPE=es_ES.UTF-8       LC_NUMERIC=C              
 [3] LC_TIME=es_ES.UTF-8        LC_COLLATE=es_ES.UTF-8    
 [5] LC_MONETARY=es_ES.UTF-8    LC_MESSAGES=es_ES.UTF-8   
 [7] LC_PAPER=es_ES.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=es_ES.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] purrr_0.2.2.2  tidytext_0.1.3 repr_0.10      ggplot2_2.2.0  knitr_1.15.1  
[6] dplyr_0.7.1    sparklyr_0.5.6 SparkR_1.6.1  

loaded via a namespace (and not attached):
 [1] pbdZMQ_0.2-4      reshape2_1.4.2    lattice_0.20-34   colorspace_1.3-1 
 [5] htmltools_0.3.5   SnowballC_0.5.1   yaml_2.1.14       base64enc_0.1-3  
 [9] rlang_0.1.1       withr_1.0.2       foreign_0.8-67    glue_1.1.1       
[13]

## References 

- [`sparklyr` presentation](https://cdn.oreillystatic.com/en/assets/1/event/193/Sparklyr_%20An%20R%20interface%20for%20Apache%20Spark%20Presentation.pdf)
- [`sparklyr` tutorial](http://spark.rstudio.com/).
- [`sparklyr` cheatsheet](http://spark.rstudio.com/images/sparklyr-cheatsheet.pdf).
- [`sparklyr`: creating extensions](http://spark.rstudio.com/extensions.html).
- [Differences between `sparkr` and `sparklyr`](https://stackoverflow.com/questions/39494484/sparkr-vs-sparklyr)..
- [Hive Operators and UDFs](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF).