
# RDD Transformations and Actions

In this lecture we will begin to delve deeper into using Spark and Python. Please view the video lecture for a full explanation.
## Important Terms

Let's quickly go over some important terms:<br>

|Term 	|Definition
|----    |------|
|RDD |Resilient Distributed Dataset|
|Transformation |Spark operation that produces an RDD|
|Action 	|Spark operation that produces a local object|
|Spark Job 	|Sequence of transformations on data with a final action|

## Creating an RDD

There are two common ways to create an RDD:<br>

|Method   |Result|
|----|------|
|sc.parallelize(array) |Create RDD of elements of array (or list)|
|sc.textFile(path/to/file) |Create RDD of lines from file|

## RDD Transformations

We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).

|Transformation Example 	|Result
|----|-------|
|filter(lambda x: x % 2 == 0) |Discard non-even elements
|map(lambda x: x * 2) 	|Multiply each RDD element by 2
|map(lambda x: x.split()) 	|Split each string into words
|flatMap(lambda x: x.split()) 	|Split each string into words and flatten sequence
|sample(withReplacement=True,0.25) 	|Create sample of 25% of elements with replacement
|union(rdd) 	|Append rdd to existing RDD
|distinct() 	|Remove duplicates in RDD
|sortBy(lambda x: x, ascending=False) |	Sort elements in descending order

## RDD Actions

Once you have your 'recipe' of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:

|Action 	|Result|
|----|----|
|collect() 	|Convert RDD to in-memory list
|take(3) 	|First 3 elements of RDD
|top(3) 	|Top 3 elements of RDD
|takeSample(withReplacement=True,3) |	Create sample of 3 elements with replacement
|sum() 	|Find element sum (assumes numeric elements)
|mean() 	|Find element mean (assumes numeric elements)
|stdev() |	Find element deviation (assumes numeric elements)
----
## Examples

Now the best way to show all of this is by going through examples! We'll first review a bit by creating and working with a simple text file, then we will move on to more realistic data, such as customers and sales data.
### Creating an RDD from a text file:


**Creating the textfile**

In [1]:
%%writefile example2.txt
first
second line
the third line
then a fourth line

Writing example2.txt


In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [7]:
# Show RDD
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:-2

In [8]:
# Save a reference to this RDD
text_rdd = sc.textFile('example2.txt')

In [9]:
# Map a function (or lambda expression) to each line
# Then collect the results.
words = text_rdd.map(lambda line: line.split())

In [10]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [11]:
text_rdd.collect()

['first', 'second line', 'the third line', 'then a fourth line']

## Map vs flatMap

In [12]:
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']


## RDDs and Key Value Pairs

Now that we've worked with RDDs and how to aggregate values with them, we can begin to look into working with Key Value Pairs. In order to do this, let's create some fake data as a new text file.

This data represents some services sold to customers for some SAAS business.


In [13]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [14]:
services = sc.textFile('services.txt')

In [15]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [17]:
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [18]:
services.map(lambda line: line[1:] if line[0]=='#' else line).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

So we can combine the above 2 lambda expressions to a single line:

In [21]:
clean = services.map(lambda line: line[1:] if line[0]=='#' else line).map(lambda line: line.split()).collect()
clean

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]


## Using Key Value Pairs for Operations

Let us now begin to use methods that combine lambda expressions that use a ByKey argument. These ByKey methods will assume that your data is in a Key,Value form.

For example let's find out the total sales per state:


In [22]:
# From Previous
cleanServ = services.map(lambda line: line[1:] if line[0]=='#' else line).map(lambda line: line.split())

In [23]:
cleanServ.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [27]:
# Let's start by practicing grabbing fields. 
# Let's get the total sales per state. It is the 4th and the last elements of the list
pairs = cleanServ.map(lambda line: (line[3], line[-1]))
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [28]:
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
reduce_by_ley = pairs.reduceByKey(lambda amt1,amt2: float(amt1) + float(amt2))

In [29]:
reduce_by_ley.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

We can continue our analysis by sorting this output:

In [38]:
# Grab state and amounts
step1 = cleanServ.map(lambda line: (line[3], line[-1]))

# Reduce by key
step2 = step1.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))

# Get rid of State, Amount titles
step3 = step2.filter(lambda x: not x[0]=='State')

# Sort Results by Amount
step4 = step3.sortBy(lambda stateAmount: stateAmount[1], ascending=False)

step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

**Remember to try to use unpacking for readability. For example:**

In [40]:
x = 'Id State Amount'.split()
x

['Id', 'State', 'Amount']

In [41]:
def func1(lst):
    return lst[-1]

In [42]:
def func2(id_st_amt):
    # Unpack the values
    (Id, st, amt) = id_st_amt
    return amt

In [44]:
func1(x)

'Amount'

In [45]:
func2(x)

'Amount'