In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

CATALOG = "music_demo"
SILVER  = "silver"
GOLD    = "gold"

spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {GOLD}")
spark.sql(f"USE {GOLD}")


In [0]:
# Pull clean Silver fact
df = (spark.table(f"{CATALOG}.{SILVER}.fact_metrics")
        .withColumn("activity", F.coalesce(F.col("streams"), F.lit(0)) + F.coalesce(F.col("views"), F.lit(0))))

# Window specs
w7      = Window.partitionBy("track_id", "platform").orderBy(F.col("date").cast("timestamp")).rowsBetween(-6, 0)
w28     = Window.partitionBy("track_id", "platform").orderBy(F.col("date").cast("timestamp")).rowsBetween(-27, 0)
wfol7   = Window.partitionBy("artist_id", "platform").orderBy(F.col("date").cast("timestamp")).rowsBetween(-6, 0)
wprev21 = Window.partitionBy("track_id", "platform").orderBy(F.col("date").cast("timestamp")).rowsBetween(-27, -7)

feat = (df
    .withColumn("activity_7d",  F.sum("activity").over(w7))
    .withColumn("activity_28d", F.sum("activity").over(w28))
    .withColumn("followers_7d", F.sum(F.coalesce(F.col("followers_gained"), F.lit(0))).over(wfol7))
    .withColumn("activity_prev_21d_avg", F.avg("activity").over(wprev21))
    .withColumn(
        "momentum_ratio",
        F.when(F.col("activity_prev_21d_avg").isNull() | (F.col("activity_prev_21d_avg") == 0), F.lit(None))
         .otherwise(F.col("activity_7d") / F.col("activity_prev_21d_avg"))
    )
    .withColumn(
        "raw_score",
        F.coalesce(F.col("activity_7d"), F.lit(0)) * F.lit(0.6) +
        F.coalesce(F.col("followers_7d"), F.lit(0)) * F.lit(10.0) +
        (F.lit(10000) - F.least(F.coalesce(F.col("rank_estimate"), F.lit(10000)), F.lit(10000))) * F.lit(0.1)
    )
    .select(
        "date","platform","region","artist_id","track_id",
        "activity_7d","activity_28d","followers_7d","momentum_ratio","raw_score"
    )
)

# Materialize Gold table
feat.write.mode("overwrite").format("delta").saveAsTable(f"{CATALOG}.{GOLD}.track_momentum_daily")

display(spark.sql(f"SELECT COUNT(*) AS rows FROM {CATALOG}.{GOLD}.track_momentum_daily"))


In [0]:
spark.sql(f"""
CREATE OR REPLACE VIEW {CATALOG}.{GOLD}.v_rising_tracks AS
SELECT
  d.date,
  d.platform,
  d.region,
  t.track_id,
  t.track_title,
  a.artist_id,
  a.artist_name,
  d.activity_7d,
  d.activity_28d,
  d.followers_7d,
  d.momentum_ratio,
  d.raw_score
FROM {CATALOG}.{GOLD}.track_momentum_daily d
JOIN {CATALOG}.{SILVER}.dim_track  t ON d.track_id = t.track_id
JOIN {CATALOG}.{SILVER}.dim_artist a ON t.artist_id = a.artist_id
""")

display(spark.sql(f"""
SELECT *
FROM {CATALOG}.{GOLD}.v_rising_tracks
ORDER BY date DESC, raw_score DESC
LIMIT 20
"""))


In [0]:
# Helpful for BI performance on this modest dataset
spark.sql(f"OPTIMIZE {CATALOG}.{GOLD}.track_momentum_daily ZORDER BY (track_id, platform, date)")
