In [5]:
from pyspark.sql import SparkSession
import findspark
findspark.init()


spark = (SparkSession
         .builder
         .appName("pyspark_tutorial")
         .master("local")
         # .config("hive.merge.mapfiles", "false")
         # .config("hive.merge.tezfiles", "false")
         # .config("parquet.enable.summary-metadata", "false")
         # .config("spark.sql.parquet.mergeSchema","false")
         # .config("hive.merge.smallfiles.avgsize", "160000000")
         # .config("hive.exec.dynamic.partition", "true")
         # .config("hive.exec.dynamic.partition.mode", "nonstrict")
         # .config("spark.sql.orc.impl", "native")
         # .config("spark.sql.parquet.binaryAsString","true")
         # .config("spark.sql.parquet.writeLegacyFormat","true")
         # .config(“spark.sql.streaming.checkpointLocation”, “hdfs://pp/apps/hive/warehouse/dev01_landing_initial_area.db”)
         .enableHiveSupport()
         .getOrCreate()
         )


In [6]:
spark


In [8]:
sc = spark.sparkContext


## Add-indices

In [9]:
a = [('g1', 2), ('g2', 4), ('g3', 3), ('g4', 8)]
rdd = sc.parallelize(a)
rdd.collect()


[('g1', 2), ('g2', 4), ('g3', 3), ('g4', 8)]

In [10]:
sorted = rdd.sortByKey()
sorted.collect()


[('g1', 2), ('g2', 4), ('g3', 3), ('g4', 8)]

In [23]:
#rdd2 = rdd.map(lambda (x,y) : (y,x))
rdd2 = rdd.map(lambda xy: (xy[1], xy[0]))

rdd2.collect()


[(2, 'g1'), (4, 'g2'), (3, 'g3'), (8, 'g4')]

In [24]:
sorted = rdd2.sortByKey()
sorted.collect()

[(2, 'g1'), (3, 'g3'), (4, 'g2'), (8, 'g4')]

In [25]:
# decresing 
sorted = rdd2.sortByKey(False)
sorted.collect()

[(8, 'g4'), (4, 'g2'), (3, 'g3'), (2, 'g1')]

In [26]:
indices = sorted.zipWithIndex()
indices.collect()

[((8, 'g4'), 0), ((4, 'g2'), 1), ((3, 'g3'), 2), ((2, 'g1'), 3)]

In [27]:
nums = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 20])
nums.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 20]

## Average

In [28]:
sumAndCount = nums.map(lambda x: (x, 1)).fold((0, 0), (lambda x, y: (x[0] + y[0], x[1] + y[1])))

sumAndCount

(56, 9)

In [29]:
avg = float(sumAndCount[0]) / float(sumAndCount[1])
avg

6.222222222222222

## - Filter

In [30]:
filtered1 = nums.filter(lambda x : x % 2 == 1)
filtered1.collect()

[1, 3, 5, 7]

In [31]:
filtered2 = nums.filter(lambda x : x % 2 == 0)
filtered2.collect()

[2, 4, 6, 8, 20]

##  Join

In [36]:
%%bash
cat > R.txt
k1,v1
k1,v2
k2,v3
k2,v4
k3,v7
k3,v8
k3,v9



In [37]:
%%bash

cat > S.txt
k1,v11
k1,v22
k1,v33
k2,v55
k4,v77
k5,v88

In [39]:
R = sc.textFile("R.txt")
R.collect()

['k1,v1', 'k1,v2', 'k2,v3', 'k2,v4', 'k3,v7', 'k3,v8', 'k3,v9', '']

In [40]:

S = sc.textFile("S.txt");
S.collect()

['k1,v11', 'k1,v22', 'k1,v33', 'k2,v55', 'k4,v77', 'k5,v88']

In [41]:
r1 = R.map(lambda s: s.split(","))
r1.collect()

