In [1]:
import pyspark

In [2]:
from pyspark import SparkContext
sc=SparkContext()

In [3]:
import IPython
print('pyspark version'+str(sc.version))
print('Ipython version'+str(IPython.__version__))

pyspark version2.4.5
Ipython version7.6.1


## map

In [6]:
#map
x=sc.parallelize([1,2,3])
y=x.map(lambda x:(x,x*x))
print(x.collect())
y.collect()
        

[1, 2, 3]


[(1, 1), (2, 4), (3, 9)]

## flatMap

In [9]:
#flatMap
x=sc.parallelize([1,2,3])
y=x.flatMap(lambda x:(x,x*x,x*x*x)) #maps all the values into single 
print(x.collect())
y.collect()

[1, 2, 3]


[1, 1, 1, 2, 4, 8, 3, 9, 27]

## mapPartitions

In [23]:
#mapPartitions
def f(iterator):
    yield sum(iterator)
    
x=sc.parallelize([1,2,3,4,5,6],2)
y=x.mapPartitions(f)
print(x.getNumPartitions())
print(x.collect())
print(x.glom().collect()) #glom flattens the values in each partition
print(y.glom().collect())

2
[1, 2, 3, 4, 5, 6]
[[1, 2, 3], [4, 5, 6]]
[[6], [15]]


## mapPartitionsWithIndex

In [25]:
#mapPartitionWithIndex
def f(partitionIndex,iterator):
    yield (partitionIndex,sum(iterator))
    
x=sc.parallelize([1,2,3,4,5,6],2)
y=x.mapPartitionsWithIndex(f)
print(x.collect())
print(x.glom().collect())
print(y.glom().collect())

[1, 2, 3, 4, 5, 6]
[[1, 2, 3], [4, 5, 6]]
[[(0, 6)], [(1, 15)]]


## getNumPartitions

In [29]:
#getNumPartitions
x=sc.parallelize([1,2,3,4,5,6],3)
print(x.getNumPartitions()) #object on driver
print(x.glom().collect()) #rdd with partitions

3
[[1, 2], [3, 4], [5, 6]]


## filter

In [40]:
#filter
x=sc.parallelize([1,2,3,4,5,6],2)
y=x.filter(lambda x:x%2==0) #filters the rdd values(even elements) which satisfies the condition
print(x.collect())
print(x.glom().collect())
print(y.collect())
print(y.glom().collect())

[1, 2, 3, 4, 5, 6]
[[1, 2, 3], [4, 5, 6]]
[2, 4, 6]
[[2], [4, 6]]


## distinct

In [41]:
#distinct
x=sc.parallelize([1,2,3,4,5,2,6,3],2)
y=x.distinct() #distinct values of rdd 
print(x.collect())
print(x.glom().collect())
print(y.collect())
print(y.glom().collect())


[1, 2, 3, 4, 5, 2, 6, 3]
[[1, 2, 3, 4], [5, 2, 6, 3]]
[2, 4, 6, 1, 3, 5]
[[2, 4, 6], [1, 3, 5]]


## sample

In [48]:
#sample on partitions
x=sc.parallelize([1,2,3,4,5,2,6,3],2)
y=x.sample(True,0.5) #withReplacement=True, fraction=0.5
print(x.collect())
print(x.glom().collect())
print(y.collect())
print(y.glom().collect())



[1, 2, 3, 4, 5, 2, 6, 3]
[[1, 2, 3, 4], [5, 2, 6, 3]]
[1, 4, 4, 5, 2]
[[1, 4, 4], [5, 2]]


## takeSample

In [65]:
#takeSample
x=sc.parallelize([1,2,3,4,5,2,6,3])
ylist=[x.takeSample(True,4) for i in range(5)] #call takeSample 5 times
print('x: '+str(x.collect()))
for cnt,lst in zip(range(len(ylist)),ylist):
    print('sample:'+str(cnt)+' y: '+str(lst))

x: [1, 2, 3, 4, 5, 2, 6, 3]
sample:0 y: [3, 4, 1, 2]
sample:1 y: [4, 4, 6, 2]
sample:2 y: [5, 2, 5, 2]
sample:3 y: [3, 2, 2, 2]
sample:4 y: [1, 4, 4, 2]


