## Setup
We need an available Java installation to run pyspark. The easiest way to do this is to install JDK and set the proper paths using conda

In [None]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.2.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)

In [None]:
# pip install wordcloud

In [None]:
# pip install plotly

In [None]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext 
from pyspark.sql.functions import col, lower, count, length, unix_timestamp, current_timestamp, to_date, desc
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import re
from datetime import datetime
import pandas as pd
from tabulate import tabulate
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import seaborn as sns
from pyspark.sql import functions as F
from scipy.stats import tstd
import nltk
from wordcloud import WordCloud,STOPWORDS
import matplotlib.pyplot as plt
# !pip install plotly
# !pip install wordcloud
import plotly.express as px

# download the nltk stopwords
nltk.download('stopwords')
from nltk.corpus import stopwords
nltk_stopwords = set(stopwords.words('english'))

## Process S3 data with SageMaker Processing Job `PySparkProcessor`

We are going to move the above processing code in a Python file and then submit that file to SageMaker Processing Job's [`PySparkProcessor`](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#pysparkprocessor).

In [None]:
!mkdir -p ./code

In [None]:
%%writefile ./code/process.py

import os
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")    
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_prefix", type=str, help="s3 output prefix")
    parser.add_argument("--col_name_for_filtering", type=str, help="Name of the column to filter")
    parser.add_argument("--values_to_keep", type=str, help="comma separated list of values to keep in the filtered set")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    logger.info(f"spark version = {spark.version}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

   
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path}")
    df = spark.read.parquet(args.s3_dataset_path, header=True)
    logger.info(f"finished reading files...")
    

    
    # filter the dataframe to only keep the values of interest
    vals = [s.strip() for s in args.values_to_keep.split(",")]
    df_filtered = df.where(col(args.col_name_for_filtering).isin(vals))
    
    # save the filtered dataframes so that these files can now be used for future analysis
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}"
    logger.info(f"going to write data for {vals} in {s3_path}")
    logger.info(f"shape of the df_filtered dataframe is {df_filtered.count():,}x{len(df_filtered.columns)}")
    df_filtered.write.mode("overwrite").parquet(s3_path)
    
    logger.info(f"all done...")
    
if __name__ == "__main__":
    main()

Now submit this code to SageMaker Processing Job.

In [None]:
%%time
import time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project",
    framework_version="3.3",
    role=role,
    instance_count=8,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_logs = f"spark_logs"

# modify this comma separated list to choose the subreddits of interest
subreddits = "socialism, Economics"
configuration = [
    {
        "Classification": "spark-defaults",
        "Properties": {"spark.executor.memory": "12g", "spark.executor.cores": "4"},
    }
]

# the dataset contains data for these 3 years
year_list = [2021,2022,2023]

In [None]:
%%time
for yyyy in year_list:
    print(f"going to filter comments data for year={yyyy}")
    s3_dataset_path_commments = f"s3://bigdatateaching/reddit-parquet/comments/year={yyyy}/month=*/*.parquet" # "s3a://bigdatateaching/reddit/parquet/comments/yyyy=*/mm=*/*comments*.parquet"
    output_prefix_data_comments = f"project/comments/yyyy={yyyy}"
    col_name_for_filtering = "subreddit"
    subreddits = "socialism, Economics"

    # run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
    spark_processor.run(
        submit_app="./code/process.py",
        arguments=[
            "--s3_dataset_path",
            s3_dataset_path_commments,
            "--s3_output_bucket",
            bucket,
            "--s3_output_prefix",
            output_prefix_data_comments,
            "--col_name_for_filtering",
            col_name_for_filtering,
            "--values_to_keep",
            subreddits,
        ],
        spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
        logs=False,
        configuration=configuration
    )
    # give some time for resources from this iterations to get cleaned up
    # if we start the job immediately we could get insufficient resources error
    time.sleep(60)

In [None]:
%%time
for yyyy in year_list:
    print(f"going to filter submissions data for year={yyyy}")
    s3_dataset_path_submissions = f"s3://bigdatateaching/reddit-parquet/submissions/year={yyyy}/month=*/*.parquet" # "s3a://bigdatateaching/reddit/parquet/submissions/yyyy=*/mm=*/*submissions*.parquet"
    output_prefix_data_submissions = f"project/submissions/yyyy={yyyy}"

    # run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
    spark_processor.run(
        submit_app="./code/process.py",
        arguments=[
             "--s3_dataset_path",
            s3_dataset_path_submissions,
            "--s3_output_bucket",
            bucket,
            "--s3_output_prefix",
            output_prefix_data_submissions,
            "--col_name_for_filtering",
            col_name_for_filtering,
            "--values_to_keep",
            subreddits,
        ],
        spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
        logs=False,
        configuration=configuration
    )
    # give some time for resources from this iterations to get cleaned up
    # if we start the job immediately we could get insufficient resources error
    time.sleep(60)

## Read the filtered data

