In [143]:
import pandas as pd
import os
import sys

from pyspark.sql import SparkSession, types
from pyspark.sql.types import TimestampType

In [144]:
spark = SparkSession.builder.appName('reddit-submissions-getter').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

assert sys.version_info >= (3, 8) # make sure we have Python 3.8+
assert spark.version >= '3.2' # make sure we have Spark 3.2+


In [145]:
# for stopping execution of the notebook
class StopExecution(Exception):
    def _render_traceback_(self):
        return ["ERROR: submissions already linked to comments"]

In [146]:
def add_comment_sentiment_counts(comments_df, submissions_df):
# given the comments and submissions dataframes, this function will split up the comments
# by their link_id (the submission id) and count the number of positive, neutral and negative comments 


#sum the number of positive/netural/negative commetns for each unique link_id 

    comment_counts_df = comments_df.groupby("link_id").agg(
        positive_count=('sentiment_rounded', lambda x: (x == 1).sum()),
        negative_count=('sentiment_rounded', lambda x: (x == -1).sum()),
        neutral_count=('sentiment_rounded', lambda x: (x == 0).sum())
    ).reset_index()
    
#remove  the "t3_" from link_id in the comments_df
    comments_df['link_id'] = comments_df['link_id'].str.replace('t3_', '', regex=False)
    
#rename "link_id" to "id" in comment_counts_df so we can merge the dataframes on "id"

    comment_counts_df.rename(columns={"link_id": "id"}, inplace=True)
    
#merge the dataframes on "id"
    result_df = submissions_df.merge(comment_counts_df, on="id", how="left")
    

    result_df.fillna({'positive_count': 0, 'negative_count': 0, 'neutral_count': 0}, inplace=True)
    
    return result_df


In [147]:

# pivots the sentiment counts into seperate columns for each sentiment fill missing values with 0 
def aggregate_sentiment_counts(comments_df):
    sentiment_counts = comments_df.groupby(['link_id', 'sentiment_rounded']).size().unstack(fill_value=0)
    sentiment_counts.columns = ['negative_count', 'neutral_count', 'positive_count'] 
    sentiment_counts.reset_index(inplace = True)
    return sentiment_counts 


In [148]:
#adjust these boundaries as needed 

def round_sentiment(score):
    if score > 0.25:
        return 1 
    elif score < -0.25:
        return -1 
    else:
        return 0 

In [149]:
#function to read multiple csv files in a directorty and load them into one single dataframe
def load_csv_files(directory):
    data_frames = []
    for filename in os.listdir(directory):
        if filename.endswith(".csv"):
            file_path = os.path.join(directory, filename)
            try:
                df = pd.read_csv(file_path, on_bad_lines='skip')  #avoid errors while reading 
                data_frames.append(df)
            except pd.errors.ParserError as e:
                print(f"Error reading {file_path}: {e}")
    return pd.concat(data_frames, ignore_index=True)



In [150]:
submissions_df = load_csv_files('comments_cleaned/subs')
comments_df = load_csv_files('comments_cleaned/comms')


In [151]:
if 'positive_count' in submissions_df.columns:
    raise StopExecution

In [152]:

#round the sentiments to -1, 0 or 1 

comments_df['sentiment_score'] = pd.to_numeric(comments_df['sentiment_score'], errors='coerce')
submissions_df['sentiment_score'] = pd.to_numeric(submissions_df['sentiment_score'], errors='coerce')


comments_df['sentiment_rounded'] = comments_df['sentiment_score'].apply(round_sentiment)
submissions_df['sentiment_rounded'] = submissions_df['sentiment_score'].apply(round_sentiment)
#add the comment sentiment counts to the submissions dataframe 

submissions_df = add_comment_sentiment_counts(comments_df, submissions_df)

#group comments by link_id and count the number of each sentiment
submissions = aggregate_sentiment_counts(comments_df)

#match the column names for merging with submissions_df
submissions.rename(columns={'link_id': 'id'}, inplace=True)

#join with submissions_df
submissions = submissions_df.merge(submissions, on = "id", how = "left")


In [153]:
#drop unnecessary columns added by the merge 
submissions = submissions.drop(columns=['positive_count_x', 'neutral_count_x', 'negative_count_x'])
submissions = submissions.rename(columns={
    'positive_count_y': 'positive_comment_count',
    'neutral_count_y': 'neutral_comment_count',
    'negative_count_y': 'negative_comment_count'
}).fillna(0)
submissions[['positive_comment_count', 'neutral_comment_count', 'negative_comment_count']]= submissions[['positive_comment_count', 'neutral_comment_count', 'negative_comment_count']].fillna(0)

In [154]:
submissions_df['positive_count']=submissions['positive_comment_count']
submissions_df['negative_count']=submissions['negative_comment_count']

# remove sentiment_rounded from submissions_df
submissions_df = submissions_df.drop(columns=['sentiment_rounded'])

In [155]:
print(submissions_df)
print(comments_df)

          id    subreddit                                              title  \
0    1ea46c5  aznidentity        Recognize the Tactics of White Nationalists   
1    1e9zbx9  aznidentity  Hung Cao Speaks at The 2024 Republican Nationa...   
2    1e9eam1  aznidentity      Chinese language program for overseas Chinese   
3    1e95w39  aznidentity  The 2024 USA International Mathematical Olympi...   
4    1e95hy8  aznidentity  Re-Post of our Rules Regarding Trolling and Tr...   
..       ...          ...                                                ...   
237  1efztr6  programming    My mental model of setf was wrong [common lisp]   
238  1efztds  programming         Data.Maybe, and thoughts on library design   
239  1efzsyo  programming         Deno: What we got wrong about HTTP imports   
240  1efzs27  programming                The evolution of Ruby's Range class   
241  1efzrbv  programming         DARPA: Translating All C to Rust (Tractor)   

    time_created  num_comments  sentime

In [156]:
# save to csv 
# submissions_df.to_csv('submissions_cleaned_linked.csv', index=False)
# comments_df.to_csv('comments_cleaned_linked.csv', index = False)

In [157]:
# save to csv

spark_subs = spark.createDataFrame(submissions_df)
spark_subs.write.format("csv").save("comments_cleaned/subs", mode="overwrite", header=True)

# comments do not need saving bc the only new column is 'rounded_sentiment', which is not very useful

                                                                                