In [0]:
from pyspark.sql.functions import col, count, month, broadcast
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Optimize').getOrCreate()
spark.conf.set("spark.sql.adaptive.enabled", "true")

# File location and type
answers_file_location = "/FileStore/tables/answers"
questions_file_location = "/FileStore/tables/questions3"
file_type = "parquet"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
answersDF = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(answers_file_location).cache()
# display(answersDF)

questionsDF = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(questions_file_location).cache()
# display(questionsDF)

# 4s without cache, 
answers_month = answersDF.withColumn('month', month('creation_date')).groupBy('question_id', 'month').agg(count('*').alias('cnt'))
resultDF = questionsDF.join(answers_month, 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')
resultDF.orderBy('question_id', 'month').show()

# 5s
# answers_month = answersDF.withColumn('month', month('creation_date')).groupBy('question_id', 'month').agg(count('*').alias('cnt'))
# resultDF = questionsDF.join(broadcast(answers_month), 'question_id').select('question_id', 'creation_date', 'month', 'cnt')
# resultDF.orderBy('question_id', 'month').show()

# 4s
# answers_month = answersDF.withColumn('month', month('creation_date')).groupBy('question_id', 'month').agg(count('*').alias('cnt'))
# resultDF = answers_month.join(broadcast(questionsDF), 'question_id').select('question_id', 'creation_date', 'month', 'cnt')
# resultDF.orderBy('question_id', 'month').show()

+-----------+--------------------+--------------------+-----+---+
|question_id|       creation_date|               title|month|cnt|
+-----------+--------------------+--------------------+-----+---+
|     155989|2015-01-01 01:59:...|Frost bubble form...|    1|  1|
|     155989|2015-01-01 01:59:...|Frost bubble form...|    2|  1|
|     155990|2015-01-01 02:51:...|The abstract spac...|    1|  2|
|     155992|2015-01-01 03:44:...|centrifugal force...|    1|  1|
|     155993|2015-01-01 03:56:...|How can I estimat...|    1|  1|
|     155995|2015-01-01 05:16:...|Why should a solu...|    1|  3|
|     155996|2015-01-01 06:06:...|Why do we assume ...|    1|  2|
|     155996|2015-01-01 06:06:...|Why do we assume ...|    2|  1|
|     155996|2015-01-01 06:06:...|Why do we assume ...|   11|  1|
|     155997|2015-01-01 06:26:...|Why do square sha...|    1|  3|
|     155999|2015-01-01 07:01:...|Diagonalizability...|    1|  1|
|     156008|2015-01-01 08:48:...|Capturing a light...|    1|  2|
|     1560

In [0]:
questionsDF.printSchema()

root
 |-- question_id: long (nullable = true)
 |-- answer_id: long (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- comments: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- score: long (nullable = true)



In [0]:
answersDF.printSchema()

root
 |-- question_id: long (nullable = true)
 |-- answer_id: long (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- comments: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- score: long (nullable = true)

