![Python Logo](https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/pyspark_logo.jpeg)
This notebook is to introduce how to use Spark API to process data in python.

In [1]:
import pyspark
sc=pyspark.SparkContext()

## (1) map
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.1.map.PNG" width="300" height="300" />

In [2]:
# parallelize creates an RDD from the passed object
x = sc.parallelize([1,2,3])
y = x.map(lambda x: (x,x**2))

# collect copies RDD elements to a list on the driver
print(x.collect())
print(y.collect())

[1, 2, 3]
[(1, 1), (2, 4), (3, 9)]


## (2) flatMap
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.2.flatMap.PNG" width="300" height="300" />

In [3]:
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x,100*x,x**2))
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 100, 1, 2, 200, 4, 3, 300, 9]


## (3)mapPartitions
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.3.mapPartitions.PNG" width="300" height="300" />

In [4]:
x = sc.parallelize([1,2,3],2)
def f(iterator): yield sum(iterator)
y = x.mapPartitions(f)
# glom() falttens elements on the same partition
print(x.glom().collect())
print(y.glom().collect())

[[1], [2, 3]]
[[1], [5]]


## (4) mapPartitionsWithIndex
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.4.mapPartitionsWithIndex.PNG" width="300" height="300" />

In [5]:
x = sc.parallelize([1,2,3],2)
def f(partitionIndex,iterator): yield (partitionIndex, sum(iterator))
y = x.mapPartitionsWithIndex(f)
print(x.glom().collect())
print(y.glom().collect())

[[1], [2, 3]]
[[(0, 1)], [(1, 5)]]


## (5) getNumPartitions
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.5.getNumPartitions.PNG" width="300" height="300" />

In [6]:
y = x.getNumPartitions()
print(x.glom().collect())
print(y)

[[1], [2, 3]]
2


## (6) filter
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.6.filter.PNG" width="300" height="300" />

In [7]:
x = sc.parallelize([1,2,3])
y = x.filter(lambda x:x%2==1) # filter out elements
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 3]


## (7) distinct
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.6.filter.PNG" width="300" height="300" />

In [8]:
x = sc.parallelize(['A','A','B'])
y = x.distinct()
print(x.collect())
print(y.collect())

['A', 'A', 'B']
['A', 'B']


## (8) sample
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.8.sample.PNG" width="300" height="300" />


In [9]:
x = sc.parallelize(range(7))
ylist = [x.sample(withReplacement=False,fraction=0.5) for i in range(5)]
print('x = '+str(x.collect()))
for cnt, y in zip(range(len(ylist)),ylist):
    print('sample: '+str(cnt)+' y = '+str(y.collect()))

x = [0, 1, 2, 3, 4, 5, 6]
sample: 0 y = [4, 5]
sample: 1 y = [0, 1, 5, 6]
sample: 2 y = [2, 4, 6]
sample: 3 y = [4, 5, 6]
sample: 4 y = [1, 2, 6]


## (9) takeSample
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.9.takeSample.PNG" width="300" height="300" />


In [10]:
x = sc.parallelize(range(7))
ylist = [x.takeSample(withReplacement=False,num=3) for i in range(5)]
print('x = '+str(x.collect()))
for cnt, y in zip(range(len(ylist)),ylist):
    print('sample: '+str(cnt)+' y = '+str(y))

x = [0, 1, 2, 3, 4, 5, 6]
sample: 0 y = [0, 4, 3]
sample: 1 y = [4, 5, 2]
sample: 2 y = [0, 5, 4]
sample: 3 y = [6, 0, 2]
sample: 4 y = [2, 5, 4]


## (10) union
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.10.union.PNG" width="300" height="300" />

In [11]:
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['D','C','A'])
z = x.union(y)
print(x.collect())
print(y.collect())
print(z.collect())

['A', 'A', 'B']
['D', 'C', 'A']
['A', 'A', 'B', 'D', 'C', 'A']


## (11) intersection
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.11.intersection.PNG" width="300" height="300" />

In [12]:
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['A','C','D'])
z = x.intersection(y)
print(x.collect())
print(y.collect())
print(z.collect())

['A', 'A', 'B']
['A', 'C', 'D']
['A']


## (12) sortByKey
<img src="https://raw.githubusercontent.com/WistariaDing/SparkWithPython/master/Picture/1.12.sortByKey.PNG" width="300" height="300" />

In [13]:
x = sc.parallelize([('B',1),('A',2),('C',3)])
y = x.sortByKey()
print(x.collect())
print(y.collect())

[('B', 1), ('A', 2), ('C', 3)]
[('A', 2), ('B', 1), ('C', 3)]