Now that we have filtered the data to only keep submissions and comments from subreddits of interest. Let us read data from the s3 path where we saved the filtered data.

In [None]:
!aws s3 ls s3://sagemaker-us-east-1-163140600147/project/

In [None]:
public_bucket = 'sagemaker-us-east-1-163140600147'
output_prefix = 'project/comments/'
s3_input_path_comments = f"s3a://{public_bucket}/{output_prefix}"
comments = spark.read.parquet(s3_input_path_comments)
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}")

In [None]:
comments.cache()

In [None]:
# check counts (ensuring all needed subreddits exist)
comments.groupBy('subreddit').count().show()

In [None]:
comments.cache()

In [None]:
comments.printSchema()

In [None]:
comments.cache()

In [None]:
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "link_id", "id", "created_utc", "score").show()

In [None]:
comments.cache()

In [None]:
public_bucket = 'sagemaker-us-east-1-163140600147'
output_prefix = 'project/submissions/'
s3_input_path_submissions = f"s3a://{public_bucket}/{output_prefix}"
submissions = spark.read.parquet(s3_input_path_submissions)
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}")

In [None]:
submissions.cache()

In [None]:
# check counts (ensuring all needed subreddits exist)
submissions.groupBy('subreddit').count().show()

In [None]:
submissions.cache()

In [None]:
submissions.printSchema()

In [None]:
submissions.cache()

In [None]:
# display a subset of columns
submissions.select("subreddit", "author", "title", "selftext", "created_utc", "num_comments", "score").show()

In [None]:
submissions.cache()

# Filter "AskWomen", "AskFeminists", "Feminism" by STEM Keywords

In [None]:
from pyspark.sql.functions import col

# Subreddits to filter by keywords
keyword_subreddits = ["AskWomen", "AskFeminists", "Feminism"]
# Subreddits to include all comments from
include_all_subreddits = ["xxstem", "LadiesofScience", "womenEngineers"]

# Define keywords for case-insensitive search
keywords = ["STEM", "Science", "Technology", "Engineering", "Mathematics", "Process", "Design", "Model", "Plan", "Project"]
keywords_lower = [kw.lower() for kw in keywords]

# Filter the DataFrame
comments = comments.filter(
    (col("subreddit").isin(keyword_subreddits) & col("body").rlike('|'.join(keywords_lower))) |
    (col("subreddit").isin(include_all_subreddits))
)

# Show the filtered data
comments.select("subreddit", "author", "body", "parent_id", "link_id", "id", "created_utc", "score").show()

In [None]:
comments.cache()

In [None]:
from pyspark.sql.functions import col

# Subreddits to filter by keywords
keyword_subreddits = ["AskWomen", "AskFeminists", "Feminism"]
# Subreddits to include all submissions from
include_all_subreddits = ["xxstem", "LadiesofScience", "womenEngineers"]

# Define keywords for case-insensitive search
keywords = ["STEM", "science", "technology", "engineering", "mathematics", "process", "design", "model", "plan", "project"]
# Create a regex pattern to match any keyword (case-insensitive)
pattern = '|'.join([f"(?i){kw}" for kw in keywords])

# Filter the DataFrame
# Include all submissions from certain subreddits or those that match the keyword pattern in their title or selftext
submissions = submissions.filter(
    (col("subreddit").isin(keyword_subreddits) & (col("title").rlike(pattern) | col("selftext").rlike(pattern))) |
    col("subreddit").isin(include_all_subreddits)
)

# Show the filtered data
submissions.select("subreddit", "author", "title", "selftext", "created_utc", "num_comments", "score").show()


In [None]:
submissions.cache()

# Data Cleaning

In [None]:
# Remove rows with missing values in 'author' and 'body'
comments = comments.filter(col("author").isNotNull() & col("body").isNotNull())

# Assume that 'created_utc' should be a timestamp within the last 3 years
three_years_ago = unix_timestamp(current_timestamp()) - (3 * 365 * 24 * 60 * 60)
comments = comments.filter(
    unix_timestamp(col("created_utc")) > three_years_ago
)

# Show the filtered data
comments.select("subreddit", "author", "body", "parent_id", "link_id", "id", "created_utc", "score").show()

In [None]:
comments.cache()

In [None]:
# Remove rows with missing values in 'author' or 'title'
submissions = submissions.filter(col("author").isNotNull() & col("title").isNotNull())

# Assume that 'created_utc' should be a timestamp within the last 3 years
three_years_ago = unix_timestamp(current_timestamp()) - (3 * 365 * 24 * 60 * 60)
submissions = submissions.filter(
    unix_timestamp(col("created_utc")) > three_years_ago
)

# Show the filtered data
submissions.select("subreddit", "author", "title", "selftext", "created_utc", "num_comments",  "score").show()

In [None]:
submissions.cache()

# Table: Subreddit Statistics

In [None]:
from pyspark.sql.functions import col, count
import pandas as pd
import matplotlib.pyplot as plt
from tabulate import tabulate

