In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext("local",'app')
spark = SparkSession.builder.appName('name').config('spark.sql.shuffle.partitions',10).getOrCreate()

### Finding Prime Numbers

In [4]:
n = 5000
allnumbers = sc.parallelize(xrange(2, n), 8).cache()
# the key is to repartition the result of flatMap to balance the partition
composite = allnumbers.flatMap(lambda x: xrange(x*2, n, x)).repartition(8) #
# subtract is set operation
prime = allnumbers.subtract(composite)
print prime.take(10)

[17, 401537, 462641, 97, 47137, 113, 43649, 467009, 193, 488833]


In [2]:
# Find the number of elements in each parttion
def partitionsize(it): 
    s = 0
    for i in it:
        s += 1
    yield s
# mapPartitions(f)  function usage
print allnumbers.mapPartitions(partitionsize).collect()
print composite.mapPartitions(partitionsize).collect()
print prime.mapPartitions(partitionsize).collect()
print prime.glom().collect()[1][0:4]

[62499, 62500, 62500, 62500, 62499, 62500, 62500, 62500]
[5216986, 254759, 104166, 62499, 0, 0, 0, 0]
[0, 5169, 1, 5219, 0, 5206, 0, 5189, 0, 5165, 0, 5199, 0, 5191, 0, 5199]
[17, 401537, 462641, 97]


### Data Partitioning

In [6]:
data = [8, 96, 240, 400, 401, 800]
rdd = sc.parallelize(zip(data, data),4)
print rdd.partitioner
print "rdd original glom:", rdd.glom().collect() #have not defined partitioning method
rdd = rdd.reduceByKey(lambda x,y: x+y)
print rdd.glom().collect()  # hash method
print rdd.partitioner.partitionFunc
rdd = rdd.sortByKey()  # range method
print rdd.glom().collect()
print rdd.partitioner.partitionFunc

None
rdd original glom: [[(8, 8)], [(96, 96), (240, 240)], [(400, 400)], [(401, 401), (800, 800)]]
[[(8, 8), (96, 96), (400, 400), (240, 240), (800, 800)], [(401, 401)], [], []]
<function portable_hash at 0x1095cd9b0>
[[(8, 8), (96, 96)], [(240, 240), (400, 400)], [(401, 401)], [(800, 800)]]
<function rangePartitioner at 0x109831848>


In [11]:
# reduceByKey will use hash partitioner
# join result will propagate the partitioner
a = sc.parallelize(zip(range(10000), range(10000)), 8)
b = sc.parallelize(zip(range(10000), range(10000)), 8)
print a.partitioner
a = a.reduceByKey(lambda x,y: x+y)
print a.partitioner.partitionFunc
b = b.reduceByKey(lambda x,y: x+y)
c = a.join(b)
print c.getNumPartitions()
print c.partitioner.partitionFunc
print c.glom().first()[0:4]

None
<function portable_hash at 0x7f5634d03f50>
8
<function portable_hash at 0x7f5634d03f50>
[(0, (0, 0)), (2048, (2048, 2048)), (1432, (1432, 1432)), (2056, (2056, 2056))]


In [13]:
# A 'real' example from SF Express
# Prepare three relational tables
#generate the tables
from pyspark.sql.functions import *
num_waybills = 1000000
num_customers = 1000

rdd = sc.parallelize((i, ) for i in xrange(num_waybills))
waybills = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias('waybill'), 
                                             floor(rand()*num_customers).alias('customer')) \
                .groupBy('waybill').max('customer').withColumnRenamed('max(customer)','customer')\
                .cache()
waybills.show()
print waybills.count()

rdd = sc.parallelize((i, i) for i in xrange(num_customers))
customers = spark.createDataFrame(rdd, ['customer', 'phone']).cache()
customers.show()
print customers.count()

rdd = sc.parallelize((i, ) for i in xrange(num_waybills))
waybill_status = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias('waybill'), 
                                                   floor(rand()*10).alias('version')) \
                      .groupBy('waybill').max('version').cache()
waybill_status.show()
print waybill_status.count()

+-------+--------+
|waybill|customer|
+-------+--------+
| 615970|     283|
| 590938|     887|
| 113890|     428|
| 754476|     467|
| 333326|     702|
| 617912|     491|
| 629735|     103|
| 865840|     166|
| 550506|     673|
| 623712|     951|
| 276337|     847|
| 648179|     952|
| 747203|     931|
| 578562|     279|
|  42006|     919|
| 869259|     648|
| 338595|     274|
| 155656|     523|
| 107835|     889|
| 934391|     804|
+-------+--------+
only showing top 20 rows