[['k1', 'v1'],
 ['k1', 'v2'],
 ['k2', 'v3'],
 ['k2', 'v4'],
 ['k3', 'v7'],
 ['k3', 'v8'],
 ['k3', 'v9'],
 ['']]

In [48]:
## !!!!! empty list will cause an error !!!!!

filtered_rdd = r1.filter(lambda x: x != [''])

In [49]:
r2 = filtered_rdd.flatMap(lambda s: [(s[0], s[1])])
r2.collect()

[('k1', 'v1'),
 ('k1', 'v2'),
 ('k2', 'v3'),
 ('k2', 'v4'),
 ('k3', 'v7'),
 ('k3', 'v8'),
 ('k3', 'v9')]

In [50]:
s1 = S.map(lambda s: s.split(","))
s1.collect()

[['k1', 'v11'],
 ['k1', 'v22'],
 ['k1', 'v33'],
 ['k2', 'v55'],
 ['k4', 'v77'],
 ['k5', 'v88']]

In [51]:
s2 = s1.flatMap(lambda s: [(s[0], s[1])])
s2.collect()

[('k1', 'v11'),
 ('k1', 'v22'),
 ('k1', 'v33'),
 ('k2', 'v55'),
 ('k4', 'v77'),
 ('k5', 'v88')]

In [52]:
RjoinedS = r2.join(s2)
RjoinedS.collect()

[('k2', ('v3', 'v55')),
 ('k2', ('v4', 'v55')),
 ('k1', ('v1', 'v11')),
 ('k1', ('v1', 'v22')),
 ('k1', ('v1', 'v33')),
 ('k1', ('v2', 'v11')),
 ('k1', ('v2', 'v22')),
 ('k1', ('v2', 'v33'))]

##  Map

In [53]:
nums = sc.parallelize([1, 2, 3, 4, 5])
nums.collect()

[1, 2, 3, 4, 5]

In [54]:
bytwo = nums.map(lambda x: x + 2)
bytwo.collect()

[3, 4, 5, 6, 7]

In [55]:
squared = nums.map(lambda x: x * x)
squared.collect()

[1, 4, 9, 16, 25]

##  Multiply 

In [57]:
numbers = sc.parallelize([1, 2, 3, 4])
mult = numbers.fold(1, (lambda x, y: x * y))
mult

24

##  Sort

In [59]:
%%bash

cat >> data.txt
crazy crazy fox jumped
crazy fox jumped
fox is fast
fox is smart
dog is smart

In [60]:
lines = sc.textFile('data.txt', 1);
lines.collect()

['crazy crazy fox jumped',
 'crazy fox jumped',
 'fox is fast',
 'fox is smart',
 'dog is smart']

In [61]:
frequencies = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
frequencies.collect()

[('crazy', 3),
 ('fox', 4),
 ('jumped', 2),
 ('is', 3),
 ('fast', 1),
 ('smart', 2),
 ('dog', 1)]

In [62]:
frequencies.count()

7

In [63]:
sorted = frequencies.sortByKey()
sorted.collect()

[('crazy', 3),
 ('dog', 1),
 ('fast', 1),
 ('fox', 4),
 ('is', 3),
 ('jumped', 2),
 ('smart', 2)]

In [64]:
sortedDescending = frequencies.sortByKey(False)
sortedDescending.collect()

[('smart', 2),
 ('jumped', 2),
 ('is', 3),
 ('fox', 4),
 ('fast', 1),
 ('dog', 1),
 ('crazy', 3)]

##  Sum


In [65]:
numbers = sc.parallelize([1, 2, 3, 4])
sum = numbers.fold(0, (lambda x, y: x + y))

sum


10

## Union 

In [66]:
d1= [('k1', 1), ('k2', 2), ('k3', 5)]
d1

[('k1', 1), ('k2', 2), ('k3', 5)]

In [67]:
d2= [('k1', 3), ('k2',4), ('k4', 8)]
d2 

[('k1', 3), ('k2', 4), ('k4', 8)]