# Count the number of submissions per subreddit
submissions_count = submissions.groupBy("subreddit").count().withColumnRenamed("count", "Submissions")

# Count the number of comments per subreddit
comments_count = comments.groupBy("subreddit").count().withColumnRenamed("count", "Comments")

# Join the counts on subreddit
subreddit_data = submissions_count.join(comments_count, "subreddit")

# Convert to Pandas DataFrame
subreddit_data_pd = subreddit_data.toPandas()

# Function to create and save a table image using tabulate
def save_table_image(df, title, filename):
    table = tabulate(df, headers='keys', tablefmt='psql', showindex=False)
    lines = table.split('\n')
    fig_width = min(max([len(line) for line in lines]) / 2, 10)
    fig_height = min(len(lines) / 2, 10)
    plt.figure(figsize=(fig_width, fig_height))
    plt.text(0.5, 0.5, table, horizontalalignment='center', verticalalignment='center', fontsize=12, family='monospace')
    plt.axis('off')
    plt.title(title, fontsize=16, weight='bold')
    plt.savefig(filename, bbox_inches='tight', pad_inches=0.05)
    plt.close()

# Define the title for the image and the file path
title = "Subreddit Submission and Comment Counts"
file_path = 'subreddit_counts_table.png'

# Create and save the table image
save_table_image(subreddit_data_pd, title, file_path)

# Show the table
print(subreddit_data_pd)


# Table: Top 10 Authors in Submissions and Comments Dataset


In [None]:
# Filter out "[deleted]" authors from comments and submissions before aggregation
filtered_comments = comments.filter(col("author") != "[deleted]")
filtered_submissions = submissions.filter(col("author") != "[deleted]")

# Find the most active users in filtered comments
most_active_users_comments = (
    filtered_comments.groupBy("author")
    .agg(count("*").alias("comments_count"))
    .orderBy(col("comments_count").desc())
    .limit(10)
)

# Find the most active users in filtered submissions
most_active_users_submissions = (
    filtered_submissions.groupBy("author")
    .agg(count("*").alias("submissions_count"))
    .orderBy(col("submissions_count").desc())
    .limit(10)
)

# # find the most active users in comments
# most_active_users_comments = comments.groupBy("author").agg(count("*").alias("comments_count")).orderBy("comments_count", ascending=False).limit(10)

# # find the most active users in submissions
# most_active_users_submissions = submissions.groupBy("author").agg(count("*").alias("submissions_count")).orderBy("submissions_count", ascending=False).limit(10)

# Converting to Pandas
top10_author_submissions = most_active_users_submissions.toPandas()
top10_author_comments = most_active_users_comments.toPandas()

# Change colulmn names
top10_author_submissions.columns = ['Author', 'Submissions']
top10_author_comments.columns = ['Author', 'Comments']

# Using tabulate to print out the tables in a nicely formatted way
table_submissions = tabulate(top10_author_submissions, headers='keys', tablefmt='psql', showindex=False)
table_comments = tabulate(top10_author_comments, headers='keys', tablefmt='psql', showindex=False)

def save_table_image(table, title, filename):
    # Split the table into lines
    lines = table.split('\n')
    # Estimate the size of the image
    fig_width = max([len(line) for line in lines]) / 2  # adjust the division factor as needed
    fig_height = len(lines) / 2  # adjust the division factor as needed
    plt.figure(figsize=(fig_width, fig_height))
    plt.text(0.5, 0.5, table, horizontalalignment='center', verticalalignment='center', fontsize=12, family='monospace')
    plt.axis('off')
    plt.title(title, fontsize=16, weight='bold')
    plt.savefig(filename, bbox_inches='tight', pad_inches=0.05)
    plt.close()

# Save the tables as images
save_table_image(table_submissions, 'Top 10 Authors by Submissions', 'top10_author_submissions.png')
save_table_image(table_comments, 'Top 10 Authors by Comments', 'top10_author_comments.png')

# Display the tables as tabulate tables
print(table_submissions)
print(table_comments)

# Table: Common authors in Comments and Submissions Datasets

In [None]:
from pyspark.sql.functions import col, count, sum as sum_agg

# Filter out "[deleted]" authors
filtered_comments = comments.filter(col("author") != "[deleted]")
filtered_submissions = submissions.filter(col("author") != "[deleted]")

# Aggregate count and score for comments
comment_agg = filtered_comments.groupBy("author").agg(
    count("id").alias("num_comments"),
    sum_agg("score").alias("total_comment_score")
)

# Aggregate count and score for submissions
submission_agg = filtered_submissions.groupBy("author").agg(
    count("*").alias("num_submissions"),
    sum_agg("score").alias("total_submission_score")
)

# Perform inner join and limit to top 20 common authors
common_authors = (
    comment_agg.join(
        submission_agg,
        "author"
    )
    .orderBy(col("num_comments").desc(), col("num_submissions").desc())
    .limit(20)
)