## union

In [66]:
#union
x=sc.parallelize([1,2,3,4])
y=sc.parallelize([1,5,2,6,3])
z=x.union(y)
print(x.collect())
print(y.collect())
print(z.collect())

[1, 2, 3, 4]
[1, 5, 2, 6, 3]
[1, 2, 3, 4, 1, 5, 2, 6, 3]


## intersection

In [67]:
#intersection
x=sc.parallelize([1,2,3,4])
y=sc.parallelize([1,5,2,6,3])
z=x.intersection(y)
print(x.collect())
print(y.collect())
print(z.collect())

[1, 2, 3, 4]
[1, 5, 2, 6, 3]
[1, 2, 3]


## sortByKey

In [69]:
#sortByKey
x=sc.parallelize([(2,22),(1,11),(4,44),(8,88),(3,33)])
y=x.sortByKey(ascending=False)
print(x.collect())
print(y.collect())


[(2, 22), (1, 11), (4, 44), (8, 88), (3, 33)]
[(8, 88), (4, 44), (3, 33), (2, 22), (1, 11)]


## sortBy

In [74]:
#sortBy
def keyGen(val):
    return val[1] #sort using second letter in the words of rdd

x=sc.parallelize(['apple','dog','cat'])
y=x.sortBy(keyGen)
y.collect()


['cat', 'dog', 'apple']

## glom

In [77]:
#glom
x=sc.parallelize([1,2,3,4,8,5],2)
y=x.glom() #x is transformed into partitions
print(x.collect())
print(y.collect())
print(y)


[1, 2, 3, 4, 8, 5]
[[1, 2, 3], [4, 8, 5]]
PythonRDD[260] at collect at <ipython-input-77-a1952f3b2717>:5


## cartesian

In [79]:
#cartesian
x=sc.parallelize([1,2,3,4])
y=sc.parallelize([4,5,2,9])
z=x.cartesian(y)
print(x.collect())
print(y.collect())
z.collect()



[1, 2, 3, 4]
[4, 5, 2, 9]


[(1, 4),
 (1, 5),
 (1, 2),
 (1, 9),
 (2, 4),
 (2, 5),
 (2, 2),
 (2, 9),
 (3, 4),
 (3, 5),
 (3, 2),
 (3, 9),
 (4, 4),
 (4, 5),
 (4, 2),
 (4, 9)]

## groupBy

In [88]:
#groupBy
x=sc.parallelize([1,2,3,4])
y=x.groupBy(lambda x:'A' if (x%2==0) else 'B')
print(x.collect())
print([(j[0],[i for i in j[1]]) for j in y.collect()])

[1, 2, 3, 4]
[('B', [1, 3]), ('A', [2, 4])]


## pipe

In [92]:
#pipe
x=sc.parallelize(['A','aB','C','DA'])
y=x.pipe('grep -i "a"')
print(x.collect())
print(y.collect())

['A', 'aB', 'C', 'DA']
['A', 'aB', 'DA']


## collect

In [93]:
#collect
x=sc.parallelize([1,2,3,4])
y=x.collect()
print(x)
print(y)

ParallelCollectionRDD[323] at parallelize at PythonRDD.scala:195
[1, 2, 3, 4]


## reduce

In [96]:
#reduce
x=sc.parallelize([1,2,3,4])
y=x.reduce(lambda obj,accumulated:accumulated+obj) #computes cumulative sum
print(x.collect())
print(y)

[1, 2, 3, 4]
10


## fold

In [101]:
#fold
x=sc.parallelize([1,2,6,4])
neutral_zero_value=1 #0 for sum, 1 for multiplication
y=x.fold(neutral_zero_value,lambda obj,accumulated:accumulated+obj) #computes cumulative sum
print(x.collect())
print(y)

[1, 2, 6, 4]
18


## aggregate

In [102]:
#aggregate
x=sc.parallelize([1,2,6,4])
neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x
seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) 
combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1]))
y = x.aggregate(neutral_zero_value,seqOp,combOp)  # computes (cumulative sum, cumulative product)
print(x.collect())
print(y)

