# Phase 3 – Business Insights Gold Layer

This notebook builds business-ready GOLD insight tables for dashboarding, aligned with Phase 2 architecture.

In [0]:
from pyspark.sql import functions as F

## Load Phase 2 Tables

In [0]:

silver_playlists = spark.table("silver_playlists")
silver_playlist_tracks = spark.table("silver_playlist_tracks")

gold_track_popularity = spark.table("gold_track_popularity")
gold_artist_popularity = spark.table("gold_artist_popularity")
gold_engagement_bins = spark.table("gold_engagement_bins")


## Executive KPIs

In [0]:

gold_exec_kpis = (
    silver_playlist_tracks
    .join(silver_playlists, "playlist_id")
    .groupBy()
    .agg(
        F.countDistinct("track_uri").alias("total_tracks"),
        F.countDistinct("playlist_id").alias("total_playlists"),
        F.countDistinct("artist_uri").alias("total_artists"),
        F.avg("playlist_followers").alias("avg_playlist_followers"),
        F.max("playlist_modified_at").alias("last_data_date")
    )
)

gold_exec_kpis.write.mode("overwrite").saveAsTable("gold_exec_kpis")


## KPI Trend Over Time

In [0]:

gold_kpi_daily = (
    silver_playlist_tracks
    .groupBy("playlist_modified_at")
    .agg(
        F.countDistinct("track_uri").alias("tracks_added"),
        F.countDistinct("playlist_id").alias("active_playlists")
    )
    .withColumnRenamed("playlist_modified_at", "event_date")
    .orderBy("event_date")
)

gold_kpi_daily.write.mode("overwrite").saveAsTable("gold_kpi_daily")


## Track Performance

In [0]:
spt = silver_playlist_tracks.alias("spt")
gtp = gold_track_popularity.alias("gtp")

gold_track_summary = (
    spt
    .join(gtp, "track_uri", "left")
    .groupBy(
        F.col("spt.track_uri"),
        F.col("spt.track_name")
    )
    .agg(
        F.countDistinct("spt.playlist_id").alias("playlist_count"),
        F.first("gtp.appearances").alias("total_appearances"),
        F.first("gtp.playlists_count").alias("playlists_count"),
        F.round(F.avg("spt.duration_ms") / 60000, 2).alias("avg_duration_min")
    )
)

gold_track_summary.write.mode("overwrite").saveAsTable("gold_track_summary")


## Artist Performance

In [0]:
from pyspark.sql import functions as F

spt = silver_playlist_tracks.alias("spt")
gap = gold_artist_popularity.alias("gap")

gold_artist_summary = (
    spt
    .join(
        gap,
        F.col("spt.artist_name") == F.col("gap.artist_name"),
        "left"
    )
    .groupBy(
        F.col("spt.artist_name")
    )
    .agg(
        F.countDistinct("spt.track_uri").alias("track_count"),
        F.first("gap.appearances").alias("total_appearances"),
        F.first("gap.playlists_count").alias("playlists_count")
    )
)

gold_artist_summary.write.mode("overwrite").saveAsTable("gold_artist_summary")


## Engagement Distribution

In [0]:
from pyspark.sql import functions as F

gold_engagement_summary = (
    gold_engagement_bins
    .groupBy("tracks_bin")
    .agg(
        F.sum("n_playlists").alias("total_playlists"),
        F.round(F.avg("median_followers"), 0).alias("avg_median_followers"),
        F.round(F.avg("p90_followers"), 0).alias("avg_p90_followers")
    )
    .orderBy("tracks_bin")
)

gold_engagement_summary.write.mode("overwrite").saveAsTable("gold_engagement_summary")


## Data Quality & Freshness

In [0]:

gold_data_quality = (
    silver_playlist_tracks
    .groupBy()
    .agg(
        F.max("playlist_modified_at").alias("latest_snapshot"),
        F.countDistinct("playlist_id").alias("total_playlists"),
        F.countDistinct("track_uri").alias("total_tracks")
    )
)