# Convert to Pandas DataFrame
common_authors_pd = common_authors.toPandas()

# Updating the DataFrame column names
common_authors_pd.columns = ["Author", "Number of Comments", "Total Comment Score", "Number of Submissions", "Total Submission Score"]




# # Filter out "[deleted]" authors from comments and submissions before aggregation
# filtered_comments = comments.filter(col("author") != "[deleted]")
# filtered_submissions = submissions.filter(col("author") != "[deleted]")

# # Perform inner join and limit to top 20 common authors
# common_authors = (
#     filtered_comments.groupBy("author").agg(count("id").alias("num_comments"))
#     .join(
#         filtered_submissions.groupBy("author").agg(count("*").alias("num_submissions")),
#         "author"
#     )
#     .orderBy(col("num_comments").desc(), col("num_submissions").desc())
#     .limit(20)
# )

# # Convert to Pandas DataFrame
# common_authors_pd = common_authors.toPandas()

# # Updating the column names for the DataFrame
# common_authors_pd.columns = ["Author", "Number of Comments", "Number of Submissions"]

# Function to save the DataFrame as an image using tabulate
def save_table_image(df, title, filename):
    table = tabulate(df, headers='keys', tablefmt='psql', showindex=False)
    lines = table.split('\n')
    fig_width = min(max([len(line) for line in lines]) / 2, 10)
    fig_height = min(len(lines) / 2, 10)
    plt.figure(figsize=(fig_width, fig_height))
    plt.text(0.5, 0.5, table, horizontalalignment='center', verticalalignment='center', fontsize=12, family='monospace')
    plt.axis('off')
    plt.title(title, fontsize=16, weight='bold')
    plt.savefig(filename, bbox_inches='tight', pad_inches=0.05)
    plt.close()

# Define the title for the image and the file path
title = "Top 20 Common Authors by Comments and Submissions"
file_path = 'top20_common_authors_table.png'

# Create and save the table image
save_table_image(common_authors_pd, title, file_path)

# Display the table as output
print(tabulate(common_authors_pd, headers='keys', tablefmt='psql', showindex=False))

# Table: Length of the comments and submissions (Minimum, Maximum, Mean, Median, Standard Deviation)

In [None]:
# Calculate length statistics for comments
comments = comments.withColumn('body_length', length(col('body')))
comments_stats = comments.select(
    F.min(col('body_length')).alias('Minimum Length'),
    F.max(col('body_length')).alias('Maximum Length'),
    F.avg(col('body_length')).alias('Average Length'),
    F.expr('percentile_approx(body_length, 0.5)').alias('Median Length'),
    F.stddev(col('body_length')).alias('Standard Deviation')
).collect()[0].asDict()

# Calculate length statistics for submissions
submissions = submissions.withColumn('selftext_length', length(col('selftext')))
submissions_stats = submissions.select(
    F.min(col('selftext_length')).alias('Minimum Length'),
    F.max(col('selftext_length')).alias('Maximum Length'),
    F.avg(col('selftext_length')).alias('Average Length'),
    F.expr('percentile_approx(selftext_length, 0.5)').alias('Median Length'),
    F.stddev(col('selftext_length')).alias('Standard Deviation')
).collect()[0].asDict()

# Convert stats to pandas dataframes
comments_stats_df = pd.DataFrame([comments_stats])
submissions_stats_df = pd.DataFrame([submissions_stats])

def save_table_as_image(df, title, file_name):
    # Convert the DataFrame to a table using tabulate
    table = tabulate(df, headers='keys', tablefmt='psql', showindex=False)

    # Split the table into lines to estimate its size
    lines = table.split('\n')
    fig_width = max([len(line) for line in lines]) / 2  # adjust the division factor as needed
    fig_height = len(lines) / 2  # adjust the division factor as needed

    # Create figure with dynamic size
    fig, ax = plt.subplots(figsize=(fig_width, fig_height))  # Size may need to be adjusted
    ax.text(0.5, 0.5, table, horizontalalignment='center', verticalalignment='center', fontsize=12, family='monospace')
    ax.axis('off')
    plt.title(title, fontsize=16, weight='bold')

    # Save the figure to a file
    plt.savefig(f"{file_name}.png", bbox_inches='tight', pad_inches=0.05)
    plt.close()

# Convert stats to pandas dataframes if not already done
comments_stats_df = pd.DataFrame([comments_stats])
submissions_stats_df = pd.DataFrame([submissions_stats])

# Save the tables as images with tabulate formatting
save_table_as_image(comments_stats_df, "Comments Length Statistics", "comments_length_stats")
save_table_as_image(submissions_stats_df, "Submissions Length Statistics", "submissions_length_stats")

# Paths to the saved images
# comments_length_stats_path = "comments_length_stats.png"
# submissions_length_stats_path = "submissions_length_stats.png"

