In [27]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [28]:
from pyspark import SparkContext
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('darkgrid')
%matplotlib inline

In [31]:
%%writefile example.txt
first line
second line
third line
fourth line

Overwriting example.txt


In [None]:
sc = SparkContext()

In [32]:
textFile = sc.textFile('example.txt')

In [33]:
textFile.first()

'first line'

In [34]:
secFile = textFile.filter(lambda line: 'second' in line)
secFile

PythonRDD[17] at RDD at PythonRDD.scala:53

In [35]:
secFile.collect()

['second line']

In [36]:
secFile.count()

1

In [37]:
square = lambda x: x**2
square(2)

4

In [38]:
rev = lambda x: x[::-1]
rev('hello')

'olleh'

In [39]:
adder = lambda x, y: x+y
adder(1, 4)

5

In [40]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Overwriting example2.txt


In [41]:
text_rdd = sc.textFile('example2.txt')

In [42]:
text_rdd.map(lambda line: line.split()).collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [43]:
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

In [44]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [45]:
services = sc.textFile('services.txt')

In [46]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [47]:
services.map(lambda x: x.split())

PythonRDD[26] at RDD at PythonRDD.scala:53

In [48]:
services.map(lambda x: x.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [50]:
services.map(lambda x: x[1:] if x[0] == '#' else x).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [52]:
cleanserv = services.map(lambda x: x[1:] if x[0] == '#' else x).map(lambda x: x.split())

In [53]:
cleanserv.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [56]:
cleanserv.map(lambda lst: (lst[3], lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [58]:
cleanserv.map(lambda lst: (lst[3], lst[-1])).reduceByKey(lambda amt1, amt2 : float(amt1)+float(amt2)).collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [63]:
cleanserv.map(lambda lst: (lst[3], lst[-1])).reduceByKey(lambda amt1, amt2: float(amt1)+float(amt2)).filter(lambda x: not x[0] == 'State').sortBy(lambda stateAmount: stateAmount[1], ascending=False).collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [64]:
x = ['Id', 'State', 'Amount']

In [65]:
x[-1]

'Amount'

In [66]:
id, st, amt = x
amt

'Amount'