## 1. Tạo Class

In [103]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [104]:
class BaseTransformer:
    def __init__(self, sparkSession):
        self.spark = sparkSession
    def readData(self, path, format="json", schema=None):
        reader=self.spark.read.format(format)
        if schema:
            reader.schema(schema)
        df=reader.load(path)
        return df
    def writeData(self, df, path, format="json", dbPath=None, checkpointPath=None, mode="append", streaming=False):
        if streaming:
            writer = (
                df.writeStream
                .format(format)
                .option("checkpointLocation", checkpointPath)
                .outputMode(mode)
            )
            query = writer.toTable(dbPath)
            query.awaitTermination()
        else:
            (
                df.write
                .format(format)
                .mode(mode)
                .save(path)
            )
    def showShape(self, df):
        return (len(df.columns), df.count())

    def convertTimestamp(self, df, colName="created_utc", newCol="createdDate"):
        return df.withColumn(
            newCol,
            F.from_unixtime(F.col(colName)).cast("timestamp")
        )

    def markAuthorDeleted(self, df, authorCol="author", nameAuthor="author_fullname", newCol="accDeleted"):
        return df.withColumn(
            newCol,
            (F.col(authorCol) == "[deleted]") & F.col(nameAuthor).isNull()
        )

    def markBodyRemoved(self, df, body="selftext", newCol="postDeleted"):
        return df.withColumn(
            newCol,
            F.col(body) == "[removed]"
        )

In [105]:
class SubmissionTransformer(BaseTransformer):
    def __init__(self, sparkSession):
        super().__init__(sparkSession)

    def markSpamPost(self, df, titleCol="title", urlCol="url", domainCol="domain", subredditCol="subreddit", newCol="isPostSpam"):
        words = F.split(F.lower(F.col(titleCol)), " ")
        numMatched = F.size(
        F.expr(f"filter(split(lower({titleCol}), ' '), word -> instr(lower({urlCol}), word) > 0)")
        )        
        totalWord = F.size(words)
        matchPercent = (numMatched * 100) / totalWord
        isPostSpam = (matchPercent >= 55) & (
        F.col(domainCol) != F.concat(F.lit("self."), F.col(subredditCol))
        )
        
        return df.withColumn("matchPercent", matchPercent).withColumn(newCol, isPostSpam)        

    def transform(self, df):
        df = self.convertTimestamp(df)
        df = self.markAuthorDeleted(df) 
        df = self.markBodyRemoved(df)
        df = self.markSpamPost(df)
        return df

In [106]:
class CommentTransformer(BaseTransformer):
    def __init__(self, sparkSession):
        super().__init__(sparkSession)

    def normalizeParentId(self, df, parentIdCol="parent_id", newCol="parent_clean"):
        """Removes prefixes like 't3_' and 't1_' from parent_id."""
        return df.withColumn(
            newCol,
            F.regexp_replace(F.col(parentIdCol), r"^(t[13]_)", "")
        )

    def normalizeLinkId(self, df, linkIdCol="link_id", newCol="link_clean"):
        """Removes the 't3_' prefix from link_id."""
        return df.withColumn(
            newCol,
            F.regexp_replace(F.col(linkIdCol), r"^t3_", "")
        )
    
    def markModComments(self, df, authorCol="author", newColMod="deleted_by_mod", newColAutoMod="deleted_by_auto"):
        """Marks comments from Mod Team or AutoModerator."""
        df_with_mod = df.withColumn(
            newColMod,
            F.lower(F.col(authorCol)).like("%-modteam")
        )
        return df_with_mod.withColumn(
            newColAutoMod,
            F.col(authorCol) == "AutoModerator"
        )
    def transform(self, df):
        df = self.convertTimestamp(df)
        df = self.markAuthorDeleted(df) 
        df = self.markBodyRemoved(df, body="body", newCol="cmtDeleted")
        df = self.normalizeParentId(df)
        df = self.normalizeLinkId(df)
        df = self.markModComments(df)
        return df