gold_data_quality.write.mode("overwrite").saveAsTable("gold_data_quality")


In [0]:
from pyspark.sql import functions as F

gold_playlist_lifecycle = (
    silver_playlists
    .groupBy("playlist_id")
    .agg(
        F.min("playlist_modified_ts").alias("first_seen_date"),
        F.max("playlist_modified_ts").alias("last_seen_date"),
        F.countDistinct("playlist_modified_ts").alias("active_days"),
        F.min("track_rows").alias("min_tracks"),
        F.max("track_rows").alias("max_tracks"),
        F.round(F.avg("track_rows"), 2).alias("avg_tracks"),
        F.first("followers").alias("followers")
    )
    .withColumn(
        "track_growth",
        F.col("max_tracks") - F.col("min_tracks")
    )
)

gold_playlist_lifecycle.write \
    .mode("overwrite") \
    .saveAsTable("gold_playlist_lifecycle")


In [0]:
from pyspark.sql import functions as F

gold_track_lifecycle = (
    silver_playlist_tracks
    .groupBy("track_uri")
    .agg(
        F.min("playlist_modified_at").alias("first_seen_date"),
        F.max("playlist_modified_at").alias("last_seen_date"),
        F.countDistinct("playlist_modified_at").alias("active_days"),
        F.countDistinct("playlist_id").alias("playlist_count"),
    )
)

gold_track_lifecycle.write.mode("overwrite").saveAsTable("gold_track_lifecycle")


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

daily_track_popularity = (
    silver_playlist_tracks
    .groupBy("playlist_modified_at", "track_uri")
    .agg(F.countDistinct("playlist_id").alias("playlists_containing_track"))
    .withColumnRenamed("playlist_modified_at", "event_date")
)

w = Window.partitionBy("track_uri").orderBy("event_date")

gold_track_trend_velocity = (
    daily_track_popularity
    .withColumn("prev_playlists_containing_track", F.lag("playlists_containing_track").over(w))
    .withColumn(
        "delta_playlists",
        F.col("playlists_containing_track") - F.col("prev_playlists_containing_track")
    )
    .where(F.col("prev_playlists_containing_track").isNotNull())
    .orderBy("event_date", "track_uri")
)

gold_track_trend_velocity.write.mode("overwrite").saveAsTable("gold_track_trend_velocity")


In [0]:
from pyspark.sql import functions as F

artist_counts = (
    silver_playlist_tracks
    .groupBy("playlist_id", "artist_uri")
    .agg(F.count("*").alias("track_count"))
)

playlist_totals = (
    artist_counts
    .groupBy("playlist_id")
    .agg(
        F.countDistinct("artist_uri").alias("unique_artists"),
        F.sum("track_count").alias("total_tracks"),
        F.max("track_count").alias("top_artist_tracks"),
    )
)

gold_playlist_artist_concentration = (
    playlist_totals
    .withColumn("top_artist_share", F.col("top_artist_tracks") / F.col("total_tracks"))
    .drop("top_artist_tracks")
)

gold_playlist_artist_concentration.write.mode("overwrite").saveAsTable("gold_playlist_artist_concentration")


In [0]:
from pyspark.sql import functions as F

a = silver_playlist_tracks.select(
    F.col("playlist_id").alias("playlist_id"),
    F.col("playlist_modified_at").alias("event_date"),
    F.col("track_uri").alias("track_uri_1"),
)

b = silver_playlist_tracks.select(
    F.col("playlist_id").alias("playlist_id"),
    F.col("playlist_modified_at").alias("event_date"),
    F.col("track_uri").alias("track_uri_2"),
)

gold_track_pairs = (
    a.join(
        b,
        on=[
            a.playlist_id == b.playlist_id,
            a.event_date == b.event_date
        ],
        how="inner"
    )
    .where(F.col("track_uri_1") < F.col("track_uri_2"))
    .groupBy("track_uri_1", "track_uri_2")
    .agg(F.count("*").alias("co_occurrence_count"))
    .orderBy(F.desc("co_occurrence_count"))
)

