# Spark RDDs

## Success Criteria
Today I will be successful if I can...
- Define/Explain Big data
- Define what an RDD is by its properties and operations
- Explain Lazy Evaluation
- Explain the difference between Transformations and Actions on an RDD
- Implement .map(funct) and .filter(funct) transformations on an RDD
- Implement .collect() and .take(n) Actions on an RDD

# Key Concepts

## Spark is a tool for parallelized computation
- Highly efficient distributed operations
- Spark runs in memory and on disk
- Can be up to 100x faster than Hadoop MapReduce in memory, and 10x faster on disk.
- Spark keeps everything in memory when possible, uses lots of it.

<img src="images/spark_ecosystem.png" width="500">

## Parallelizing data storage: Resilient Distributed Datasets (RDD)

<img src="images/rdd_on_cluster.png" width="200" align="right">

[Image Source](http://horicky.blogspot.com/2015/02/big-data-processing-in-spark.html)

 
RDDs are the primary class introduced by Spark. It is a data container that allows for the construction of RDDs.

- **immutable** 
- **lazily evaluated**
- **cacheable**

## Parallelizing computation: a "functional programming paradigm" and DAGs

Spark provides many **transformation** functions. By programming these functions, you construct a **Directed Acyclic Graph** (DAG) of steps to execute the transformation.

<img src="images/dag.png">

When you use them, these functions are passed from the **client** to the **master**(The machine on which the Driver program runs), who then distributes them to workers, who apply them across their partitions of the RDD.


<!-- 
Remember the image from slides:

<img src="images/spark_components.png">
 -->

## Spark architecture : from your coding hands to the cluster

<img src="images/from_rdd_to_cluster.png">

References: 
 - https://trongkhoanguyen.com/spark/understand-rdd-operations-transformations-and-actions/ 

You construct your sequence of transformations in python.

Spark functional programming interface builds up a **DAG**

This DAG is sent by the **driver** for execution functional programming interface builds up a DAGto the **cluster manager**.

# Walkthrough: Spark/RDD in Python

<img src="images/spark_flow.png" width="500">

We'll proceed along the usual spark flow (see above).
1. create the environment to run spark from python
2. extract RDDs from files
3. run some transformations
4. execute actions to obtain values (local objects in python)

## 1.1. Initializing a `SparkSession`

IPython / IPython notebook can be a *client* to interact with the *master*.

The client will have a `SparkSession` that..

1. Acts as a gateway between the client and Spark master
2. Sends code/data from IPython to the master (who then sends it to the workers)

<img src="images/spark_driver_etc.png"/>

Using:

```python
import pyspark as ps

spark = ps.sql.SparkSession.builder \
            .master("local[2]") \
            .appName("df lecture") \
            .getOrCreate()
```

will create a *"local"* cluster made of the driver using all 4 cores.

In [128]:
import pyspark as ps    # for the pyspark suite

spark = ps.sql.SparkSession.builder \
            .master("local[2]") \
            .appName("Spark Session Intro") \
            .getOrCreate()

sc = spark.sparkContext 

In [2]:
sc.getConf().getAll()

[('spark.app.startTime', '1616300522023'),
 ('spark.driver.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.port', '42317'),
 ('spark.executor.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.app.name', 'Spark Session Intro'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', 'dc66c93859ea'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1616300523226'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[2]'),
 ('spark.sql.warehouse.dir',
  'file:/home/jovyan/work/Desktop/dsi/notes/spark-rdds/spark-warehouse')]

## 1.2. Creating an RDD

RDDs are **immutable**. Once created, you cannot modify them directly. You can only transform them into another RDD. 

Functions for creating an RDD from a|n external source are methods of the SparkContext object `sc`.

| Method | Description |
| - | - |
| [`sc.parallelize(array)`]() | Create an RDD from a python array or list |
| [`sc.textFile(path)`]() | Create an RDD from a text file |
| [`sc.pickleFile(path)`]() | Create an RDD from a pickle file |

### 1.2.1. Creating RDDs from local files

#### `sc.parallelize()` : create an RDD from a python array/list

In [19]:
# creating an adhoc list
data_array = [['matthew', 4],
              ['jorge', 8],
              ['josh', 15],
              ['evangeline', 16],
              ['emilie', 23],
              ['yunjin', 42]]

# reading the array/list using SparkContext
rdd = sc.parallelize(data_array)

In [20]:
rdd

ParallelCollectionRDD[5] at readRDDFromFile at PythonRDD.scala:274

In [21]:
# to output the content in python [irl, use with great care]
print(rdd.collect())
type(rdd.collect())

[['matthew', 4], ['jorge', 8], ['josh', 15], ['evangeline', 16], ['emilie', 23], ['yunjin', 42]]


list

#### `sc.textFile()` : from a text file !

The import will give you an rdd made of **strings which are lines of the text file**.

In [7]:
ls spark_data/

aapl.csv                  input.txt       sales.json     [0m[01;34mtoy_data.pkl[0m/
airline-data-extract.csv  sales2.json.gz  sales.txt      toy_data.txt
cookie_data.txt           sales.csv       toy_dataB.txt


In [16]:
f = open('spark_data/toy_data.txt')
print(f.read())
type(f)

matthew,4
jorge,8
josh,15
evangeline,16
emilie,23
yunjin,42



_io.TextIOWrapper

In [22]:
# displaying the content of the file in stdout
with open('spark_data/toy_data.txt', 'r') as data:
    print(data.read(), type(data))

matthew,4
jorge,8
josh,15
evangeline,16
emilie,23
yunjin,42
 <class '_io.TextIOWrapper'>


In [23]:
# reading the file using SparkContext
rdd = sc.textFile('spark_data/toy_data.txt')

# to output the content in python [irl, use collect() with great care]
print(rdd.collect())
type(rdd.collect())

['matthew,4', 'jorge,8', 'josh,15', 'evangeline,16', 'emilie,23', 'yunjin,42']


list

#### <span style="color:red">`sc.pickleFile()` : from a HDFS pickle file

The import will give you an rdd composed of whatever table was stored into that file.</span>

In [25]:
%ls spark_data/toy_data.pkl

part-00000  part-00001  _SUCCESS


In [26]:
ls spark_data/toy_data.pkl/

part-00000  part-00001  _SUCCESS


In [27]:
# reading the file using SparkContext
rdd = sc.pickleFile('spark_data/toy_data.pkl')

# to output the content in python [irl, use with great care]
rdd.collect()

['matthew,4', 'jorge,8', 'josh,15', 'evangeline,16', 'emilie,23', 'yunjin,42']

### 1.2.2. Creating RDDs from S3

You can read from S3 too, but it takes some configuration to make it work.  See this [post](https://medium.com/@bogdan.cojocar/how-to-read-json-files-from-s3-using-pyspark-and-the-jupyter-notebook-275dcb27e124) as a guide.

In [29]:
# link to the S3 repository

link = 's3a://mortar-example-data/airline-data'
# rdd = sc.textFile(link)  # this won't work, but give it a shot

rdd = sc.textFile('spark_data/airline-data-extract.csv')

In [32]:
# rdd.collect()

In [33]:
rdd.count()

100

In [34]:
rdd.getNumPartitions()

2

## 1.3. Transformations : transforming an RDD into another

- They are **lazy**: Spark doesn't apply the transformation right away, it just builds on the **DAG**
- They transform an RDD into another RDD because RDD are **immutable**.
- They can be **wide** or **narrow** (whether they shuffle partitions or not).

<img src="images/rdd_narrow_vs_wide_transformations.png" width="400"/>
<h5><center>http://horicky.blogspot.com/2013/12/spark-low-latency-massively-parallel.html</center></h5>




In [35]:
data_array = [['matthew', 4],
              ['jorge', 8],
              ['josh', 15],
              ['evangeline', 16],
              ['emilie', 23],
              ['yunjin', 42],
              ['matthew', 7],
              ['jorge', 2],
              ['josh', 10],
              ['evangeline', 90],
              ['emilie', -4],
              ['yunjin', 65]]

another_rdd = sc.parallelize(data_array)

In [36]:
another_rdd.getNumPartitions()

2

In [37]:
another_rdd.glom().collect()

[[['matthew', 4],
  ['jorge', 8],
  ['josh', 15],
  ['evangeline', 16],
  ['emilie', 23],
  ['yunjin', 42]],
 [['matthew', 7],
  ['jorge', 2],
  ['josh', 10],
  ['evangeline', 90],
  ['emilie', -4],
  ['yunjin', 65]]]

In [38]:
another_rdd.collect()

[['matthew', 4],
 ['jorge', 8],
 ['josh', 15],
 ['evangeline', 16],
 ['emilie', 23],
 ['yunjin', 42],
 ['matthew', 7],
 ['jorge', 2],
 ['josh', 10],
 ['evangeline', 90],
 ['emilie', -4],
 ['yunjin', 65]]

In [39]:
#narrow transformation
another_rdd2 = another_rdd.filter(lambda x: x[1]%2==0)
another_rdd2.collect()

[['matthew', 4],
 ['jorge', 8],
 ['evangeline', 16],
 ['yunjin', 42],
 ['jorge', 2],
 ['josh', 10],
 ['evangeline', 90],
 ['emilie', -4]]

In [41]:
#take another look at the partitions
another_rdd2.glom().collect()

[[['matthew', 4], ['jorge', 8], ['evangeline', 16], ['yunjin', 42]],
 [['jorge', 2], ['josh', 10], ['evangeline', 90], ['emilie', -4]]]

In [42]:
#wide transformation
another_rdd3 = another_rdd.reduceByKey(max)
another_rdd3.collect()

[('josh', 15),
 ('evangeline', 90),
 ('matthew', 7),
 ('jorge', 8),
 ('emilie', 23),
 ('yunjin', 65)]

| Method | Type | Category | Description |
| - | - | - | - |
| [`.map(func)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map) | transformation | mapping | Return a new RDD by applying a function to each element of this RDD. |
| [`.flatMap(func)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.flatMap) | transformation | mapping | Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. |
| [`.filter(func)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.filter) | transformation | reduction |  Return a new RDD containing only the elements that satisfy a predicate. |
| [`.sample()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.sample) | transformation | reduction | Return a sampled subset of this RDD. |
| [`.distinct()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.distinct) | transformation | reduction |  Return a new RDD containing the distinct elements in this RDD. |
| [`.keys()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.keys) | transformation | `<k,v>` | Return an RDD with the keys of each tuple. |
| [`.values()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.values) | transformation | `<k,v>` | Return an RDD with the values of each tuple. |
| [`.join(rddB)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.join) | transformation | `<k,v>` | Return an RDD containing all pairs of elements with matching keys in self and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other. |
| [`.reduceByKey()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) | transformation | `<k,v>` | Merge the values for each key using an associative and commutative reduce function. |
| [`.groupByKey()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) | transformation | `<k,v>` | Merge the values for each key using non-associative operation, like mean. |
| [`.sortBy(keyfunc)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.sortBy) | transformation | sorting |  Sorts this RDD by the given keyfunc. |
| [`.sortByKey()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.sortByKey) | transformation | sorting/`<k,v>` | Sorts this RDD, which is assumed to consist of (key, value) pairs. |



### 1.3.1. Applying transformations and chaining them

Recall the spark flow:

<img src="images/spark_flow.png" width="500">

In the sequence below, we will in one sequence:
1. read an RDD from a text file
2. transform by applying `split`
3. transform by filtering
4. transform by casting some columns to their corresponding type.
5. use an action to output the results

Each transformation is a method of an RDD, and returns another RDD.

In [43]:
ls spark_data/

aapl.csv                  input.txt       sales.json     [0m[01;34mtoy_data.pkl[0m/
airline-data-extract.csv  sales2.json.gz  sales.txt      toy_data.txt
cookie_data.txt           sales.csv       toy_dataB.txt


In [49]:
# displaying the content of the file in stdout
with open('spark_data/sales.txt', 'r') as fin:
    print(fin.read())

#ID    Date           Store   State  Product    Amount
101    11/13/2014     100     WA     331        300.00
104    11/18/2014     700     OR     329        450.00
102    11/15/2014     203     CA     321        200.00
106    11/19/2014     202     CA     331        330.00
103    11/17/2014     101     WA     373        750.00
105    11/19/2014     202     CA     321        200.00



*Recall: Input functions, reading RDDs from files, are functions of the SparkContext.*

In [50]:
# reads a text file line by line
rdd1 = sc.textFile('spark_data/sales.txt')

rdd1.collect()

['#ID    Date           Store   State  Product    Amount',
 '101    11/13/2014     100     WA     331        300.00',
 '104    11/18/2014     700     OR     329        450.00',
 '102    11/15/2014     203     CA     321        200.00',
 '106    11/19/2014     202     CA     331        330.00',
 '103    11/17/2014     101     WA     373        750.00',
 '105    11/19/2014     202     CA     321        200.00']

In [51]:
# applies split() to each row
rdd2 = rdd1.map(lambda rowstr : rowstr.split())

rdd2.collect()

[['#ID', 'Date', 'Store', 'State', 'Product', 'Amount'],
 ['101', '11/13/2014', '100', 'WA', '331', '300.00'],
 ['104', '11/18/2014', '700', 'OR', '329', '450.00'],
 ['102', '11/15/2014', '203', 'CA', '321', '200.00'],
 ['106', '11/19/2014', '202', 'CA', '331', '330.00'],
 ['103', '11/17/2014', '101', 'WA', '373', '750.00'],
 ['105', '11/19/2014', '202', 'CA', '321', '200.00']]

In [52]:
rdd2.collect()[1][0]

'101'

In [57]:
# filters rows
rdd3 = rdd2.filter(lambda row: not row[0].startswith('#'))

rdd3.collect()

[['101', '11/13/2014', '100', 'WA', '331', '300.00'],
 ['104', '11/18/2014', '700', 'OR', '329', '450.00'],
 ['102', '11/15/2014', '203', 'CA', '321', '200.00'],
 ['106', '11/19/2014', '202', 'CA', '331', '330.00'],
 ['103', '11/17/2014', '101', 'WA', '373', '750.00'],
 ['105', '11/19/2014', '202', 'CA', '321', '200.00']]

In [58]:
def casting_function(row):
    id, date, store, state, product, amount = row
    return((int(id), date, int(store), state, int(product), float(amount)))

# applies casting_function to rows
rdd4 = rdd3.map(casting_function)

# shows the result
rdd4.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

**Now, let's see the canonical way to write that in Python...**

In [59]:
def casting_function(row):
    id, date, store, state, product, amount = row
    return((int(id), date, int(store), state, int(product), float(amount)))

rdd_sales = sc.textFile('spark_data/sales.txt')\
        .map(lambda rowstr : rowstr.split())\
        .filter(lambda row: not row[0].startswith('#'))\
        .map(casting_function)  

rdd_sales.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

<span style="color:black">From now on we'll rely on 2 rdds, names and sales</span>

In [60]:
# creating an adhoc list
data_array = [['matthew', 4],
              ['jorge', 8],
              ['josh', 15],
              ['evangeline', 16],
              ['emilie', 23],
              ['yunjin', 42]]

# reading the array/list using SparkContext
rdd_names = sc.parallelize(data_array)

# to output the content in python [irl, use with great care]
rdd_names.collect()

[['matthew', 4],
 ['jorge', 8],
 ['josh', 15],
 ['evangeline', 16],
 ['emilie', 23],
 ['yunjin', 42]]

In [61]:
def casting_function(row):
    id, date, store, state, product, amount = row
    return((int(id), date, int(store), state, int(product), float(amount)))

rdd_sales = sc.textFile('spark_data/sales.txt')\
        .map(lambda x : x.split())\
        .filter(lambda x: not x[0].startswith('#'))\
        .map(casting_function)

rdd_sales.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

### 1.3.2. Mapping

#### `.map(func)` : applying a function on every row

In [62]:
rdd_names.collect()

[['matthew', 4],
 ['jorge', 8],
 ['josh', 15],
 ['evangeline', 16],
 ['emilie', 23],
 ['yunjin', 42]]

In [66]:
# applying a lambda function to an rdd
rddout = rdd_names.map(lambda x : [x[0] , len(x[0])])
# print out the original rdd
print("before: {}".format(rdd_names.collect()))

# print out the new rdd generated
print("after: {}".format(rddout.collect()))

before: [['matthew', 4], ['jorge', 8], ['josh', 15], ['evangeline', 16], ['emilie', 23], ['yunjin', 42]]
after: [['matthew', 7], ['jorge', 5], ['josh', 4], ['evangeline', 10], ['emilie', 6], ['yunjin', 6]]


#### `.flatMap(func)` : applying a function on every row and flattening the resulting lists

In [67]:
# applying a lambda function to an rdd (because why not)
rddout = rdd_names.flatMap(lambda row : [row[0],len(row[0])])

# print out the original rdd
print("before: {}".format(rdd_names.collect()))

# print out the new rdd generated
print("after: {}".format(rddout.collect()))
# print()
# print(rdd_names.glom().collect())
# print(rddout.glom().collect())

before: [['matthew', 4], ['jorge', 8], ['josh', 15], ['evangeline', 16], ['emilie', 23], ['yunjin', 42]]
after: ['matthew', 7, 'jorge', 5, 'josh', 4, 'evangeline', 10, 'emilie', 6, 'yunjin', 6]


### 1.3.3. Row reduction

#### `.filter(func)`: filters an RDD using a function that returns boolean values

In [69]:
rdd_sales.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

In [70]:
# filtering an rdd
rddout = rdd_sales.filter(lambda row: (row[3] == 'CA'))

# print out the original rdd
print("before:")
for record in rdd_sales.collect():
    print(record)

# print out the new rdd generated
print("\nafter: ")
for record in rddout.collect():
    print(record)

before:
(101, '11/13/2014', 100, 'WA', 331, 300.0)
(104, '11/18/2014', 700, 'OR', 329, 450.0)
(102, '11/15/2014', 203, 'CA', 321, 200.0)
(106, '11/19/2014', 202, 'CA', 331, 330.0)
(103, '11/17/2014', 101, 'WA', 373, 750.0)
(105, '11/19/2014', 202, 'CA', 321, 200.0)

after: 
(102, '11/15/2014', 203, 'CA', 321, 200.0)
(106, '11/19/2014', 202, 'CA', 331, 330.0)
(105, '11/19/2014', 202, 'CA', 321, 200.0)


#### `.sample(withReplacement, fraction, seed)`: sampling an RDD !!  Very useful if you don't want to work with the whole dataset at first.

In [72]:
rdd_sales.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

In [79]:
# sampling an rdd
rddout = rdd_sales.sample(False, 0.55, 42)

# print out the original rdd
print("before:")
for record in rdd_sales.collect():
    print(record)

# print out the new rdd generated
print("\nafter: ")
for record in rddout.collect():
    print(record)

before:
(101, '11/13/2014', 100, 'WA', 331, 300.0)
(104, '11/18/2014', 700, 'OR', 329, 450.0)
(102, '11/15/2014', 203, 'CA', 321, 200.0)
(106, '11/19/2014', 202, 'CA', 331, 330.0)
(103, '11/17/2014', 101, 'WA', 373, 750.0)
(105, '11/19/2014', 202, 'CA', 321, 200.0)

after: 
(101, '11/13/2014', 100, 'WA', 331, 300.0)
(104, '11/18/2014', 700, 'OR', 329, 450.0)
(102, '11/15/2014', 203, 'CA', 321, 200.0)


#### `.distinct()`: obtaining distinct rows

In [80]:
# obtaining distinct values of the "state" column of rdd_sales
rddout = rdd_sales.map(lambda row: row[3])\
                    .distinct()

# print out the original rdd
print("before:")
for record in rdd_sales.collect():
    print(record)

# print out the new rdd generated
print("\nafter: ")
for record in rddout.collect():
    print(record)

before:
(101, '11/13/2014', 100, 'WA', 331, 300.0)
(104, '11/18/2014', 700, 'OR', 329, 450.0)
(102, '11/15/2014', 203, 'CA', 321, 200.0)
(106, '11/19/2014', 202, 'CA', 331, 330.0)
(103, '11/17/2014', 101, 'WA', 373, 750.0)
(105, '11/19/2014', 202, 'CA', 321, 200.0)

after: 
CA
WA
OR


### 1.3.4. Methods with a `<k,v>` paradigm

#### `.values()`: returns the values of a RDD made of `<k,v>` pairs

In [81]:
# applying a lambda function to an rdd (because why not)
rddout = rdd_names.values()

# print out the original rdd
print("before:")
for record in rdd_names.collect():
    print(record)

# print out the new rdd generated
print("\nafter: ")
for record in rddout.collect():
    print(record)

before:
['matthew', 4]
['jorge', 8]
['josh', 15]
['evangeline', 16]
['emilie', 23]
['yunjin', 42]

after: 
4
8
15
16
23
42


#### `.keys()`: returns the keys of a RDD made of `<k,v>` pairs

In [82]:
# applying a lambda function to an rdd (because why not)
rddout = rdd_names.keys()

# print out the original rdd
print("before:")
for record in rdd_names.collect():
    print(record)

# print out the new rdd generated
print("\nafter: ")
for record in rddout.collect():
    print(record)

before:
['matthew', 4]
['jorge', 8]
['josh', 15]
['evangeline', 16]
['emilie', 23]
['yunjin', 42]

after: 
matthew
jorge
josh
evangeline
emilie
yunjin


For data that's not an obvious key-value pair, you can get strange behavior:

In [84]:
rdd_sales.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

In [83]:
rddout_sales = rdd_sales.keys()
print("Keys of rdd_sales: {}".format(rddout_sales.collect()))

rddout_sales_values = rdd_sales.values()
print("\nValues of rdd_sales: {}".format(rddout_sales_values.collect()))
print("We are missing other columns of data.  To get them, we would need to transform the data into one column.")

Keys of rdd_sales: [101, 104, 102, 106, 103, 105]

Values of rdd_sales: ['11/13/2014', '11/18/2014', '11/15/2014', '11/19/2014', '11/17/2014', '11/19/2014']
We are missing other columns of data.  To get them, we would need to transform the data into one column.


#### `rddA.join(rddB)`: join another RDD

In [85]:
rdd_salesperstate = rdd_sales.map(lambda row: (row[3],row[5]))

print(rdd_salesperstate.collect())
# rdd_sales.collect()

[('WA', 300.0), ('OR', 450.0), ('CA', 200.0), ('CA', 330.0), ('WA', 750.0), ('CA', 200.0)]


In [88]:
# creating an adhoc list of managers for each state
data_array = [['CA', 'matthew'],
              ['OR', 'jorge'],
              ['WA','yunjin'],
              ['TX', 'emilie']]

# reading the array/list using SparkContext
rdd_managers = sc.parallelize(data_array)
rdd_managers.collect()

[['CA', 'matthew'], ['OR', 'jorge'], ['WA', 'yunjin'], ['TX', 'emilie']]

In [89]:
# to output the content in python [irl, use with great care]
rdd_salesperstate.join(rdd_managers).collect()

[('CA', (200.0, 'matthew')),
 ('CA', (330.0, 'matthew')),
 ('CA', (200.0, 'matthew')),
 ('WA', (300.0, 'yunjin')),
 ('WA', (750.0, 'yunjin')),
 ('OR', (450.0, 'jorge'))]

#### `.reduceByKey(func)`: reduce `v`s by their `k` by applying func (what ?)

The `func` here needs to be associative and commutative... can you guess why ?  
Commutative property: a + b = b + a  
Associative property: a + (b + c) = (a + b) + c

In [122]:
# creating an adhoc list
data_array = [['CA', 1],
              ['WA', 1],
              ['CA', 2],
              ['OR', 1],
              ['CA', 5],
              ['OR', 1],
             ['CA', 9],
             ['CA', 14]]

# reading the array/list using SparkContext
rdd = sc.parallelize(data_array)

# to output the content in python [irl, use with great care]
rdd.collect()

[['CA', 1],
 ['WA', 1],
 ['CA', 2],
 ['OR', 1],
 ['CA', 5],
 ['OR', 1],
 ['CA', 9],
 ['CA', 14]]

In [130]:
rdd.reduceByKey(lambda v1,v2 : v1+v2).collect()

[('CA', 31), ('WA', 1), ('OR', 2)]

In [136]:
rdd.reduceByKey(lambda v1,v2: [v1,v2]).collect()

[('CA', [[1, 2], [[5, 9], 14]]), ('WA', 1), ('OR', [1, 1])]

#### `.groupByKey(func)`: reduce `v`s by their `k` by applying func (again ?)

This can use any function non-commutative

In [97]:
# creating an adhoc list
data_array = [['CA', 1],
              ['WA', 1],
              ['CA', 2],
              ['OR', 1],
              ['CA', 5],
              ['OR', 1]]

# reading the array/list using SparkContext
rdd = sc.parallelize(data_array)

# to output the content in python [irl, use with great care]
rdd.collect()


[['CA', 1], ['WA', 1], ['CA', 2], ['OR', 1], ['CA', 5], ['OR', 1]]

In [140]:
def mean(args):
    key,iterator = args
    total = 0.0; count = 0
    for x in iterator:
        total += x; count += 1
    return key, total / count

keys = rdd.keys().distinct().collect()

means = rdd.groupByKey().map(mean).collect()

# for k, mn in zip(keys, means):
#     print(f"{k}: {mn:0.2f}")

means

[('CA', 6.2), ('WA', 1.0), ('OR', 1.0)]

### 1.3.5. Sorting methods

#### `.sortBy(keyfunc)`: sorting by the value of a function on rows

In [142]:
rdd_names.collect()

[['matthew', 4],
 ['jorge', 8],
 ['josh', 15],
 ['evangeline', 16],
 ['emilie', 23],
 ['yunjin', 42]]

In [143]:
# sorting by any function (because why not?)
rddout = rdd_names.sortBy(lambda row : (13-row[1])**2, ascending=True)
rddout.collect()

[['josh', 15],
 ['evangeline', 16],
 ['jorge', 8],
 ['matthew', 4],
 ['emilie', 23],
 ['yunjin', 42]]

In [144]:
# sorting by any function (because why not?)
# rddout = rdd_names.sortBy(lambda row : (13-row[1])**2, ascending=True)

#sorting by value descending
rddout = rdd_names.sortBy(lambda row : row[1], ascending=False)

# print out the original rdd
print(rdd_names.collect())

# print out the new rdd generated
print(rddout.collect())

[['matthew', 4], ['jorge', 8], ['josh', 15], ['evangeline', 16], ['emilie', 23], ['yunjin', 42]]
[['yunjin', 42], ['emilie', 23], ['evangeline', 16], ['josh', 15], ['jorge', 8], ['matthew', 4]]


#### `.sortByKey()`: sorting by key on a `<k,v>` RDD

In [145]:
# sorting k,v pairs by key
rddout = rdd_names.sortByKey(ascending=False)

# print out the original rdd
print(rdd_names.collect())

# print out the new rdd generated
print(rddout.collect())

[['matthew', 4], ['jorge', 8], ['josh', 15], ['evangeline', 16], ['emilie', 23], ['yunjin', 42]]
[('yunjin', 42), ('matthew', 4), ('josh', 15), ('jorge', 8), ('evangeline', 16), ('emilie', 23)]


## 1.4. Actions : turning your RDD into something else (local object)

Actions are specific methods of an RDD object, they are usually designed to transform an RDD into something else (a python object, or a statistic).

When used/executed in IPython or in a notebook, they **launch the processing of the DAG**. This is where Spark stops being **lazy**. This is where your script will take time to execute.

| Method | Type | Description |
| - | - | - |
| [`.collect()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) | action | Return a list that contains all of the elements in this RDD. Note that this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory. |
| [`.count()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.count) | action | Return the number of elements in this RDD. |
| [`.take(n)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take) | action | Take the first `n` elements of the RDD. |
| [`.top(n)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.top) | action | Get the top `n` elements from a RDD. It returns the list sorted in descending order. |
| [`.first()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.first) | action | Return the first element in a RDD. |
| [`.sum()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.sum) | action | Add up the elements in this RDD. |
| [`.mean()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mean) | action | Compute the mean of this RDD’s elements. |
| [`.stdev()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.stdev) | action | Compute the standard deviation of this RDD’s elements. |

In [146]:
# creating an adhoc list
data_array = [['matthew', 4],
              ['jorge', 8],
              ['josh', 15],
              ['evangeline', 16],
              ['emilie', 23],
              ['yunjin', 42]]

# reading the array/list using SparkContext
rdd_names = sc.parallelize(data_array)

### 1.4.1. Actions that return portions of an RDD

#### `.collect()` : returning the *full* content of an RDD to "python space"

Returns the rows of an RDD as a list. Can be a bad idea if your RDD is gigantic, cause `.collect()` will return everything and put it in memory for python to process.

In [147]:
# to output the content in python
collected = rdd_names.collect()

# let's check the type of RDD
print("type of rdd: {}".format(type(rdd_names)))

# let's check the type of what's collected
print("type of rdd_collected: {}".format(type(collected)))

# let's print the collected content
print(collected)

type of rdd: <class 'pyspark.rdd.RDD'>
type of rdd_collected: <class 'list'>
[['matthew', 4], ['jorge', 8], ['josh', 15], ['evangeline', 16], ['emilie', 23], ['yunjin', 42]]


#### `.take(n)` : returning (any) n lines of an RDD

Returns `n` the rows of an RDD as a list. These `n` are not randomly selected. They are Spark's own internal mechanism for obtaining the lines that can be collected first.

In [148]:
rdd_names.collect()

[['matthew', 4],
 ['jorge', 8],
 ['josh', 15],
 ['evangeline', 16],
 ['emilie', 23],
 ['yunjin', 42]]

In [151]:
# to output the content in python
taken = rdd_names.take(3)

# let's check the type of what's collected
print("type of rdd_taken: {}".format(type(taken)))

# let's print the collected content
print(taken)

type of rdd_taken: <class 'list'>
[['matthew', 4], ['jorge', 8], ['josh', 15]]


#### `.first()` : returning the first line of an RDD

In [152]:
print(rdd_names.first())

['matthew', 4]


### 1.4.2. Actions that compute some statistics

#### `.count()` : count the number of lines

In [153]:
print(rdd_names.count())

6


#### `.sum()`: summing every line in an RDD

(The RDD needs to be containing summable values)

In [154]:
print(rdd_names.values().sum())

108


#### `.mean()`: averaging every line in an RDD

(The RDD needs to be containing summable values)

In [155]:
print(rdd_names.values().mean())

18.0


#### `.stdev()`: you get that right ?

In [156]:
print(rdd_names.values().stdev())

12.315302134607444


# 2. Let's design chains of transformations together !

## 2.1. Computing sales per state

### Input RDD

In [157]:
def casting_function(row):
    id, date, store, state, product, amount = row
    return((int(id), date, int(store), state, int(product), float(amount)))

rdd_sales = sc.textFile('spark_data/sales.txt')\
        .map(lambda x : x.split())\
        .filter(lambda x: not x[0].startswith('#'))\
        .map(casting_function)

rdd_sales.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

### Task

You want to obtain an RDD of the states sorted by their decreasing cumulated sales.

What transformations do you need to apply ?

If you had to draw a workflow of the transformations to apply ?

### Code

In [175]:
rdd_sales1 = rdd_sales.map(lambda row: [row[3], int(row[5])])
rdd_sales1.collect()

[['WA', 300], ['OR', 450], ['CA', 200], ['CA', 330], ['WA', 750], ['CA', 200]]

In [179]:
rdd_sales2 = rdd_sales1.reduceByKey(lambda v1,v2: v1+v2)
rdd_sales2.collect()

[('CA', 730), ('WA', 1050), ('OR', 450)]

In [181]:
rdd_sales2.sortBy(lambda row: row[1]).collect()

[('OR', 450), ('CA', 730), ('WA', 1050)]

In [183]:
#one method:
    #just pull out the state and amount
    #then get amounts summed per state
    #sort info by summed amount

rddout = rdd_sales.map(lambda row: [row[3], int(row[5])])\
                .reduceByKey(lambda v1,v2: v1+v2)\
                .sortBy(lambda row: row[1], ascending=False)

rddout.collect()

[('WA', 1050), ('CA', 730), ('OR', 450)]

### Solution


<details>
  <summary>Click here to see the solution below</summary>
```
rddout = rdd_sales.map(lambda x: (x[3],x[5]))\
    .reduceByKey(lambda amount1,amount2: amount1+amount2)\
    .sortBy(lambda state_amount:state_amount[1],ascending=False)

rddout.collect()
```
</details>


## 2.2. Word count

### Input RDD

In [186]:
# displaying the content of the file in stdout
with open('spark_data/input.txt', 'r') as fin:
    print(fin.read())

# reading the file using SparkContext
rdd_text = sc.textFile('spark_data/input.txt')
rdd_text.collect()

hello world
another line
yet another line
yet another another line



['hello world', 'another line', 'yet another line', 'yet another another line']

### Task
You want to create a table of unique words and their occurences.

What transformations do you need to apply ?

If you had to draw a workflow of the transformations to apply ?

### Code

In [245]:
rdd_text.flatMap(lambda row: row.split())\
        .map(lambda word: (word,1))\
        .reduceByKey(lambda v1,v2: v1+v2)\
        .sortBy(lambda v: v[1], ascending=False).collect()

[('another', 4), ('line', 3), ('yet', 2), ('world', 1), ('hello', 1)]

In [248]:
rddout = rdd_text.flatMap(lambda str : str.split())\
                    .map(lambda word: (word,1))\
                    .reduceByKey(lambda v1,v2: v1+v2)
rddout.collect()

[('world', 1), ('line', 3), ('yet', 2), ('hello', 1), ('another', 4)]

In [None]:
#hint: Get to here first
# [('hello', 1),
#  ('world', 1),
#  ('another', 1),
#  ('line', 1),
#  ('yet', 1),
#  ('another', 1),
#  ('line', 1),
#  ('yet', 1),
#  ('another', 1),
#  ('another', 1),
#  ('line', 1)]



rddout = rdd_text # put your transformations here...

rddout.collect()

### Solution

<details>
  <summary>Click here to see the solution below</summary>
```
rddout = rdd_text.flatMap(lambda str : str.split())\
            .map(lambda word: (word,1))\
            .reduceByKey(lambda v1,v2: v1+v2)

rddout.collect()
```
</details>

## 2.3. Find the date on which AAPL's stock price was the highest

### Input RDD

In [None]:
rdd_aapl_raw = sc.textFile('spark_data/aapl.csv')

print("lines in file: {}".format(rdd_aapl_raw.count()))

rdd_aapl_raw.take(5)

In [249]:
rdd_appl = sc.textFile('spark_data/aapl.csv')

rdd_appl.take(5)

['Date,Open,High,Low,Close,Volume,Adj Close',
 '2016-10-25,117.949997,118.360001,117.309998,118.25,39190300,118.25',
 '2016-10-24,117.099998,117.739998,117.00,117.650002,23538700,117.650002',
 '2016-10-21,116.809998,116.910004,116.279999,116.599998,23192700,116.599998',
 '2016-10-20,116.860001,117.379997,116.330002,117.059998,24125800,117.059998']

### Task

Now, design a pipeline that would :
1. filter out headers
2. split each line based on comma
3. keep only fields for Date (col 0) and Close (col 4)
4. order by Close in descending order

### Code

In [264]:
rdd_appl.filter(lambda row: not row.startswith('D'))\
        .map(lambda row: [row.split(',')[0],row.split(',')[4]])\
        .sortBy(lambda row: row[1], ascending=False)\
        .take(5)

[['2016-01-26', '99.989998'],
 ['2016-07-20', '99.959999'],
 ['2016-01-12', '99.959999'],
 ['2016-07-19', '99.870003'],
 ['2016-05-31', '99.860001']]

In [269]:
rddout =  rdd_appl.filter(lambda line: not line.startswith("Date"))\
                        .map(lambda line: line.split(","))\
                        .map(lambda fields: (float(fields[4]),fields[0]))\
                        .sortBy(lambda row: row[1], ascending=False)

# apply transformation to rdd_aapl_raw here

rddout.take(5)

[(118.25, '2016-10-25'),
 (117.650002, '2016-10-24'),
 (116.599998, '2016-10-21'),
 (117.059998, '2016-10-20'),
 (117.120003, '2016-10-19')]

### Solution

<details>
  <summary>Click here to see the solution below</summary>
```
rddout = rdd_aapl_raw.filter(lambda line: not line.startswith("Date"))\
.map(lambda line: line.split(","))\
.map(lambda fields: (float(fields[4]),fields[0]))\
.sortBy(lambda row: row[1], ascending=False)

rddout.collect()
```
</details>

# 3. Caching / Persistency

- The RDD does no work until an action is called. And then when an action is called it figures out the answer and then throws away all the data.
- If you have an RDD that you are going to reuse in your computation you can use cache() to make Spark cache the RDD.
- This is especially useful if you have to run the same computation over and over again on one RDD: one use case ? oh I don't know maybe... **MACHINE LEARNING !!!**

## 3.1. Caching

Consider the following job...

In [270]:
import random
num_count = 500*1000
num_list = [random.random() for i in range(num_count)]
rdd1 = sc.parallelize(num_list)
rdd2 = rdd1.sortBy(lambda num: num)

In [273]:
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()

CPU times: user 4.32 ms, sys: 3.67 ms, total: 7.99 ms
Wall time: 81.7 ms
CPU times: user 5.29 ms, sys: 3.53 ms, total: 8.82 ms
Wall time: 105 ms
CPU times: user 6.79 ms, sys: 3.75 ms, total: 10.5 ms
Wall time: 121 ms
CPU times: user 7.34 ms, sys: 1.56 ms, total: 8.9 ms
Wall time: 85.6 ms


500000

- Lets cache it and try again.

In [275]:
rdd2.cache()
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()

CPU times: user 5.07 ms, sys: 567 µs, total: 5.63 ms
Wall time: 76.7 ms
CPU times: user 2.85 ms, sys: 3.92 ms, total: 6.77 ms
Wall time: 111 ms
CPU times: user 8.04 ms, sys: 3.69 ms, total: 11.7 ms
Wall time: 105 ms
CPU times: user 5.1 ms, sys: 1.36 ms, total: 6.46 ms
Wall time: 51.3 ms
CPU times: user 4.44 ms, sys: 371 µs, total: 4.82 ms
Wall time: 57 ms


500000

- Caching the RDD speeds up the job because the RDD does not have to be computed from scratch again.
- Calling cache() flips a flag on the RDD.
- The data is not cached until an action is called.
- You can uncache an RDD using unpersist()

## 3.2. Persist

- Persist RDD to disk instead of caching it in memory.
- You can cache RDDs at different levels.

| Level	| Meaning |
| - | - |
| MEMORY_ONLY	| Same as cache() |
| MEMORY_AND_DISK	| Cache in memory then overflow to disk |
| MEMORY_AND_DISK_SER	| Like above; in cache keep objects serialized instead of live |
| DISK_ONLY	| Cache to disk not to memory |

In [276]:
print('Wow this was a long walkthrough.')

Wow this was a long walkthrough.