# Display the table as output
print("Comments Length Statistics:")
print(tabulate(comments_stats_df, headers='keys', tablefmt='psql', showindex=False))

print("\nSubmissions Length Statistics:")
print(tabulate(submissions_stats_df, headers='keys', tablefmt='psql', showindex=False))


# Chart: Time analysis Charts

In [None]:
import plotly.graph_objs as go

# Process submissions DataFrame
submissions_df = submissions.withColumn("date", to_date(col("created_utc")))
submissions_count_by_date = submissions_df.groupBy("date").count().orderBy("date")

# Process comments DataFrame
comments_df = comments.withColumn("date", to_date(col("created_utc")))
comments_count_by_date = comments_df.groupBy("date").count().orderBy("date")

# Convert to Pandas DataFrames for plotting
submissions_pd = submissions_count_by_date.toPandas()
comments_pd = comments_count_by_date.toPandas()# Create traces for the chart
trace_submissions = go.Scatter(x=submissions_pd['date'], y=submissions_pd['count'], name='Submissions')
trace_comments = go.Scatter(x=comments_pd['date'], y=comments_pd['count'], name='Comments', visible=False)

# Create the figure with both traces
fig = go.Figure(data=[trace_submissions, trace_comments])

# Define the buttons for the dropdown menu
button_submissions = dict(label='Submissions', method='update',
                          args=[{'visible': [True, False]}, {'title': 'Number of Submissions Over Time'}])
button_comments = dict(label='Comments', method='update',
                       args=[{'visible': [False, True]}, {'title': 'Number of Comments Over Time'}])

# Add dropdown menus to the figure
fig.update_layout(
    updatemenus=[dict(active=0, buttons=[button_submissions, button_comments])]
)

# Add a range slider for the time dimension
fig.update_layout(
    xaxis=dict(
        rangeselector=dict(
            buttons=list([
                dict(count=1, label='1m', step='month', stepmode='backward'),
                dict(count=6, label='6m', step='month', stepmode='backward'),
                dict(count=1, label='1y', step='year', stepmode='backward'),
                dict(count=1, label='YTD', step='year', stepmode='todate'),
                dict(step='all')
            ])
        ),
        rangeslider=dict(visible=True),
        type='date'
    )
)

# Set the layout for the figure
fig.update_layout(title='Interactive Time Series Chart', xaxis_title='Date', yaxis_title='Count')

# # Save the interactive plot to an HTML file
fig.write_html("timeseries.html")

# Show the figure
fig.show()

In [None]:
import plotly.graph_objs as go
from pyspark.sql.functions import to_date, col

# Define the subreddits
subreddits = ["xxstem", "Feminism", "LadiesofScience", "womenEngineers", "AskFeminists", "AskWomen"]

# Process submissions DataFrame
submissions_df = submissions.withColumn("date", to_date(col("created_utc")))
submissions_count_by_date_subreddit = submissions_df.groupBy("date", "subreddit").count().orderBy("date")

# Process comments DataFrame
comments_df = comments.withColumn("date", to_date(col("created_utc")))
comments_count_by_date_subreddit = comments_df.groupBy("date", "subreddit").count().orderBy("date")

# Convert to Pandas DataFrames
submissions_pd = submissions_count_by_date_subreddit.toPandas()
comments_pd = comments_count_by_date_subreddit.toPandas()

# Create traces for Submissions and Comments
traces_submissions = []
traces_comments = []
for subreddit in subreddits:
    # Traces for Submissions
    trace_submissions = go.Scatter(
        x=submissions_pd[submissions_pd['subreddit'] == subreddit]['date'],
        y=submissions_pd[submissions_pd['subreddit'] == subreddit]['count'],
        name=f'{subreddit}'
    )
    traces_submissions.append(trace_submissions)

    # Traces for Comments
    trace_comments = go.Scatter(
        x=comments_pd[comments_pd['subreddit'] == subreddit]['date'],
        y=comments_pd[comments_pd['subreddit'] == subreddit]['count'],
        name=f'{subreddit}',
        visible=False  # Initially hidden
    )
    traces_comments.append(trace_comments)

# Create the figure with all traces
fig = go.Figure(data=traces_submissions + traces_comments)

# Define the buttons for the dropdown menu
buttons = [
    dict(
        label='Submissions',
        method='update',
        args=[{'visible': [True]*len(traces_submissions) + [False]*len(traces_comments)},
              {'title': 'Number of Submissions Over Time'}]
    ),
    dict(
        label='Comments',
        method='update',
        args=[{'visible': [False]*len(traces_submissions) + [True]*len(traces_comments)},
              {'title': 'Number of Comments Over Time'}]
    )
]

# Add dropdown menus to the figure
fig.update_layout(updatemenus=[dict(active=0, buttons=buttons)])