[1, 2, 6, 4]
(13, 48)


## max, min

In [105]:
#max, min
x=sc.parallelize([1,2,6,4])
y=x.max()
z=x.min()
print(x.collect())
print(y)
print(z)


[1, 2, 6, 4]
6
1


## sum

In [106]:
#sum
x=sc.parallelize([1,2,6,4])
y=x.sum()
print(x.collect())
print(y)

[1, 2, 6, 4]
13


## count

In [108]:
#count
x=sc.parallelize([1,2,6,4])
y=x.count()
print(x.collect())
print(y)

[1, 2, 6, 4]
4


## histogram

In [110]:
#histogram
x=sc.parallelize([1,2,6,4,2])
y=x.histogram(buckets=2)
print(x.collect())
print(y)


[1, 2, 6, 4, 2]
([1.0, 3.5, 6], [3, 2])


## mean

In [112]:
#mean
x=sc.parallelize([12,2,6,4])
y=x.mean()
print(x.collect())
print(y)

[12, 2, 6, 4]
6.0


## variance

In [113]:
#variance
x=sc.parallelize([1,2,6,4])
y=x.variance()
print(x.collect())
print(y)

[1, 2, 6, 4]
3.6875


## stdev

In [114]:
#stdev
x=sc.parallelize([1,2,6,4])
y=x.stdev() #divides by N
print(x.collect())
print(y)

[1, 2, 6, 4]
1.920286436967152


## sampleStdev

In [115]:
#sampleStdev
x=sc.parallelize([1,2,6,4])
y=x.sampleStdev() #divides by N-1
print(x.collect())
print(y)

[1, 2, 6, 4]
2.217355782608345


## sampleVariance

In [117]:
#sampleVariance
x=sc.parallelize([3,2,6,4])
y=x.sampleVariance()#divides by N-1
print(x.collect())
print(y)


[3, 2, 6, 4]
2.9166666666666665


## countByValue

In [120]:
#countByValue
x=sc.parallelize([1,2,6,5,6,2,4])
y=x.countByValue()
print(x.collect())
print(y)

[1, 2, 6, 5, 6, 2, 4]
defaultdict(<class 'int'>, {1: 1, 2: 2, 6: 2, 5: 1, 4: 1})


## top

In [126]:
#top
x=sc.parallelize([2,3,6,8,4])
y=x.top(4) #list of values sorted in desc order
print(x.collect())
print(y)
print(type(y))


[2, 3, 6, 8, 4]
[8, 6, 4, 3]
<class 'list'>


## takeOrdered

In [125]:
#takeOrdered
x=sc.parallelize([5,2,3,6,4])
y=x.takeOrdered(3) #list of values sorted in ascend order
print(x.collect())
print(y)


[5, 2, 3, 6, 4]
[2, 3, 4]


## take

In [127]:
#take
x=sc.parallelize([5,2,3,6,4])
y=x.take(3) #list of first 3 values
print(x.collect())
print(y)

[5, 2, 3, 6, 4]
[5, 2, 3]


## first

In [129]:
#first
x=sc.parallelize([12,2,3,6,4])
y=x.first()
print(x.collect())
print(y)

[12, 2, 3, 6, 4]
12


## collectAsMap

In [132]:
#collectAsMap
x=sc.parallelize([('C',3),('A',1),('B',2)])
y=x.collectAsMap()
print(x.collect())
print(y)


[('C', 3), ('A', 1), ('B', 2)]
{'C': 3, 'A': 1, 'B': 2}


## keys

In [134]:
#keys
x=sc.parallelize([('C',3),('A',1),('B',2)])
y=x.keys()
print(x.collect())
print(y.collect())

[('C', 3), ('A', 1), ('B', 2)]
['C', 'A', 'B']


## values

In [135]:
#values
x=sc.parallelize([('C',3),('A',1),('B',2)])
y=x.values()
print(x.collect())
print(y.collect())

[('C', 3), ('A', 1), ('B', 2)]
[3, 1, 2]


## reduceByKey

In [137]:
#reduceByKey
x=sc.parallelize([('C',3),('A',1),('B',2),('C',4)])
y=x.reduceByKey(lambda x,y:x+y)
print(x.collect())
print(y.collect())


