# Load Reddit Submissions in to Parquet

The raw data was pulled from [pushshift.io](https://files.pushshift.io/reddit/submissions/).

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pyspark.sql.utils as U
from pyspark.sql.window import Window as W

import pandas as pd

pd.set_option('display.max_colwidth', None)

spark = SparkSession\
        .builder\
        .appName("RedditPostsLoadToParquet")\
        .getOrCreate()

In [None]:
year_range = range(2022,2023)

json_schema = T.StructType([
    T.StructField("author", T.StringType()),
    T.StructField("created_utc", T.LongType()),
    T.StructField("domain", T.StringType()),
    T.StructField("edited", T.BooleanType()),
    T.StructField("id", T.StringType()),
    T.StructField("is_crosspostable", T.BooleanType()),
    T.StructField("is_self", T.BooleanType()),
    T.StructField("is_video", T.BooleanType()),
    T.StructField("num_comments", T.LongType()),
    T.StructField("num_crossposts", T.LongType()),
    T.StructField("over_18", T.BooleanType()),
    T.StructField("permalink", T.StringType()),
    T.StructField("promoted", T.BooleanType()),
    T.StructField("score", T.LongType()),
    T.StructField("selftext", T.StringType()),
    T.StructField("spam", T.BooleanType()),
    T.StructField("stickied", T.BooleanType()),
    T.StructField("subreddit", T.StringType()),
    T.StructField("subreddit_id", T.StringType()),
    T.StructField("thumbnail", T.StringType()),
    T.StructField("title", T.StringType()),
    T.StructField("ups", T.StringType()),
    T.StructField("url", T.StringType()),    
])

def has_column(df, col_name):
    if col_name in df.columns:
        return F.lit(True)
    else:
        return F.lit(False)

for year in year_range:
    print('Processing submissions date for year {0}'.format(year))
    file_pattern = 'qfs:///data/reddit/submissions/raw/RS_*{0}-*.bz2'.format(year)
    submissions_raw = (
            spark.read.json(
            file_pattern,
            encoding='utf-8',
            schema=json_schema,
        )
    )
    df = (
        submissions_raw
        .withColumn(
            'created_date',
            F.from_unixtime(F.col('created_utc'), 'yyyy-MM-dd')
        )
        .withColumn(
            'month',
            F.from_unixtime(F.col('created_utc'), 'MM')
        )
        .withColumn(
            'day',
            F.from_unixtime(F.col('created_utc'), 'dd')
        )
        .withColumn(
            'created_date',
             F.from_unixtime(F.col('created_utc'), 'dd')
        )
    )        
    df.write.partitionBy(
            'month', 'day'
        ).parquet(
            'qfs:///data/reddit/submissions/processed/year={0}/'.format(year),
            mode='overwrite'
        )

 

In [None]:
submissions_df = spark.read.parquet('qfs:///data/reddit/submissions/processed/')

In [None]:
submissions_df.printSchema()

In [None]:
submissions_df.count()

In [None]:
submissions_df.count()

In [None]:
submissions_df.filter('author = "MichaelKamprath"').toPandas()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pandas.plotting import register_matplotlib_converters

register_matplotlib_converters()
pd.set_option('display.max_colwidth', None)

def plot_line_graph(
    df,
    y_axis_column,
    x_axis_column,
    segment_column=None,
    segment_values=None, # a list of values from segment_column to be graphed
    segment_labels=None, # a dictionary with segment_values as key and name a value
    xlabel=None,
    ylabel=None,
    line_width=2,
    xlabel_rotation=None,
    x_axis_is_dates=True,
    y_axis_log_scale=False,
    title=None,
    legend_location='lower left',
):
    df_pd = df.toPandas()
    fig, ax = plt.subplots()

    if segment_column is None:
        x_axis_values = df_pd[x_axis_column]
        if ylabel is None:
            item_label = y_axis_column
        else: 
            item_label = ylabel
        if x_axis_is_dates:
            ax.plot_date(
                x_axis_values,
                df_pd[y_axis_column],
                '-',
                linewidth = line_width,
                label = item_label,
            )
        else:
            ax.plot(
                x_axis_values,
                df_pd[y_axis_column],
                label = item_label,
                linewidth = line_width,
            )        
    else:
        if segment_values is None:
            segment_value_list = [r.val for r in df.select(F.col(segment_column).alias('val')).distinct().collect()]
        else:
            segment_value_list = segment_values
        for i in segment_value_list:
            data = df_pd[df_pd[segment_column] == i]
            x_axis_values = data[x_axis_column]
            if segment_labels is not None:
                item_label = segment_labels[i]
            else:
                item_label = "{0}".format(i)
            
            if x_axis_is_dates:
                ax.plot_date(
                    x_axis_values,
                    data[y_axis_column],
                    '-',
                    linewidth = line_width,
                    label = item_label,
                )                
            else:
                ax.plot(
                    x_axis_values,
                    data[y_axis_column],
                    label = item_label,
                    linewidth = line_width
                )
    
    fig.set_size_inches(20,12)
    if xlabel is not None:
        plt.xlabel(xlabel)
    if ylabel is not None:
        plt.ylabel(ylabel)
    if xlabel_rotation is not None:
        plt.xticks(rotation=xlabel_rotation)
    if x_axis_is_dates:
        fig.autofmt_xdate()
    if y_axis_log_scale:
        plt.grid()
        plt.yscale("log")
    if title is not None:
        fig.suptitle(title, fontsize=18)
    ax.legend(loc=legend_location)
    plt.show()

In [None]:
monthly_submissions = (
        submissions_df
        .withColumn(
            'year_month', 
            F.to_date(
                F.format_string('%4d-%02d', F.col('year'), F.col('month')),
                format='yyyy-MM'
            )          
        )
        .groupBy('year_month')
        .agg(
            F.count('*').alias('count'),
            F.countDistinct('author').alias('authors')
        )
        .orderBy(F.col('year_month'))
    ).cache()

In [None]:
(
    submissions_df
    .filter(F.col('month').isNull())
    .select(
        'author',
        'subreddit_id',
        'permalink',
        'selftext',
        'created_utc',
        'created_date',
        'year',
        'month',
        'day',
    )
).limit(20).toPandas()

In [None]:
submissions_df.filter(F.col('month').isNull()).groupBy('year').agg(F.count('*').alias('count')).toPandas()

In [None]:
monthly_submissions.orderBy(F.col('year_month')).limit(20).toPandas()

In [None]:
plot_line_graph(
    monthly_submissions,
    'authors',
    'year_month',
    xlabel='Date',
    ylabel='Authors',
)