In [1]:
%%writefile example2.txt
first
second line
the third line
then a fourth line

Writing example2.txt


In [2]:
from pyspark import SparkContext

In [9]:
sc = SparkContext()

Here we can see the actual RDD, describes it as MapPartitionsRDD[1]

In [10]:
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

We can save a reference by creating an object.

In [11]:
text_rdd = sc.textFile('example2.txt')

This is a transformation that will be called. To split string into words, mapped by a lambda function.

In [12]:
words = text_rdd.map(lambda line: line.split()) 

Words will not do anything until you do an action on it. In this case we call collect to perform that action.

In [13]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

If you just call collect on the original text, you will just get each string line in that list. 

In [14]:
text_rdd.collect() # you get each string line

['first', 'second line', 'the third line', 'then a fourth line']

If you want to collect everything as a single flatMap, we use the flatMap transformation. We use the same lambda expression (for every line split it) and here we combined the transformation step and action step. You can seperate them with no additional memory cost. 

With flatMap we collect everything into a flatmap that creates one single array with all the words in that text file as a list. 

In [15]:
text_rdd.flatMap(lambda line: line.split()).collect() #here everything gets collected as a single flat map

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

# Key Value pair RDDs
We create some data as a new text file. This looks like a typical CSV file. 

In [16]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


We will assign our RDD to services. 

In [17]:
services = sc.textFile('services.txt')

This will need cleaning. Grabbing top two lines of the data. It's all just a single string per line. 

In [18]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

We will split this single string line into individual words. With the .map and lambda functions. The top 3 lines get split up. 

In [20]:
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

This map will look for hashtags in the dataset and remove them. 

In [32]:
services.map(lambda line:  line[1:] if line[0]=='#' else line).collect() 

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

We set this transformation to cleanServ and then call it afterward.

In [34]:
cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split())

In [35]:
cleanServ.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

# Methods that combine lambda expressions using a bi-key argument.
These bi-key methods will assume that your data is in the form of a key-value pair.

Here we grab some fields, getting the total sales per state. Reduce by key statement. Returns a tuple.

In [37]:

cleanServ.map(lambda lst: (lst[3],lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

Now that we have these tuples we can use reduceByKey commands. reduceByKey takes in a lambda expression and assumes that you have your data already in the tuples form. It will assume the first value is a key and it performs this lambda function as a reduction off the second item.

In [38]:
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
# Also there is a problem in that it's not adding values by concating the strings
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : amt1+amt2)\
         .collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

We ran into an issule where the amonts taht were added together were strings and instead of summing them we concated. So we will fix that by turning our string into a float value. This is a lot of the work you will be doing with spark.

In [40]:
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
         .collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [41]:
cleanServ.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

We look through our data set and we do the following steps:
1. take cleanServ and map lambda grabbing the 3rd item 'State' and the last item 'Amount' and returning as tuple
2. reduceByKey will sum all our values by state (by changing from string to float) 
3. get rid of the column names state, amount titles by using filter method. 
4. sort the results by the amount, tells to sort by the second item in tuples
5. perform the action

In [49]:
# Grab state and amounts
# Add them
# Get rid of ('State','Amount')
# Sort them by the amount value
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

Alot of times you will want to replace the indexing we did above with tuple unpacking. We make a list. 

In [50]:
x = ['ID','State','Amount']

We make a function that takes in a list and returns the last item on that list. It's not readable if you come back to it later...

In [52]:
def func1(lst):
    return lst[-1]

Instead you will want to use tuple unpacking instead. Set (Id,st,amt) and you can return singular values. Much easier to understand when you come back to it later. 

In [54]:
def func2(id_st_amt):
    #unpack values
    (Id,st,amt) = id_st_amt
    return amt

In [55]:
func1(x)

'Amount'

In [56]:
func2(x)

'Amount'