In [0]:
# =========================
# GOLD LAYER — FULL BUILD
# =========================

from pyspark.sql.functions import (
    col, sum as _sum, avg, desc, to_date, row_number, expr, lit
)
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, IntegerType

# ---------- CONFIG (edit if needed) ----------
CATALOG = "yt_trending_catalog"
SILVER_SCHEMA = "slv_sch_yt"
GOLD_SCHEMA = "gld_sch_yt"

SILVER_TABLE = f"{CATALOG}.{SILVER_SCHEMA}.youtube_trending_table_silver"

GOLD_TRENDING_VIDEOS      = f"{CATALOG}.{GOLD_SCHEMA}.gold_trending_videos"
GOLD_CHANNEL_STATS        = f"{CATALOG}.{GOLD_SCHEMA}.gold_channel_stats"
GOLD_CATEGORY_STATS       = f"{CATALOG}.{GOLD_SCHEMA}.gold_category_stats"
GOLD_TRENDS_DAILY         = f"{CATALOG}.{GOLD_SCHEMA}.gold_trends_daily"
GOLD_TOP10_PER_CATEGORY   = f"{CATALOG}.{GOLD_SCHEMA}.gold_top10_videos_per_category"
GOLD_ENGAGEMENT_LEADERS   = f"{CATALOG}.{GOLD_SCHEMA}.gold_engagement_leaders"
DIM_CATEGORY              = f"{CATALOG}.{GOLD_SCHEMA}.dim_category"

# Ensure target schema exists
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{GOLD_SCHEMA}")
df_silver_raw = spark.table(SILVER_TABLE)


In [0]:
df_gold = (
    df_silver_raw
    .select(
        col("video_id"),
        col("title"),
        col("description"),
        col("channel_id"),
        col("channel_title"),
        col("category_id"),
        col("published_at"),
        col("views"),
        col("likes"),
        col("comments"),
        col("favorite_count")
    )
)

In [0]:

# ---------- 1) CATEGORY LOOKUP (ID -> Name) ----------
category_pairs = [
    ("1",  "Film & Animation"),
    ("2",  "Autos & Vehicles"),
    ("10", "Music"),
    ("15", "Pets & Animals"),
    ("17", "Sports"),
    ("18", "Short Movies"),
    ("19", "Travel & Events"),
    ("20", "Gaming"),
    ("21", "Videoblogging"),
    ("22", "People & Blogs"),
    ("23", "Comedy"),
    ("24", "Entertainment"),
    ("25", "News & Politics"),
    ("26", "Howto & Style"),
    ("27", "Education"),
    ("28", "Science & Technology"),
    ("29", "Nonprofits & Activism"),
]
dim_category_df = spark.createDataFrame(category_pairs, ["category_id", "category_name"])
dim_category_df.write.mode("overwrite").saveAsTable(DIM_CATEGORY)



In [0]:

# Join category name into silver
df_gold = (
    df_gold
    .withColumn("category_id", col("category_id").cast(StringType()))
    .join(spark.table(DIM_CATEGORY), on="category_id", how="left")
    .withColumn("category_name", expr("coalesce(category_name, 'Unknown')"))
)


In [0]:

# ---------- 2) GOLD: TRENDING VIDEOS (snapshot ranked by views) ----------
from pyspark.sql.functions import col, when, round

# Add ratios safely (avoid divide by zero)
df_gold_videos = (
    df_gold
    .select(
        col("video_id"),
        col("title"),
        col("channel_id"),
        col("channel_title"),
        col("category_id"),
        col("category_name"),
        col("published_at"),
        col("views"),
        col("likes"),
        col("comments"),
        col("favorite_count")
    )
    .withColumn("like_ratio", round(when(col("views") > 0, col("likes") / col("views")).otherwise(0), 4))
    .withColumn("comment_ratio", round(when(col("views") > 0, col("comments") / col("views")).otherwise(0), 4))
    .orderBy(col("views").desc())
)

# Save Gold table
df_gold_videos.write.mode("overwrite").saveAsTable(GOLD_TRENDING_VIDEOS)