In [107]:
class GoldTransformer(BaseTransformer):
    def __init__(self, sparkSession, dfSubmission, dfComment):
        super().__init__(sparkSession)
        self.dfSubmission = dfSubmission
        self.dfComment = dfComment
        self.dimTime
        self.dimAuthor
        self.dimSubreddit
        self.dimPostType
        self.dimPost
        self.dimComment
        self.dimSentiment
    
    def createDimTime(self,timestampCol="createdDate"):
        dfTimestamp=self.dfSubmission.select(timestampCol).distinct().union(self.dfComment.select(timestampCol).distinct())
        dimTime=(dfTimestamp.withColumn("year", F.year(F.col(timestampCol)))
                            .withColumn("month", F.month(F.col(timestampCol)))
                            .withColumn("day", F.dayofmonth(F.col(timestampCol)))
                            .withColumn("hour", F.hour(F.col(timestampCol)))
                            .withColumn("minute", F.minute(F.col(timestampCol))) 
                            .withColumn("second", F.second(F.col(timestampCol))) 
                            .withColumn("day_of_week", F.dayofweek(F.col(timestampCol)))
                            .withColumn("time_key", F.monotonically_increasing_id()))
        return dimTime
        
    def createDimAuthor(self, authorName="author", fullName="author_fullname"):
        dimAuthor=(self.dfSubmission.select(authorName, fullName).distinct()
            .union(self.dfComment.select(authorName, fullName).distinct())
            .distinct()
            .withColumn("author_key", F.monotonically_increasing_id()))
        return dimAuthor

    def createDimSubreddit(self, subredditId="subreddit_id", subredditName="subreddit", subredditNamePrefixed="subreddit_name_prefixed", subredditType="subreddit_type",subredditSubscribers="subreddit_subscribers"):
        dimSubreddit=(self.dfSubmission.select(
                                        F.col(subredditId),
                                        F.col(subredditName),
                                        F.col(subredditNamePrefixed),
                                        F.col(subredditType),
                                        F.col(subredditSubscribers))
                                    .withColumnRenamed(subredditId, "subreddit_key")
                                    .withColumnRenamed(subredditName, "subreddit_name")
                                      .distinct())
        return dimSubreddit

    def createDimPostType(self, postType="post_hint"):
        dimPostType=(self.dfSubmission.select(F.col(postType))
                                     .withColumn("post_type_key", F.monotonically_increasing_id())
                                    . dictinct())
        return dimPostType
    def createDimPost (self):
        dfSub=self.dfSubmission
        dfCmt=self.dfComment
        dimJoinRaw = (
            dfSub.join(dfCmt, dfSub["id"] == dfCmt["link_id"], "inner")
                  .select(
                      dfSub["id"],
                      dfSub["title"],
                      dfSub["permalink"],
                      dfSub["url"],
                      dfSub["domain"],
                      dfSub["is_edited"],
                      dfSub["is_locked"],
                      dfSub["is_spoiler"],
                      dfSub["is_over_18"],
                      dfSub["is_stickied"],
                      dfSub["is_original_content"],
                      dfSub["link_flair_text"],
                      dfSub["accDeleted"],
                      dfSub["postDeleted"],
                      dfCmt["isModDeleted"],
                      dfCmt["isAutoDeleted"]
                  )
        )                                                             
        dimPost=(dimJoinRaw.withColumn("postStatus",
                                         F.when(F.col("accDeleted"==True, F.lit("AuthorDeleted")))
                                          .when(F.col ("postDeleted"==True and accDeleted==False, F.lit("SelfDeleted")))
                                         . when(F.col("isModDeleted"==True, F.lit("ModDeleted")))
                                         .when(F.col("isAutoDeleted"==True, F.lit("AutoDeleted")))
                                            .otherwise(F.lit("active")))
                                .withColumnRenamed("id", "post_key"))
        return dimPost
    
    def createDimComment(self):
        dimComment=self.dfComment.select(F.col("id"), F.col("body"), F.col("permalink")
                                         , F.col("is_edited"), F.col("is_submitter")
                                         , F.col("controversiality"))
        return dimComment

    def createDimTopic(self):
        dimTopic=(self.dfSubmisison.select("topic_label")
                                    .withColumn("topic_key", F.monotonically_increasing_id()))
        return dimTopic
    def createDimSentiment(self):
        return (self.dfSubmission.select("sentiment_label")
            .withColumn("sentiment_key", F.monotonically_increasing_id()))

    def createFactPostActivity(self):
        dfPostActi = (self.dfSubmission.select(F.col("id"),  col("author_fullname"), col("createdDate"),
                                                col("subreddit"), col("post_hint"),
                                                col("topic_label"), 
                                                col("sentiment_label"),
                                                col("score"), col("num_comments"),
                                               col("total_awards_received")))
        
        factActi=dfPostActi.join(self.dimTime, self.dimTime.createdDate==dfPostActi.createDate,
                                 "left").drop("createdDate")
        factActi=factActi.join(self.dimAuthor, self.dimAuthor.author_fullname==factActi.author_fullname,
                               "left").drop("author_fullname")
        factActi=factActi.join(self.dimSubreddit, self.dimSubreddit.subreddit==factActi.subreddit, "left").drop("subreddit")
        factActi=factActi.join(self.dimPostType, self.dimPostType.post_hint==factActi.post_hint, "left").drop("post_hint")
        factActi=facActi.join(self.dimTopic, self.dimTopic.topic_labell==factActi.topic_label, 
                              "left").drop("topic_labell")
        factActi=facActi.join(self.DimSentiment, self.DimSentiment.sentiment_label==factActi.sentiment_label, 
                              "left").drop("sentiment_label")

        factActi = (factActi.select(
            F.col("id"), col("time_key"),col("author_key"),col("subreddit_key"),col("post_type_key"),
             col("topic_key"),col("sentiment_key"),col("score"),col("num_comments"),col("total_awards_received"))
            .withColumnRenamed("id", "post_key")
            .withColumn("id", F.monotonically_increasing_id()))
        return factActi
        
    def createFactCommentActivity(self):
        dfCommentActi = (self.dfComment.select(
            F.col("id"),col("author_fullname"),col("createdDate"),col("subreddit"),
            col("link_id"),
            col("sentiment_label"),
            col("score"),
            col("controversiality"),
            col("total_awards_received")))

        factActi = dfCommentActi.join(self.dimTime,
            self.dimTime.createdDate == dfCommentActi.createdDate,
            "left").drop("createdDate")
        factActi= factActi.join(self.dimAuthor,
            self.dimAuthor.author_fullname == factActi.author_fullname,
            "left").drop("author_fullname")
    
        factActi =factActi.join(self.dimSubreddit,
            self.dimSubreddit.subreddit_name == factActi.subreddit,
            "left").drop("subreddit_name")
        factActi = factActi.join(
            self.dimSentiment,
            self.dimSentiment.sentiment_label == factActi.sentiment_label,
            "left").drop("sentiment_label")
    
        factActi= factActi.select(
            F.col("time_key"),col("author_key"),col("subreddit_key"),
            col("post_key"),col("sentiment_key"),
            col("score"),col("controversiality"), col("total_awards_received"))

        return factActi

