In [2]:
%%writefile example2.txt
first
second line
the third line
then the forth line

Overwriting example2.txt


In [4]:
from pyspark import SparkContext

In [5]:
sc = SparkContext()

In [7]:
text_rdd = sc.textFile('example2.txt')

In [21]:
# transformation on RDD. Transformed object is also RDD
rdd_transform = text_rdd.map(lambda x:x.split()) 

In [18]:
# action on RDD
rdd_action = rdd_transform.collect()

In [19]:
rdd_action

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'the', 'forth', 'line']]

In [20]:
text_rdd.collect()

['first', 'second line', 'the third line', 'then the forth line']

In [25]:
# another eaxapmle of transformation
rdd_flat_trans = text_rdd.flatMap(lambda x:x.split())

In [37]:
rdd_flat_action = rdd_flat_trans.collect()

In [38]:
rdd_flat_action

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'the',
 'forth',
 'line']

# RDDs and Key Value Pairs

In [39]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [40]:
services_rdd = sc.textFile('services.txt')

In [44]:
services_rdd.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [58]:
services_transform = services_rdd.map(lambda x: x.split())

In [59]:
services_action = services_transform.take(50)

In [60]:
services_action

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [64]:
#removing # from eventid
clean = services_rdd.map(lambda x: x[1:] if x[0]=='#' else x)
clean

PythonRDD[29] at RDD at PythonRDD.scala:49

In [69]:
# transformation over transformed RDD clean
clean = clean.map(lambda x: x.split())

In [70]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [74]:
# grabbing the state and amount column by key value pair
pairs = clean.map(lambda x: (x[3],x[5]))

In [75]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [78]:
reduced_key = pairs.reduceByKey(lambda am1,am2 : am1+am2)

In [79]:
reduced_key.collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

In [80]:
reduced_key = pairs.reduceByKey(lambda am1,am2 : float(am1) + float(am2))

In [85]:
reduced_key.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [87]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [128]:
# Grab state and amount column
step1 = clean.map(lambda x:(x[3],x[5]))
# grab the summation by using key
step2 = step1.reduceByKey(lambda x,y : float(x)+float(y))
# removing state and amount
step3 = step2.filter(lambda x: not x[0] =='State')
# sorting by amount
step4 = step3.sortBy(lambda x: x[1],ascending=False)

In [129]:
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [130]:
x =['ID','State','Amount']

In [133]:
def func1(param):
    return param[2]

In [134]:
func1(x)

'Amount'

In [150]:
def func2(Id_st_amt):
    # UNPACK VALUES
    (Id,st,amt) = Id_st_amt
    return amt

In [151]:
func2(x)

'Amount'