In [5]:
import os
import sys

PARENT_PATH = os.path.abspath(os.path.dirname(os.getcwd()))
sys.path.append(PARENT_PATH)

from utils import flat_subreddits, subreddits

from tqdm import tqdm
import polars as pl
import re



In [6]:
MIN_COMMENTS = 3
MAX_COMMENTS = 5

MIN_COMMENT_SCORE = 2

# Both are inclusive
DATE_START = pl.datetime(2020, 11, 30)
DATE_END = pl.datetime(2024, 11, 30)

MIN_WORDS = 10

REMOVAL_PATTERN = r'\[removed\]|\[deleted\]|Your submission has been removed'

posts_schema = {
    "id": pl.String,
    "title": pl.String, 
    "selftext": pl.String,
    "author": pl.String,
    "score": pl.Int32,
    "num_comments": pl.Int32,
    "stickied": pl.Boolean
}

comments_schema = {
    "id": pl.String,
    "parent_id": pl.String,
    "body": pl.String, 
    "score": pl.Int32,
    "author": pl.String,
    "stickied": pl.Boolean,
    "edited": pl.Int32
}

In [7]:
def read_with_utc_handling(filepath, schema):
    """Read NDJSON file and handle UTC timestamps that may be string or int"""
    
    # Read with UTC as string
    df_str = pl.read_ndjson(filepath, schema={**schema, "created_utc": pl.String})
    df_str = df_str.with_columns(
        pl.col("created_utc").cast(pl.Int32).alias("created_utc_str")
    )
    
    # Read with UTC as int 
    df_int = pl.read_ndjson(filepath, schema={"id": pl.String, "created_utc": pl.Int32})
    
    # Merge and coalesce UTC fields
    return df_str.join(
        df_int.select(["id", "created_utc"])
            .rename({"created_utc": "created_utc_int"}),
        on="id",
        how="left"
    ).with_columns(
        pl.coalesce([pl.col("created_utc_str"), pl.col("created_utc_int")]).alias("created_utc")
    ).drop(["created_utc_str", "created_utc_int"])

In [8]:
posts = {}
comments = {}

for subreddit in flat_subreddits:
    posts[subreddit] = read_with_utc_handling(
        f"{PARENT_PATH}/data/extracted/{subreddit}_submissions",
        posts_schema
    )

    comments[subreddit] = read_with_utc_handling(
        f"{PARENT_PATH}/data/extracted/{subreddit}_comments", 
        comments_schema
    )

In [9]:
def prep_posts(posts):
    # Rename columns
    posts = posts.rename({"id": "post_id", "author": "post_author"})
    
    # Convert UTC fields to datetime
    posts = posts.with_columns(
        pl.from_epoch(pl.col("created_utc")).alias("created_utc")
    )
    
    # Specify date frame of interests
    posts = posts.filter(
        (pl.col("created_utc") >= DATE_START) &
        (pl.col("created_utc") <= DATE_END)
    )
    
    # Add temporary text column combining title and selftext
    posts = posts.with_columns(
        pl.concat_str([
            pl.col("title").fill_null(""),
            pl.col("selftext").fill_null("")
        ], separator=" ").alias("all_text")
    )
    
    return posts

def prep_comments(comments):
    # Rename columns
    comments = comments.rename({"id": "comment_id", "author": "comment_author"})
    
    # Only keep first-level comments
    comments = comments.with_columns(
        pl.col("parent_id").str.replace_all("t3_", "").alias("post_id")
    ).drop("parent_id")
    
    # Drop the "t1_" prefix to every comment identifier
    comments = comments.filter(~pl.col("post_id").str.starts_with("t1_"))
    
    # Handle edited timestamps
    comments = comments.with_columns(
        pl.when(pl.col("edited") == 0).then(None).otherwise(pl.col("edited")).alias("edited")
    )
    
    # Convert UTC fields to datetime
    comments = comments.with_columns(
        pl.from_epoch(pl.col("created_utc")).alias("created_utc"),
        pl.from_epoch(pl.col("edited")).alias("edited")
    )
    
    # Convert null "stickied" values to false
    comments = comments.with_columns(
        pl.col("stickied").fill_null(False)
    )
    
    return comments

In [10]:
for subreddit in tqdm(flat_subreddits, desc="Pre-processing posts"):
    posts[subreddit] = prep_posts(posts[subreddit])
    comments[subreddit] = prep_comments(comments[subreddit])

Pre-processing posts:   0%|          | 0/5 [00:00<?, ?it/s]

Pre-processing posts: 100%|██████████| 5/5 [00:02<00:00,  1.88it/s]


In [11]:
def filter_posts(posts):
    
    filters = (
        ~pl.col("stickied") &
        (pl.col("num_comments") >= MIN_COMMENTS) &
        ~(pl.col("selftext").str.contains(REMOVAL_PATTERN) & pl.col("title").str.contains(REMOVAL_PATTERN)) &
        (pl.col("all_text").str.count_matches(r'\S+') >= MIN_WORDS)
    )

    filtered_posts = posts.filter(filters)

    return filtered_posts

for subreddit in tqdm(flat_subreddits, desc="Filtering posts"):
    posts[subreddit] = filter_posts(posts[subreddit])

    comments[subreddit] = comments[subreddit].join(
        posts[subreddit].select(["post_id", "post_author"]),
        on="post_id",
        how="inner"
    )

Filtering posts: 100%|██████████| 5/5 [00:00<00:00,  7.36it/s]


In [12]:
import datetime


