# RDD Transformations and Actions

RDD -> Resilient Distributed Dataset

Transformation -> Spark operation that produces an RDD

Action -> Spark operation that produces a local object

Spark Job -> Sequence of transformations on data with a final action

**Two common ways to create an RDD:**
    
sc.parallelize(array) -> Create RDD of elements of array (or list)

sc.textFile(path/to/file) -> Create RDD of lines from file

**RDD Transformations**

filter(lambda x : x % 2 == 0) -> Discard non-even elements

map(lambda x : x * 2) -> Multiply each RDD element by 2

flatMap(lambda x : x.split()) -> Split each string into words and flatten sequence

sample(withReplacement=True,0.25) -> Create sample of 25% of elements with replacement

union(rdd) -> Append rdd to existing RDD

distinct() -> Remove duplicates in RDD

sortBy(lambda x: x , ascending = False) -> Sort elements in descending order

**RDD Actions**

collect()

take(3)

top(3)

takeSample(withReplacement=True,3)

sum()

mean()

stdev()

**Examples**

In [2]:
%%writefile example2.txt
first line
second line
the third line
then a fourth line

Writing example2.txt


In [3]:
from pyspark import SparkContext

In [4]:
sc = SparkContext()

In [5]:
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [6]:
text_rdd = sc.textFile('example2.txt')

In [7]:
words = text_rdd.map(lambda line: line.split())

In [8]:
words

PythonRDD[4] at RDD at PythonRDD.scala:48

In [9]:
words.collect()

[['first', 'line'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [10]:
text_rdd.collect()

['first line', 'second line', 'the third line', 'then a fourth line']

**Map vs FlatMap**

In [11]:
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'line',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

**RDDs and Key Value Pairs**

In [13]:
%%writefile services.txt
#EventId    Timestamp     Customer      State     ServiceID    Amount
201         10/13/2017      100           NY        131         100.00
204         10/18/2017      700           TX        129         450.00
202         10/15/2017      203           CA        121         200.00
206         10/19/2017      202           CA        131         500.00
203         10/17/2017      101           NY        173         750.00
205         10/19/2017      202           TX        121         200.00

Overwriting services.txt


In [14]:
services = sc.textFile('services.txt')

In [15]:
services.take(2)

['#EventId    Timestamp     Customer      State     ServiceID    Amount',
 '201         10/13/2017      100           NY        131         100.00']

In [16]:
services.top(2)

['206         10/19/2017      202           CA        131         500.00',
 '205         10/19/2017      202           TX        121         200.00']

In [17]:
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [18]:
# Remove the hashtag (#) from the EventId column name
services.map(lambda line: line[1:] if line[0] == '#' else line).collect()

['EventId    Timestamp     Customer      State     ServiceID    Amount',
 '201         10/13/2017      100           NY        131         100.00',
 '204         10/18/2017      700           TX        129         450.00',
 '202         10/15/2017      203           CA        121         200.00',
 '206         10/19/2017      202           CA        131         500.00',
 '203         10/17/2017      101           NY        173         750.00',
 '205         10/19/2017      202           TX        121         200.00']

In [19]:
clean = services.map(lambda line: line[1:] if line[0] == '#' else line)

In [20]:
clean = clean.map(lambda line: line.split())

In [21]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [22]:
#Getting columns
clean.map(lambda lst: (lst[3],lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [23]:
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [24]:
# IMPORTANT: reduceByKey assumes the first element of the tuple is the Key
rekey = pairs.reduceByKey(lambda amt1, amt2: amt1 + amt2)

In [25]:
rekey.collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

In [26]:
rekey = pairs.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

In [27]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [28]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [29]:
# Grab (State, Amount)
step1 = clean.map(lambda lst: (lst[3],lst[-1]))
# Reduce by Key
step2 = step1.reduceByKey(lambda amt1,amt2:float(amt1)+float(amt2))
# Get rid of State and Amount titles
step3 = step2.filter(lambda x : not x[0] == 'State')
# Sort Results by Amount
step4 = step3.sortBy(lambda stAmount: stAmount[1],ascending=False)
# Perform the acion
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [30]:
x = ['ID','State','Amount']

In [31]:
def func1(lst):
    return lst[-1]

In [32]:
def func2(id_st_amt):
    #unpack values
    (Id,st,amt) = id_st_amt
    return amt

In [33]:
func1(x)

'Amount'

In [34]:
func2(x)

'Amount'