In [1]:
import pyspark
import pyspark.sql.functions as F

In [2]:
sc = pyspark.SparkContext()
sc

In [3]:
sc.pythonVer

'3.6'

# Load data

In [4]:
rdd = sc.parallelize([1,2,3,4,5])
rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [5]:
rdd2 = sc.textFile('auto-mpg.csv')
rdd2.collect()[0:5]

['mpg,cyl,displ,hp,weight,accel,yr,origin,name,color,size,marker',
 '18.0,6,250.0,88,3139,14.5,71,US,ford mustang,red,27.370336111111108,o',
 '9.0,8,304.0,193,4732,18.5,70,US,hi 1200d,green,62.199511111111114,o',
 '36.1,4,91.0,60,1800,16.4,78,Asia,honda civic cvcc,blue,9.0,x',
 '18.5,6,250.0,98,3525,19.0,77,US,ford granada,red,34.515625,o']

## Functions

In [6]:
double = lambda x: x*2

In [7]:
double(2)

4

In [8]:
items = [1,2,3,4]
items

[1, 2, 3, 4]

In [9]:
list(map(double, items))

[2, 4, 6, 8]

In [10]:
list(filter(lambda x: x%2==0, items))

[2, 4]

# 2. RDD

In [11]:
helloRdd = sc.parallelize('Hello world!')
helloRdd

ParallelCollectionRDD[3] at parallelize at PythonRDD.scala:195

In [12]:
type(helloRdd)

pyspark.rdd.RDD

In [13]:
fileRdd = sc.textFile('data/5000_points.txt')
type(fileRdd)

pyspark.rdd.RDD

In [14]:
fileRdd

data/5000_points.txt MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0

In [15]:
numRdd = sc.parallelize(range(10), numSlices=6)
numRdd

PythonRDD[7] at RDD at PythonRDD.scala:53

In [16]:
fileRdd = sc.textFile('data/5000_points.txt', minPartitions=6)
fileRdd

data/5000_points.txt MapPartitionsRDD[9] at textFile at NativeMethodAccessorImpl.java:0

# 2.1. RDD operations

In [17]:
rdd = sc.parallelize([1,2,3,4])
rdd_map = rdd.map(lambda x: x*x)
rdd_map.collect()

[1, 4, 9, 16]

In [18]:
rdd_filter = rdd.filter(lambda x: x>2)
rdd_filter.collect()

[3, 4]

In [19]:
rdd = sc.parallelize(['Hello world', 'How are you'])
rdd_map = rdd.map(lambda x: x.split(" "))
rdd_map.collect()

[['Hello', 'world'], ['How', 'are', 'you']]

In [20]:
rdd = sc.parallelize(['Hello world', 'How are you'])
rdd_flatmap = rdd.flatMap(lambda x: x.split(" "))
rdd_flatmap.collect()

['Hello', 'world', 'How', 'are', 'you']

In [21]:
inputRdd = sc.textFile('data/spam.txt')
inputRdd_month = inputRdd.filter(lambda x: 'month' in x.split())
inputRdd_month.collect()

['From next month get upto 50% More Calls 4 Ur standard network charge 2 activate Call 9061100010 C Wire3.net 1st4Terms PoBox84 M26 3UZ Cost Â£1.50 min MobcudB more']

In [22]:
inputRdd_rental = inputRdd.filter(lambda x: 'rental' in x.split())
inputRdd_rental.collect()

['4mths half price Orange line rental & latest camera phones 4 FREE. Had your phone 11mths+? Call MobilesDirect free on 08000938767 to update now! or2stoptxt T&Cs',
 'Free video camera phones with Half Price line rental for 12 mths and 500 cross ntwk mins 100 txts. Call MobileUpd8 08001950382 or Call2OptOut/674',
 '4mths half price Orange line rental & latest camera phones 4 FREE. Had your phone 11mths ? Call MobilesDirect free on 08000938767 to update now! or2stoptxt',
 'Want a new Video Phone? 750 anytime any network mins? Half price line rental free text for 3 months? Reply or call 08000930705 for free delivery',
 'Free video camera phones with Half Price line rental for 12 mths and 500 cross ntwk mins 100 txts. Call MobileUpd8 08001950382 or Call2OptOut/674&',
 '4mths half price Orange line rental & latest camera phones 4 FREE. Had your phone 11mths ? Call MobilesDirect free on 08000938767 to update now! or2stoptxt']

In [23]:
inputRdd_combined = inputRdd_month.union(inputRdd_rental)
inputRdd_combined.collect()

