# Resilient Distributed Dataset


* Each record in RDD in divided into logical partitions, which can be computed on different nodes of the cluster

* RDD is computed on several processes scattered across multiple physical servers called nodes

## Advantages

* In-Memory Processing
    * loads data from disk and process in memory and keeps the data in memory
    * can cache RDD in memory to reuse

* Immutability
    
* Fault Tolerance

* Lazy Evaluation

* Partitioning


## Limitations

not suitable for applications that make updates to the state store such as storage system for a web app


In [1]:
# RDD are mainly created in two ways

# Parallelizing an exising collection
# Referencing dataset in external storage

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
        .appName("RDD") \
        .getOrCreate()
spark

bash: /home/magus/miniconda3/envs/nlp/lib/libtinfo.so.6: no version information available (required by bash)
bash: /home/magus/miniconda3/envs/nlp/lib/libtinfo.so.6: no version information available (required by bash)
22/12/21 19:32:58 WARN Utils: Your hostname, Magus resolves to a loopback address: 127.0.1.1; using 172.26.192.58 instead (on interface eth0)
22/12/21 19:32:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/12/21 19:32:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/21 19:33:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# Parallelizing

data = list(range(1,13))
rdd = spark.sparkContext.parallelize(data)
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [4]:
rdd2 = spark.sparkContext.textFile("endomondoHR.json")
rdd2

endomondoHR.json MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
# Creating empty RDD

rdd3 = spark.sparkContext.emptyRDD()
rdd3

EmptyRDD[3] at emptyRDD at NativeMethodAccessorImpl.java:0

In [6]:
# Empty RDD with partitions

rdd4 = spark.sparkContext.parallelize([], 10) # 10 partitions
rdd4

ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:274

In [7]:
# Parallelize automatically creates partitions based on resource availability

In [8]:
# Repartition and Coalesce

# Repartition -> Shuffles data from all nodes 
# Coalesce -> Shuffle data from minimum nodes

reparRdd = rdd.repartition(4)
reparRdd.getNumPartitions()

4

## RDD Transformations

In [9]:
# RDD Transformation are lazy meaning they return another
# RDD instead of updating the current one

In [10]:
# df = spark.read.json("./endomondoHR.json")

In [11]:
# df

In [12]:
rdd = spark.sparkContext.textFile("./test.txt")


In [13]:
# Flatmap flattens the RDD after applying the function and returns a new RDD

rdd2 = rdd.flatMap(lambda x: x.split(" "))
rdd2

PythonRDD[12] at RDD at PythonRDD.scala:53

In [14]:
# View data of rdd
rdd.collect()[:10]

['Project Gutenberg’s',
 'Alice’s Adventures in Wonderland',
 'by Lewis Carroll',
 'This eBook is for the use',
 'of anyone anywhere',
 'at no cost and with',
 'Alice’s Adventures in Wonderland',
 'by Lewis Carroll',
 'This eBook is for the use',
 'of anyone anywhere']

In [15]:
rdd2.collect()[:10]

['Project',
 'Gutenberg’s',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'by',
 'Lewis',
 'Carroll',
 'This']

In [16]:
rdd3 = rdd.map(lambda x: x.split(" "))
rdd3.collect()[:10]

[['Project', 'Gutenberg’s'],
 ['Alice’s', 'Adventures', 'in', 'Wonderland'],
 ['by', 'Lewis', 'Carroll'],
 ['This', 'eBook', 'is', 'for', 'the', 'use'],
 ['of', 'anyone', 'anywhere'],
 ['at', 'no', 'cost', 'and', 'with'],
 ['Alice’s', 'Adventures', 'in', 'Wonderland'],
 ['by', 'Lewis', 'Carroll'],
 ['This', 'eBook', 'is', 'for', 'the', 'use'],
 ['of', 'anyone', 'anywhere']]

In [17]:
rdd4 = rdd2.map(lambda x: (x,1))
rdd4.collect()[:10]

[('Project', 1),
 ('Gutenberg’s', 1),
 ('Alice’s', 1),
 ('Adventures', 1),
 ('in', 1),
 ('Wonderland', 1),
 ('by', 1),
 ('Lewis', 1),
 ('Carroll', 1),
 ('This', 1)]

In [18]:
# ReduceByKey merges value for each key with the function provided

rdd5 = rdd4.reduceByKey(lambda x,y: x+y)
rdd5.collect()[:10]

