In [1]:
from pyspark.sql import SparkSession
import os
from pathlib import Path
from pyspark.sql.functions import (
    col, from_unixtime, to_date, hour, dayofmonth, month, year, weekofyear,lit,
    date_format, minute, second, dense_rank, monotonically_increasing_id, max as spark_max
)
from pyspark.sql.window import Window
import shutil

In [2]:
# Initialize Spark
spark = SparkSession.builder \
    .appName("Reddit_ETL") \
    .config("spark.hadoop.io.nativeio.enabled", "false") \
    .getOrCreate()

In [3]:
# Paths
input_folder = "D:/Portfolio/reddit-analytics-pipeline/data/partitions"
dw_folder = "D:/Portfolio/reddit-analytics-pipeline/data/DW_cache"

In [4]:
# Ensure DW cache directory exists
os.makedirs(dw_folder, exist_ok=True)

spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")

In [5]:
def delete_all_files_in_directory(folder_path):
    if os.path.exists(folder_path):
        for item in os.listdir(folder_path):
            item_path = os.path.join(folder_path, item)
            if os.path.isfile(item_path) or os.path.islink(item_path):
                os.remove(item_path)
            elif os.path.isdir(item_path):
                shutil.rmtree(item_path)

In [6]:
def is_folder_empty_of_parquet(folder_path):
    if not os.path.exists(folder_path):
        print("Folder does not exist.")
        return False  # or raise Exception if needed

    parquet_files = [f for f in os.listdir(folder_path) if f.endswith(".parquet")]
    if not parquet_files:
        print("Folder exists but contains no Parquet files.")
        return False
    else:
        print("Folder has Parquet files.")
        return True

In [7]:
def read_new_csvs(input_path):
    return spark.read.option("header", True).csv(input_path)

In [8]:
def clean_and_prepare(df):
    return df.select(
        col("id").alias("comment_id"),
        col("author"),
        col("subreddit"),
        col("subreddit_id"),
        col("created_utc"),
        col("body"),
        col("score"),
        col("controversiality"),
        col("gilded"),
        col("ups"),
        col("downs")
    ).dropna(subset=["author", "subreddit", "created_utc", "comment_id"])

In [9]:
def build_dim_user(df, dw_path):
    spark.catalog.clearCache()
    user_path = os.path.join(dw_path, "dim_user")

    # Clean input: remove nulls and drop duplicates
    new_users = df.select("author") \
                  .dropna(subset=["author"]) \
                  .dropDuplicates(["author"])

    if os.path.exists(user_path) and not is_folder_empty_of_parquet(user_path):
        existing = spark.read.parquet(user_path).dropDuplicates(["author"])

        # Get max user_id
        max_id = existing.agg(spark_max("user_id")).collect()[0][0] or 0

        # Exclude existing authors
        new_only = new_users.join(existing, on="author", how="left_anti")

        if new_only.count() == 0:
            return existing

        # Assign new user_ids sequentially
        window = Window.orderBy("author")
        new_only = new_only.withColumn("row_num", dense_rank().over(window))
        new_only = new_only.withColumn("user_id", col("row_num") + lit(max_id)).drop("row_num")

        # Combine and ensure uniqueness
        combined = existing.unionByName(new_only).dropDuplicates(["author"])

        # Safety check
        assert combined.select("user_id").distinct().count() == combined.count(), "❌ Duplicate user_id found!"

        # Overwrite clean version
        shutil.rmtree(user_path)
        combined.write.mode("overwrite").parquet(user_path)
        return combined

    else:
        # First-time creation
        window = Window.orderBy("author")
        new_users = new_users.withColumn("user_id", dense_rank().over(window))
                
        new_users.write.mode("overwrite").parquet(user_path)
        return new_users

