In [63]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [64]:
class BaseTransformer:
    def __init__(self, sparkSession):
        self.spark = sparkSession
    def readData(self, path, format="json", schema=None):
        reader=self.spark.read.format(format)
        if schema:
            reader.schema(schema)
        df=reader.load(path)
        return df
    def writeData(self, df, path, format="json", dbPath=None, checkpointPath=None, mode="append", streaming=False):
        if streaming:
            writer = (
                df.writeStream
                .format(format)
                .option("checkpointLocation", checkpointPath)
                .outputMode(mode)
            )
            query = writer.toTable(dbPath)
            query.awaitTermination()
        else:
            (
                df.write
                .format(format)
                .mode(mode)
                .save(path)
            )
    def showShape(self, df):
        return (len(df.columns), df.count())

    def convertTimestamp(self, df, colName="created_utc", newCol="createdDate"):
        return df.withColumn(
            newCol,
            F.from_unixtime(F.col(colName)).cast("timestamp")
        )

    def markAuthorDeleted(self, df, authorCol="author", nameAuthor="author_fullname", newCol="accDeleted"):
        return df.withColumn(
            newCol,
            (F.col(authorCol) == "[deleted]") & F.col(nameAuthor).isNull()
        )

    def markBodyRemoved(self, df, body="selftext", newCol="postDeleted"):
        return df.withColumn(
            newCol,
            F.col(body) == "[removed]"
        )

In [65]:
class CommentTransformer(BaseTransformer):
    def __init__(self, sparkSession):
        super().__init__(sparkSession)

    def normalizeParentId(self, df, parentIdCol="parent_id", newCol="parent_clean"):
        """Removes prefixes like 't3_' and 't1_' from parent_id."""
        return df.withColumn(
            newCol,
            F.regexp_replace(F.col(parentIdCol), r"^(t[13]_)", "")
        )

    def normalizeLinkId(self, df, linkIdCol="link_id", newCol="link_clean"):
        """Removes the 't3_' prefix from link_id."""
        return df.withColumn(
            newCol,
            F.regexp_replace(F.col(linkIdCol), r"^t3_", "")
        )

    def markBodyRemoved(self, df, bodyCol="body", newCol="is_removed_body"):
        """Marks comments with a removed body."""
        return df.withColumn(
            newCol,
            F.col(bodyCol) == "[removed]"
        )
    
    def markModComments(self, df, authorCol="author", newColMod="is_mod_comment", newColAutoMod="is_automod_comment"):
        """Marks comments from Mod Team or AutoModerator."""
        df_with_mod = df.withColumn(
            newColMod,
            F.lower(F.col(authorCol)).like("%-modteam")
        )
        return df_with_mod.withColumn(
            newColAutoMod,
            F.col(authorCol) == "AutoModerator"
        )

In [66]:
spark = SparkSession.builder.appName("CommentTransformer").getOrCreate()

In [67]:
comment = CommentTransformer(spark)

In [68]:
path = "../data/RC_reddit.jsonl"
rCommentBronzeSchema = StructType([
    StructField("id", StringType(), True),
    StructField("body", StringType(), True),
    StructField("created_utc", LongType(), True),
    StructField("edited", BooleanType(), True),
    StructField("score", IntegerType(), True),
    StructField("author", StringType(), True),
    StructField("author_fullname", StringType(), True),
    StructField("author_created_utc", LongType(), True),
    StructField("parent_id", StringType(), True),
    StructField("link_id", StringType(), True),
    StructField("is_submitter", BooleanType(), True),
    StructField("permalink", StringType(), True),
    StructField("subreddit", StringType(), True),
    StructField("subreddit_id", StringType(), True),
    StructField("subreddit_name_prefixed", StringType(), True),
    StructField("subreddit_type", StringType(), True),
    StructField("total_awards_received", IntegerType(), True),
    StructField("controversiality", IntegerType(), True),
    StructField("retrieved_on", LongType(), True),
    StructField("stickied", BooleanType(), True)
])


In [69]:
df = comment.readData(path, schema=rCommentBronzeSchema)

