In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext() # initialize a SparkContext that connects to Spark cluster
# You can have only one running SparkContext at a time here

In [16]:
#first = lambda s: s[0]
#first('dasdaj')

In [None]:
# The following section is a Jupyter command - Writes the following text into example.txt file

In [10]:
%%writefile example.txt
first line
second line
third line
fourth line

Overwriting example.txt


In [3]:
textFile = sc.textFile('example.txt') # Creating a Resilient Distributed Dataset (RDD) object using the textFile method of the SparkContext

In [4]:
type(textFile)

pyspark.rdd.RDD

In [6]:
textFile.count() # count the number of elements in that RDD

4

In [7]:
textFile.first() # Returns the first elemet of the RDD

'first line'

In [13]:
secfind = textFile.filter(lambda line: 'second' in line) # apply a filter on all elemenst
# This is a lazily-executed command, meaning it doesn't actually read all the data until we perform calculations on it
# It performs a Transformation, not an Action yet - so it doesn't display an output

In [9]:
secfind

PythonRDD[5] at RDD at PythonRDD.scala:48

In [15]:
secfind.collect() # Now it actually performs the action on the transformatio (execution of the filter) and returns the results

['second line']

In [16]:
secfind.count()

1

In [17]:
%%writefile example2.txt
first
second line
the third line
then a forth line

Writing example2.txt


In [19]:
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:-2

In [18]:
textfile_rdd = sc.textFile('example2.txt')

In [20]:
words = textfile_rdd.map(lambda line: line.split()) # Create a transformation

In [21]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'forth', 'line']]

In [22]:
textfile_rdd.collect()

['first', 'second line', 'the third line', 'then a forth line']

In [23]:
textfile_rdd.flatMap(lambda line: line.split()).collect() # flatMap is similar to map, but at the end it converts the output to a list

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'forth',
 'line']

In [48]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [49]:
services = sc.textFile('services.txt')

In [50]:
services.take(2) # takes frist 2 elements 

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [51]:
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [52]:
services.map(lambda line: line[1:] if line[0]=='#' else line).collect() # Remove the # from the begenning of the first line

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [53]:
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [54]:
clean = clean.map(lambda line: line.split())

In [55]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [56]:
clean.map(lambda lst: (lst[3],lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [57]:
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [58]:
pairs.reduceByKey(lambda amt1, amt2: amt1 + amt2).collect() # assumes the input is a tuple and the first column is the Key
# Then applies the lambda on the values of the same keys together and returns the result
# However here it reads the values as strings, so it needs to be converted to numbers

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

In [59]:
pairs.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2)).collect() # gives the total sales by state

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [60]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [61]:
step1 = clean.map(lambda lst: (lst[3],lst[-1]))

In [62]:
step2 = step1.reduceByKey(lambda amt1,amt2: float(amt1)+float(amt2))

In [67]:
step3 = step2.filter(lambda x: not x[0]=='State') # Getting rid of the titles element

In [68]:
step4 = step3.sortBy(lambda stAmount: stAmount[1], ascending=False) # sort by the second elemnt (which are the numbers) in descending order

In [69]:
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [70]:
x = ['ID', 'State', 'Amount']

In [71]:
def func1(lst):
    return lst[-1]

In [74]:
func1(x)

'Amount'

In [72]:
def func2(id_st_amt):
    # Tuple unpacking
    (Id, st, amt) = id_st_amt
    return amt

In [73]:
func2(x)

'Amount'