In [None]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


# Bronze -> Silver

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
from pyspark.sql.types import TimestampType

In [None]:
import os
import re
from pyspark.sql import SparkSession, functions as F

class DataCleaning:
    def __init__(self):
        self.spark = self.get_spark_session()
        self.jdbc_url, self.db_properties = self.get_db()

    def get_spark_session(self):
        return (
            SparkSession.builder
            .appName("colab_test")
            .master("local[*]")
            .config("spark.driver.memory", "8g")
            .config("spark.driver.maxResultSize", "2g")
            .config("spark.sql.shuffle.partitions", "8")
            .config("spark.jars.packages", "org.postgresql:postgresql:42.7.4")
            .getOrCreate()
        )

    def get_db(self):
        db_connector = {
            "host": os.getenv("POSTGRES_HOST"),
            "port": os.getenv("POSTGRES_PORT"),
            "dbname": os.getenv("POSTGRES_DB"),
            "user": os.getenv("POSTGRES_USER"),
            "password": os.getenv("POSTGRES_PASSWORD")
        }

        db_properties = {
            "user": db_connector["user"],
            "password": db_connector["password"],
            "driver": "org.postgresql.Driver"
        }
        jdbc_url = f"jdbc:postgresql://{db_connector['host']}:{db_connector['port']}/{db_connector['dbname']}"
        return jdbc_url, db_properties

    # ===================== EXTRACT FUNCTIONS =====================

    def table(self, table_name, column, start_day=None, end_day=None, lowerBound=1, upperBound=100000, numPartitions=8):
        query = table_name
        if start_day and end_day:
            try:
                temp_df = self.spark.read.jdbc(url=self.jdbc_url, table=table_name, properties=self.db_properties)
                if 'created_at' in temp_df.columns:
                    query = f"(SELECT * FROM {table_name} WHERE created_at BETWEEN '{start_day}' AND '{end_day}') AS t"
                else:
                    print(f"Warning: 'created_at' column not found in table '{table_name}'. Skipping date filter.")
            except Exception as e:
                print(f"Error checking table schema for '{table_name}': {e}")
                print("Skipping date filter.")


        return self.spark.read.jdbc(
            url=self.jdbc_url,
            table=query,
            column=column,
            lowerBound=lowerBound,
            upperBound=upperBound,
            numPartitions=numPartitions,
            properties=self.db_properties
        )

    # ===================== TRANSFORM FUNCTIONS =====================

    def check_missing(self, df):
        df_sample = df.sample(withReplacement=False, fraction=0.01, seed=42).cache()
        try:
            total_count = df_sample.count()
            agg_exprs = [
                F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_sample.columns
            ]
            missing_df = df_sample.agg(*agg_exprs)

            results = []
            row = missing_df.first().asDict()
            for col, miss in row.items():
                results.append({
                    "column": col,
                    "missing_count": miss,
                    "missing_ratio": miss / total_count if total_count > 0 else 0
                })
            return results
        finally:
            df_sample.unpersist()

    def check_duplicate(self, df, subset_cols, drop=False):
        df.cache()
        try:
            total_count = df.count()
            distinct_count = df.dropDuplicates(subset=subset_cols).count()
            dup_count = total_count - distinct_count
            dup_ratio = dup_count / total_count if total_count > 0 else 0

            stats = {
                "duplicate_count": dup_count,
                "duplicate_ratio": dup_ratio
            }

            if drop and dup_count > 0:
                cleaned_df = df.dropDuplicates(subset=subset_cols)
                print(f"Found {dup_count} duplicate rows. Dropped duplicates. New total: {distinct_count}")
                return stats, cleaned_df
            else:
                print(f"Duplicate rows: {dup_count} ({dup_ratio:.4%})")
                return stats, df
        finally:
            df.unpersist()


    def check_invalid_email(self, df, email_col="email", drop=False):
        df.cache()
        try:
            regex_pattern = r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"

            counts = df.agg(
                F.sum(F.when(F.col(email_col).rlike(regex_pattern), 1).otherwise(0)).alias("valid"),
                F.sum(F.when(~F.col(email_col).rlike(regex_pattern), 1).otherwise(0)).alias("invalid"),
                F.count("*").alias("total")
            ).collect()[0]

            valid = counts["valid"]
            invalid = counts["invalid"]
            total = counts["total"]
            invalid_ratio = invalid / total if total > 0 else 0

            stats = {
                "valid_email_count": valid,
                "invalid_email_count": invalid,
                "invalid_email_ratio": invalid_ratio
            }

            if drop and invalid > 0:
                cleaned_df = df.filter(F.col(email_col).rlike(regex_pattern))
                print(f"Found {invalid} invalid emails ({invalid_ratio:.4%}). Dropped invalid emails. New total: {valid}")
                return stats, cleaned_df
            else:
                print(f"Valid emails: {valid}, Invalid emails: {invalid} ({invalid_ratio:.4%})")
                return stats, df
        finally:
            df.unpersist()

    # ===================== LOAD FUNCTIONS =====================

    def save_to_parquet(self, df, path: str, mode: str = "overwrite"):
        df.write.mode(mode).parquet(path)
        print(f"Data saved to Parquet at: {path}")

