In [13]:
# ==================================================================
# CELL 0 - Imports & Globals
# Purpose: centralize imports, constants, and small helper functions used across cells.
# ==================================================================
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col, input_file_name, regexp_extract, current_timestamp, row_number,
    explode, posexplode, expr, split, element_at
)
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# ---- Global paths (adjust when running) ----
BRONZE_BASE = "Files/bronze/spotify"
SILVER_BASE = "Files/silver/spotify"

# ---- Helper: safe write as Delta table (upsert shortcut for simple inserts) ----
def upsert_into_delta(table_name: str, df: DataFrame, match_condition: str, update_on_match: bool = False):
    """
    Upsert (merge) df into existing Delta table by match_condition.
    If table does not exist, this will raise â€” run CREATE TABLE first (see DDL cells).
    Set update_on_match=True if you want to update matched rows as well.
    """
    delta = DeltaTable.forName(spark, table_name)
    m = delta.alias('t').merge(df.alias('s'), match_condition)
    if update_on_match:
        m = m.whenMatchedUpdateAll()
    m.whenNotMatchedInsertAll().execute()

StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 15, Finished, Available, Finished)

In [14]:
# ==================================================================
# CELL 1 - SQL DDL: Create Silver Schemas (single place for table definitions)
# Purpose: define explicit schemas for main silver tables using SQL DDL.
# Run this once before merges. Keeps schema definitions readable and versionable.
# ==================================================================
# NOTE: We keep DDLs compact here. Modify types/props as needed.
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_followed_artists (
    spotify_user_id STRING,
    snapshot_date DATE,
    artist_id STRING,
    artist_name STRING,
    artist_genres ARRAY<STRING>,
    artist_popularity INT,
    artist_followers INT,
    artist_uri STRING,
    artist_image_url STRING
)
USING delta
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS silver_saved_tracks (
    spotify_user_id STRING,
    snapshot_date DATE,
    added_at STRING,
    track_id STRING,
    track_name STRING,
    duration_ms INT,
    explicit BOOLEAN,
    popularity INT,

    spotify_url STRING,
    track_uri STRING,
    track_number INT,
    disc_number INT,

    album_id STRING,
    album_name STRING,
    album_type STRING,
    release_date STRING,
    total_tracks INT,
    album_image_url STRING,

    artist_id STRING,
    artist_name STRING,
    artist_type STRING,
    artist_url STRING
)
USING delta
TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = true,
    delta.autoOptimize.autoCompact = true
)
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS silver_recently_played (
    spotify_user_id STRING,
    snapshot_date DATE,
    played_at TIMESTAMP,
    track_id STRING,
    track_name STRING,
    popularity INT,
    duration_ms INT,
    explicit BOOLEAN,
    artist_id STRING,
    artist_name STRING,
    album_id STRING,
    album_name STRING,
    album_image_url STRING,
    context_type STRING,
    context_uri STRING
)
USING delta
TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = true,
    delta.autoOptimize.autoCompact = true
)
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS silver_top_tracks (
    spotify_user_id STRING,
    snapshot_date DATE,
    time_range STRING,
    rank INT,
    track_id STRING,
    track_name STRING,
    popularity INT,
    duration_ms INT,
    explicit BOOLEAN,
    track_number INT,
    external_url STRING,
    artist_id STRING,
    artist_name STRING,
    album_id STRING,
    album_name STRING,
    release_date STRING,
    album_image_url STRING
)
USING delta
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS silver_top_artists (
    spotify_user_id STRING,
    snapshot_date DATE,
    time_range STRING,
    rank INT,
    artist_id STRING,
    artist_name STRING,
    popularity INT,
    followers INT,
    artist_genres ARRAY<STRING>,
    external_url STRING,
    artist_image_url STRING
)
USING delta
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS silver_playlist_tracks (
    spotify_user_id STRING,
    playlist_id STRING,
    snapshot_date DATE,
    position INT,
    added_at TIMESTAMP,
    added_by_id STRING,
    track_id STRING,
    track_name STRING,
    popularity INT,
    duration_ms INT,
    explicit BOOLEAN,
    external_url STRING,
    artist_id STRING,
    artist_name STRING,
    album_id STRING,
    album_name STRING,
    album_image_url STRING
)
USING delta
""")

StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 16, Finished, Available, Finished)

DataFrame[]

In [15]:
# ==================================================================
# CELL 2 - Standardized: Load raw JSON files (pattern-based loader)
# Purpose: single reusable approach to read JSON with source path column
# Usage: pass source_path (glob) and the regex path_pattern to extract metadata
# ==================================================================

def load_json_with_path(source_path: str) -> DataFrame:
    """Read multiline JSONs and add a source_path column for metadata extraction."""
    return (
        spark.read
             .option("multiline", True)
             .json(source_path)
             .withColumn("source_path", input_file_name())
    )


StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 17, Finished, Available, Finished)

In [16]:
# ==================================================================
# CELL 3 - USER PROFILE: Latest snapshot per user (Dimension)
# Purpose: keep one latest profile record per spotify_user_id
# Steps grouped: load -> extract snapshot_date -> window latest -> flatten -> write
# ==================================================================
source_path = f"{BRONZE_BASE}/*/user_profile/*/data.json"
path_pattern = r".*/spotify/[^/]+/user_profile/([^/]+)/data\.json$"

df_raw = load_json_with_path(source_path)

# Extract snapshot_date and cast to date
df = df_raw.withColumn("snapshot_date", regexp_extract("source_path", path_pattern, 1).cast("date"))

# Keep latest snapshot per user using window
w = Window.partitionBy("ingestion_metadata.spotify_user_id").orderBy(col("snapshot_date").desc())

df_latest = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")

# Flatten only required fields and add ingestion timestamp
df_user_profile = (
    df_latest.select(
        col("ingestion_metadata.spotify_user_id").alias("spotify_user_id"),
        col("snapshot_date"),
        col("ingestion_metadata.run_utc").alias("run_utc"),
        col("payload.display_name").alias("display_name"),
        col("payload.email").alias("email"),
        col("payload.country").alias("country"),
        col("payload.product").alias("subscription_type"),
        col("payload.followers.total").alias("follower_count"),
        col("payload.explicit_content.filter_enabled").alias("explicit_filter_enabled"),
        col("payload.explicit_content.filter_locked").alias("explicit_filter_locked"),
        col("payload.uri").alias("spotify_uri"),
        col("payload.images").alias("images"),
        current_timestamp().alias("ingested_at")
    )
)

# Overwrite the silver_user_profile table (dimension - latest snapshot)
df_user_profile.write.format("delta").mode("overwrite").saveAsTable("silver_user_profile")

StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 18, Finished, Available, Finished)

In [17]:
# ==================================================================
# CELL 4 - FOLLOWED ARTISTS: flatten and insert new follows
# Purpose: flatten followed artists snapshots and insert only new (per user+artist)
# Steps grouped: load -> extract snapshot_date -> explode items -> clean -> dedupe -> merge
# ==================================================================
source_path = f"{BRONZE_BASE}/*/followed_artists/*/data.json"
path_pattern = r".*/spotify/[^/]+/followed_artists/([^/]+)/data\.json$"

raw = load_json_with_path(source_path)
raw = raw.withColumn("snapshot_date", regexp_extract("source_path", path_pattern, 1).cast("date"))

exploded = raw.select(
    col("ingestion_metadata.spotify_user_id").alias("spotify_user_id"),
    col("snapshot_date"),
    explode(col("payload.artists.items")).alias("artist")
)

clean = exploded.select(
    col("spotify_user_id"),
    col("snapshot_date"),
    col("artist.id").alias("artist_id"),
    col("artist.name").alias("artist_name"),
    col("artist.genres").alias("artist_genres"),
    col("artist.popularity").alias("artist_popularity"),
    col("artist.followers.total").alias("artist_followers"),
    col("artist.uri").alias("artist_uri"),
    expr("artist.images[0].url").alias("artist_image_url")
).dropDuplicates(["spotify_user_id", "artist_id"])  # dedupe within batch

# Merge inserts only new artists per user
upsert_into_delta(
    table_name="silver_followed_artists",
    df=clean,
    match_condition="t.spotify_user_id = s.spotify_user_id AND t.artist_id = s.artist_id",
    update_on_match=False
)

StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 19, Finished, Available, Finished)

In [19]:
# ==================================================================
# CELL 5 - SAVED TRACKS: flatten multi-page payloads and merge
# Purpose: transform user's saved tracks and ensure one row per (user, track)
# Steps grouped: load -> extract snapshot_date -> explode pages -> flatten -> dedupe -> merge
# ==================================================================
source_path = f"{BRONZE_BASE}/*/saved_tracks/*/data.json"
path_pattern = r".*/spotify/[^/]+/saved_tracks/([^/]+)/data\.json$"

raw = load_json_with_path(source_path)
raw = raw.withColumn("snapshot_date", regexp_extract("source_path", path_pattern, 1).cast("date"))

# payload is array of pages -> explode pages then items
df_items = raw.select(
    col("ingestion_metadata.spotify_user_id").alias("spotify_user_id"),
    col("snapshot_date"),
    explode(col("payload")).alias("entry")
).select(
    "spotify_user_id",
    "snapshot_date",
    explode(col("entry.items")).alias("item")
)

# flatten
clean = df_items.select(
    col("spotify_user_id"),
    col("snapshot_date"),
    col("item.added_at").alias("added_at"),
    col("item.track.id").alias("track_id"),
    col("item.track.name").alias("track_name"),
    col("item.track.duration_ms"),
    col("item.track.explicit"),
    col("item.track.popularity"),
    col("item.track.external_urls.spotify").alias("spotify_url"),
    col("item.track.uri").alias("track_uri"),
    col("item.track.track_number"),
    col("item.track.disc_number"),
    col("item.track.album.id").alias("album_id"),
    col("item.track.album.name").alias("album_name"),
    col("item.track.album.album_type"),
    col("item.track.album.release_date").alias("release_date"),
    col("item.track.album.total_tracks"),
    expr("item.track.album.images[0].url").alias("album_image_url"),
    expr("item.track.artists[0].id").alias("artist_id"),
    expr("item.track.artists[0].name").alias("artist_name"),
    expr("item.track.artists[0].type").alias("artist_type"),
    expr("item.track.artists[0].external_urls.spotify").alias("artist_url")
)

# dedupe source batch
df_final_source = clean.dropDuplicates(["spotify_user_id", "track_id"])

upsert_into_delta(
    table_name="silver_saved_tracks",
    df=df_final_source,
    match_condition="t.spotify_user_id = s.spotify_user_id AND t.track_id = s.track_id",
    update_on_match=False
)


StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 21, Finished, Available, Finished)

In [20]:
# ==================================================================
# CELL 6 - RECENTLY PLAYED: flatten and merge play events (append-heavy)
# Purpose: ingest play events with played_at as event timestamp (PK per user + played_at)
# Steps grouped: load -> extract snapshot_date -> explode payload[0].items -> flatten -> dedupe -> merge
# ==================================================================
source_path = f"{BRONZE_BASE}/*/recently_played/*/data.json"
path_pattern = r".*/spotify/[^/]+/recently_played/([^/]+)/data\.json$"

raw = load_json_with_path(source_path)
raw = raw.withColumn("snapshot_date", regexp_extract("source_path", path_pattern, 1).cast("date"))

exploded = raw.select(
    col("ingestion_metadata.spotify_user_id").alias("spotify_user_id"),
    col("snapshot_date"),
    explode(col("payload")[0].items).alias("item")
)

clean = exploded.select(
    col("spotify_user_id"),
    col("snapshot_date"),
    col("item.played_at").cast("timestamp").alias("played_at"),
    col("item.track.id").alias("track_id"),
    col("item.track.name").alias("track_name"),
    col("item.track.popularity"),
    col("item.track.duration_ms"),
    col("item.track.explicit"),
    expr("item.track.artists[0].id").alias("artist_id"),
    expr("item.track.artists[0].name").alias("artist_name"),
    col("item.track.album.id").alias("album_id"),
    col("item.track.album.name").alias("album_name"),
    expr("item.track.album.images[0].url").alias("album_image_url"),
    col("item.context.type").alias("context_type"),
    col("item.context.uri").alias("context_uri")
)

# dedupe by event key
df_final_source = clean.dropDuplicates(["spotify_user_id", "played_at"])

upsert_into_delta(
    table_name="silver_recently_played",
    df=df_final_source,
    match_condition="t.spotify_user_id = s.spotify_user_id AND t.played_at = s.played_at",
    update_on_match=False
)


StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 22, Finished, Available, Finished)

In [21]:
# ==================================================================
# CELL 7 - TOP TRACKS: parse time range and rank using posexplode
# Purpose: capture ordered top tracks per user/time_range/snapshot
# Steps grouped: load -> extract user/time_range/snapshot -> posexplode -> flatten -> dedupe -> merge
# ==================================================================
source_path = f"{BRONZE_BASE}/*/top_tracks_*_term/*/data.json"
path_pattern = r".*/spotify/([^/]+)/top_tracks_([^_]+)_term/([^/]+)/data\.json$"

raw = load_json_with_path(source_path)

# extract user id, time_range and snapshot (handles underscores in user folder)
meta = raw.withColumn("spotify_user_id", element_at(split(regexp_extract("source_path", path_pattern, 1), "_"), -1)) \
          .withColumn("time_range", regexp_extract("source_path", path_pattern, 2)) \
          .withColumn("snapshot_date", regexp_extract("source_path", path_pattern, 3).cast("date"))

exploded = meta.select(
    col("spotify_user_id"),
    col("snapshot_date"),
    col("time_range"),
    posexplode(col("payload")[0].items).alias("index", "item")
)

clean = exploded.select(
    col("spotify_user_id"),
    col("snapshot_date"),
    col("time_range"),
    (col("index") + 1).alias("rank"),
    col("item.id").alias("track_id"),
    col("item.name").alias("track_name"),
    col("item.popularity").cast("int").alias("popularity"),
    col("item.duration_ms"),
    col("item.explicit"),
    col("item.track_number"),
    col("item.external_urls.spotify").alias("external_url"),
    expr("item.artists[0].id").alias("artist_id"),
    expr("item.artists[0].name").alias("artist_name"),
    col("item.album.id").alias("album_id"),
    col("item.album.name").alias("album_name"),
    col("item.album.release_date").alias("release_date"),
    expr("item.album.images[0].url").alias("album_image_url")
)

# dedupe within batch
df_final = clean.dropDuplicates(["spotify_user_id", "snapshot_date", "time_range", "rank"])

upsert_into_delta(
    table_name="silver_top_tracks",
    df=df_final,
    match_condition=(
        "t.spotify_user_id = s.spotify_user_id AND t.snapshot_date = s.snapshot_date "
        "AND t.time_range = s.time_range AND t.rank = s.rank"
    ),
    update_on_match=False
)


StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 23, Finished, Available, Finished)

In [22]:

# ==================================================================
# CELL 8 - TOP ARTISTS: same pattern as top_tracks
# Purpose: capture ordered top artists per user/time_range/snapshot
# ==================================================================
source_path = f"{BRONZE_BASE}/*/top_artists_*_term/*/data.json"
path_pattern = r".*/spotify/([^/]+)/top_artists_([^_]+)_term/([^/]+)/data\.json$"

raw = load_json_with_path(source_path)
meta = raw.withColumn("spotify_user_id", element_at(split(regexp_extract("source_path", path_pattern, 1), "_"), -1)) \
          .withColumn("time_range", regexp_extract("source_path", path_pattern, 2)) \
          .withColumn("snapshot_date", regexp_extract("source_path", path_pattern, 3).cast("date"))

exploded = meta.select(
    col("spotify_user_id"),
    col("snapshot_date"),
    col("time_range"),
    posexplode(col("payload")[0].items).alias("index", "item")
)

clean = exploded.select(
    col("spotify_user_id"),
    col("snapshot_date"),
    col("time_range"),
    (col("index") + 1).alias("rank"),
    col("item.id").alias("artist_id"),
    col("item.name").alias("artist_name"),
    col("item.popularity").cast("int").alias("popularity"),
    col("item.followers.total").cast("int").alias("followers"),
    col("item.genres").alias("artist_genres"),
    col("item.external_urls.spotify").alias("external_url"),
    expr("item.images[0].url").alias("artist_image_url")
)

df_final = clean.dropDuplicates(["spotify_user_id", "snapshot_date", "time_range", "rank"])

upsert_into_delta(
    table_name="silver_top_artists",
    df=df_final,
    match_condition=(
        "t.spotify_user_id = s.spotify_user_id AND t.snapshot_date = s.snapshot_date "
        "AND t.time_range = s.time_range AND t.rank = s.rank"
    ),
    update_on_match=False
)

StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 24, Finished, Available, Finished)

In [23]:
# ==================================================================
# CELL 9 - PLAYLIST TRACKS: capture playlist order and allow updates
# Purpose: store playlist track order and metadata; update if track metadata changes
# ==================================================================
source_path = f"{BRONZE_BASE}/*/playlist_*/*/data.json"
path_pattern = r".*/spotify/([^/]+)/playlist_([^/]+)/([^/]+)/data\.json$"

raw = load_json_with_path(source_path)
meta = raw.withColumn("spotify_user_id", element_at(split(regexp_extract("source_path", path_pattern, 1), "_"), -1)) \
          .withColumn("playlist_id", regexp_extract("source_path", path_pattern, 2)) \
          .withColumn("snapshot_date", regexp_extract("source_path", path_pattern, 3).cast("date"))

exploded = meta.select(
    col("spotify_user_id"),
    col("playlist_id"),
    col("snapshot_date"),
    posexplode(col("payload")[0].items).alias("index", "item")
)

clean = exploded.select(
    col("spotify_user_id"),
    col("playlist_id"),
    col("snapshot_date"),
    (col("index") + 1).alias("position"),
    col("item.added_at").cast("timestamp").alias("added_at"),
    col("item.added_by.id").alias("added_by_id"),
    col("item.track.id").alias("track_id"),
    col("item.track.name").alias("track_name"),
    col("item.track.popularity").cast("int").alias("popularity"),
    col("item.track.duration_ms"),
    col("item.track.explicit"),
    col("item.track.external_urls.spotify").alias("external_url"),
    expr("item.track.artists[0].id").alias("artist_id"),
    expr("item.track.artists[0].name").alias("artist_name"),
    col("item.track.album.id").alias("album_id"),
    col("item.track.album.name").alias("album_name"),
    expr("item.track.album.images[0].url").alias("album_image_url")
)

# dedupe within batch and merge (update_on_match True to refresh metadata)
df_final = clean.dropDuplicates(["playlist_id", "track_id"])

upsert_into_delta(
    table_name="silver_playlist_tracks",
    df=df_final,
    match_condition="t.playlist_id = s.playlist_id AND t.track_id = s.track_id",
    update_on_match=True
)

StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 25, Finished, Available, Finished)

In [24]:
# ==================================================================
# CELL 10 - MASTER ARTISTS: build a master artists dimension (robust JSON handling)
# Purpose: create/overwrite a master artist table from Files/silver/spotify/*/master_artists.json
# Steps grouped: load -> normalize wrapper -> flatten -> dedupe -> overwrite table
# ==================================================================
source_path = f"{SILVER_BASE}/*/master_artists.json"
try:
    df_raw = spark.read.option("multiline", True).json(source_path)
except Exception as e:
    print(f"No master_artists.json found or empty. Error: {e}")
    dbutils.notebook.exit("No Data")

# normalize structure: either {"artists": [...]} or top-level array
if "artists" in df_raw.columns:
    df_flat = df_raw.select(explode(col("artists")).alias("item"))
elif "items" in df_raw.columns:
    df_flat = df_raw.select(explode(col("items")).alias("item"))
else:
    df_flat = df_raw.select(expr("struct(*)").alias("item"))

# flatten and dedupe
df_clean = df_flat.select(
    col("item.id").alias("artist_id"),
    col("item.name").alias("artist_name"),
    col("item.popularity").cast("int").alias("popularity"),
    col("item.followers.total").cast("int").alias("followers"),
    col("item.genres").alias("artist_genres"),
    col("item.external_urls.spotify").alias("external_url"),
    expr("item.images[0].url").alias("artist_image_url")
).dropDuplicates(["artist_id"])  # master by id

# overwrite master table
_df = df_clean
_df.write.format("delta").mode("overwrite").saveAsTable("silver_artists_raw")


StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 26, Finished, Available, Finished)

In [25]:
# ==================================================================
# CELL 11 - MASTER TRACKS: build master tracks (recco->spotify mapping)
# Purpose: create/overwrite silver_tracks_raw from master_reccobeats_tracks.json
# ==================================================================
source_path = f"{SILVER_BASE}/*/master_reccobeats_tracks.json"
try:
    df_raw = spark.read.json(source_path)
except Exception as e:
    print(f"No master_reccobeats_tracks.json found. Error: {e}")
    dbutils.notebook.exit("No Data")

# flatten minimal required fields
df_clean = df_raw.select(
    col("spotify_id"),
    col("recco_id"),
    col("trackTitle").alias("track_name"),
    col("durationMs").alias("duration_ms"),
    col("isrc"),
    col("popularity"),
    expr("artists[0].id").alias("artist_recco_id"),
    expr("artists[0].name").alias("artist_name"),
    col("href").alias("api_link")
).dropDuplicates(["spotify_id"])

# overwrite master table
df_clean.write.format("delta").mode("overwrite").saveAsTable("silver_tracks_raw")

StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 27, Finished, Available, Finished)

In [26]:
# ==================================================================
# CELL 12 - AUDIO FEATURES: join features -> spotify id and save
# Purpose: combine audio feature records (recco_id) with spotify_id using master tracks file
# ==================================================================
source_path = f"{SILVER_BASE}/*/master_reccobeats_features.json"
try:
    df_features_raw = spark.read.json(source_path)
except Exception as e:
    print("No audio features found")
    dbutils.notebook.exit("No audio features found")

# prepare features
df_features = df_features_raw.select(
    col("id").alias("recco_id"),
    col("danceability"),
    col("energy"),
    col("key"),
    col("loudness"),
    col("mode"),
    col("speechiness"),
    col("acousticness"),
    col("instrumentalness"),
    col("liveness"),
    col("valence"),
    col("tempo")
).dropDuplicates(["recco_id"])

# load tracks mapping
path_tracks = f"{SILVER_BASE}/*/master_reccobeats_tracks.json"
df_tracks_raw = spark.read.json(path_tracks)
df_lookup = df_tracks_raw.select(col("recco_id"), col("spotify_id")).dropDuplicates(["recco_id"])

# join and save
df_joined = df_features.join(df_lookup, on="recco_id", how="inner")

df_final = df_joined.select(
    col("spotify_id"),
    col("recco_id"),
    col("danceability"),
    col("energy"),
    col("key"),
    col("loudness"),
    col("mode"),
    col("speechiness"),
    col("acousticness"),
    col("instrumentalness"),
    col("liveness"),
    col("valence"),
    col("tempo")
)

# overwrite or update as needed
df_final.write.format("delta").mode("overwrite").saveAsTable("silver_audio_features_raw")


StatementMeta(, 82393ace-b2d5-47b8-864b-78b20106132e, 28, Finished, Available, Finished)

Refactored silver layer notebook cells completed.