In [10]:
def build_dim_subreddit(df, dw_path):
    from pyspark.sql.functions import col, lit, dense_rank, max as spark_max
    from pyspark.sql.window import Window

    spark.catalog.clearCache()
    sub_path = os.path.join(dw_path, "dim_subreddit")

    # Clean new input: keep only unique subreddit_id
    new_subs = df.select("subreddit", "subreddit_id") \
                 .dropna(subset=["subreddit_id"]) \
                 .dropDuplicates(["subreddit_id"])

    if os.path.exists(sub_path) and not is_folder_empty_of_parquet(sub_path):
        existing = spark.read.parquet(sub_path).dropDuplicates(["subreddit_id"])

        # Get max existing subreddit_key
        max_key = existing.agg(spark_max("subreddit_key")).collect()[0][0] or 0

        # Exclude already seen subreddit_ids
        new_only = new_subs.join(existing, on="subreddit_id", how="left_anti")

        if new_only.count() == 0:
            return existing

        # Assign subreddit_keys sequentially from max_key + 1
        window = Window.orderBy("subreddit_id")
        new_only = new_only.withColumn("row_num", dense_rank().over(window))
        new_only = new_only.withColumn("subreddit_key", col("row_num") + lit(max_key)).drop("row_num")

        # Combine and drop any accidental duplicates
        combined = existing.unionByName(new_only).dropDuplicates(["subreddit_id"])

        # Final safety: ensure subreddit_key is unique
        assert combined.select("subreddit_key").distinct().count() == combined.count(), "❌ Duplicate subreddit_key found!"

        # Overwrite clean version
        shutil.rmtree(sub_path)
        
        # 🚨 Deduplicate by subreddit_key
        combined = combined.dropDuplicates(["subreddit_key"])
        
        combined.write.mode("overwrite").parquet(sub_path)
        return combined

    else:
        # Initial load: assign keys from 1
        window = Window.orderBy("subreddit_id")
        new_subs = new_subs.withColumn("subreddit_key", dense_rank().over(window))
        
        # 🚨 Deduplicate by subreddit_key
        new_subs = new_subs.dropDuplicates(["subreddit_key"])
        
        new_subs.write.mode("overwrite").parquet(sub_path)
        return new_subs

In [11]:
def build_dim_date(df, dw_path):
    # Extract and cast timestamp
    dim_date_df = df.withColumn("created_datetime", from_unixtime(col("created_utc")).cast("timestamp")) \
                    .select("created_datetime") \
                    .dropDuplicates()

    # Breakdown into date parts
    dim_date_df = dim_date_df \
        .withColumn("date", to_date(col("created_datetime"))) \
        .withColumn("year", year(col("created_datetime"))) \
        .withColumn("month", month(col("created_datetime"))) \
        .withColumn("day", dayofmonth(col("created_datetime"))) \
        .withColumn("hour", hour(col("created_datetime"))) \
        .withColumn("minute", minute(col("created_datetime"))) \
        .withColumn("second", second(col("created_datetime"))) \
        .withColumn("day_of_week", date_format(col("created_datetime"), "E")) \
        .withColumn("week_of_year", weekofyear(col("created_datetime")))

    # Path to save
    date_path = os.path.join(dw_path, "dim_date")

    # Append new entries only
    if os.path.exists(date_path):
        existing = spark.read.parquet(date_path)
        dim_date_df = dim_date_df.join(existing, on="created_datetime", how="left_anti")
    
    # 🚨 Deduplicate by created_datetime
    dim_date_df = dim_date_df.dropDuplicates(["created_datetime"])
    
    dim_date_df.write.mode("append").parquet(date_path)
    return spark.read.parquet(date_path)

In [12]:
def build_fact_comment(df, dim_user, dim_sub, dim_date, dw_path):
    spark.catalog.clearCache()
    fact_path = os.path.join(dw_path, "fact_comment")

    # Step 1: Add timestamp for join
    df = df.withColumn("created_datetime", from_unixtime(col("created_utc")).cast("timestamp"))

    # Step 2: Join with dimension tables
    new_fact = df \
        .join(dim_user.select("author", "user_id"), on="author", how="left") \
        .join(dim_sub.select("subreddit_id", "subreddit_key"), on="subreddit_id", how="left") \
        .join(dim_date.select("created_datetime"), on="created_datetime", how="left") \
        .select(
            col("comment_id"),
            col("user_id"),
            col("subreddit_key"),
            col("created_datetime"),
            col("body"),
            col("score"),
            col("controversiality"),
            col("gilded"),
            col("ups"),
            col("downs")
        ).dropna(subset=["comment_id"])  # Ensure we don't propagate null comment_ids

    # Step 3: Load existing fact_comment if exists
    if os.path.exists(fact_path):
        existing_fact = spark.read.parquet(fact_path).dropDuplicates(["comment_id"])
        # Keep only new records that don't exist
        new_only = new_fact.join(existing_fact, on="comment_id", how="left_anti")
        combined = existing_fact.unionByName(new_only).dropDuplicates(["comment_id"])
        
        # Filter invalid dates
        combined = combined.filter(
            (col("created_datetime").isNotNull()) &
            (to_date("created_datetime") != lit("1969-12-31"))
        )
        
        # Print distinct dates
        print("🗓️ Distinct created dates being written:")
        combined.select(to_date("created_datetime").alias("date")).distinct().orderBy("date").show(truncate=False)

        # Remove old and overwrite
        #shutil.rmtree(fact_path)
        combined.write.mode("append").parquet(fact_path)
        return combined
    else:
        
        # Filter invalid dates
        new_fact = new_fact.filter(
            (col("created_datetime").isNotNull()) &
            (to_date("created_datetime") != lit("1969-12-31"))
        )
        
        # First time creation
        new_fact.write.mode("overwrite").parquet(fact_path)
        return new_fact

