In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Start spark session
spark = SparkSession.builder.appName('Spark optimization').getOrCreate()

In [2]:
# Load data from Parquet into dataframes
answersDF = spark.read.parquet("data/answers")
questionsDF = spark.read.parquet("data/questions")

In [3]:
# Group answers by month and questionID to get answer_count
aggDF_1 = answersDF.withColumn('month', F.month('creation_date')) \
    .groupBy('question_id', 'month') \
    .agg(F.count('*') \
    .alias('answer_count'))
aggDF_1.withColumn("partitionId", F.spark_partition_id()).groupBy("partitionId").count().show()

# Get number of partitions for each DF
aggNumPartitions = aggDF_1.rdd.getNumPartitions()
answersNumPartitions = answersDF.rdd.getNumPartitions()
questionsNumPartitions = questionsDF.rdd.getNumPartitions()

print(f"""
answersDF has {answersNumPartitions} partitions
questionsDF has {questionsNumPartitions} partitions
aggDF has {aggNumPartitions} partitions
""")

+-----------+-----+
|partitionId|count|
+-----------+-----+
|        148|  380|
|         31|  385|
|         85|  379|
|        137|  371|
|         65|  365|
|         53|  376|
|        133|  364|
|         78|  327|
|        108|  355|
|        155|  319|
|         34|  393|
|        193|  352|
|        101|  377|
|        115|  384|
|        126|  372|
|         81|  382|
|         28|  377|
|        183|  360|
|         76|  374|
|         26|  339|
+-----------+-----+
only showing top 20 rows


answersDF has 4 partitions
questionsDF has 4 partitions
aggDF has 200 partitions



### Choosing number of partitions

As we can see from the above cell, our data has 200 partitions with a small amount of data per partition. Some of the partitions are pretty uneven too - partition 155 has 319 records while partition 34 has 393. Let's see if those partitions are more even if we choose 4 partitions to match the questionsDF and answersDF:

In [4]:
# Coalesce one of the DFs and show against un-coalesced DF
aggDF_test = aggDF_1.coalesce(4)
aggDF_1.withColumn("partitionId", F.spark_partition_id()).groupBy("partitionId").count().show()
aggDF_test.withColumn("partitionId", F.spark_partition_id()).groupBy("partitionId").count().show()

+-----------+-----+
|partitionId|count|
+-----------+-----+
|        148|  380|
|         31|  385|
|         85|  379|
|        137|  371|
|         65|  365|
|         53|  376|
|        133|  364|
|         78|  327|
|        108|  355|
|        155|  319|
|         34|  393|
|        193|  352|
|        101|  377|
|        115|  384|
|        126|  372|
|         81|  382|
|         28|  377|
|        183|  360|
|         76|  374|
|         26|  339|
+-----------+-----+
only showing top 20 rows

+-----------+-----+
|partitionId|count|
+-----------+-----+
|          1|18346|
|          3|18165|
|          2|18269|
|          0|18245|
+-----------+-----+



### Testing number of partitions and caching

With 4 partitions, the number of records per partition is much more even. And having 4 partitions will make it easier to work with the questionsDF in the following steps, which also has 4 partitions. Caching the aggDF should also help because we use this DF again in the join against questionsDF.

In the steps below, the results are computed 4 different ways:
1. No caching, no coalescing
2. Caching, no coalescing
3. Caching and coalescing
4. No caching, coalescing

We can compare the execution times of each to find the optimal choice.

In [5]:
# 1. No caching, no coalescing
result_1_DF = questionsDF.join(aggDF_1, 'question_id') \
    .select('question_id', 'creation_date', 'title', 'month', 'answer_count')
result_1_DF.show()

+-----------+--------------------+--------------------+-----+------------+
|question_id|       creation_date|               title|month|answer_count|
+-----------+--------------------+--------------------+-----+------------+
|     382738|2018-01-28 02:22:...|What is the pseud...|    1|           1|
|     370717|2017-11-25 04:09:...|What is the defin...|   11|           1|
|     339944|2017-06-17 16:32:...|Could gravitation...|    6|           3|
|     233852|2016-02-04 16:19:...|When does travell...|    2|           2|
|     173819|2015-04-02 11:56:...|Finding Magnetic ...|    4|           1|
|     265198|2016-06-28 10:56:...|Physical meaning ...|    6|           2|
|     175015|2015-04-08 21:24:...|Understanding a m...|    4|           1|
|     413973|2018-06-27 09:29:...|Incorporate spino...|    6|           1|
|     303670|2017-01-08 01:05:...|A Wilson line pro...|    1|           1|
|     317368|2017-03-08 14:53:...|Shouldn't Torsion...|    3|           1|
|     369982|2017-11-20 2

