___
___

**Run through Jupyter Notebooks on AWS EC2 VM instance**
___
___

# RDD Transformations and Actions

In this lecture we will begin to delve deeper into using Spark and Python. Please view the video lecture for a full explanation.

## Important Terms

Let's quickly go over some important terms:

Term                   |Definition
----                   |-------
RDD                    |Resilient Distributed Dataset
Transformation         |Spark operation that produces an RDD
Action                 |Spark operation that produces a local object
Spark Job              |Sequence of transformations on data with a final action

## Creating an RDD

There are two common ways to create an RDD:

Method                      |Result
----------                               |-------
`sc.parallelize(array)`                  |Create RDD of elements of array (or list)
`sc.textFile(path/to/file)`                      |Create RDD of lines from file

## RDD Transformations

We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).

Transformation Example                          |Result
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Discard non-even elements
`map(lambda x: x * 2)`                   |Multiply each RDD element by `2`
`map(lambda x: x.split())`               |Split each string into words
`flatMap(lambda x: x.split())`           |Split each string into words and flatten sequence
`sample(withReplacement=True,0.25)`      |Create sample of 25% of elements with replacement
`union(rdd)`                             |Append `rdd` to existing RDD
`distinct()`                             |Remove duplicates in RDD
`sortBy(lambda x: x, ascending=False)`   |Sort elements in descending order

## RDD Actions

Once you have your 'recipe' of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:

Action                             |Result
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(3)`                              |First 3 elements of RDD 
`top(3)`                               |Top 3 elements of RDD
`takeSample(withReplacement=True,3)`   |Create sample of 3 elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)

____
# Examples

Now the best way to show all of this is by going through examples! We'll first review a bit by creating and working with a simple text file, then we will move on to more realistic data, such as customers and sales data.

### Creating an RDD from a text file:

**Creating the textfile**

In [2]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Writing example2.txt


Now let's perform some transformations and actions on this text file:

In [3]:
from pyspark import SparkContext

In [4]:
sc = SparkContext()

In [5]:
# Show RDD
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [6]:
# Save a reference to this RDD
text_rdd = sc.textFile('example2.txt')

In [7]:
# Map a function (or lambda expression) to each line
# Then collect the results.
words = text_rdd.map(lambda line: line.split())

In [8]:
# perform action on transformation
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [10]:
text_rdd.collect()

['first ', 'second line', 'the third line', 'then a fourth line']

In [9]:
text_rdd.map(lambda line: line.split()).collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

## Map vs flatMap

In [12]:
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()
# outputs single list, while map outputs list of list

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

# RDDs and Key Value Pairs

Now that we've worked with RDDs and how to aggregate values with them, we can begin to look into working with Key Value Pairs. In order to do this, let's create some fake data as a new text file.

This data represents some services sold to customers for some SAAS business.

In [13]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [14]:
services = sc.textFile('services.txt')
# usually we'd have to do some formatting of the imported data

In [15]:
# fetch top 2 lines of the RDD
services.take(2)
# outputs header and row 1
# each line is a single string, so needs to be split

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [16]:
# split the elements in the lines
services.map(lambda line: line.split())

PythonRDD[10] at RDD at PythonRDD.scala:48

In [17]:
# output top 3 rows
services.map(lambda line: line.split()).take(3)
# now the data is an array of arrays, rows and cols

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [18]:
# output entire dataset
services.map(lambda line: line.split()).collect()

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [19]:
# Let's remove that first hash-tag!
services.map(lambda line: line[1:] if line[0]=='#' else line).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [20]:
# need to split into cols again
services.map(lambda line: line[1:] if line[0]=='#' else line).map(lambda line: line.split()).collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [21]:
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [22]:
clean = clean.map(lambda line: line.split())

In [23]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

## Using Key Value Pairs for Operations

Let us now begin to use methods that combine lambda expressions that use a ByKey argument. These ByKey methods will assume that your data is in a Key,Value form. 


For example let's find out the total sales per state: 

In [24]:
# Let's start by practicing grabbing fields
# total sales (3rd index) per state (5th index)
clean.map(lambda row: (row[3],row[5])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [25]:
pairs = clean.map(lambda row: (row[3],row[5]))

In [26]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [28]:
# Continue with reduceByKey
# need data to be in key, value paid within tuples
# Notice how it assumes that the first item in the tuple is the key!
# lambda function operates on the value
# so amt1 and amt2 reference the value
pairs.reduceByKey(lambda amt1, amt2: amt1 + amt2).collect()
# problem strings are being concat since Amounts are strings, not summation of numerical val

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

In [29]:
rekey = pairs.reduceByKey(lambda amt1, amt2: amt1 + amt2)

In [30]:
rekey.collect()
# problem strings are being concat since Amounts are strings, not summation of numerical val

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

Uh oh! Looks like we forgot that the amounts are still strings! Let's fix that:

In [31]:
# change amount to floats
rekey = pairs.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

In [32]:
rekey.collect()
# amounts are now summing

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

We can continue our analysis by sorting this output:

In [33]:
# revert to using clean RDD
clean.collect()
# Grab state and amounts
# Add them
# Get rid of ('State','Amount')
# Sort them by the amount value

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [45]:
# Grab state and amounts, i.e. 3rd item and last item, in the form of a tuple
step1 = clean.map(lambda lst: (lst[3], lst[-1]))
# Add them using reduceByKey
step2 = step1.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))
# Get rid of ('State','Amount') using filter
step3 = step2.filter(lambda row: not row[0]=='State' )
# Sort them by the amount value using sortBy
step4 = step3.sortBy(lambda state: state[1], ascending=False)

In [46]:
step1.collect()
# tuples of Amount and State cols

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [47]:
step2.collect()
# summed by state

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [48]:
step3.collect()
# removed headers

[('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [49]:
step4.collect()
# sorted desc

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [54]:
# bring it all together
allSteps = clean.map(lambda lst: (lst[3], lst[-1]))\
    .reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))\
    .filter(lambda row: not row[0]=='State' )\
    .sortBy(lambda state: state[1], ascending=False)

In [55]:
allSteps.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [56]:
# bring it all together
clean.map(lambda lst: (lst[3], lst[-1]))\
    .reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))\
    .filter(lambda row: not row[0]=='State' )\
    .sortBy(lambda state: state[1], ascending=False)\
    .collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

**Remember to try to use unpacking for readability. For example:**

In [57]:
x = ['ID','State','Amount']

In [58]:
def func1(lst):
    return lst[-1]

In [63]:
func1(x)

'Amount'

In [62]:
# function 2 is more readable than fuction 1
def func2(lst):
    # tuple unpacking
    (id, st, amt) = lst
    return amt

In [64]:
func2(x)

'Amount'