## 2. Spark Session

In [108]:
spark = SparkSession.builder.appName("Transformer").getOrCreate()

## 3. Transform Submission Silver

In [109]:
pathSub = "../data/RS_reddit.jsonl"
rSubmissionBronzeSchema = StructType([
    StructField("id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("selftext", StringType(), True),
    StructField("url", StringType(), True),
    StructField("permalink", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("post_hint", StringType(), True),
    StructField("author", StringType(), True),
    StructField("author_fullname", StringType(), True),
    StructField("created_utc", LongType(), True),
    StructField("subreddit", StringType(), True),
    StructField("subreddit_id", StringType(), True),
    StructField("subreddit_name_prefixed", StringType(), True),
    StructField("subreddit_type", StringType(), True),
    StructField("subreddit_subscribers", IntegerType(), True),
    StructField("score", IntegerType(), True),
    StructField("num_comments", IntegerType(), True),
    StructField("total_awards_received", IntegerType(), True),
    StructField("edited", BooleanType(), True),
    StructField("locked", BooleanType(), True),
    StructField("spoiler", BooleanType(), True),
    StructField("over_18", BooleanType(), True),
    StructField("stickied", BooleanType(), True),   
    StructField("retrieved_on", LongType(), True),
    StructField("is_original_content", BooleanType(), True),
    StructField("link_flair_text", StringType(), True)
])


In [110]:
submission = SubmissionTransformer(spark)

In [111]:
dfSub = submission.readData(pathSub, schema=rSubmissionBronzeSchema)

In [112]:
submission.showShape(dfSub)

(26, 7306)

In [120]:
dfSub=submission.transform(dfSub)

In [121]:
submission.showShape(dfSub)

(31, 7306)

In [130]:
dfSub.columns

['id',
 'title',
 'selftext',
 'url',
 'permalink',
 'domain',
 'post_hint',
 'author',
 'author_fullname',
 'created_utc',
 'subreddit',
 'subreddit_id',
 'subreddit_name_prefixed',
 'subreddit_type',
 'subreddit_subscribers',
 'score',
 'num_comments',
 'total_awards_received',
 'edited',
 'locked',
 'spoiler',
 'over_18',
 'stickied',
 'retrieved_on',
 'is_original_content',
 'link_flair_text',
 'createdDate',
 'accDeleted',
 'postDeleted',
 'matchPercent',
 'isPostSpam']

## 3. Transform Comment Silver

In [113]:
comment=CommentTransformer(spark)

In [114]:
pathCmt = "../data/RC_reddit.jsonl"

rCommentBronzeSchema = StructType([
    StructField("id", StringType(), True),
    StructField("body", StringType(), True),
    StructField("created_utc", LongType(), True),
    StructField("edited", BooleanType(), True),
    StructField("score", IntegerType(), True),
    StructField("author", StringType(), True),
    StructField("author_fullname", StringType(), True),
    StructField("author_created_utc", LongType(), True),
    StructField("parent_id", StringType(), True),
    StructField("link_id", StringType(), True),
    StructField("is_submitter", BooleanType(), True),
    StructField("permalink", StringType(), True),
    StructField("subreddit", StringType(), True),
    StructField("subreddit_id", StringType(), True),
    StructField("subreddit_name_prefixed", StringType(), True),
    StructField("subreddit_type", StringType(), True),
    StructField("total_awards_received", IntegerType(), True),
    StructField("controversiality", IntegerType(), True),
    StructField("retrieved_on", LongType(), True),
    StructField("stickied", BooleanType(), True)
])



In [115]:
dfCmt=comment.readData(pathCmt, schema=rCommentBronzeSchema)

In [116]:
comment.showShape(dfCmt)

(20, 65661)

In [122]:
dfCmt=comment.transform(dfCmt)

In [123]:
comment.showShape(dfCmt)

(27, 65661)

In [128]:
dfCmt.columns

['id',
 'body',
 'created_utc',
 'edited',
 'score',
 'author',
 'author_fullname',
 'author_created_utc',
 'parent_id',
 'link_id',
 'is_submitter',
 'permalink',
 'subreddit',
 'subreddit_id',
 'subreddit_name_prefixed',
 'subreddit_type',
 'total_awards_received',
 'controversiality',
 'retrieved_on',
 'stickied',
 'createdDate',
 'accDeleted',
 'cmtDeleted',
 'parent_clean',
 'link_clean',
 'deleted_by_mod',
 'deleted_by_auto']

## 4. Gold Transform DimTime DimAuthor test 

In [131]:
spark = SparkSession.builder.appName("Exxx").getOrCreate()
   

25/08/14 00:27:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [132]:
 
    post_schema = StructType([
        StructField("post_id", IntegerType(), True),
        StructField("content", StringType(), True),
        StructField("type", StringType(), True),
    ])
    post_data = [
        (1, "Post 1 content", "image"),
        (2, "Post 2 content", "text")
    ]
    post_df = spark.createDataFrame(post_data, post_schema)
    
    comment_schema = StructType([
        StructField("comment_id", IntegerType(), True),
        StructField("post_id", IntegerType(), True),
        StructField("content", StringType(), True),
    ])
    comment_data = [
        (1, 1, "Comment 1 on post 1"),
        (2, 3, "Comment 2 on post 1"),
        (3, 2, "Comment 1 on post 2")
    ]
    comment_df = spark.createDataFrame(comment_data, comment_schema)
    
    

In [133]:
post_df.show()

+-------+--------------+-----+
|post_id|       content| type|
+-------+--------------+-----+
|      1|Post 1 content|image|
|      2|Post 2 content| text|
+-------+--------------+-----+



In [134]:
comment_df.show()

+----------+-------+-------------------+
|comment_id|post_id|            content|
+----------+-------+-------------------+
|         1|      1|Comment 1 on post 1|
|         2|      3|Comment 2 on post 1|
|         3|      2|Comment 1 on post 2|
+----------+-------+-------------------+



In [142]:
joinn = post_df.join(comment_df, comment_df.post_id==post_df.post_id, "left")

In [143]:
joinn.show()

+-------+--------------+-----+----------+-------+-------------------+
|post_id|       content| type|comment_id|post_id|            content|
+-------+--------------+-----+----------+-------+-------------------+
|      1|Post 1 content|image|         1|      1|Comment 1 on post 1|
|      2|Post 2 content| text|         3|      2|Comment 1 on post 2|
+-------+--------------+-----+----------+-------+-------------------+



In [144]:
joinn = post_df.join(comment_df, comment_df.post_id==post_df.post_id, "left").drop("post_id")

In [145]:
joinn.show()

+--------------+-----+----------+-------------------+
|       content| type|comment_id|            content|
+--------------+-----+----------+-------------------+
|Post 1 content|image|         1|Comment 1 on post 1|
|Post 2 content| text|         3|Comment 1 on post 2|
+--------------+-----+----------+-------------------+

