# Introduction to Spark

## Introduction to Resiliant Distributed Datasets (RDD)


***


* Spark is a distributed computing framework for big data. 
* Clusters of worker machines operate on Resiliant Distributed Datasets (RDDs) in paralell. 
* Spark is optimized to utilized worker memory as much as possible, allowing for fast computation on large datasets.

<img src="figs/RDD_1.png">
#### A spark driver, either through an application or shell, connects to a cluster consisting of worker machines. (Databricks Workshop 2015)


***

* RDDs are collections of data either from a central source which is then parallelized (file on local machine) or from a distributed data store (HDFS, Cassandra etc).
* RDDs can consist of common data types and arbitrary objects.
* An RDD is split between several partitions and distributed to the different worker machines.
<img src="figs/RDD_2.png">
#### Large dataset partitioned between different worker machines in a Spark cluster (Databricks Workshop 2015) 


***

* Example RDD consisting of logging data

<img src="figs/RDD_3.png">
#### (Databricks Workshop 2015)


***

* Creating an RDD from a collection of data on the driver and distributing them to the worker machines

<img src="figs/RDD_4.png">
#### (Databricks Workshop 2015)

In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.ml.linalg import Vectors
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("testapp")
sc = SparkContext(conf=conf)
spark = SparkSession.builder \
    .master("local") \
    .appName("testapp") \
    .getOrCreate()

In [2]:
# Create RDD from a python list
wordsRDD = sc.parallelize(["fish", "cats", "dogs"])

# View object type
print(wordsRDD)

# Get number of objects in RDD
print(wordsRDD.count())

# Get the first object in the RDD
print(wordsRDD.first())

# Collect all the objects in the RDD from Spark back to driver (this notebook)
print(wordsRDD.collect())

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
3
fish
['fish', 'cats', 'dogs']



***

* Creating an RDD from a file
  * Can come from local machine (small files)
  * Can read files directly from HDFS and Cassandra
    * example: > newRDD = sc.textFile("hdfs://localhost:9000/user/data/file.txt")

<img src="figs/RDD_5.png">
#### (Databricks Workshop 2015)

In [3]:
# Read in Moby Dick text file (reads in one line per RDD entry)
mobyDickRDD = sc.textFile("./data/MobyDick.txt")

# Count how many lines are in RDD
print("There is a total of %d lines in Moby Dick" % mobyDickRDD.count())

# Take first 5 lines from RDD
mobyDickRDD.take(50)

There is a total of 22933 lines in Moby Dick


['MOBY DICK; OR THE WHALE ',
 '',
 'by Herman Melville',
 '',
 '',
 '',
 '',
 'ETYMOLOGY.',
 '',
 '(Supplied by a Late Consumptive Usher to a Grammar School)',
 '',
 'The pale Usher--threadbare in coat, heart, body, and brain; I see him',
 'now.  He was ever dusting his old lexicons and grammars, with a queer',
 'handkerchief, mockingly embellished with all the gay flags of all the',
 'known nations of the world.  He loved to dust his old grammars; it',
 'somehow mildly reminded him of his mortality.',
 '',
 '"While you take in hand to school others, and to teach them by what',
 'name a whale-fish is to be called in our tongue leaving out, through',
 'ignorance, the letter H, which almost alone maketh the signification',
 'of the word, you deliver that which is not true." --HACKLUYT',
 '',
 '"WHALE. ... Sw. and Dan. HVAL.  This animal is named from roundness',
 'or rolling; for in Dan. HVALT is arched or vaulted." --WEBSTER\'S',
 'DICTIONARY',
 '',
 '"WHALE. ... It is more immediately 


***

* Typical workflow
  * Data is read from external sources (HDFS, Cassandra, local file...) into an RDD
  * Transformations are performed on RDD to create new RDDs
    * In this example, the main RDD is filtered to create an RDD which only contains Error messages

<img src="figs/RDD_6.png">
#### (Databricks Workshop 2015)


***

* Filtered RDDs can be coalesced to reduce the number of partitions which make up that RDD
  * Used when filtered RDDs become sparse compared to their parent RDD

<img src="figs/RDD_7.png">
#### (Databricks Workshop 2015)

In [4]:
# The number of paritions for a RDD can be specified when RDD is created
# This is automatically done if no parameter is passed
N_partitions = 4
team = sc.parallelize(["Al", "Ani", "Jackie", "Lalitha", "Mark", "Neil", "Nick", "Shirin"], N_partitions)
print(team.getNumPartitions())

4



***

* Standard workflow for a Spark application

<img src="figs/RDD_8.png">
#### (Databricks Workshop 2015)