def filter_comments(comments):

    filters = (
        ~pl.col("stickied") &
        (pl.col("score") >= MIN_COMMENT_SCORE) &
        (pl.col("comment_author") != pl.col("post_author")) &
        (
            pl.col("edited").is_null() |
            ((pl.col("edited") - pl.col("created_utc")) <= datetime.timedelta(hours=24))
        ) &
        ~pl.col("body").str.contains(REMOVAL_PATTERN) &
        (pl.col("body").str.count_matches(r'\S+') >= MIN_WORDS)
    )

    filtered_comments = comments.filter(filters)
    
    return filtered_comments

for subreddit in tqdm(flat_subreddits, desc="Filtering comments"):
    comments[subreddit] = filter_comments(comments[subreddit])
    # Group comments by post_id and get all comments sorted by score
    grouped_comments = comments[subreddit].sort("score", descending=True)

    # Get posts with at least 3 comments
    comment_counts = grouped_comments.group_by("post_id").agg(
        pl.col("comment_id").count().alias("n_comments")
    )
    valid_posts = comment_counts.filter(pl.col("n_comments") >= MIN_COMMENTS).select("post_id")
    filtered_comments = grouped_comments.join(valid_posts, on="post_id")

    # For each post, select comments based on count
    final_comments = []
    post_groups = filtered_comments.partition_by("post_id", as_dict=True)
    
    for post_comments in post_groups.values():
        n_comments = len(post_comments)
        
        if n_comments < MAX_COMMENTS:
            # For posts with 3 or 4 comments, take all
            final_comments.append(post_comments)
        else:
            # For posts with 5 comments
            top_2 = post_comments.head(2)
            bottom_2 = post_comments.tail(2)
            
            # Get 1 random middle comment
            middle_slice = post_comments.slice(2, n_comments-2)
            middle = middle_slice.sample(n=1)
            
            final_comments.extend([top_2, middle, bottom_2])

    # Combine all comments and remove duplicates
    final_comments = pl.concat(final_comments).unique(subset=["post_id", "comment_id"])

    # Update comments and posts with valid posts (3+ comments)
    valid_post_ids = final_comments.group_by("post_id").agg(
        pl.col("comment_id").count().alias("n_comments")
    ).filter(
        pl.col("n_comments") >= MIN_COMMENTS
    ).select("post_id")

    comments[subreddit] = final_comments.join(valid_post_ids, on="post_id")
    posts[subreddit] = posts[subreddit].join(valid_post_ids, on="post_id", how="inner")

Filtering comments: 100%|██████████| 5/5 [00:19<00:00,  3.87s/it]


In [13]:
def crop_text(text: str, max_words: int = 1000) -> str:
    words = text.split()
    if len(words) <= max_words:
        return text
    return " ".join(words[:max_words]) + "..."

def remove_edit_sections(text: str) -> str:
    pattern = r"^(.*?)(?=[\s]+(?:edit[\w]{0,5}|update[\w]{0,5}):)"
    match = re.match(pattern, text, re.IGNORECASE)
    result = match.group(1) if match else text
    return result

for subreddit in tqdm(flat_subreddits, desc="Making final changes"):

    posts[subreddit] = posts[subreddit].with_columns([
        pl.col("selftext").map_elements(crop_text, return_dtype=pl.String).alias("selftext")
    ])

    comments[subreddit] = comments[subreddit].with_columns([
        pl.col("body").map_elements(crop_text, return_dtype=pl.String).alias("body")
    ])

    posts[subreddit] = posts[subreddit].with_columns([
        pl.col("selftext").map_elements(remove_edit_sections, return_dtype=pl.String).alias("selftext")
    ])

Making final changes: 100%|██████████| 5/5 [00:04<00:00,  1.21it/s]


In [21]:
# Merge all posts into one dataframe
all_posts = pl.concat([
    posts[subreddit].with_columns(pl.lit(subreddit).alias("subreddit"))
])

# Merge all comments into one dataframe 
all_comments = pl.concat([
    comments[subreddit].with_columns(pl.lit(subreddit).alias("subreddit"))
    for subreddit in flat_subreddits 
])

# Map subreddits to political leanings
subreddit_to_political = {
    subreddit: political 
    for political, subreddit_list in subreddits.items()
    for subreddit in subreddit_list
}

# Add political leaning column
all_posts = all_posts.with_columns(
    pl.col("subreddit").replace(subreddit_to_political).alias("political")
)
all_comments = all_comments.with_columns(
    pl.col("subreddit").replace(subreddit_to_political).alias("political")
)


In [22]:
# Remove stickied and edited columns from posts and comments
all_posts = all_posts.drop(['stickied', 'all_text'])
all_comments = all_comments.drop(['stickied', 'edited'])

In [23]:
# Save merged dataframes
all_posts.write_csv(f"{PARENT_PATH}/data/processed/all_posts.csv")
all_comments.write_csv(f"{PARENT_PATH}/data/processed/all_comments.csv")

In [17]:
# # Convert polars dataframes to pandas for snorkel compatibility
# for subreddit in tqdm(flat_subreddits, desc="Converting to pandas"):
#     posts[subreddit] = posts[subreddit].to_pandas()
#     comments[subreddit] = comments[subreddit].to_pandas()

In [18]:
# for subreddit in flat_subreddits:
#     posts[subreddit].to_csv(f"{PARENT_PATH}/data/processed/{subreddit}_posts.csv", index=False)
#     comments[subreddit].to_csv(f"{PARENT_PATH}/data/processed/{subreddit}_comments.csv", index=False)