# Add a range slider and other layout configurations
fig.update_layout(
    xaxis=dict(
        rangeselector=dict(
            buttons=list([
                dict(count=1, label='1m', step='month', stepmode='backward'),
                dict(count=6, label='6m', step='month', stepmode='backward'),
                dict(count=1, label='1y', step='year', stepmode='backward'),
                dict(count=1, label='YTD', step='year', stepmode='todate'),
                dict(step='all')
            ])
        ),
        rangeslider=dict(visible=True),
        type='date'
    ),
    title='Interactive Time Series Chart by Subreddit and Content Type',
    xaxis_title='Date',
    yaxis_title='Count'
)

# Save the interactive plot to an HTML file
fig.write_html("timeseries_comments_submissions.html")

# Show the figure
fig.show()


In [None]:
from pyspark.sql.functions import hour
import plotly.graph_objs as go
import pandas as pd

# Extract hour from timestamp
submissions_df = submissions.withColumn("hour", hour("created_utc"))
comments_df = comments.withColumn("hour", hour("created_utc"))

# Group by hour and count
submissions_hourly = submissions_df.groupBy("hour").count().orderBy("hour")
comments_hourly = comments_df.groupBy("hour").count().orderBy("hour")

# Convert to Pandas DataFrame
submissions_hourly_pd = submissions_hourly.toPandas()
comments_hourly_pd = comments_hourly.toPandas()

# Convert hour to string and format
submissions_hourly_pd['hour'] = submissions_hourly_pd['hour'].apply(lambda x: f'{x:02d}:00')
comments_hourly_pd['hour'] = comments_hourly_pd['hour'].apply(lambda x: f'{x:02d}:00')

# Create traces for the chart
trace_submissions = go.Scatter(x=submissions_hourly_pd['hour'], y=submissions_hourly_pd['count'], name='Submissions')
trace_comments = go.Scatter(x=comments_hourly_pd['hour'], y=comments_hourly_pd['count'], name='Comments')

# Create the figure with both traces
fig = go.Figure(data=[trace_submissions, trace_comments])

# Define the buttons for the dropdown menu
button_submissions = dict(label='Submissions', method='update', args=[{'visible': [True, False]}, {'title': 'Hourly Submissions'}])
button_comments = dict(label='Comments', method='update', args=[{'visible': [False, True]}, {'title': 'Hourly Comments'}])

# Add dropdown menus to the figure
fig.update_layout(updatemenus=[dict(active=0, buttons=[button_submissions, button_comments])])

# Set the layout for the figure
fig.update_layout(title='Interactive Hourly Analysis', xaxis_title='Hour of the Day', yaxis_title='Count')

fig.write_html("timeseries_hourly_analysis.html")

# Show the figure
fig.show()


In [None]:
from pyspark.sql.functions import dayofweek

# Add 'day_of_week' column (1 = Sunday, 2 = Monday, ..., 7 = Saturday)
submissions = submissions.withColumn("day_of_week", dayofweek("created_utc"))
comments = comments.withColumn("day_of_week", dayofweek("created_utc"))

# Group by 'day_of_week' and count
submissions_day_of_week = submissions.groupBy("day_of_week").count().orderBy("day_of_week")
comments_day_of_week = comments.groupBy("day_of_week").count().orderBy("day_of_week")

# Convert to Pandas DataFrame
submissions_day_of_week_pd = submissions_day_of_week.toPandas()
comments_day_of_week_pd = comments_day_of_week.toPandas()

import plotly.graph_objects as go

# Create traces for submissions and comments
trace_submissions = go.Bar(
    x=submissions_day_of_week_pd['day_of_week'],
    y=submissions_day_of_week_pd['count'],
    name='Submissions'
)

trace_comments = go.Bar(
    x=comments_day_of_week_pd['day_of_week'],
    y=comments_day_of_week_pd['count'],
    name='Comments'
)

# Create the figure with both traces
fig = go.Figure(data=[trace_submissions, trace_comments])

# Define the buttons for the dropdown menu
button_submissions = dict(
    label='Submissions',
    method='update',
    args=[{'visible': [True, False]},
          {'title': 'Number of Submissions by Day of the Week'}]
)

button_comments = dict(
    label='Comments',
    method='update',
    args=[{'visible': [False, True]},
          {'title': 'Number of Comments by Day of the Week'}]
)

# Add dropdown menus to the figure
fig.update_layout(
    updatemenus=[dict(active=0, buttons=[button_submissions, button_comments])]
)

# Set the layout for the figure
fig.update_layout(
    title='Day Wise Analysis of Submissions and Comments',
    xaxis_title='Day of the Week',
    yaxis_title='Count',
    xaxis=dict(tickmode='array', tickvals=list(range(1, 8)), ticktext=['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'])
)

# Show the figure
fig.show()

fig.write_html("timeseries_day_analysis.html")


# Chart: Content Analysis


In [None]:
# extract comments text data
text_data_comments = comments.select("body").rdd.flatMap(lambda x: x).filter(lambda x: x is not None).collect()
text_comments = " ".join(text_data_comments)


