## Spark 

---
__Elo notes__

#### RDD (Resilient Distributed Dataset)
- Resilient -- if the data in memory is lost, it can be recreated

- Distributed -- stored in memory across the cluster

- Dataset -- initial data can come from a file or created programmatically


•	RDDs are the fundamental unit of data in Spark

•	RDD is a read-only, partitioned collection of records

•	Most of Spark programming is performing operations on RDDs
 
#### Two types of RDD operations

◦	__Actions__ - return values
 
 ▪	count
 
 ▪	take(n)
 
◦	__Transformations__ - define new RDDs based on the current one

▪	filter
		
▪	map
		
▪	reduce


---
## Spark Basics 
#### RDD 

In [17]:
import pyspark as ps
import multiprocessing
import json

In [16]:
from sklearn.datasets import fetch_20newsgroups

In [6]:
multiprocessing.cpu_count()

4

In [7]:
#Initiate SparkContext : sc
sparkc = ps.SparkContext('local[4]')

In [8]:
# Creating RDDs from a list
rddl= sparkc.parallelize([1,2,3])

In [9]:
# Read RDD from a text file
rddf = sparkc.textFile('data/cookie_data.txt')

A quickly check out the first few entries of a potentially enormous RDD without accessing all of the partitions and loading all of the data into memory.

In [10]:
# first entries
rddf.first()

u'{"Jane": "2"}'

In [11]:
rddf.take(2) 

[u'{"Jane": "2"}', u'{"Jane": "1"}']

Keep in mind that to execute the collect() method on the RDD object (like we do below), your entire dataset must fit in memory on a single machine (we in general don't want to call collect() on very large datasets). The standard workflow when working with RDDs is to perform all the big data operations/transformations before you pool/retrieve the results. If the results can't be collect()ed onto a single machine, it's common to write data out to a distributed storage system, like HDFS or S3.

In [14]:
# Note: don't call collect() on very large datasets
rddf.collect()

[u'{"Jane": "2"}',
 u'{"Jane": "1"}',
 u'{"Pete": "20"}',
 u'{"Tyler": "3"}',
 u'{"Duncan": "4"}',
 u'{"Yuki": "5"}',
 u'{"Duncan": "6"}',
 u'{"Duncan": "4"}',
 u'{"Duncan": "5"}']

In [15]:
rddl.collect()

[1, 2, 3]

RDD objects are immutable and that anytime we apply a transformation to an RDD (such as map(), reduce(), or filter()) this transformation returns another RDD.

In [44]:
# RDD file to (Key, Value)
rddf_ = rddf.map(lambda x: (json.loads(x).keys()[0], int(json.loads(x).values()[0])))

In [45]:
rddf_.collect()

[(u'Jane', 2),
 (u'Jane', 1),
 (u'Pete', 20),
 (u'Tyler', 3),
 (u'Duncan', 4),
 (u'Yuki', 5),
 (u'Duncan', 6),
 (u'Duncan', 4),
 (u'Duncan', 5)]

In [46]:
rddf_.filter(lambda (k, v): v > 5).collect()

[(u'Pete', 20), (u'Duncan', 6)]

In [48]:
rddf_.reduceByKey(lambda v1, v2: max(v1, v2)).collect()

[(u'Jane', 2), (u'Pete', 20), (u'Yuki', 5), (u'Tyler', 3), (u'Duncan', 6)]

In [49]:
cost = 1.

In [51]:
rddf_.values().reduce(lambda v1, v2: v1 + v2)*cost

50.0