In [70]:
comment.showShape(df)

(20, 65661)

In [71]:
df.show()

+-------+--------------------+-----------+------+-----+-----------------+---------------+------------------+----------+----------+------------+--------------------+-----------+------------+-----------------------+--------------+---------------------+----------------+------------+--------+
|     id|                body|created_utc|edited|score|           author|author_fullname|author_created_utc| parent_id|   link_id|is_submitter|           permalink|  subreddit|subreddit_id|subreddit_name_prefixed|subreddit_type|total_awards_received|controversiality|retrieved_on|stickied|
+-------+--------------------+-----------+------+-----+-----------------+---------------+------------------+----------+----------+------------+--------------------+-----------+------------+-----------------------+--------------+---------------------+----------------+------------+--------+
|mvc2u2o|From the article\...| 1748737579| false|    0|         Gari_305|    t2_65fa26pr|              NULL|t3_1l0buai|t3_1l0buai|

In [72]:
df_silver = comment.convertTimestamp(df, "created_utc", "created_ts")
df_silver = comment.convertTimestamp(df_silver, "retrieved_on", "retrieved_ts")


In [73]:
df_silver = comment.normalizeParentId(df_silver)


In [74]:
df_silver = comment.normalizeLinkId(df_silver)


In [75]:
df_silver = comment.markAuthorDeleted(df_silver)


In [76]:
df_silver = comment.markBodyRemoved(df_silver)


In [77]:
df_silver = comment.markModComments(df_silver)


In [78]:
df_silver.printSchema()
df_silver.show(truncate=False)

root
 |-- id: string (nullable = true)
 |-- body: string (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- edited: boolean (nullable = true)
 |-- score: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- author_fullname: string (nullable = true)
 |-- author_created_utc: long (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- permalink: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- subreddit_name_prefixed: string (nullable = true)
 |-- subreddit_type: string (nullable = true)
 |-- total_awards_received: integer (nullable = true)
 |-- controversiality: integer (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- created_ts: timestamp (nullable = true)
 |-- retrieved_ts: timestamp (nullable = true)
 |-- parent_clean: string (nullable = true)


In [79]:
df_silver.show()

+-------+--------------------+-----------+------+-----+-----------------+---------------+------------------+----------+----------+------------+--------------------+-----------+------------+-----------------------+--------------+---------------------+----------------+------------+--------+-------------------+-------------------+------------+----------+----------+---------------+--------------+------------------+
|     id|                body|created_utc|edited|score|           author|author_fullname|author_created_utc| parent_id|   link_id|is_submitter|           permalink|  subreddit|subreddit_id|subreddit_name_prefixed|subreddit_type|total_awards_received|controversiality|retrieved_on|stickied|         created_ts|       retrieved_ts|parent_clean|link_clean|accDeleted|is_removed_body|is_mod_comment|is_automod_comment|
+-------+--------------------+-----------+------+-----+-----------------+---------------+------------------+----------+----------+------------+--------------------+------

In [80]:
df_silver.filter(df_silver.is_automod_comment == True).show()

+-------+--------------------+-----------+------+-----+-------------+---------------+------------------+----------+----------+------------+--------------------+-----------+------------+-----------------------+--------------+---------------------+----------------+------------+--------+-------------------+-------------------+------------+----------+----------+---------------+--------------+------------------+
|     id|                body|created_utc|edited|score|       author|author_fullname|author_created_utc| parent_id|   link_id|is_submitter|           permalink|  subreddit|subreddit_id|subreddit_name_prefixed|subreddit_type|total_awards_received|controversiality|retrieved_on|stickied|         created_ts|       retrieved_ts|parent_clean|link_clean|accDeleted|is_removed_body|is_mod_comment|is_automod_comment|
+-------+--------------------+-----------+------+-----+-------------+---------------+------------------+----------+----------+------------+--------------------+-----------+------

In [81]:
df_silver.filter(df_silver.is_mod_comment == True).count()

106

In [82]:
df.filter(df.author == 'AutoModerator').count()

152