# extract submissions text data
text_data_submissions = submissions.select("title","selftext").rdd.flatMap(lambda x: x).filter(lambda x: x is not None).collect()
text_submissions = " ".join(text_data_submissions)

stopwords = set(STOPWORDS)
stopwords.update(nltk_stopwords)
stopwords.update(["removed", "deleted","one", "will", "using", "used", "also"])


# build wordcloud for comments
wordcloud_comments = WordCloud(width=800, height=400, background_color ='white', stopwords=stopwords).generate(text_comments)
# build wordcloud for submissions
wordcloud_submissions = WordCloud(width=800, height=400, background_color ='white', stopwords=stopwords).generate(text_submissions)

# plot wordcloud
plt.figure(figsize = (8, 8), facecolor = None) 
plt.imshow(wordcloud_comments)
plt.axis("off")
plt.tight_layout(pad=0)
plt.savefig("wordcloud_comments.png", bbox_inches='tight', dpi=300)  
print("Wordcloud for comments:")
plt.show()

plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(wordcloud_submissions)
plt.axis("off")
plt.tight_layout(pad=0)
plt.savefig("wordcloud_submissions.png", bbox_inches='tight', dpi=300)  
print("Wordcloud for submissions:")
plt.show()

# Chart: Correlation Chart: Comments dataset Number of Comments v/s Submissions dataset Length of Self Text 


In [None]:
# Convert the Spark DataFrame to a Pandas DataFrame for visualization purposes
submissions_pd = submissions.select("num_comments", "selftext_length").toPandas()

correlation = submissions.stat.corr("num_comments", "selftext_length")

# Create an interactive scatter plot
fig = px.scatter(submissions_pd, x="selftext_length", y="num_comments",
                 title=f"Correlation between Number of Comments and Length of Selftext: {correlation:.2f}",
                 labels={"selftext_length": "Length of Self-text", "num_comments": "Number of Comments"})

# Show the figure
fig.show()

# Save the interactive plot to an HTML file (optional)
fig.write_html("correlation_analysis_interactive.html")


# Chart: Top Authors with Most Comments and Most Submissions

In [None]:
# chart2_submissions = submissions.filter(col("author") != "[deleted]")
# chart2_comments = comments.filter(col("author") != "[deleted]")

# # Count the number of submissions per author
# author_submission_counts = chart2_submissions.groupBy('author').count().withColumnRenamed('count', 'submission_count')

# # Count the number of comments per author
# author_comment_counts = chart2_comments.groupBy('author').count().withColumnRenamed('count', 'comment_count')

# # Get the top authors based on submission count
# top_authors_submissions_pd = author_submission_counts.orderBy(desc('submission_count')).limit(10).toPandas()

# # Get the top authors based on comment count
# top_authors_comments_pd = author_comment_counts.orderBy(desc('comment_count')).limit(10).toPandas()

# # Create an interactive bar chart for top authors by submissions
# fig_submissions = px.bar(
#     top_authors_submissions_pd,
#     x='author',
#     y='submission_count',
#     title='Top Authors by Submissions',
#     labels={'submission_count': 'Number of Submissions', 'author': 'Author'}
# )
# fig_submissions.update_layout(xaxis_tickangle=-45)
# fig_submissions.write_html("top_authors_submissions_interactive.html")

# # Create an interactive bar chart for top authors by comments
# fig_comments = px.bar(
#     top_authors_comments_pd,
#     x='author',
#     y='comment_count',
#     title='Top Authors by Comments',
#     labels={'comment_count': 'Number of Comments', 'author': 'Author'}
# )
# fig_comments.update_layout(xaxis_tickangle=-45)
# fig_comments.write_html("top_authors_comments_interactive.html")

# # Paths to the saved interactive charts
# submissions_chart_path = "top_authors_submissions_interactive.html"
# comments_chart_path = "top_authors_comments_interactive.html"

# # Display the interactive charts
# fig_submissions.show()
# fig_comments.show()


In [None]:
import plotly.graph_objs as go
from plotly.subplots import make_subplots

# Create a figure with a dropdown menu
fig = make_subplots()

# Trace for top authors by submissions
trace_submissions = go.Bar(
    x=top_authors_submissions_pd['author'],
    y=top_authors_submissions_pd['submission_count'],
    name='Submissions'
)

# Trace for top authors by comments
trace_comments = go.Bar(
    x=top_authors_comments_pd['author'],
    y=top_authors_comments_pd['comment_count'],
    name='Comments'
)

# Add traces to the figure
fig.add_trace(trace_submissions, 1, 1)
fig.add_trace(trace_comments, 1, 1)

# Make the comments trace invisible initially
fig.data[1].visible = False