In [0]:

# ---------- 3) GOLD: CHANNEL-LEVEL STATS ----------
df_gold_channels = (
    df_gold
    .groupBy("channel_id","channel_title")
    .agg(
        _sum("views").alias("total_views"),
        _sum("likes").alias("total_likes"),
        _sum("comments").alias("total_comments")
    ))
df_gold_channels.write.mode("overwrite").saveAsTable(GOLD_CHANNEL_STATS)

In [0]:
df_gold_channels = (
    df_gold_channels
    .withColumn("like_ratio", round(when(col("total_views") > 0, col("total_likes") / col("total_views")).otherwise(0), 4))
    .withColumn("comment_ratio", round(when(col("total_views") > 0, col("total_comments") / col("total_views")).otherwise(0), 4))
    .orderBy(desc("total_views"))
)
df_gold_channels.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(GOLD_CHANNEL_STATS)

In [0]:
# ---------- 4) GOLD: CATEGORY-LEVEL STATS ----------
df_gold_category = (
    df_gold
    .groupBy("category_id","category_name")
    .agg(
        _sum("views").alias("total_views"),
        _sum("likes").alias("total_likes"),
        _sum("comments").alias("total_comments"),
    )
    .orderBy(desc("total_views"))
)
df_gold_category.write.mode("overwrite").saveAsTable(GOLD_CATEGORY_STATS)

In [0]:
df_gold_category = (
    df_gold_category
    .withColumn("like_ratio", round(when(col("total_views") > 0, col("total_likes") / col("total_views")).otherwise(0), 4))
    .withColumn("comment_ratio", round(when(col("total_views") > 0, col("total_comments") / col("total_views")).otherwise(0), 4))
    .orderBy(desc("total_views"))
)
df_gold_category.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(GOLD_CATEGORY_STATS)

In [0]:
# ---------- 5) GOLD: DAILY TRENDS (time series per video) ----------
df_trends = (
    df_gold
    .withColumn("date", to_date(col("published_at")))
    .groupBy("video_id","title","channel_id","channel_title","category_id","category_name","date")
    .agg(
        _sum(col("views").cast("long")).alias("daily_views"),
        _sum(col("likes").cast("long")).alias("daily_likes"),
        _sum(col("comments").cast("long")).alias("daily_comments")
    )
)
df_trends.write.mode("overwrite").saveAsTable(GOLD_TRENDS_DAILY)

In [0]:
# ---------- 6) GOLD: TOP 10 VIDEOS PER CATEGORY ----------
w = Window.partitionBy("category_id","category_name").orderBy(desc("views"))
df_top10_per_cat = (
    df_gold
    .select(
        "category_id","category_name","video_id","title","channel_title",
        "views","likes","comments","published_at"
    )
    .withColumn("rank_in_category", row_number().over(w))
    .filter(col("rank_in_category") <= 10)
)
df_top10_per_cat.write.mode("overwrite").saveAsTable(GOLD_TOP10_PER_CATEGORY)

In [0]:


# ---------- 7) GOLD: ENGAGEMENT LEADERS (quality threshold) ----------
# Keep channels with meaningful audience (sum of views >= 50k) and rank by (likes/views)
df_engagement_leaders = (
    df_gold
    .groupBy("channel_id","channel_title")
    .agg(
        _sum("views").alias("total_views"),
        _sum("likes").alias("total_likes"),
        _sum("comments").alias("total_comments"),
        # recompute ratio safely; avg(like_ratio) is also OK, but this weights by total views:
        expr("try_divide(sum(likes), nullif(sum(views),0))").alias("like_rate"),
        expr("try_divide(sum(comments), nullif(sum(views),0))").alias("comment_rate")
    )
    .filter(col("total_views") >= lit(50000))   # threshold — tweak for your data volume
    .orderBy(desc("like_rate"), desc("total_views"))
)
df_engagement_leaders.write.mode("overwrite").saveAsTable(GOLD_ENGAGEMENT_LEADERS)