In [69]:
rdd1 = sc.parallelize(d1)
rdd1.collect()


[('k1', 1), ('k2', 2), ('k3', 5)]

In [70]:

rdd2 = sc.parallelize(d2)
rdd2.collect()

[('k1', 3), ('k2', 4), ('k4', 8)]

In [72]:
rdd3 = rdd1.union(rdd2)
rdd3.collect()

[('k1', 1), ('k2', 2), ('k3', 5), ('k1', 3), ('k2', 4), ('k4', 8)]

In [73]:
rdd4 = rdd3.reduceByKey(lambda x,y: x+y)
rdd4.collect()

[('k2', 6), ('k4', 8), ('k1', 4), ('k3', 5)]

## Bigrams

In [76]:
lines = sc.textFile("data.txt")
lines.collect()

['crazy crazy fox jumped',
 'crazy fox jumped',
 'fox is fast',
 'fox is smart',
 'dog is smart']

In [77]:
bigrams = lines.map(lambda s : s.split(" ")).flatMap(lambda s: [((s[i],s[i+1]),1) for i in range (0, len(s)-1)])

In [78]:
 bigrams.collect()

[(('crazy', 'crazy'), 1),
 (('crazy', 'fox'), 1),
 (('fox', 'jumped'), 1),
 (('crazy', 'fox'), 1),
 (('fox', 'jumped'), 1),
 (('fox', 'is'), 1),
 (('is', 'fast'), 1),
 (('fox', 'is'), 1),
 (('is', 'smart'), 1),
 (('dog', 'is'), 1),
 (('is', 'smart'), 1)]

In [79]:
counts = bigrams.reduceByKey(lambda x, y : x+y)
counts.collect()

[(('crazy', 'crazy'), 1),
 (('crazy', 'fox'), 2),
 (('fox', 'jumped'), 2),
 (('fox', 'is'), 2),
 (('is', 'fast'), 1),
 (('is', 'smart'), 2),
 (('dog', 'is'), 1)]

## cartisian

In [80]:
a = [('k1','v1'), ('k2', 'v2')]
a

[('k1', 'v1'), ('k2', 'v2')]

In [81]:
b = [('k3','v3'), ('k4', 'v4'), ('k5', 'v5') ]
b

[('k3', 'v3'), ('k4', 'v4'), ('k5', 'v5')]

In [82]:
rdd1= sc.parallelize(a)
rdd1.collect()

[('k1', 'v1'), ('k2', 'v2')]

In [83]:
rdd2= sc.parallelize(b)
rdd2.collect()

[('k3', 'v3'), ('k4', 'v4'), ('k5', 'v5')]

In [84]:
rdd3 = rdd1.cartesian(rdd2)
rdd3.collect()

[(('k1', 'v1'), ('k3', 'v3')),
 (('k1', 'v1'), ('k4', 'v4')),
 (('k1', 'v1'), ('k5', 'v5')),
 (('k2', 'v2'), ('k3', 'v3')),
 (('k2', 'v2'), ('k4', 'v4')),
 (('k2', 'v2'), ('k5', 'v5'))]

## Combine-by-key

In [85]:
input = [("k1", 1), ("k1", 2), ("k1", 3), ("k1", 4), ("k1", 5), 
             ("k2", 6), ("k2", 7), ("k2", 8), 
             ("k3", 10), ("k3", 12)]
rdd = sc.parallelize(input)

In [86]:
sumCount = rdd.combineByKey( 
                                (lambda x: (x, 1)), 
                                (lambda x, y: (x[0] + y, x[1] + 1)), 
                                (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
                               )
sumCount.collect()

[('k1', (15, 5)), ('k2', (21, 3)), ('k3', (22, 2))]

In [87]:
avg = sumCount.mapValues( lambda v : v[0] / v[1])
avg.collect()

[('k1', 3.0), ('k2', 7.0), ('k3', 11.0)]