gold_track_pairs.write.mode("overwrite").saveAsTable("gold_track_pairs")


In [0]:
from pyspark.sql import functions as F

# Load silver playlist-track data
silver_df = spark.table("silver_playlist_tracks")

# 1. Tracks per artist per playlist
artist_counts = (
    silver_df
    .groupBy("playlist_id", "artist_name")
    .agg(F.count("*").alias("n_tracks"))
)

# 2. Total tracks per playlist
playlist_totals = (
    artist_counts
    .groupBy("playlist_id")
    .agg(F.sum("n_tracks").alias("total_tracks"))
)

# 3. Artist share inside playlist
shares = (
    artist_counts
    .join(playlist_totals, on="playlist_id")
    .withColumn("artist_share", F.col("n_tracks") / F.col("total_tracks"))
)

# 4. Artist concentration metrics
gold_df = (
    shares
    .groupBy("playlist_id")
    .agg(
        F.countDistinct("artist_name").alias("total_artists"),
        F.max("artist_share").alias("top_artist_share"),
        F.first("artist_name", ignorenulls=True).alias("top_artist_name"),
        (F.lit(1) - F.sum(F.col("artist_share") * F.col("artist_share")))
            .alias("gini_artist_index")
    )
)

# 5. Write with schema merge (IMPORTANT FIX)
(
    gold_df
    .write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .saveAsTable("gold_playlist_artist_concentration")
)

print("gold_playlist_artist_concentration updated with merged schema")


gold_playlist_artist_concentration updated with merged schema


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ---------------------------------------
# Load silver playlist-track data
# ---------------------------------------
silver_df = spark.table("silver_playlist_tracks")

# ---------------------------------------
# 1. Convert playlist_modified_at to timestamp
#    (UNIX milliseconds → timestamp)
# ---------------------------------------
silver_df = silver_df.withColumn(
    "modified_ts",
    F.to_timestamp(F.col("playlist_modified_at") / 1000)
)

# ---------------------------------------
# 2. Weekly playlist coverage per track
# ---------------------------------------
weekly_coverage = (
    silver_df
    .withColumn("week_start", F.date_trunc("week", F.col("modified_ts")))
    .groupBy(
        "track_uri",
        "track_name",
        "artist_name",
        "week_start"
    )
    .agg(
        F.countDistinct("playlist_id").alias("playlist_count")
    )
)

# ---------------------------------------
# 3. Week-over-week net change
# ---------------------------------------
w = Window.partitionBy("track_uri").orderBy("week_start")

trend_velocity = (
    weekly_coverage
    .withColumn(
        "prev_playlist_count",
        F.lag("playlist_count").over(w)
    )
    .withColumn(
        "net_change",
        F.col("playlist_count") - F.coalesce(F.col("prev_playlist_count"), F.lit(0))
    )
    .drop("prev_playlist_count")
)

# ---------------------------------------
# 4. Write Gold table (schema-safe)
# ---------------------------------------
(
    trend_velocity
    .write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .saveAsTable("gold_track_trend_velocity")
)

print("gold_track_trend_velocity created successfully")


gold_track_trend_velocity created successfully


In [0]:
from pyspark.sql import functions as F

# ---------------------------------------
# Load silver playlist-track data
# ---------------------------------------
silver_df = spark.table("silver_playlist_tracks")

# ---------------------------------------
# 1. Self-join tracks within the same playlist
#    (track A < track B to avoid duplicates)
# ---------------------------------------
pairs = (
    silver_df.alias("a")
    .join(
        silver_df.alias("b"),
        on="playlist_id"
    )
    .where(F.col("a.track_uri") < F.col("b.track_uri"))
)

