In [None]:
from pyspark.sql.functions import (
    col, count, desc, explode, ceil, unix_timestamp, window, sum, when, array_contains
)

In [None]:
sc.install_pypi_package("pandas==1.3.4")

In [None]:
sc.install_pypi_package("matplotlib==3.0.0")

In [None]:
import matplotlib.pyplot as plt

In [None]:
# path to specific S3 location where the data is sitting

posts_path = 's3://...'

In [None]:
posts_all = spark.read.parquet(posts_path)

In [None]:
posts_all.printSchema()

In [None]:
# select only cols we will work with and cache it

posts = posts_all.select(
    'id',
    'post_type_id',
    'accepted_answer_id',
    'user_id',
    'creation_date',
    'tags'
).cache()

# Compute the counts

In [None]:
posts.count()

In [None]:
questions = posts.filter(col('post_type_id') == 1)
answeres = posts.filter(col('post_type_id') == 2)

In [None]:
questions.count()

In [None]:
answeres.count()

In [None]:
# questions with accepted answer

questions.filter(col('accepted_answer_id').isNotNull())

In [None]:
# distinct number of users:

posts.filter(col('user_id').isNotNull).select('user_id').distinct().count()

# Compute the response time

In [None]:
response_time = (
    questions.alias('questions')
    .join(answers.alias('answers'), questions['accepted_answer_id'] == answersDF['answer_id'])
    .select(
        col('questions.id'),
        col('questions.creation_date').alias('question_time'),
        col('answers.creation_date').alias('answer_time')
    )
    .withColumn('response_time', unix_timestamp('answer_time') - unix_timestamp('question_time'))
    .filter(col('response_time') > 0)
    .orderBy('response_time')
)

In [None]:
response_time.show(n=5)

In [None]:
hourly_data = (
    response_time
    .withColumn('hours', ceil(col('response_time') / 3600))
    .groupBy('hours')
    .agg(count('*').alias('cnt'))
    .orderBy('hours')
    .limit(24)
).toPandas()

## See the number of questions answered within each hour

In [None]:
hourly_data.plot(
    x='hours', y='cnt', figsize=(12, 6), 
    title='Response time of questions',
    legend=False,
    kind='bar',
    xlabel='Hour',
    ylabel='Number of answered questions'
)

In [None]:
%matplot plt

# See the time evolution of the number of questions and answeres

In [None]:
posts_grouped = (
    posts
    .filter(col('user_id').isNotNull())
    .groupBy(
        window('creation_date', '1 week')
    )
    .agg(
        sum(when(col('post_type_id') == 1, lit(1)).otherwise(lit(0))).alias('questions'),
        sum(when(col('post_type_id') == 2, lit(1)).otherwise(lit(0))).alias('answers')
    )
    .withColumn('date', col('window.start').cast('date'))
    .orderBy('date')
)

In [None]:
posts_grouped.plot(
    x='date', 
    figsize=(12, 6), 
    title='Number of answers per week',
    legend=False,
    xlabel='Date',
    ylabel='Number of answers',
    kind='line'
)
plt.show()

In [None]:
%matplot plt

# Compute number of tags

In [None]:
(
    questions
    .withColumn('tags', split('tags', '><'))
    .selectExpr(
        '*',
        "TRANSFORM(tags, value -> regexp_replace(value, '(>|<)', '')) AS tags_arr"
    )
    .withColumn('tag', explode('tags_arr'))
    .select('tag')
    .distinct()
).count()

## See most popular tags

In [None]:
(
    questions
    .withColumn('tags', split('tags', '><'))
    .selectExpr(
        '*',
        "TRANSFORM(tags, value -> regexp_replace(value, '(>|<)', '')) AS tags_arr"
    )
    .withColumn('tag', explode('tags_arr'))
    .groupBy('tag')
    .agg(count('*').alias('tag_frequency'))
    .orderBy(desc('tag_frequency'))
).show(n=10)

## See the popularity of some tags

In [None]:
spark_tag = (
    questions
    .withColumn('tags', split('tags', '><'))
    .selectExpr(
        '*',
        "TRANSFORM(tags, value -> regexp_replace(value, '(>|<)', '')) AS tags_arr"
    )
    .select('question_id', 'creation_date', 'tags_arr')
    .filter(array_contains(col('tags_arr'), 'spark') | array_contains(col('tags_arr'), 'spark-sql'))
    .groupBy(
        window('creation_date', "1 week")
    )
    .agg(
        count('*').alias('tag_frequency')
    )
    .withColumn('date', col('window.start').cast('date'))
    .orderBy('date')
).toPandas()

In [None]:
spark_tag.plot(
    x='date', 
    figsize=(12, 6), 
    title='spark/spark-sql tag frequency per week',
    legend=False,
    xlabel='Date',
    ylabel='Number of questions with spark tag',
    kind='line'
)
plt.show()

In [None]:
posts.unpersist()