In [1]:
sc

In [2]:
'''
Optimize the query plan

Suppose we want to compose query in which we get for each question also the number of answers to this question for each month. See the query below which does that in a suboptimal way and try to rewrite it to achieve a more optimal plan.
'''


import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, month

import os


spark = SparkSession.builder.appName('Optimize I').getOrCreate()

sc.setSystemProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

spark.conf.set("spark.sql.adaptive.enabled",True)

base_path = "/Users/christopher/Downloads/Optimization/"

# project_path = ('/').join(base_path.split('/')[0:-3]) 

answers_input_path = os.path.join(base_path, 'data/answers')

questions_input_path = os.path.join(base_path, 'data/questions')

answersDF = spark.read.option('path', answers_input_path).load()

questionsDF = spark.read.option('path', questions_input_path).load()



In [3]:
answersDF.printSchema()

root
 |-- question_id: long (nullable = true)
 |-- answer_id: long (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- comments: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- score: long (nullable = true)



In [4]:
answersDF.count(),len(answersDF.columns), answersDF.rdd.getNumPartitions()

(110714, 6, 4)

In [5]:
questionsDF.count(),len(questionsDF.columns), questionsDF.rdd.getNumPartitions()

(86936, 8, 4)

In [6]:
questionsDF.printSchema()

root
 |-- question_id: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- creation_date: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- accepted_answer_id: long (nullable = true)
 |-- comments: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- views: long (nullable = true)



In [7]:
answersDF.show(5)

+-----------+---------+--------------------+--------+-------+-----+
|question_id|answer_id|       creation_date|comments|user_id|score|
+-----------+---------+--------------------+--------+-------+-----+
|     226592|   226595|2015-12-29 15:46:...|       3|  82798|    2|
|     388057|   388062|2018-02-22 10:52:...|       8|    520|   21|
|     293286|   293305|2016-11-17 13:35:...|       0|  47472|    2|
|     442499|   442503|2018-11-21 22:34:...|       0| 137289|    0|
|     293009|   293031|2016-11-16 05:36:...|       0|  83721|    0|
+-----------+---------+--------------------+--------+-------+-----+
only showing top 5 rows



In [8]:
questionsDF.show(5)

+-----------+--------------------+--------------------+--------------------+------------------+--------+-------+-----+
|question_id|                tags|       creation_date|               title|accepted_answer_id|comments|user_id|views|
+-----------+--------------------+--------------------+--------------------+------------------+--------+-------+-----+
|     382738|[optics, waves, f...|2018-01-27 23:22:...|What is the pseud...|            382772|       0|  76347|   32|
|     370717|[field-theory, de...|2017-11-25 01:09:...|What is the defin...|              null|       1|  75085|   82|
|     339944|[general-relativi...|2017-06-17 13:32:...|Could gravitation...|              null|      13| 116137|  333|
|     233852|[homework-and-exe...|2016-02-04 13:19:...|When does travell...|              null|       9|  95831|  185|
|     294165|[quantum-mechanic...|2016-11-22 03:39:...|Time-dependent qu...|              null|       1| 118807|   56|
+-----------+--------------------+--------------

### Issues
After aggregation partition size increased to 200 (default value) --> fixed by turning on spark.sql.adaptive.enabled, reduced down to 9 partitions.
Adaptive Query Execution: helps with heavily skewed data, expensive joins, large partitions.
Currently using *broadcast hash join*, best join in terms of performance.

Other methods for optimization...
remove unessessary columns( drop title, creation_date), need more context (business use case)?


In [9]:
'''
Answers aggregation

Here we : get number of answers per question per month
'''

answers_month = answersDF.withColumn('month', month('creation_date')).groupBy('question_id', 'month').agg(count('*').alias('cnt'))

resultDF = questionsDF.join(answers_month, 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')

resultDF.orderBy('question_id', 'month').show()

'''
Task:

see the query plan of the previous result and rewrite the query to optimize it
'''

+-----------+--------------------+--------------------+-----+---+
|question_id|       creation_date|               title|month|cnt|
+-----------+--------------------+--------------------+-----+---+
|     155989|2014-12-31 17:59:...|Frost bubble form...|    2|  1|
|     155989|2014-12-31 17:59:...|Frost bubble form...|   12|  1|
|     155990|2014-12-31 18:51:...|The abstract spac...|    1|  1|
|     155990|2014-12-31 18:51:...|The abstract spac...|   12|  1|
|     155992|2014-12-31 19:44:...|centrifugal force...|   12|  1|
|     155993|2014-12-31 19:56:...|How can I estimat...|    1|  1|
|     155995|2014-12-31 21:16:...|Why should a solu...|    1|  3|
|     155996|2014-12-31 22:06:...|Why do we assume ...|    1|  2|
|     155996|2014-12-31 22:06:...|Why do we assume ...|    2|  1|
|     155996|2014-12-31 22:06:...|Why do we assume ...|   11|  1|
|     155997|2014-12-31 22:26:...|Why do square sha...|    1|  3|
|     155999|2014-12-31 23:01:...|Diagonalizability...|    1|  1|
|     1560

'\nTask:\n\nsee the query plan of the previous result and rewrite the query to optimize it\n'

In [10]:
print(sc.getConf().get("spark.serializer"))

org.apache.spark.serializer.KryoSerializer