# ---------------------------------------
# 2. Count co-occurrences
# ---------------------------------------
co_occurrence = (
    pairs
    .groupBy(
        F.col("a.track_uri").alias("track_uri_1"),
        F.col("a.track_name").alias("track_name_1"),
        F.col("a.artist_name").alias("artist_name_1"),
        F.col("b.track_uri").alias("track_uri_2"),
        F.col("b.track_name").alias("track_name_2"),
        F.col("b.artist_name").alias("artist_name_2"),
    )
    .agg(F.countDistinct("playlist_id").alias("co_occurrence_count"))
)

# ---------------------------------------
# 3. Write Gold table (schema-safe)
# ---------------------------------------
(
    co_occurrence
    .write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .saveAsTable("gold_track_pairs")
)

print("old_track_pairs created successfully")


old_track_pairs created successfully


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

silver_playlists = spark.table("silver_playlists")
silver_playlist_tracks = spark.table("silver_playlist_tracks")

# =========================
# gold_playlist_lifecycle (history-aware)
# =========================

# 1) Daily playlist sizes (1 row per playlist per day)
daily_sizes = (
    silver_playlist_tracks
    .where(
        F.col("playlist_id").isNotNull()
        & F.col("track_uri").isNotNull()
        & F.col("playlist_modified_at").isNotNull()
    )
    .withColumn(
        "event_ts",
        F.when(
            F.col("playlist_modified_at").cast("string").rlike("^[0-9]{10}$"),
            F.to_timestamp(F.col("playlist_modified_at").cast("long"))
        ).otherwise(
            F.to_timestamp("playlist_modified_at")
        )
    )
    .withColumn("event_date", F.to_date("event_ts"))
    .groupBy("playlist_id", "event_date")
    .agg(F.countDistinct("track_uri").alias("track_count"))
)

# 2) Compute first/last day track_count per playlist, then net growth
w_first = Window.partitionBy("playlist_id").orderBy(F.col("event_date").asc())
w_last  = Window.partitionBy("playlist_id").orderBy(F.col("event_date").desc())

daily_with_flags = (
    daily_sizes
    .withColumn("rn_first", F.row_number().over(w_first))
    .withColumn("rn_last",  F.row_number().over(w_last))
    .withColumn("first_day_tracks", F.when(F.col("rn_first") == 1, F.col("track_count")))
    .withColumn("last_day_tracks",  F.when(F.col("rn_last")  == 1, F.col("track_count")))
)

lifecycle = (
    daily_with_flags
    .groupBy("playlist_id")
    .agg(
        F.min("event_date").alias("first_seen_date"),
        F.max("event_date").alias("last_seen_date"),
        F.count("*").alias("active_days"),
        F.min("track_count").alias("min_tracks"),
        F.max("track_count").alias("max_tracks"),
        F.avg("track_count").alias("avg_tracks"),
        F.max("first_day_tracks").alias("first_day_tracks"),
        F.max("last_day_tracks").alias("last_day_tracks"),
    )
    .withColumn("track_growth", F.col("last_day_tracks") - F.col("first_day_tracks"))
    .drop("first_day_tracks", "last_day_tracks")
)

# 3) Latest followers per playlist (silver_playlists uses playlist_modified_ts + followers)
w_followers = Window.partitionBy("playlist_id").orderBy(F.col("playlist_modified_ts").desc())

followers_latest = (
    silver_playlists
    .where(
        F.col("playlist_id").isNotNull()
        & F.col("playlist_modified_ts").isNotNull()
        & F.col("followers").isNotNull()
    )
    .withColumn("rn", F.row_number().over(w_followers))
    .where(F.col("rn") == 1)
    .select(
        "playlist_id",
        F.col("followers").cast("bigint").alias("followers")
    )
)

gold_playlist_lifecycle = (
    lifecycle
    .join(followers_latest, on="playlist_id", how="left")
    .withColumn("followers", F.coalesce(F.col("followers"), F.lit(0)))
)

(
  gold_playlist_lifecycle
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("gold_playlist_lifecycle")
)