In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [4]:
txt_rdd = sc.textFile('example2.txt')

In [5]:
txt_rdd.collect()

['first ', 'second line', 'the third line', 'then a fourth line']

In [6]:
words = txt_rdd.map(lambda x:x.split())

In [7]:
words

PythonRDD[4] at RDD at PythonRDD.scala:53

In [8]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [9]:
txt_rdd.flatMap(lambda line:line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

In [10]:
services = sc.textFile('services.txt')

In [11]:
services.collect()

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [12]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [14]:
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [15]:
services.map(lambda line : line[1:] if line[0]=='#' else line).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [22]:
clean = services.map(lambda line : line[1:] if line[0]=='#' else line)

In [23]:
clean = clean.map(lambda line : line.split())

In [24]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [26]:
clean.map(lambda line : (line[3],line[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [27]:
pairs = clean.map(lambda line : (line[3],line[-1]))

In [28]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [31]:
rekey = pairs.reduceByKey(lambda amt1, amt2 : float(amt1) + float(amt2))

In [32]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [33]:
rekey_n = rekey.filter(lambda line : not line[0]=='State')

In [37]:
rekey_sort = rekey_n.sortBy(lambda stAmount : stAmount[1], ascending=True)

In [36]:
rekey_sort.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [38]:
X = ['Id', 'State', 'Amount']

In [40]:
[a,b, c] = X

In [41]:
b

'State'