- Spark is an implementation of the MapReduce programming paradigm that operates on in-memory data and allows data reuses across multiple computations.
- Performance of Spark is significantly better than its predecessor, Hadoop MapReduce. 
- Spark's primary data abstraction is called a Resilient Distributed Dataset (RDD):
    - Read-only, partitioned collection of records
    - Created (aka written) through deterministic operations on data:
        - Loading from stable storage
        - Transforming from other RDDs
        - Generating through coarse-grained operations such as map, join, filter ...
    - Do not need to be materialized at all times and are recoverable via **data lineage**

## 1. Getting Started

Spark stores data in memory. This memory space is represented by the variable **sc** (SparkContext). 

In [None]:
import sys
import os

sys.path.insert(0, '/spark/python')
sys.path.insert(0, '/spark/python/lib/py4j-0.10.7-src.zip')

os.environ['SPARK_HOME'] = '/spark'

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("spark://spark-master:7077")
conf.set("spark.driver.memory","1g")
conf.set("spark.executor.memory","1g")
conf.set("spark.num.executors","1")
conf.set("spark.executor.cores","1")

sc = pyspark.SparkContext(conf=conf)

In [None]:
sc

In [None]:
textFile = sc.textFile("/opt/spark-data/gutenberg-shakespeare.txt")

In [None]:
print (textFile)

## 2. What does Spark do with my data?

**Storage Level:**
- Does RDD use disk?
- Does RDD use memory?
- Does RDD use off-heap memory?
- Should an RDD be serialized (while persisting)?
- How many replicas (default: 1) to use (can only be less than 40)?

In [None]:
textFile.getStorageLevel()

In [None]:
textFile.getNumPartitions()

In [None]:
textFile.cache()

In [None]:
textFile.getStorageLevel()

- By default, each transformed RDD may be recomputed each time you run an action on it.
- It is also possible to *persist* RDD in memory using *persist()* or *cache()*
    - *persist()* allows you to specify level of storage for RDD
    - *cache()* only persists RDD in memory
    - To retire RDD from memory, *unpersist()* is called

## 3. WordCount

Data operations in Spark are categorized into two groups, *transformation* and *action*. 
- A *transformation* creates new dataset from existing data. Examples of *transformation* include map, filter, reduceByKey, and sort. 
- An *action* returns a value to the driver program (aka memory space of this notebook) after running a computation on the data set. Examples of *action* include count, collect, reduce, and save. 

"All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program." -- Spark Documentation

#### RDD Operations in Spark

**Transformations: **

- *map*(f: T -> U) : RDD[T] -> RDD[U]
- *filter*(f: T -> Bool) : RDD[T] -> RDD[T]
- *flatMap*(f: T -> Seq[U]) : RDD[T] -> RDD[U]
- *sample*(*fraction*: Float) : RDD[T] -> RDD[T] (deterministic sampling)
- *groupByKey*() : RDD[(K,V)] -> RDD[(K, Seq[V])]
- *reduceByKey*(f: (V,V) -> V) : RDD[(K,V)] -> RDD[(K,V)]
- *union*() : (RDD[T], RDD[T]) -> RDD[T]
- *join*() : (RDD[(K,V)], RDD[(K,W)]) -> RDD[(K,(V,W))]
- *cogroup*() : (RDD[(K,V)], RDD[(K,W)] -> RDD[(K, (Seq[V],Seq[W]))]
- *crossProduct*() : (RDD[T], RDD[U]) -> RDD[(T,U)]
- *mapValues*(f: V -> W) : RDD[(K,V)] -> RDD[(K,W)] (preserves partitioning)
- *sort*(c: Comparator[K]) :  RDD[(K,V)] -> RDD[(K,V)]
- *partitionBy*(p: Partitioner[K]) : RDD[(K,V)] -> RDD[(K,V)]

**Actions:**

- *count*() : RDD[T] -> Long
- *collect*() : RDD[T] -> Seq[T]
- *reduce*(f: (T,T) -> T) : RDD[T] -> T
- *lookup*(k : K) : RDD[(K,V)] -> Seq[V] (on hash/range partitionied RDDs)
- *save*(path: String) : Outputs RDD to a storage system 

In [None]:
%%time
textFile.count()

In [None]:
%%time
textFile.count()

In [None]:
wordcount = textFile.flatMap(lambda line: line.split(" ")) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b)

In [None]:
wordcount

In [None]:
wc_result = wordcount.collect()
wc_result

**Step-by-step actions:**

In [None]:
!head -n 100 /opt/spark-data/gutenberg-shakespeare.txt

In [None]:
wordcount_step_01 = textFile.flatMap(lambda line: line.split(" "))

In [None]:
wordcount_step_01.take(20)

In [None]:
wordcount_step_02 = wordcount_step_01.map(lambda word: (word, 1))

In [None]:
wordcount_step_02.take(20)

In [None]:
wordcount_step_03 = wordcount_step_02.reduceByKey(lambda a, b: a + b)

In [None]:
wordcount_step_03.take(20)

### Challenge

- Augment the mapping process of WordCount with a function to filter out punctuations and capitalization from the unique words
  - Hint: The string module is helpful for removing punctuation.
  - Make sure your solution supports Python version 3.

In [None]:
import string

In [None]:
# PUT YOUR SOLUTION HERE

To stop the Spark job, call `sc.stop()`

In [None]:
sc.stop()