['From next month get upto 50% More Calls 4 Ur standard network charge 2 activate Call 9061100010 C Wire3.net 1st4Terms PoBox84 M26 3UZ Cost Â£1.50 min MobcudB more',
 '4mths half price Orange line rental & latest camera phones 4 FREE. Had your phone 11mths+? Call MobilesDirect free on 08000938767 to update now! or2stoptxt T&Cs',
 'Free video camera phones with Half Price line rental for 12 mths and 500 cross ntwk mins 100 txts. Call MobileUpd8 08001950382 or Call2OptOut/674',
 '4mths half price Orange line rental & latest camera phones 4 FREE. Had your phone 11mths ? Call MobilesDirect free on 08000938767 to update now! or2stoptxt',
 'Want a new Video Phone? 750 anytime any network mins? Half price line rental free text for 3 months? Reply or call 08000930705 for free delivery',
 'Free video camera phones with Half Price line rental for 12 mths and 500 cross ntwk mins 100 txts. Call MobileUpd8 08001950382 or Call2OptOut/674&',
 '4mths half price Orange line rental & latest camera phon

In [24]:
inputRdd_combined.take(1)

['From next month get upto 50% More Calls 4 Ur standard network charge 2 activate Call 9061100010 C Wire3.net 1st4Terms PoBox84 M26 3UZ Cost Â£1.50 min MobcudB more']

In [25]:
inputRdd_combined.first()

'From next month get upto 50% More Calls 4 Ur standard network charge 2 activate Call 9061100010 C Wire3.net 1st4Terms PoBox84 M26 3UZ Cost Â£1.50 min MobcudB more'

In [26]:
inputRdd_combined.count()

7

# 2.2. Pair RDDs

In [27]:
myTuple = sc.parallelize([('sam', 23), ('mary', 34), ('peter', 25)])
myTuple.collect()

[('sam', 23), ('mary', 34), ('peter', 25)]

#### map

In [28]:
myList = sc.parallelize(['sam 23', 'mary 34', 'peter 25'])
pairedRdd = myList.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))
pairedRdd.collect()

[('sam', '23'), ('mary', '34'), ('peter', '25')]

#### reduceByKey

In [29]:
rdd = sc.parallelize([('a', 23), ('b', 2), ('a', 40)])
rddReducedKey = rdd.reduceByKey(lambda x,y: x+y)
rddReducedKey.collect()

[('a', 63), ('b', 2)]

#### sortByKey

In [30]:
rddReducedKey.sortByKey(ascending=False).collect()

[('b', 2), ('a', 63)]

#### groupByKey

In [31]:
airportsRdd = sc.parallelize([('US','JFK'), ('UK','LHR'), ('FR','CDG'), ('US', 'SFO')])
airportsRddGrouped = airportsRdd.groupByKey().collect()
airportsRddGrouped

[('FR', <pyspark.resultiterable.ResultIterable at 0x7f48900f1358>),
 ('UK', <pyspark.resultiterable.ResultIterable at 0x7f48900f1390>),
 ('US', <pyspark.resultiterable.ResultIterable at 0x7f48900f1438>)]

In [32]:
for cont, air in airportsRddGrouped:
    print(cont, list(air))

FR ['CDG']
UK ['LHR']
US ['JFK', 'SFO']


#### join

In [33]:
rdd1 = sc.parallelize([('messi', 34), ('ronaldo', 32), ('neymar', 24)])
rdd2 = sc.parallelize([('ronaldo', 80), ('neymar', 120), ('messi', 100)])
rdd1.join(rdd2).collect()

[('ronaldo', (32, 80)), ('neymar', (24, 120)), ('messi', (34, 100))]

#### reduce

In [34]:
rdd = sc.parallelize([1,3,4,6])
rdd.reduce(lambda x,y: x+y)

14

#### saveAsTextFile

In [35]:
rdd.saveAsTextFile('data/tempFile')

In [36]:
rdd.coalesce(1).saveAsTextFile('data/tempFile2')

#### countByKey

In [37]:
rdd = sc.parallelize([('a',1), ('b',1), ('a',1)])

for key, val in rdd.countByKey().items():
    print(key, val)

a 2
b 1


#### collectAsMap

In [38]:
sc.parallelize([(1,2), (3,4)]).collectAsMap()

{1: 2, 3: 4}

# 3. Dataframe

In [39]:
iphonesRdd = sc.parallelize([
    ('xs', 2018, 5.65, 2.79, 6.24),
    ('xr', 2018, 5.94, 2.98, 6.84),
    ('x10', 2017, 5.65, 2.78, 6.13),
    ('8plus', 2017, 6.23, 3.07, 7.12),
])