[('Project', 9),
 ('Gutenberg’s', 9),
 ('Alice’s', 18),
 ('Adventures', 18),
 ('in', 18),
 ('Wonderland', 18),
 ('by', 18),
 ('Lewis', 18),
 ('Carroll', 18),
 ('This', 27)]

In [20]:
# SortByKey is used to sort RDD elements on key

# For PairRDD above of (string, int)
# we first convert it into (int, string)
# Then we used sortByKey which indeally sorts on int value
rdd6 = rdd5.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)
rdd6.collect()[:10]

[(27, 'This'),
 (27, 'eBook'),
 (27, 'is'),
 (27, 'for'),
 (27, 'the'),
 (27, 'use'),
 (27, 'of'),
 (27, 'anyone'),
 (27, 'anywhere'),
 (27, 'at')]

In [25]:
# Filter is used to filter records in RDD

rdd5 = rdd4.filter(lambda x: 'an' in x[0])
rdd5.collect()[:10]


[('Wonderland', 1),
 ('anyone', 1),
 ('anywhere', 1),
 ('and', 1),
 ('Wonderland', 1),
 ('anyone', 1),
 ('anywhere', 1),
 ('and', 1),
 ('anyone', 1),
 ('anywhere', 1)]

## RDD Actions

RDD Action operations return values from an RDD. RDD functions that returns non-RDD is considered as action

In [27]:
# Count

rdd6.count()

23

In [28]:
# First

rdd6.first()

(27, 'This')

In [29]:
# Max

rdd6.max()

(27, 'with')

In [31]:
# Reduce reduces records to single, can use to count or sum

totalWordCount = rdd6.reduce(lambda a,b : (a[0]+b[0],''))
totalWordCount

(522, '')

In [34]:
# Take returns record specified as an argument

data3 = rdd6.take(3)
data3

[(27, 'This'), (27, 'eBook'), (27, 'is')]

In [35]:
# Collect returns all data

## Shuffle Operations

Used to redistribute data across different executors or across machines

Expensive task since it involves

* Dist I/O
* Involves data serialization and deserialization
* Network I/O

### What happens when we first run reduceByKey

Initially when we create an RDD, partitions do not store data for all keys.

* PySpark runs map tasks on all partitions which groups all values for a single key
* Reulsts of the map tasks are kept in memory
* When results do not fit in mem, PySpark sotres data into disk
* PS shuffles the mapped data across partitions, some times it also stores the shuffles data into a dist for reuse 
* Run the garbage collection
* Finally runs reduce taks on each partition based on key

## RDD Persistence

Even though PS is 100x times faster than traditional MapReduce jobs, if we do not design jobs to reuse repeating computations, we will see performance drop.

Using ```cache()``` and ```persists()``` methods, PS provides optimization mechanism to store the intermediate computation.

### Advantages

* Cost Efficient
* Time Efficient
* Execution Time

In [38]:
# Cache by default saved RDD computation to storage level 'MEMEORY_ONLY' meaning it will store data in the JVM heap as unserialized objects

# cache() internally calls persists()

cachedRdd = rdd.cache()
cachedRdd

./test.txt MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:0

In [40]:
# persist() is used to store RDD to one of many storage levels

import pyspark

dfPersist = rdd.persist(pyspark.StorageLevel.MEMORY_ONLY)
dfPersist

./test.txt MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:0

In [42]:
# PS automatically removes persist and cache if not used
# Can manually remove by unpersist()

rddPersist2 = dfPersist.unpersist()
rddPersist2

./test.txt MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:0

## PySpark Shared Variables

When PS does transformations using ```map()``` or ```reduce()```, it executes tranformations on remote node by using variables that are shipped with the taks and these variables are not sent back to PS Driver. Hence, we cannot share variables across tasks.

PS provides shared variables to solve this.
* Broadcast Variables (read-only)
* Accumulator Variables (updatable)

### Broadcast

* read-only
* cached and available on all nodes in a cluster
* instead of sending this data along with every task,PS distributes broadcast variables to the machines using efficient broadcast algorithms

In [43]:
broadcastVar = spark.sparkContext.broadcast([0,1,2,3])
broadcastVar.value

[0, 1, 2, 3]

### Accumulators

* 'added' through an associative and commutative operations
* used to perform counters

Types
* named: shown in PS web UI
* unnamed: not shown


In [55]:
accum = spark.sparkContext.accumulator(0.0)
spark.sparkContext.parallelize([1,2,3,4]).foreach(lambda x: accum.add(x))
accum.value

10.0