[('C', 3), ('A', 1), ('B', 2), ('C', 4)]
[('A', 1), ('B', 2), ('C', 7)]


## reduceByKeyLocally

In [140]:
#reduceByKeyLocally
x=sc.parallelize([('C',3),('A',1),('B',2),('C',4)])
y=x.reduceByKeyLocally(lambda x,y:x+y)
print(x.collect())
print(y)
print(type(y))

[('C', 3), ('A', 1), ('B', 2), ('C', 4)]
{'C': 7, 'A': 1, 'B': 2}
<class 'dict'>


## countByKey

In [141]:
#countByKey
x=sc.parallelize([('C',3),('A',1),('B',2),('C',4)])
y=x.countByKey()
print(x.collect())
print(y)

[('C', 3), ('A', 1), ('B', 2), ('C', 4)]
defaultdict(<class 'int'>, {'C': 2, 'A': 1, 'B': 1})


## join

In [142]:
#join
x=sc.parallelize([('C',3),('A',1),('B',2),('C',4)])
y=sc.parallelize([('C',2),('A',1),('B',2),('D',4)])
z=x.join(y)
print(x.collect())
print(y.collect())
print(z.collect())

[('C', 3), ('A', 1), ('B', 2), ('C', 4)]
[('C', 2), ('A', 1), ('B', 2), ('D', 4)]
[('B', (2, 2)), ('C', (3, 2)), ('C', (4, 2)), ('A', (1, 1))]


## leftOuterJoin

In [143]:
#leftOuterJoin
x=sc.parallelize([('C',3),('A',1),('B',2),('C',4)])
y=sc.parallelize([('C',2),('A',1),('B',2),('D',4)])
z=x.leftOuterJoin(y)
print(x.collect())
print(y.collect())
print(z.collect())

[('C', 3), ('A', 1), ('B', 2), ('C', 4)]
[('C', 2), ('A', 1), ('B', 2), ('D', 4)]
[('B', (2, 2)), ('C', (3, 2)), ('C', (4, 2)), ('A', (1, 1))]


## rightOuterJoin

In [144]:
#rightOuterJoin
x=sc.parallelize([('C',3),('A',1),('B',2),('C',4)])
y=sc.parallelize([('C',2),('A',1),('B',2),('D',4)])
z=x.rightOuterJoin(y)
print(x.collect())
print(y.collect())
print(z.collect())

[('C', 3), ('A', 1), ('B', 2), ('C', 4)]
[('C', 2), ('A', 1), ('B', 2), ('D', 4)]
[('B', (2, 2)), ('C', (3, 2)), ('C', (4, 2)), ('A', (1, 1)), ('D', (None, 4))]


## partitionBy

In [148]:
#partitionBy
x = sc.parallelize([(0,1),(1,2),(2,3)],2)
y=x.partitionBy(numPartitions=3,partitionFunc=lambda x:x)
print(x.glom().collect())
print(y.glom().collect())

[[(0, 1)], [(1, 2), (2, 3)]]
[[(0, 1)], [(1, 2)], [(2, 3)]]


## combineByKey

In [150]:
#combineByKey
x=sc.parallelize([('C',3),('A',1),('B',2),('C',4)])
createCombiner = (lambda el: [(el,el**2)]) 
mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated
mergeComb = (lambda agg1,agg2: agg1 + agg2 )  # append agg1 with agg2
y = x.combineByKey(createCombiner,mergeVal,mergeComb)
print(x.collect())
print(y.collect())

[('C', 3), ('A', 1), ('B', 2), ('C', 4)]
[('A', [(1, 1)]), ('B', [(2, 4)]), ('C', [(3, 9), (4, 16)])]


## aggregateByKey

In [154]:
#aggregateByKey
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
zeroValue=[]#empty list to append values 
mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated
mergeComb = (lambda agg1,agg2: agg1 + agg2 )  # append agg1 with agg2
y = x.aggregateByKey(zeroValue,mergeVal,mergeComb)
print(x.collect())
print(y.collect())

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
[('B', [(1, 1), (2, 4)]), ('A', [(3, 9), (4, 16), (5, 25)])]


