In [None]:
# Spark and python with pyspark 

# RDD - Resilient distributed dataset 
# Action - spark operation that produces a local object 
# Transformation - spark operation that produces an RDD 
# Spark job - Sequence of transformations on data with a final action 


# common ways to create a RDD - sc.textfile , sc.parallelize 



In [1]:
# Spark and python with pyspark 
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [3]:
# Lambda revision 
addlamb = lambda x,y : x+y

In [4]:
addlamb(2,3)

5

In [5]:
%%writefile example.txt
firstline 
secondline 
thirdline 
fourthline
# interactively creating a example file in jupyter notebook 


Overwriting example.txt


In [6]:
# using RDD's - actions - transformations
textfile = sc.textFile('example.txt')

In [7]:
# actions 
# collect count first take top sum mean stdev takesample  
textfile.count()

5

In [8]:
# actions 
textfile.first()

'firstline '

In [9]:
# Transformation - Filter , Map, flatmap
secondfind = textfile.filter(lambda sec: 'second' in sec)

In [10]:
secondfind

PythonRDD[4] at RDD at PythonRDD.scala:48

In [11]:
# actions
secondfind.collect()

['secondline ']

In [12]:
# actions 
secondfind.count()

1

In [13]:
# Transformations 
# filter, map , flatmap, sample, union , distinct, sortby



# RDD transformations and actions 

In [14]:
%%writefile example2.txt
first 
second line 
the third line 
then the fourth line 


Overwriting example2.txt


In [15]:
# creating a RDD from text file 
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:-2

In [16]:
# reference to this RDD 
text_rdd = sc.textFile('example2.txt')

In [17]:
# Perform transformation(map) 
words = text_rdd.map(lambda word: word.split())

In [18]:
# action on this object 
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'the', 'fourth', 'line']]

In [19]:
text_rdd.collect()

['first ', 'second line ', 'the third line ', 'then the fourth line ']

# Map vs Flatmap 

In [20]:
# Map 
words_map = text_rdd.map(lambda word: word.split())
words_map.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'the', 'fourth', 'line']]

In [21]:
# Flatmap 
words_flatmap = text_rdd.flatMap(lambda word: word.split())
words_flatmap.collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'the',
 'fourth',
 'line']

# RDDs key value pair 

In [22]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [23]:
# create a RDD 
services = sc.textFile('services.txt')

In [24]:
# action - take first 2 rows 
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [25]:
# perform a transformation 

services.map(lambda line: line.split())


PythonRDD[16] at RDD at PythonRDD.scala:48

In [26]:
# transformation just creates a RDD - perform an action to get an object 
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [40]:
# Remove the # in the event id 
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [41]:
# perform the split 
clean = clean.map(lambda line: line.split())


In [42]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [43]:
# RDD and key value pairs
# reducebykey is similar to grouby concept 
# to use reducebykey we need to have keypair structure 

pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [45]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [47]:
# considers the state column to have a key as it can be grouped by 
redkey = pairs.reduceByKey(lambda amt1, amt2: float(amt1)+float(amt2))

In [52]:
redkey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [53]:
# remove the state header 
redkey.collect()[0]

('State', 'Amount')

In [54]:
statefilt = redkey.filter(lambda x: not x[0]=='State')

In [56]:
# sort the amount 
sortamt = statefilt.sortBy(lambda stamt:stamt[1],ascending= False)

In [57]:
sortamt.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

# Tuple unpacking 

In [58]:
x = ['ID','State','Amount']

In [60]:
def tupunpack(id_st_amt):
    (Id, Stat, Amt) = id_st_amt
    return Amt 


In [61]:
tupunpack(x)

'Amount'