In [1]:
# The code was removed by DSX for sharing.

# Actions And Transformations

RDDs support two types of operations: <b>transformations</b>, which create a new RDD from an existing one, and <b>actions</b>, which return a value to the driver program after running a computation on the dataset. 

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset. This allows spark to optimize given transformation with a DAG structure.
<br>

<img src="http://image.prntscr.com/image/86fad7aa29dc4af1a85fae8a6a181eba.png">

# Persistance
By default, each transformed RDD <b>may be</b> recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
<br>
<img src="http://image.prntscr.com/image/71bdbbd1c3b34870ad0a0956d5e6a551.jpeg">

In [2]:
from pyspark.storagelevel import StorageLevel
import time

In [3]:
filtered_rdd = sc.textFile("swift://CS340." + name + "/ratings.csv")\
                .map(lambda x: x.split(","))\
                .map(lambda lis: lis[2])\
                .map(float)\
                .map(lambda x: x**2)\
                .map(int)\
                .filter(lambda x: x%2!=0)

In [6]:
start =  time.time()
filtered_rdd.collect()
print time.time() - start

0.42010307312


In [7]:
start =  time.time()
filtered_rdd.collect()
print time.time() - start

0.404487848282


Since they are not persisted explicitly, we would expect that both actions take the same amount of time. However that is not the case. There can be many reasons for this.

1) Spark will automatically evict RDD partitions from Workers in an LRU(Least Recently Used) manner. The LRU eviction happens independently on each Worker and depends on the available memory in the Worker.(Chris Fregly)

2) Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist.

# StorageLevel.MEMORY_ONLY 
<br> Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed.

# StorageLevel.MEMORY_AND_DISK
<br> Almost same as MEMORY_ONLY, the only difference is it will store the partitions that don't fit the memory. So it does not have to recompute each time.
# StorageLevel.MEMORY_ONLY_SER
<br> Same as MEMORY_ONLY, the only difference is that it store RDD as <b>serialized</b> Java objects in the JVM.
# StorageLevel.MEMORY_AND_DISK_SER
<br> Combination on MEMORY_ONLY_SER and MEMORY_AND_DISK.
# StorageLevel.DISK_ONLY
<br> As name suggests its store the partitions to the disk.
# MEMORY_ONLY_2, MEMORY_AND_DISK_2
<br> Same as the levels above, but replicate each partition on two cluster nodes.

<br>
<b>Note</b>: In Python, stored objects will always be serialized with the <b>Pickle</b> library.

In [None]:
filtered_rdd.persist(StorageLevel.DISK_ONLY)