In [None]:
dc = DataCleaning()

# USERS


In [None]:
df_user = dc.table(table_name="users",
                   column="user_id",
                   start_day="2023-01-01",
                   end_day="2025-01-31",
                   lowerBound=1,
                   upperBound=100000,
                   numPartitions=8)



In [None]:
missing_report = dc.check_missing(df_user)

In [None]:
missing_report

[{'column': 'user_id', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'username', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'email', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'password_hash', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'full_name', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'registration_date', 'missing_count': 0, 'missing_ratio': 0.0}]

In [None]:
stats, df_user = dc.check_duplicate(df_user, subset_cols=["user_id"], drop=True)
stats

Duplicate rows: 0 (0.0000%)


{'duplicate_count': 0, 'duplicate_ratio': 0.0}

In [None]:
stats, df_user = dc.check_invalid_email(df_user, email_col="email", drop=True)
stats

Found 17513 invalid emails (17.5130%). Dropped invalid emails. New total: 82487


{'valid_email_count': 82487,
 'invalid_email_count': 17513,
 'invalid_email_ratio': 0.17513}

In [None]:
dc.save_to_parquet(df_user, "users.parquet")

Data saved to Parquet at: users.parquet


# COMMENT

In [None]:
df_comment = dc.table(table_name="comments",
                   column="comment_id",
                   start_day="2023-01-01",
                   end_day="2025-01-31",
                   lowerBound=1,
                   upperBound=1048700,
                   numPartitions=8)

In [None]:
missing_report = dc.check_missing(df_comment)

In [None]:
missing_report

[{'column': 'comment_id', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'post_id', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'user_id', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'content', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'created_at', 'missing_count': 0, 'missing_ratio': 0.0}]

In [None]:
stats, df_comment = dc.check_duplicate(df_comment, subset_cols=['content'], drop=True)
stats

Found 137989 duplicate rows. Dropped duplicates. New total: 701275


{'duplicate_count': 137989, 'duplicate_ratio': 0.16441667937621535}

In [None]:
dc.save_to_parquet(df_comment, "comments.parquet")

Data saved to Parquet at: comments.parquet


# POSTS

In [None]:
df_post = dc.table(table_name="posts",
                   column="post_id",
                   start_day="2023-01-01",
                   end_day="2025-01-31",
                   lowerBound=1,
                   upperBound=1048750,
                   numPartitions=8)

In [None]:
missing_report = dc.check_missing(df_post)
missing_report

[{'column': 'post_id', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'user_id', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'content', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'created_at', 'missing_count': 0, 'missing_ratio': 0.0}]

In [None]:
stats, df_post = dc.check_duplicate(df_post, subset_cols=['content'], drop=True)
stats

Found 329663 duplicate rows. Dropped duplicates. New total: 1629714


{'duplicate_count': 329663, 'duplicate_ratio': 0.16824888727386306}

In [None]:
dc.save_to_parquet(df_post, "posts.parquet")

Data saved to Parquet at: posts.parquet


# LIKES

In [None]:
df_like = dc.table(table_name="likes",
                   column="like_id",
                   start_day="2023-01-01",
                   end_day="2025-01-31",
                   lowerBound=1,
                   upperBound=1048750,
                   numPartitions=8)

In [None]:
missing_report = dc.check_missing(df_like)
missing_report

[{'column': 'like_id', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'post_id', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'user_id', 'missing_count': 0, 'missing_ratio': 0.0},
 {'column': 'created_at', 'missing_count': 0, 'missing_ratio': 0.0}]

In [None]:
stats, df_like = dc.check_duplicate(df_like, subset_cols=['like_id', 'post_id'], drop=True)
stats

Duplicate rows: 0 (0.0000%)


{'duplicate_count': 0, 'duplicate_ratio': 0.0}

In [None]:
dc.save_to_parquet(df_like, "likes.parquet")

Data saved to Parquet at: likes.parquet


# Silver -> Gold

In [None]:
# -*- coding: utf-8 -*-
"""Add fake sentiment and keywords columns to silver parquet files"""