In [6]:
# 2. Caching, no coalescing
aggDF_2 = aggDF_1.cache()
result_2_DF = questionsDF.join(aggDF_2, 'question_id') \
    .select('question_id', 'creation_date', 'title', 'month', 'answer_count')
result_2_DF.show()

+-----------+--------------------+--------------------+-----+------------+
|question_id|       creation_date|               title|month|answer_count|
+-----------+--------------------+--------------------+-----+------------+
|     382738|2018-01-28 02:22:...|What is the pseud...|    1|           1|
|     370717|2017-11-25 04:09:...|What is the defin...|   11|           1|
|     339944|2017-06-17 16:32:...|Could gravitation...|    6|           3|
|     233852|2016-02-04 16:19:...|When does travell...|    2|           2|
|     173819|2015-04-02 11:56:...|Finding Magnetic ...|    4|           1|
|     265198|2016-06-28 10:56:...|Physical meaning ...|    6|           2|
|     175015|2015-04-08 21:24:...|Understanding a m...|    4|           1|
|     413973|2018-06-27 09:29:...|Incorporate spino...|    6|           1|
|     303670|2017-01-08 01:05:...|A Wilson line pro...|    1|           1|
|     317368|2017-03-08 14:53:...|Shouldn't Torsion...|    3|           1|
|     369982|2017-11-20 2

In [9]:
# 3. Caching and coalescing
aggDF_2.unpersist()
aggDF_3 = aggDF_1.coalesce(4).cache()
result_3_DF = questionsDF.join(aggDF_3, 'question_id') \
    .select('question_id', 'creation_date', 'title', 'month', 'answer_count')
result_3_DF.show()

+-----------+--------------------+--------------------+-----+------------+
|question_id|       creation_date|               title|month|answer_count|
+-----------+--------------------+--------------------+-----+------------+
|     382738|2018-01-28 02:22:...|What is the pseud...|    1|           1|
|     370717|2017-11-25 04:09:...|What is the defin...|   11|           1|
|     339944|2017-06-17 16:32:...|Could gravitation...|    6|           3|
|     233852|2016-02-04 16:19:...|When does travell...|    2|           2|
|     173819|2015-04-02 11:56:...|Finding Magnetic ...|    4|           1|
|     265198|2016-06-28 10:56:...|Physical meaning ...|    6|           2|
|     175015|2015-04-08 21:24:...|Understanding a m...|    4|           1|
|     413973|2018-06-27 09:29:...|Incorporate spino...|    6|           1|
|     303670|2017-01-08 01:05:...|A Wilson line pro...|    1|           1|
|     317368|2017-03-08 14:53:...|Shouldn't Torsion...|    3|           1|
|     369982|2017-11-20 2

In [10]:
# 4. No caching, coalescing
aggDF_3.unpersist()
aggDF_4 = aggDF_1.coalesce(4)
result_4_DF = questionsDF.join(aggDF_4, 'question_id') \
    .select('question_id', 'creation_date', 'title', 'month', 'answer_count')
result_4_DF.show()

+-----------+--------------------+--------------------+-----+------------+
|question_id|       creation_date|               title|month|answer_count|
+-----------+--------------------+--------------------+-----+------------+
|     382738|2018-01-28 02:22:...|What is the pseud...|    1|           1|
|     370717|2017-11-25 04:09:...|What is the defin...|   11|           1|
|     339944|2017-06-17 16:32:...|Could gravitation...|    6|           3|
|     233852|2016-02-04 16:19:...|When does travell...|    2|           2|
|     173819|2015-04-02 11:56:...|Finding Magnetic ...|    4|           1|
|     265198|2016-06-28 10:56:...|Physical meaning ...|    6|           2|
|     175015|2015-04-08 21:24:...|Understanding a m...|    4|           1|
|     413973|2018-06-27 09:29:...|Incorporate spino...|    6|           1|
|     303670|2017-01-08 01:05:...|A Wilson line pro...|    1|           1|
|     317368|2017-03-08 14:53:...|Shouldn't Torsion...|    3|           1|
|     369982|2017-11-20 2

### Results

| Method                    | Execution time (s)|
|---------------------------|-------------------|
| No caching, no coalescing | 1                 |
| Caching, no coalescing    | 1                 |
| Caching and coalescing    | 0.5               |
| No caching, coalescing    | 0.6               |

### Summary

As you can see from the results, coalescing makes the biggest difference in optimizing execution time - the coalesced results ran about twice as fast as the non-coalesced ones regardless of caching. With such a small dataset, caching appears not to be worth it. With larger datasets and more complicated queries, this may be different.

There are no further optimizations to make, as we've kept shuffling to a minimum - just one shuffle for the grouping in creating the aggregated DF.