In [13]:
# 🚀 Main
if __name__ == "__main__":
    all_csvs = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith(".csv")]
    for csv_file in all_csvs:
        print(f"🚚 Processing file: {csv_file}")
        df_raw = read_new_csvs(csv_file)
        df_clean = clean_and_prepare(df_raw)


        dim_user = build_dim_user(df_clean, dw_folder)
        dim_sub = build_dim_subreddit(df_clean, dw_folder)
        dim_date = build_dim_date(df_clean, dw_folder)
        #dim_user = spark.read.parquet(os.path.join(dw_folder, "dim_user"))
        #dim_sub = spark.read.parquet(os.path.join(dw_folder, "dim_subreddit"))
        #dim_date = spark.read.parquet(os.path.join(dw_folder, "dim_date"))

        fact_comment = build_fact_comment(df_clean, dim_user, dim_sub, dim_date, dw_folder)

    print("✅ ETL completed. All dimension and fact tables updated.")

🚚 Processing file: D:/Portfolio/reddit-analytics-pipeline/data/partitions\reddit_data_2015-05-01.csv
🗓️ Distinct created dates being written:
+----------+
|date      |
+----------+
|2015-05-01|
|2015-05-02|
|2015-05-03|
|2015-05-04|
|2015-05-05|
+----------+

🚚 Processing file: D:/Portfolio/reddit-analytics-pipeline/data/partitions\reddit_data_2015-05-02.csv
🗓️ Distinct created dates being written:
+----------+
|date      |
+----------+
|2015-05-01|
|2015-05-02|
|2015-05-03|
|2015-05-04|
|2015-05-05|
+----------+

🚚 Processing file: D:/Portfolio/reddit-analytics-pipeline/data/partitions\reddit_data_2015-05-03.csv
🗓️ Distinct created dates being written:
+----------+
|date      |
+----------+
|2015-05-01|
|2015-05-02|
|2015-05-03|
|2015-05-04|
|2015-05-05|
+----------+

🚚 Processing file: D:/Portfolio/reddit-analytics-pipeline/data/partitions\reddit_data_2015-05-04.csv
🗓️ Distinct created dates being written:
+----------+
|date      |
+----------+
|2015-05-01|
|2015-05-02|
|2015-05-03|


🚚 Processing file: D:/Portfolio/reddit-analytics-pipeline/data/partitions\reddit_data_2015-05-24.csv
🗓️ Distinct created dates being written:
+----------+
|date      |
+----------+
|2015-05-01|
|2015-05-02|
|2015-05-03|
|2015-05-04|
|2015-05-05|
|2015-05-06|
|2015-05-07|
|2015-05-08|
|2015-05-09|
|2015-05-10|
|2015-05-11|
|2015-05-12|
|2015-05-13|
|2015-05-14|
|2015-05-15|
|2015-05-16|
|2015-05-17|
|2015-05-18|
|2015-05-19|
|2015-05-20|
+----------+
only showing top 20 rows

🚚 Processing file: D:/Portfolio/reddit-analytics-pipeline/data/partitions\reddit_data_2015-05-25.csv
🗓️ Distinct created dates being written:
+----------+
|date      |
+----------+
|2015-05-01|
|2015-05-02|
|2015-05-03|
|2015-05-04|
|2015-05-05|
|2015-05-06|
|2015-05-07|
|2015-05-08|
|2015-05-09|
|2015-05-10|
|2015-05-11|
|2015-05-12|
|2015-05-13|
|2015-05-14|
|2015-05-15|
|2015-05-16|
|2015-05-17|
|2015-05-18|
|2015-05-19|
|2015-05-20|
+----------+
only showing top 20 rows

🚚 Processing file: D:/Portfolio/reddit-a

In [14]:
spark.catalog.clearCache()

In [15]:
spark.stop()