from pyspark.sql import SparkSession, functions as F
import random

# ============ INIT SPARK ============
spark = (
    SparkSession.builder
    .appName("add_fake_sentiment_keywords")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "16")
    .getOrCreate()
)

# ============ LOAD SILVER PARQUETS ============
posts = spark.read.parquet("posts.parquet")
comments = spark.read.parquet("comments.parquet")

# ============ ADD FAKE SENTIMENT ============
# random sentiment (replace later with actual model inference)
sentiments = ["positive", "neutral", "negative"]

posts = posts.withColumn(
    "sentiment",
    F.when(F.rand() < 0.33, F.lit("positive"))
     .when(F.rand() < 0.66, F.lit("neutral"))
     .otherwise(F.lit("negative"))
)

comments = comments.withColumn(
    "sentiment",
    F.when(F.rand() < 0.33, F.lit("positive"))
     .when(F.rand() < 0.66, F.lit("neutral"))
     .otherwise(F.lit("negative"))
)

# ============ ADD FAKE KEYWORDS ============
keywords_list = [
    ["ai", "machine learning", "innovation"],
    ["sports", "football", "health"],
    ["travel", "food", "culture"],
    ["technology", "mobile", "data"]
]

# pick random keyword group for each record
def random_keywords():
    return random.choice(keywords_list)

# Register UDF
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf

rand_keywords_udf = udf(lambda: random.choice(keywords_list), ArrayType(StringType()))

posts = posts.withColumn("keywords", rand_keywords_udf())
comments = comments.withColumn("keywords", rand_keywords_udf())

# ============ SAVE BACK TO UPDATED PARQUETS ============
posts.write.mode("overwrite").parquet("silver_posts.parquet")
comments.write.mode("overwrite").parquet("silver_comments.parquet")

print("✅ Added fake sentiment and keywords columns to silver datasets.")

✅ Added fake sentiment and keywords columns to silver datasets.


In [None]:
# -*- coding: utf-8 -*-
"""Final Silver → Gold Transformation Script"""

from pyspark.sql import SparkSession, functions as F, Window
from datetime import date
from pyspark.sql.functions import broadcast

# ===================== INIT SPARK =====================
spark = (
    SparkSession.builder
    .appName("silver_to_gold_final")
    .master("local[*]")
    .config("spark.driver.memory", "8g")
    .config("spark.sql.shuffle.partitions", "16")
    .getOrCreate()
)

# ===================== LOAD SILVER PARQUETS =====================
posts = spark.read.parquet("silver_posts.parquet")
comments = spark.read.parquet("silver_comments.parquet")
likes = spark.read.parquet("likes.parquet")
users = spark.read.parquet("users.parquet")

# Ensure required columns exist
if "sentiment" not in posts.columns:
    posts = posts.withColumn("sentiment", F.lit("neutral"))
if "sentiment" not in comments.columns:
    comments = comments.withColumn("sentiment", F.lit("neutral"))

posts.cache()
comments.cache()
likes.cache()

DataFrame[like_id: int, post_id: int, user_id: int, created_at: timestamp]

In [None]:
# ============================================================
# 1️⃣ GOLD: DAILY PLATFORM SUMMARY
# ============================================================
posts_daily = posts.groupBy(F.to_date("created_at").alias("report_date")).agg(
    F.countDistinct("post_id").alias("total_posts"),
    F.countDistinct("user_id").alias("active_posters"),
    F.sum(F.when(F.col("sentiment") == "positive", 1).otherwise(0)).alias("positive_posts_count"),
    F.sum(F.when(F.col("sentiment") == "negative", 1).otherwise(0)).alias("negative_posts_count")
)

comments_daily = comments.groupBy(F.to_date("created_at").alias("report_date")).agg(
    F.countDistinct("comment_id").alias("total_comments"),
    F.countDistinct("user_id").alias("active_commenters"),
    F.sum(F.when(F.col("sentiment") == "positive", 1).otherwise(0)).alias("positive_comments_count"),
    F.sum(F.when(F.col("sentiment") == "negative", 1).otherwise(0)).alias("negative_comments_count")
)

likes_daily = likes.groupBy(F.to_date("created_at").alias("report_date")).agg(
    F.countDistinct("like_id").alias("total_likes"),
    F.countDistinct("user_id").alias("active_likers")
)

daily_summary = (
    posts_daily.join(comments_daily, "report_date", "outer")
    .join(likes_daily, "report_date", "outer")
    .fillna(0)
    .withColumn("avg_comments_per_post",
                F.when(F.col("total_posts") > 0, F.col("total_comments") / F.col("total_posts")).otherwise(0))
    .withColumn("avg_likes_per_post",
                F.when(F.col("total_posts") > 0, F.col("total_likes") / F.col("total_posts")).otherwise(0))
)

