In [None]:
import pyspark
sc.version

<p id='#top'>
SparkContext is automatically stored in a variable named sc<br>
sc.parallelize() method is the SparkContext's parallelize method to create a parallelized collection. This allows Spark
to distribute the data across multiple nodes, instead of depending on a single node to process the data:</p>

In [None]:
myRDD = sc.parallelize(
[('Mike', 19), ('June', 18), ('Rachel',16), ('Rob', 18), ('Scott', 17), ('Jack', 25)])

<ul><li>Collect <b>action</b> will return all values in the RDD from the Spark worker nodes to the driver
<li>A Spark driver (aka an application’s driver process) is a JVM process that hosts SparkContext for a Spark application. It is the master node in a Spark application.
<li>There are performance implications when working with a large amount of data as this translates to large volumes of data being transferred from the Spark worker nodes to the driver.

In [None]:
myRDD.collect()

take(n) method returns the first n elements of the RDD instead of the whole dataset

In [None]:
myRDD.take(3)

In [None]:
myRDD.getNumPartitions() #returns the number of partitions the data was split into

The lower bound for number of partitions is 2 x number of cores 

<h3>Reading Data From Files</h3>
<table align='left'><tr><td>Storage type <td>Example
<tr><td>Local files <td>sc.textFile('/local folder/filename.csv')
<tr><td>Hadoop HDFS <td>sc.textFile('hdfs://folder/filename.csv')
<tr><td>S3<td>sc.textFile('s3://bucket/folder/filename.csv')

In [None]:
airports = sc.textFile('s3://cis4567-salehan/Spark/Data/airport-codes-na.txt') 
# you may need to update this address based on file location on your system
airports.take(5)

In [None]:
airports.getNumPartitions() # The default number for text files is 2

In [None]:
airports.count() #each line is one item

<h3>Transformations</h3><br>
use the <b>.map()</b> function to <b>transform</b> the data from a list of strings to a list of lists<br>
<b>lambda:</b> An anonymous function (that is, a function defined without a name) composed of a single expression<br>
The following code replaces each line by its list of words split by TAB

In [None]:
airports = airports.map(
    lambda line: line.split("\t"))
airports.take(5)

In [None]:
flights = sc.textFile('s3://cis4567-salehan/Spark/Data/departuredelays.csv') 
flights.getNumPartitions()

In [None]:
flights.take(5)

In [None]:
flights = sc.textFile(
    's3://cis4567-salehan/Spark/Data/departuredelays.csv', 
    minPartitions=8).map(
    lambda line: line.split(","))

In [None]:
def split_by_comma (line):
    return line.split(",")

In [None]:
sc.textFile(
    's3://cis4567-salehan/Spark/Data/departuredelays.csv', 
    minPartitions=8).map(split_by_comma).take(5)

In [None]:
flights.getNumPartitions()

In [None]:
flights.take(5)

In [None]:
# Use map() to extract out the first two columns
airports.map(lambda c: (c[0], c[1])).take(5)

The <b>filter</b>(f) transformation returns a new RDD based on selecting elements for which
the f function returns true

In [None]:
# User filter() to filter where second column == "WA"
#use \ to create multiline statement

airports\
.map(lambda c: (c[0], c[1]))\
.filter(lambda c: c[1] == "WA")\
.take(5)

In [None]:
#Here we do the thing as above without a lambda function
def f(x):
    if x[1] == "CA":
        return True
    return False

(
airports
.filter(f)
.map(lambda c: (c[0], c[1]))
.take(5)
)

The <b>flatMap</b>(f) transformation is similar to map, but the new RDD flattens out all of the
elements (that is, a sequence of events).

In [None]:
# Filter only second column == "WA",
# select first two columns within the RDD,
# and flatten out all values
(
airports
.filter(lambda c: c[1] == "WA")
.map(lambda c: (c[0], c[1]))
.flatMap(lambda x: x)
.take(10)
)

The <b>distinct()</b> transformation returns a new RDD containing the distinct elements of the
source RDD.

In [None]:
# Provide the distinct elements for the
# third column of airports representing
# countries
(
airports
.map(lambda c: c[2])
.distinct()
.take(5)
)

In [None]:
print(flights.count())

The <b>sample</b>(withReplacement, fraction, seed) transformation samples a fraction of the data, with or without replacement (the withReplacement parameter), based on a random seed.

In [None]:
# Provide a sample based on 0.001% the
# flights RDD data specific to the fourth
# column (origin city of flight)
# without replacement (False) using random
# seed of 123
(
flights
.map(lambda c: c[3])
.sample(False, 0.001, 1)
.count()
)

sample function doesn't return the same sample size because spark internally uses something called Bernoulli sampling for taking the sample. The fraction argument doesn't represent the fraction of the actual size of the RDD

The <B>join</B>(RDD) transformation returns an RDD of (key, (val_left, val_right)) when calling
RDD (key, val_left) and RDD (key, val_right). Outer joins are supported through left outer
join, right outer join, and full outer join.<br>
Join uses the first column of each record as key and the second columns of each RDD are merged into a tuple representing the value

