In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import month, count, col
import pyspark.sql.functions as F

spark = SparkSession.builder.appName('Optimize I').getOrCreate()

spark.sparkContext

21/07/06 14:50:43 WARN Utils: Your hostname, pySpark-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
21/07/06 14:50:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/07/06 14:50:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [37]:
spark.conf.set('spark.sql.shuffle.partitions', 4)

for _ in range(10):
    answersDF_1 = spark.read.option('path', 'data/answers').load()
    questionsDF_1 = spark.read.option('path', 'data/questions').load()

    answers_month_1 = answersDF_1.withColumn('month', month('creation_date'))\
    .groupBy('question_id', 'month')\
    .agg(count('*').alias('cnt'))

    resultDF_1 = questionsDF_1.join(answers_month_1, 'question_id')\
    .select('question_id', 'creation_date', 'title', 'month', 'cnt')
    
    resultDF_1 = resultDF_1.orderBy('question_id', 'month')

    out_1 = resultDF_1.collect()

In [39]:
spark.conf.set('spark.sql.shuffle.partitions', 4)
for _ in range(10):
    answersDF_2 = spark.read.option('path', 'data/answers').load().select(['question_id', 'creation_date'])
    questionsDF_2 = spark.read.option('path', 'data/questions').load()\
        .select(['question_id', 'title', 'creation_date'])

    answers_month_2 = answersDF_2.withColumn('month', month('creation_date'))\
    .groupBy('question_id', 'month')\
    .agg(count('*').alias('cnt'))\
    
    resultDF_2 = questionsDF_2.join(answers_month_2, 'question_id')\
    .select('question_id', 'creation_date', 'title', 'month', 'cnt')\
    .orderBy('question_id', 'month')

    out_2 = resultDF_2.collect()

In [34]:
# reduce shuffling at expense of parallelism
spark.conf.set('spark.sql.shuffle.partitions', 4)


# drop unneeded columns on load
for _ in range(10):
    answersDF_3 = spark.read.option('path', 'data/answers').load()\
        .select(['question_id', 'creation_date'])
    questionsDF_3 = spark.read.option('path', 'data/questions').load()\
        .select(['question_id', 'title', 'creation_date'])

    q_and_a_joined_3 = questionsDF_3.join(answersDF_3, 'question_id')\
    .select(answersDF_3.question_id, answersDF_3.creation_date, questionsDF_3.title)

    resultDF_3 = q_and_a_joined_3.withColumn('month', month('creation_date'))\
    .groupBy('question_id', 'month')\
    .agg(count('*').alias('cnt'),
        F.max(col('title')).alias('title'),
        F.max(col('creation_date')).alias('creation_date'))\
    .orderBy('question_id', 'month')\
    .select('question_id', 'creation_date', 'title', 'month', 'cnt')

    out_3 = resultDF_3.collect()

In [22]:
# reduce shuffling at expense of parallelism
spark.conf.set('spark.sql.shuffle.partitions', 4)


# drop unneeded columns on load
for _ in range(10):
    answersDF_4 = spark.read.option('path', 'data/answers').load()\
        .select(['question_id', 'creation_date']).withColumn('month', month('creation_date'))\
        .repartition(4,'question_id', 'month')
    questionsDF_4 = spark.read.option('path', 'data/questions').load()\
        .select(['question_id', 'title'])

    q_and_a_joined_4 = answersDF_4.join(questionsDF_4, 'question_id')\
    .select(answersDF_4.question_id, answersDF_4.creation_date, questionsDF_4.title, answersDF_4.month)

    resultDF_4 = q_and_a_joined_4\
    .groupBy('question_id', 'month')\
    .agg(count('*').alias('cnt'),
        F.max(col('title')).alias('title'),
        F.max(col('creation_date')).alias('creation_date'))\
    .orderBy('question_id', 'month')\
    .select('question_id', 'creation_date', 'title', 'month', 'cnt')

    out_4 = resultDF_4.collect()