# total_active_users: distinct union of user_ids per day
user_union = (
    posts.select(F.to_date("created_at").alias("report_date"), F.col("user_id"))
    .union(comments.select(F.to_date("created_at").alias("report_date"), F.col("user_id")))
    .union(likes.select(F.to_date("created_at").alias("report_date"), F.col("user_id")))
    .distinct()
)
active_user_counts = user_union.groupBy("report_date").agg(F.countDistinct("user_id").alias("total_active_users"))

daily_summary = daily_summary.join(active_user_counts, "report_date", "left").fillna(0)

daily_summary.repartition("report_date").write.mode("overwrite") \
    .partitionBy("report_date") \
    .option("compression", "snappy") \
    .parquet("gold_daily_platform_summary.parquet")

print("✅ gold_daily_platform_summary created")



✅ gold_daily_platform_summary created


In [None]:
# ============================================================
# 2️⃣ GOLD: USER SNAPSHOT
# ============================================================
user_activity = (
    users.alias("u")
    .join(
        posts.groupBy("user_id").agg(
            F.count("*").alias("total_posts_per_user"),
            F.mean(F.when(F.col("sentiment") == "positive", 1).otherwise(0)).alias("positive_post_ratio"),
            F.sum(F.when(F.col("created_at") >= F.date_sub(F.current_date(), 30), 1).otherwise(0)).alias("posts_in_last_30_days")
        ), "user_id", "left"
    )
    .join(
        comments.groupBy("user_id").agg(
            F.count("*").alias("total_comments_given"),
            F.sum(F.when(F.col("created_at") >= F.date_sub(F.current_date(), 30), 1).otherwise(0)).alias("comments_in_last_30_days")
        ), "user_id", "left"
    )
    .join(
        likes.groupBy("user_id").agg(
            F.count("*").alias("total_likes_given")
        ), "user_id", "left"
    )
)

# Received feedback
comments_received = comments.groupBy("post_id").agg(
    F.count("*").alias("total_comments_received"),
    F.mean(F.when(F.col("sentiment") == "positive", 1).otherwise(0)).alias("positive_comment_received_ratio")
)
likes_received = likes.groupBy("post_id").agg(F.count("*").alias("total_likes_received"))

posts_feedback = (
    posts.join(broadcast(comments_received), "post_id", "left")
         .join(broadcast(likes_received), "post_id", "left")
         .groupBy("user_id")
         .agg(
             F.sum("total_comments_received").alias("total_comments_received"),
             F.sum("total_likes_received").alias("total_likes_received"),
             F.avg("total_likes_received").alias("avg_likes_per_post"),
             F.avg("total_comments_received").alias("avg_comments_per_post"),
             F.mean("positive_comment_received_ratio").alias("positive_comment_received_ratio")
         )
)

# 🟩 Compute user's most recent activity date
last_activity = (
    posts.select("user_id", "created_at")
    .union(comments.select("user_id", "created_at"))
    .union(likes.select("user_id", "created_at"))
    .groupBy("user_id")
    .agg(F.max("created_at").alias("last_active_date"))
)

# ✅ Build final user_snapshot (only once)
user_snapshot = (
    user_activity.join(posts_feedback, "user_id", "left")
    .join(last_activity, "user_id", "left")
    .withColumn("account_age_days", F.datediff(F.current_date(), F.col("registration_date")))
    .fillna(0)
    .withColumn("snapshot_date", F.lit(date.today()))
)

# user_segment classification
user_snapshot = user_snapshot.withColumn(
    "user_segment",
    F.when(F.col("account_age_days") <= 30, "New User")
     .when(F.col("last_active_date") < F.date_sub(F.current_date(), 90), "Churned")
     .when(F.col("last_active_date") < F.date_sub(F.current_date(), 30), "At Risk")
     .otherwise("Casual User")
)

user_snapshot.repartition("snapshot_date").write.mode("overwrite") \
    .partitionBy("snapshot_date") \
    .option("compression", "snappy") \
    .parquet("gold_user_snapshot.parquet")

print("✅ gold_user_snapshot created")



✅ gold_user_snapshot created


In [None]:
# ============================================================
# 3️⃣ GOLD: POST PERFORMANCE
# ============================================================
first_like = likes.groupBy("post_id").agg(F.min("created_at").alias("first_like_time"))
first_comment = comments.groupBy("post_id").agg(F.min("created_at").alias("first_comment_time"))