632470
+--------+-----+
|customer|phone|
+--------+-----+
|       0|    0|
|       1|    1|
|       2|    2|
|       3|    3|
|       4|    4|
|       5|    5|
|       6|    6|
|       7|    7|
|       8|    8|
|       9|    9|
|      10|   10|
|      11|   11|
|      12|   12|
|      13|   13|
|      14|   14|
|      15|   15|
|      16|   16|
|      17|   17|
|      18|   18|
|      19|   19|
+--------+-----+
only showing top 20 rows

1000
+-------+------------+
|waybill|max(version)|
+-------+------------+
| 4

In [9]:
waybills.join(waybill_status, 'waybill').count()

41

In [14]:
# We want to join 3 tables together.
# Knowing how each table is partitioned helps optimize the join order.
import time

start = time.clock()
code1=waybills.join(customers, 'customer').join(waybill_status, 'waybill')
elapsed = (time.clock() - start)
print("Time used:",elapsed)

start = time.clock()
code2=waybills.join(waybill_status, 'waybill').join(customers, 'customer')
elapsed = (time.clock() - start)
print("Time used:",elapsed)

('Time used:', 0.004304000000000308)
('Time used:', 0.0027500000000006963)


In [9]:
def partitionsize(it): yield len(list(it))
    
n = 40000

def f(x):
    return x / (n/8)

data1 = range(0, n, 16) + range(0, n, 16)
data2 = range(0, n, 8)
rdd1 = sc.parallelize(zip(data1, data2), 8)   
rdd1 = rdd1.reduceByKey(lambda x,y: x+y)  #hash partitioner
print rdd1.mapPartitions(partitionsize).collect()
rdd1 = rdd1.partitionBy(8, f)  # partitionBy function, will balance the data
rdd2 = rdd1.reduceByKey(lambda x,y: x+y, partitionFunc=f)
#rdd2 = rdd1.reduceByKey(lambda x,y: x+y)
rdd2.mapPartitions(partitionsize).collect()

[2500, 0, 0, 0, 0, 0, 0, 0]


[313, 312, 313, 312, 313, 312, 313, 312]

### Partitioning in DataFrames

In [5]:
data1 = [1, 1, 1, 2, 2, 2, 3, 3, 3, 4]
data2 = [2, 2, 3, 4, 5, 3, 1, 1, 2, 3]
df = spark.createDataFrame(zip(data1, data2))
print df.rdd.getNumPartitions()
print df.rdd.glom().collect()

8
[[Row(_1=1, _2=2)], [Row(_1=1, _2=2)], [Row(_1=1, _2=3)], [Row(_1=2, _2=4), Row(_1=2, _2=5)], [Row(_1=2, _2=3)], [Row(_1=3, _2=1)], [Row(_1=3, _2=1)], [Row(_1=3, _2=2), Row(_1=4, _2=3)]]


In [6]:
df1 = df.repartition(6, df._2)
print df1.rdd.glom().collect()
df1.show()

[[], [], [Row(_1=1, _2=2), Row(_1=1, _2=2), Row(_1=2, _2=4), Row(_1=2, _2=5), Row(_1=3, _2=2)], [Row(_1=1, _2=3), Row(_1=2, _2=3), Row(_1=4, _2=3)], [], [Row(_1=3, _2=1), Row(_1=3, _2=1)]]
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  1|  2|
|  2|  4|
|  2|  5|
|  3|  2|
|  1|  3|
|  2|  3|
|  4|  3|
|  3|  1|
|  3|  1|
+---+---+



### Threading

In [3]:
import threading
import random

partitions = 8
n = 500000 * partitions

# use different seeds in different threads and different partitions
# a bit ugly, since mapPartitionsWithIndex takes a function with only index
# and it as parameters
def f1(index, it):
    random.seed(index + 987231)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0

def f2(index, it):
    random.seed(index + 987232)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0

def f3(index, it):
    random.seed(index + 987233)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0
    
def f4(index, it):
    random.seed(index + 987234)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0
    
def f5(index, it):
    random.seed(index + 987245)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0

f = [f1, f2, f3, f4, f5]
    
# the function executed in each thread/job
def dojob(i):
    count = sc.parallelize(xrange(1, n + 1), partitions) \
              .mapPartitionsWithIndex(f[i]).reduce(lambda a,b: a+b)
    print "Worker", i, "reports: Pi is roughly", 4.0 * count / n

# create and execute the threads
threads = []
for i in range(5):
    t = threading.Thread(target=dojob, args=(i,))
    threads += [t]
    t.start()

# wait for all threads to complete
for t in threads:
    t.join()    
'''
for i in range(5):
    dojob(i)
'''

Worker 3 reports: Pi is roughly 3.142212
Worker 4 reports: Pi is roughly 3.142006
Worker 2 reports: Pi is roughly 3.142872
Worker 0 reports: Pi is roughly 3.141997
Worker 1 reports: Pi is roughly 3.142233



for i in range(5):
    dojob(i)