In [40]:
names = ['model','year','height','width','weight']

In [41]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [42]:
iphonesDf = spark.createDataFrame(iphonesRdd, schema=names)
type(iphonesDf)

pyspark.sql.dataframe.DataFrame

In [43]:
iphonesDf.toPandas()

Unnamed: 0,model,year,height,width,weight
0,xs,2018,5.65,2.79,6.24
1,xr,2018,5.94,2.98,6.84
2,x10,2017,5.65,2.78,6.13
3,8plus,2017,6.23,3.07,7.12


In [44]:
df = spark.read.csv('data/people.csv', header=True, inferSchema=True)
df.toPandas().head()

Unnamed: 0,_c0,person_id,name,sex,date of birth
0,0,100,Penelope Lewis,female,1990-08-31
1,1,101,David Anthony,male,1971-10-14
2,2,102,Ida Shipp,female,1962-05-24
3,3,103,Joanna Moore,female,2017-03-10
4,4,104,Lisandra Ortiz,female,2020-08-05


In [45]:
df2 = spark.read.text('data/5000_points.txt')
type(df2)

pyspark.sql.dataframe.DataFrame

In [46]:
df2.toPandas().head()

Unnamed: 0,value
0,664159\t550946
1,665845\t557965
2,597173\t575538
3,618600\t551446
4,635690\t608046


# 3.1. Interacting with Pyspark df

In [47]:
df2.show(3)

+-------------+
|        value|
+-------------+
|664159	550946|
|665845	557965|
|597173	575538|
+-------------+
only showing top 3 rows



In [48]:
df.filter(df.sex == 'male').show(3)

+---+---------+-------------+----+-------------+
|_c0|person_id|         name| sex|date of birth|
+---+---------+-------------+----+-------------+
|  1|      101|David Anthony|male|   1971-10-14|
|  5|      105|David Simmons|male|   1999-12-30|
|  6|      106|Edward Hudson|male|   1983-05-09|
+---+---------+-------------+----+-------------+
only showing top 3 rows



In [49]:
df.groupBy('sex').count().orderBy('sex').show()

+------+-----+
|   sex|count|
+------+-----+
|  null| 1920|
|female|49014|
|  male|49066|
+------+-----+



In [50]:
df.count()

100000

In [51]:
df.dropDuplicates().count()

100000

In [52]:
dfT = df.withColumnRenamed('date of birth', 'date_of_birth')
dfT.toPandas().head()

Unnamed: 0,_c0,person_id,name,sex,date_of_birth
0,0,100,Penelope Lewis,female,1990-08-31
1,1,101,David Anthony,male,1971-10-14
2,2,102,Ida Shipp,female,1962-05-24
3,3,103,Joanna Moore,female,2017-03-10
4,4,104,Lisandra Ortiz,female,2020-08-05


In [53]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- date of birth: string (nullable = true)



In [54]:
df.columns

['_c0', 'person_id', 'name', 'sex', 'date of birth']

In [55]:
df.describe().show()

+-------+-----------------+-----------------+-------------+------+-------------+
|summary|              _c0|        person_id|         name|   sex|date of birth|
+-------+-----------------+-----------------+-------------+------+-------------+
|  count|           100000|           100000|       100000| 98080|       100000|
|   mean|          49999.5|          50099.5|         null|  null|         null|
| stddev|28867.65779668774|28867.65779668774|         null|  null|         null|
|    min|                0|              100|Aaron Addesso|female|   1899-08-28|
|    max|            99999|           100099|  Zulma Biggs|  male|   2084-11-17|
+-------+-----------------+-----------------+-------------+------+-------------+



# 3.2. PySpark SQL

In [56]:
from pyspark import sql
df.createOrReplaceTempView('table1')

In [57]:
spark.sql("select name from table1 limit 5").collect()

[Row(name='Penelope Lewis'),
 Row(name='David Anthony'),
 Row(name='Ida Shipp'),
 Row(name='Joanna Moore'),
 Row(name='Lisandra Ortiz')]

In [58]:
query = '''select person_id from table1 limit 5'''
spark.sql(query).show()

+---------+
|person_id|
+---------+
|      100|
|      101|
|      102|
|      103|
|      104|
+---------+



In [59]:
query = '''select sex, count(*) from table1 group by sex'''
spark.sql(query).show()

+------+--------+
|   sex|count(1)|
+------+--------+
|  null|    1920|
|female|   49014|
|  male|   49066|
+------+--------+