## foldByKey

In [155]:
#foldByKey
x=sc.parallelize([('C',3),('A',1),('B',2),('B',8),('C',4)])
y=x.foldByKey(1,lambda x,y:x*y) #1 is for multiplication
print(x.collect())
print(y.collect())


[('C', 3), ('A', 1), ('B', 2), ('B', 8), ('C', 4)]
[('A', 1), ('B', 16), ('C', 12)]


## groupByKey

In [159]:
#groupByKey
x=sc.parallelize([('C',3),('A',1),('B',2),('B',8),('C',4)])
y=x.groupByKey()
print(x.collect())
print([(j[0],[i for i in j[1]]) for j in y.collect()])

[('C', 3), ('A', 1), ('B', 2), ('B', 8), ('C', 4)]
[('A', [1]), ('B', [2, 8]), ('C', [3, 4])]


## flatMapValues

In [161]:
#flatMapValues
x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
y=x.flatMapValues(lambda x:[(i**2) for i in x])
print(x.collect())
print(y.collect())

[('A', (1, 2, 3)), ('B', (4, 5))]
[('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]


## groupWith

In [162]:
#groupWith
# groupWith
x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])
y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))])
z = sc.parallelize([('D',9),('B',(8,8))])
a = x.groupWith(y,z)
print(x.collect())
print(y.collect())
print(z.collect())
print("Result:")
for key,val in list(a.collect()): 
    print(key, [list(i) for i in val])

[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]
[('B', (7, 7)), ('A', 6), ('D', (5, 5))]
[('D', 9), ('B', (8, 8))]
Result:
A [[2, (1, 1)], [6], []]
D [[], [(5, 5)], [9]]
C [[4], [], []]
B [[(3, 3)], [(7, 7)], [(8, 8)]]


## cogroup

In [163]:
# cogroup
x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])
y = sc.parallelize([('A',8),('B',7),('A',6),('D',(5,5))])
z = x.cogroup(y)
print(x.collect())
print(y.collect())
for key,val in list(z.collect()):
    print(key, [list(i) for i in val])

[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]
[('A', 8), ('B', 7), ('A', 6), ('D', (5, 5))]
B [[(3, 3)], [7]]
C [[4], []]
A [[2, (1, 1)], [8, 6]]
D [[], [(5, 5)]]


## sampleByKey

In [164]:
#sampleByKey
x = sc.parallelize([('A',1),('B',2),('C',3),('B',4),('A',5)])
y = x.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2})
print(x.collect())
print(y.collect())

[('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)]
[('A', 1), ('B', 2), ('C', 3), ('B', 4)]


## keyBy

In [165]:
#keyBy
x=sc.parallelize([5,2,3,6,4])
y=x.keyBy(lambda x:x**2)
print(x.collect())
print(y.collect())

[5, 2, 3, 6, 4]
[(25, 5), (4, 2), (9, 3), (36, 6), (16, 4)]


## coalesce

In [166]:

# coalesce
x = sc.parallelize([1,2,3,4,5],2)
y = x.coalesce(numPartitions=1)
print(x.glom().collect())
print(y.glom().collect())

[[1, 2], [3, 4, 5]]
[[1, 2, 3, 4, 5]]


## zip

In [167]:
#zip
x = sc.parallelize(['B','A','A'])
y = sc.parallelize([1,2,3])
z=x.zip(y)
print(x.collect())
print(y.collect())
print(z.collect())

['B', 'A', 'A']
[1, 2, 3]
[('B', 1), ('A', 2), ('A', 3)]


## zipWithIndex

In [168]:
# zipWithIndex
x = sc.parallelize(['B','A','A'],2)
y = x.zipWithIndex()
print(x.glom().collect())
print(y.collect())

[['B'], ['A', 'A']]
[('B', 0), ('A', 1), ('A', 2)]


## zipWithUniqueId

In [169]:
# zipWithUniqueId
x = sc.parallelize(['B','A','A'],2)
y = x.zipWithUniqueId() #uniqueId=element index * #partitions +partition Index
print(x.glom().collect())
print(y.collect())


[['B'], ['A', 'A']]
[('B', 0), ('A', 1), ('A', 3)]