post_perf = (
    posts
    .join(broadcast(users.select("user_id", "username")), "user_id", "left")
    .join(broadcast(first_like), "post_id", "left")
    .join(broadcast(first_comment), "post_id", "left")
    .join(broadcast(likes.groupBy("post_id").agg(F.count("*").alias("total_likes"))), "post_id", "left")
    .join(broadcast(comments.groupBy("post_id").agg(F.count("*").alias("total_comments"))), "post_id", "left")
    .withColumn("time_to_first_like_minutes",
                F.round((F.unix_timestamp("first_like_time") - F.unix_timestamp("created_at")) / 60))
    .withColumn("time_to_first_comment_minutes",
                F.round((F.unix_timestamp("first_comment_time") - F.unix_timestamp("created_at")) / 60))
    .select(
        "post_id",
        F.to_date("created_at").alias("created_date"),
        F.col("content").alias("post_content"),
        F.col("sentiment").alias("post_sentiment"),
        "user_id", "username",
        "total_likes", "total_comments",
        "time_to_first_like_minutes", "time_to_first_comment_minutes"
    )
)

post_perf.repartition("created_date").write.mode("overwrite") \
    .partitionBy("created_date") \
    .option("compression", "snappy") \
    .parquet("gold_post_performance.parquet")

print("✅ gold_post_performance created")


✅ gold_post_performance created


In [None]:
# ============================================================
# 4️⃣ GOLD: CONTENT TRENDS (Optional)
# ============================================================
if "keywords" in posts.columns:
    exploded_posts = posts.withColumn("topic_or_keyword", F.explode(F.col("keywords")))
    exploded_comments = comments.withColumn("topic_or_keyword", F.explode(F.col("keywords")))

    content_trends = (
        exploded_posts.groupBy(F.to_date("created_at").alias("report_date"), "topic_or_keyword")
        .agg(F.count("*").alias("mention_count_in_posts"))
        .join(
            exploded_comments.groupBy(F.to_date("created_at").alias("report_date"), "topic_or_keyword")
            .agg(F.count("*").alias("mention_count_in_comments")),
            ["report_date", "topic_or_keyword"],
            "outer"
        )
        .fillna(0)
        .withColumn("total_mentions", F.col("mention_count_in_posts") + F.col("mention_count_in_comments"))
        .withColumn("avg_sentiment_when_mentioned", F.lit(None).cast("float"))
        .withColumn("trending_rank",
                    F.row_number().over(Window.partitionBy("report_date").orderBy(F.desc("total_mentions"))))
    )
    content_trends.repartition("report_date").write.mode("overwrite") \
        .partitionBy("report_date") \
        .option("compression", "snappy") \
        .parquet("gold_daily_content_trends.parquet")

print("✅ Silver → Gold transformation complete.")


✅ Silver → Gold transformation complete.


In [None]:
# Load and verify outputs
gold_daily = spark.read.parquet("gold_daily_platform_summary.parquet")
gold_user = spark.read.parquet("gold_user_snapshot.parquet")
gold_post = spark.read.parquet("gold_post_performance.parquet")
gold_trend = spark.read.parquet("gold_daily_content_trends.parquet")

print("=== GOLD DAILY PLATFORM SUMMARY ===")
gold_daily.show(5, truncate=False)

print("\n=== GOLD USER SNAPSHOT ===")
gold_user.select("user_id", "username", "total_posts_per_user", "account_age_days", "user_segment").show(5, truncate=False)

print("\n=== GOLD POST PERFORMANCE ===")
gold_post.select("post_id", "user_id", "post_sentiment", "total_likes", "total_comments").show(5, truncate=False)

print("\n=== GOLD DAILY CONTENT TRENDS ===")
gold_trend.select("report_date", "topic_or_keyword", "total_mentions", "trending_rank").show(10, truncate=False)

=== GOLD DAILY PLATFORM SUMMARY ===
+-----------+--------------+--------------------+--------------------+--------------+-----------------+-----------------------+-----------------------+-----------+-------------+---------------------+-------------------+------------------+-----------+
|total_posts|active_posters|positive_posts_count|negative_posts_count|total_comments|active_commenters|positive_comments_count|negative_comments_count|total_likes|active_likers|avg_comments_per_post|avg_likes_per_post |total_active_users|report_date|
+-----------+--------------+--------------------+--------------------+--------------+-----------------+-----------------------+-----------------------+-----------+-------------+---------------------+-------------------+------------------+-----------+
|2130       |2114          |682                 |483                 |933           |930              |291                    |232                    |862        |859          |0.43802816901408453  |0.4046948356