# Create a dropdown menu for switching between traces
fig.update_layout(
    updatemenus=[
        dict(
            buttons=list([
                dict(
                    args=[{"visible": [True, False]}],
                    label="Submissions",
                    method="update"
                ),
                dict(
                    args=[{"visible": [False, True]}],
                    label="Comments",
                    method="update"
                )
            ]),
            direction="down",
            pad={"r": 10, "t": 10},
            showactive=True,
            x=1,
            xanchor="left",
            y=1.3,
            yanchor="top"
        ),
    ]
)

# Set the layout for the figure
fig.update_layout(
    title_text="Top Authors by Submissions and Comments",
    xaxis_title="Author",
    yaxis_title="Count",
    xaxis_tickangle=-45
)

# Save the interactive plot to an HTML file
fig.write_html("top_authors_interactive.html")

# Display the figure
fig.show()


# Chart: Engagement Metrics of Number of Comments per Submissions (For each subrredit)

In [None]:
import plotly.graph_objects as go
from pyspark.sql.functions import collect_list

# Group by 'subreddit' and collect the 'num_comments' into a list
subreddit_comments = submissions.groupBy('subreddit').agg(collect_list('num_comments').alias('comments'))

# Convert to Pandas DataFrame for plotting
subreddit_comments_pd = subreddit_comments.toPandas().set_index('subreddit')

# Create a Plotly figure
fig = go.Figure()

# Add traces (one for each subreddit), but set them to be not visible initially
for subreddit, data in subreddit_comments_pd.iterrows():
    fig.add_trace(
        go.Histogram(
            x=data['comments'],
            name=subreddit,
            opacity=0.75,
            visible=False
        )
    )

# Create a dropdown menu with options for each subreddit
subreddit_options = [{"label": subreddit, "method": "update", "args": [{"visible": [subreddit == r for r in subreddit_comments_pd.index]}]} for subreddit in subreddit_comments_pd.index]

# Update the layout to add the dropdown
fig.update_layout(
    title='Number of Comments Distribution per Subreddit',
    xaxis_title='Number of Comments',
    yaxis_title='Frequency',
    updatemenus=[{
        "buttons": subreddit_options,
        "direction": "down",
        "showactive": True,
    }]
)

# Set the first trace to be visible initially
fig.data[0].visible = True

# Show the interactive figure
fig.show()

# Save the interactive plot to an HTML file
fig.write_html("subreddit_comments_distribution_interactive.html")

# Chart: Issues faced by Women


In [None]:
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from pyspark.sql.functions import col

# Define the issues and corresponding regex patterns
issues = {
    "Workplace Discrimination": "workplace|discrimination",
    "Gender Equality": "gender equality|feminism",
    "Healthcare": "healthcare|health care|reproductive rights",
    "Color": "hispanic|black|asian|ethnicity",
    "Domestic Violence": "domestic violence|abuse",
    "Parental Leave": "maternity leave|paternity leave|parental leave",
    "Body Image": "body image|body positivity",
    "Education": "education|school|university",
    "Career Advancement": "promotion|career advancement",
    "Harassment": "harassment|sexual harassment",
    "Mental Health": "mental health|depression|anxiety",
    "Equal Pay": "equal pay|wage gap",
    "Political Representation": "politics|representation",
    "Child Care": "child care|childcare|babysitting",
    "Menstruation": "menstruation|period|menstrual",
    "LGBTQ+ Rights": "LGBTQ|lesbian|gay|bisexual|transgender",
    "Marriage and Divorce": "marriage|divorce|wedding",
    "Aging": "aging|elderly|senior",
    "Fashion and Beauty Standards": "fashion|beauty standards|makeup",
    "Sexual and Reproductive Health": "contraception|birth control|abortion"
}

# Initialize dictionaries for counting occurrences
issue_counts_submissions = {issue: 0 for issue in issues.keys()}
issue_counts_comments = {issue: 0 for issue in issues.keys()}

# Function to count occurrences of each issue
def count_issues(df, column, issue_counts):
    for issue, pattern in issues.items():
        count = df.filter(col(column).rlike(pattern)).count()
        issue_counts[issue] += count

# Count issues in comments and submissions
count_issues(comments, "body", issue_counts_comments)
count_issues(submissions, "title", issue_counts_submissions)
count_issues(submissions, "selftext", issue_counts_submissions)

labels = list(issues.keys())
values_submissions = list(issue_counts_submissions.values())
values_comments = list(issue_counts_comments.values())

color_palette = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd',
                 '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf',
                 ]

fig = make_subplots(rows=1, cols=2, subplot_titles=("Submissions", "Comments"))

fig.add_trace(go.Pie(labels=labels, values=values_submissions, name="Submissions",
                     marker=dict(colors=color_palette)), 1, 1)

fig.add_trace(go.Pie(labels=labels, values=values_comments, name="Comments",
                     marker=dict(colors=color_palette)), 1, 2)

fig.update_layout(title_text="Issues Faced by Women in Reddit")
fig.show()
# Save the interactive plot to an HTML file
fig.write_html("piechart.html")

Disclaimer: The work has been by us. However, we have taken assissstance from ChatGPT for code commenting and cleaning.