# 3.3. Data visualization

In [60]:
df.limit(10).toPandas().hist('_c0')

array([[<matplotlib.axes._subplots.AxesSubplot object at 0x7f4870cf6d68>]],
      dtype=object)

# 4. PySpark MLlib

In [61]:
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.clustering import KMeans

# 4.1. Collaborative filtering

In [62]:
from pyspark.mllib.recommendation import Rating

In [63]:
r = Rating(user=1, product=2, rating=5.0)
r

Rating(user=1, product=2, rating=5.0)

#### randomSplit()

In [64]:
data = sc.parallelize(list(range(1,11)))
training, testing = data.randomSplit([0.6, 0.4])
training.collect()

[2, 4, 5, 9]

In [65]:
testing.collect()

[1, 3, 6, 7, 8, 10]

#### Ranking & ALS

In [66]:
r1 = Rating(1, 1, 1.0)
r2 = Rating(1, 2, 2.0)
r3 = Rating(2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
ratings.collect()

[Rating(user=1, product=1, rating=1.0),
 Rating(user=1, product=2, rating=2.0),
 Rating(user=2, product=1, rating=2.0)]

In [67]:
model = ALS.train(ratings, rank=10, iterations=10)

In [68]:
unratedRdd = sc.parallelize([(1,2), (1,1)])
predictions = model.predictAll(unratedRdd)
predictions.collect()

[Rating(user=1, product=1, rating=1.0000152952292287),
 Rating(user=1, product=2, rating=1.9890391497393216)]

#### MSE

In [69]:
rates = ratings.map(lambda x: ((x[0], x[1]), x[2]))
rates.collect()

[((1, 1), 1.0), ((1, 2), 2.0), ((2, 1), 2.0)]

In [70]:
preds = predictions.map(lambda x: ((x[0], x[1]), x[2]))
preds.collect()

[((1, 1), 1.0000152952292287), ((1, 2), 1.9890391497393216)]

In [71]:
rates_preds = rates.join(preds)
rates_preds.collect()

[((1, 2), (2.0, 1.9890391497393216)), ((1, 1), (1.0, 1.0000152952292287))]

In [72]:
MSE = rates_preds.map(lambda r: (r[1][0]-r[1][1])**2).mean()
MSE

6.007023619052534e-05

# 4.2. Classification

In [73]:
from pyspark.mllib.linalg import Vectors

In [74]:
denseVec = Vectors.dense([1,2,3])
denseVec

DenseVector([1.0, 2.0, 3.0])

In [75]:
sparseVec = Vectors.sparse(4, {1:1, 3:5.5})
sparseVec

SparseVector(4, {1: 1.0, 3: 5.5})

In [76]:
from pyspark.mllib.regression import LabeledPoint

In [77]:
positive = LabeledPoint(1, [1,0,3])
positive

LabeledPoint(1.0, [1.0,0.0,3.0])

In [78]:
negative = LabeledPoint(0, [2,1,1])
negative

LabeledPoint(0.0, [2.0,1.0,1.0])

In [79]:
from pyspark.mllib.feature import HashingTF

In [80]:
sentence = 'hello hello world'
words = sentence.split()
tf = HashingTF(10000)
tf.transform(words)

SparseVector(10000, {1658: 2.0, 9282: 1.0})

In [81]:
data = [
    LabeledPoint(0, (0,1)),
    LabeledPoint(1, (1,0)),
]

rdd = sc.parallelize(data)
model = LogisticRegressionWithLBFGS.train(rdd)

In [82]:
model.predict([1,0])

1

In [83]:
model.predict([0,1])

0

# 4.3. Clustering

In [84]:
rdd = sc.textFile('data/ratings.csv')\
    .map(lambda x: x.split(','))\
    .map(lambda x: [float(x[0]), float(x[1])])

In [85]:
rdd.take(5)

[[1.0, 31.0], [1.0, 1029.0], [1.0, 1061.0], [1.0, 1129.0], [1.0, 1172.0]]

In [86]:
from pyspark.mllib.clustering import KMeans

In [87]:
model = KMeans.train(rdd, k=2, maxIterations=10)
model.clusterCenters

[array([ 346.72793537, 3360.20190866]),
 array([  348.97470863, 76212.25925632])]

In [88]:
from math import sqrt

In [89]:
def error(point):
    center = model.centers[model.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

In [90]:
WSSE = rdd.map(lambda point: error(point)).reduce(lambda x,y: x+y)
print('Within set sum of square error = ' + str(WSSE))

Within set sum of square error = 518638526.0983816
