# Spark partitioning demo

## Initialize the spark application

In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
254,application_1583239045420_3907,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fce061f8110>

## Default parallelization

In [2]:
rdd = sc.parallelize(range(1,11))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
sc.defaultParallelism

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40

In [4]:
rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40

In [5]:
rdd.glom().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[], [], [], [1], [], [], [], [2], [], [], [], [3], [], [], [], [4], [], [], [], [5], [], [], [], [6], [], [], [], [7], [], [], [], [8], [], [], [], [9], [], [], [], [10]]

Not ideal!
Can we do better?

We should reduce the number of partitions and have at least one element in each partition

In [6]:
rdd2 = sc.parallelize(range(1,11),10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
rdd2.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10

In [8]:
rdd2.glom().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]

That's much better!

## Repartition and coalesce

Now lets create a DataFrame, dont forget to give it the schema.

In [9]:
df1 = sc.parallelize([[1,2,3], [4,5,6]]).toDF(("a", "b", "c"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
df1.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40

In [11]:
df1.rdd.glom().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [Row(a=1, b=2, c=3)], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [Row(a=4, b=5, c=6)]]

### Repartition

In [None]:
This will use a shuffle to 

In [12]:
df2 = df1.repartition("a")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now lets check how many partitions were created

In [13]:
df2.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

200

`200` is the default number of partitions for shuffle operations in Spark, it's a configuration set in: `spark.sql.shuffle.partitions`

### Coalesce

In [14]:
df3 = df1.coalesce(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
df3.rdd.glom().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[Row(a=1, b=2, c=3)], [Row(a=4, b=5, c=6)]]

## Different types of partitioning

Different types of partitioning can be applied by using `partitionBy`. The data needs to be in the form of key, value.

First we initialize our RDD. 

In [16]:
new_data = range(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
new_rdd = sc.parallelize(new_data).map(lambda x: (x, x))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We verify the partitions, we have 40 because that's the default of our Spark context.

In [18]:
new_rdd.glom().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[(0, 0), (1, 1)], [(2, 2), (3, 3)], [(4, 4), (5, 5)], [(6, 6), (7, 7), (8, 8), (9, 9)], [(10, 10), (11, 11)], [(12, 12), (13, 13)], [(14, 14), (15, 15)], [(16, 16), (17, 17), (18, 18), (19, 19)], [(20, 20), (21, 21)], [(22, 22), (23, 23)], [(24, 24), (25, 25)], [(26, 26), (27, 27), (28, 28), (29, 29)], [(30, 30), (31, 31)], [(32, 32), (33, 33)], [(34, 34), (35, 35)], [(36, 36), (37, 37), (38, 38), (39, 39)], [(40, 40), (41, 41)], [(42, 42), (43, 43)], [(44, 44), (45, 45)], [(46, 46), (47, 47), (48, 48), (49, 49)], [(50, 50), (51, 51)], [(52, 52), (53, 53)], [(54, 54), (55, 55)], [(56, 56), (57, 57), (58, 58), (59, 59)], [(60, 60), (61, 61)], [(62, 62), (63, 63)], [(64, 64), (65, 65)], [(66, 66), (67, 67), (68, 68), (69, 69)], [(70, 70), (71, 71)], [(72, 72), (73, 73)], [(74, 74), (75, 75)], [(76, 76), (77, 77), (78, 78), (79, 79)], [(80, 80), (81, 81)], [(82, 82), (83, 83)], [(84, 84), (85, 85)], [(86, 86), (87, 87), (88, 88), (89, 89)], [(90, 90), (91, 91)], [(92, 92), (93, 93)], [(9

### HashPartitioner

In [19]:
partitioned_rdd = new_rdd.partitionBy(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
partitioned_rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2

In [21]:
partitioned_rdd.glom().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[(12, 12), (36, 36), (38, 38), (62, 62), (86, 86), (88, 88), (4, 4), (30, 30), (54, 54), (80, 80), (22, 22), (46, 46), (48, 48), (72, 72), (96, 96), (98, 98), (2, 2), (6, 6), (8, 8), (20, 20), (0, 0), (26, 26), (28, 28), (32, 32), (52, 52), (56, 56), (58, 58), (16, 16), (18, 18), (76, 76), (78, 78), (44, 44), (24, 24), (70, 70), (50, 50), (94, 94), (82, 82), (74, 74), (42, 42), (66, 66), (68, 68), (92, 92), (10, 10), (34, 34), (60, 60), (84, 84), (14, 14), (40, 40), (64, 64), (90, 90)], [(7, 7), (9, 9), (33, 33), (57, 57), (59, 59), (83, 83), (5, 5), (31, 31), (55, 55), (81, 81), (23, 23), (47, 47), (49, 49), (73, 73), (97, 97), (99, 99), (13, 13), (37, 37), (39, 39), (63, 63), (87, 87), (89, 89), (3, 3), (21, 21), (1, 1), (27, 27), (29, 29), (53, 53), (17, 17), (19, 19), (77, 77), (79, 79), (25, 25), (45, 45), (71, 71), (51, 51), (95, 95), (75, 75), (43, 43), (67, 67), (69, 69), (93, 93), (11, 11), (35, 35), (61, 61), (85, 85), (15, 15), (41, 41), (65, 65), (91, 91)]]

### Custom partitioner

In [None]:
Sometimes we 

In [22]:
nobel_prizes = [
    {'name': 'Michel Mayor', 'field': 'Physics', 'year': 2019, 'country': 'Switzerland'},
    {'name': 'Tomas Lindahl', 'field': 'Chemistry', 'year': 2015, 'country': 'Sweden'},
    {'name': 'Didier Queloz', 'field': 'Physics', 'year': 2019, 'country': 'Switzerland'},
    {'name': 'Michael Levitt', 'field': 'Chemistry', 'year': 2013, 'country': 'South Africa'},
    {'name': 'Jacques Dubochet', 'field': 'Chemistry', 'year': 2017, 'country': 'Switzerland'},
    {'name': 'Tomas Tranströmer', 'field': 'Literature', 'year': 2011, 'country': 'Sweden'},
    {'name': 'Mario Vargas Llosa', 'field': 'Literature', 'year': 2010, 'country': 'Spain'},
]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
def string_partitioner(string_value):
    return hash(string_value)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
nobel_prizes_rdd = sc.parallelize(nobel_prizes).map(lambda x: (x['country'], x)).partitionBy(4, string_partitioner)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
nobel_prizes_rdd.glom().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[('Switzerland', {'field': 'Chemistry', 'year': 2017, 'name': 'Jacques Dubochet', 'country': 'Switzerland'}), ('Switzerland', {'field': 'Physics', 'year': 2019, 'name': 'Michel Mayor', 'country': 'Switzerland'}), ('Switzerland', {'field': 'Physics', 'year': 2019, 'name': 'Didier Queloz', 'country': 'Switzerland'})], [], [('Sweden', {'field': 'Chemistry', 'year': 2015, 'name': 'Tomas Lindahl', 'country': 'Sweden'}), ('Sweden', {'field': 'Literature', 'year': 2011, 'name': 'Tomas Transtr\xc3\xb6mer', 'country': 'Sweden'}), ('Spain', {'field': 'Literature', 'year': 2010, 'name': 'Mario Vargas Llosa', 'country': 'Spain'})], [('South Africa', {'field': 'Chemistry', 'year': 2013, 'name': 'Michael Levitt', 'country': 'South Africa'})]]

Pay attention to data skews!

In [33]:
test = sc.parallelize(nobel_prizes).map(lambda x: (x['country'], x)).partitionBy(4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
nobel_prizes_rdd2 = sc.parallelize(nobel_prizes).map(lambda x: (x['field'], x)).partitionBy(3, string_partitioner)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
nobel_prizes_rdd2.glom().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[], [('Physics', {'field': 'Physics', 'year': 2019, 'name': 'Michel Mayor', 'country': 'Switzerland'}), ('Chemistry', {'field': 'Chemistry', 'year': 2017, 'name': 'Jacques Dubochet', 'country': 'Switzerland'}), ('Physics', {'field': 'Physics', 'year': 2019, 'name': 'Didier Queloz', 'country': 'Switzerland'}), ('Chemistry', {'field': 'Chemistry', 'year': 2015, 'name': 'Tomas Lindahl', 'country': 'Sweden'}), ('Chemistry', {'field': 'Chemistry', 'year': 2013, 'name': 'Michael Levitt', 'country': 'South Africa'})], [('Literature', {'field': 'Literature', 'year': 2010, 'name': 'Mario Vargas Llosa', 'country': 'Spain'}), ('Literature', {'field': 'Literature', 'year': 2011, 'name': 'Tomas Transtr\xc3\xb6mer', 'country': 'Sweden'})]]