In [None]:
# Flights data (origin, date)
# e.g. (u'JFK', u'01010900')
flt = flights.map(lambda c: (c[3], c[0]))
# Airports data (IATA, state)
# e.g. (u'JFK', u'NY')
air = airports.map(lambda c: (c[3], c[1]))
# Execute inner join between RDDs
flt.join(air).take(5)

The <b>repartition</b>(n) transformation repartitions the RDD into n partitions by randomly
reshuffling and uniformly distributing data across the network. As noted in the preceding
recipes, this can improve performance by running more parallel threads concurrently.

In [None]:
#The flights RDD has 8 partitions
flights.getNumPartitions()

In [None]:
# Let's re-partition this to 8 so we can have 8
# partitions
flights2 = flights.repartition(16)
# Checking the number of partitions for the flights2 RDD
flights2.getNumPartitions()

The <b>zipWithIndex</b>() transformation appends (or ZIPs) the RDD with the element indices.
This is very handy when wanting to remove the header row (first row) of a file.

In [None]:
# View each row within RDD + the index
# i.e. output is in form ([row], idx)
ac = airports.map(lambda c: (c[0], c[3]))
ac.zipWithIndex().take(5)

To remove the header from your data, you can use the following code:

In [None]:
# Using zipWithIndex to skip header row
# - filter out row 0
# - extract only row info
(
ac
.zipWithIndex()
.filter(lambda row: row[1] > 0)
.map(lambda row: row[0])
.take(5)
)

The <b>reduceByKey</b>(f) transformation reduces the elements of the RDD using f by the key.
The f function should be commutative and associative so that it can be computed correctly
in parallel.

In [None]:
# Determine delays by originating city
# - remove header row via zipWithIndex()
# and map()
(
flights
.zipWithIndex()
.filter(lambda row: row[1] > 0)
.map(lambda row: row[0])
.map(lambda c: (c[3], int(c[1]))) #map to (destination, delay)
.reduceByKey(lambda x, y: x + y) #the first element is used as key
.take(5)
)

The <b>sortByKey</b>(asc=True) transformation orders (key, value) RDD by key and returns an RDD in
ascending or descending order.

In [None]:
# Takes the origin code and delays, remove header
# runs a group by origin code via reduceByKey()
# sorting by the key (origin code)
(
flights
.zipWithIndex()
.filter(lambda row: row[1] > 0)
.map(lambda row: row[0])
.map(lambda c: (c[3], int(c[1])))
.reduceByKey(lambda x, y: x + y)
.sortByKey()
.take(10)
)

The <b>sortBy</b>(f, asc=True) transformation orders (key, value) RDD using the specified function 'f' and returns an RDD in
ascending or descending order.

In [None]:
(
flights
.zipWithIndex()
.filter(lambda row: row[1] > 0)
.map(lambda row: row[0])
.map(lambda c: (c[3], int(c[1])))
.reduceByKey(lambda x, y: x + y)
.sortBy(lambda x: x[1], False)
.take(10)
)

The <b>union</b>(RDD) transformation returns a new RDD that is the union of the source and
argument RDDs.

In [None]:
# Create `a` RDD of Washington airports
a = (
airports
.zipWithIndex()
.filter(lambda row: row[1] > 0)
.map(lambda row: row[0])
.filter(lambda c: c[1] == "WA")
)
# Create `b` RDD of British Columbia airports
b = (
airports
.zipWithIndex()
.filter(lambda row: row[1] > 0)
.map(lambda row: row[0])
.filter(lambda c: c[1] == "BC")
)
# Union WA and BC airports
a.union(b).collect()

The <b>mapPartitionsWithIndex</b>(f) is similar to map but runs the f function separately on
each partition and provides an index of the partition. It is useful to determine the data skew
within partitions

In [None]:
# Source: https://stackoverflow.com/a/38957067/1100699
def partitionElementCount(idx, iterator):
    count = 0
    for _ in iterator:
        count += 1
    return idx, count
# Use mapPartitionsWithIndex to determine
flights.mapPartitionsWithIndex(partitionElementCount).collect()

transformation takes an existing RDD and transforms it into one or more output RDDs. It is also a lazy process that is not initiated until an action is executed (e.g., take)

In [None]:
# Flights data
# e.g. (u'JFK', u'01010900')
flt = flights.map(lambda c: (c[3], c[0]))
# Airports data
# e.g. (u'JFK', u'NY')
air = airports.map(lambda c: (c[3], c[1]))
# Execute inner join between RDDs
output = flt.join(air)

In [None]:
output.take(5)

Now let's check this out in spark UI. You need to have a SSH connection to your EMR master before you can connect to Spark UI

In [None]:
#how to get a list of python packages insatlled on the cluster
sc.list_packages()

In [None]:
#how to insatll a package on the cluster
sc.install_pypi_package('pyarrow')