In [0]:
# Test only; run to clear Bronze
# dbutils.fs.rm("/mnt/billboard_mount/bronze_delta/hot100_raw", recurse=True)


In [0]:
%sql
-- Test only; run to clear Bronze
-- DROP TABLE IF EXISTS bronze.hot100_raw;


In [0]:
# Test only; run to clear Bronze
# spark.sql("DROP TABLE IF EXISTS spark_catalog.silver.hot100_clean")
# dbutils.fs.rm("/mnt/billboard_mount/silver/hot100_clean", recurse=True)


In [0]:
# Test only; run to clear Bronze
# spark.sql(f"DROP TABLE IF EXISTS {silver_table}")
# dbutils.fs.rm(silver_delta_path, recurse=True)

In [0]:
# BRONZE

from delta.tables import DeltaTable
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
import urllib.parse

# CONFIGURE AWS MOUNT
aws_access_key = "access"
aws_secret_key = "secret"
aws_bucket_name = "billboard-hot100-project"
mount_name = "billboard_mount"

# CONFIGURE BRONZE PATHS
source_csv_path = f"/mnt/{mount_name}/bronze/hot100_delta_*.csv"
bronze_table = "bronze.hot100_raw"
bronze_delta_path = f"/mnt/{mount_name}/bronze_delta/hot100_raw"

# VARIABLES FOR CONTROLLING PROCESSING
MAX_ROWS = 2000  # Limit rows processed for testing purposes
IS_FIRST_RUN = not DeltaTable.isDeltaTable(spark, bronze_delta_path) # On the first run, all historical data is ran 

# MOUNT AWS S3
encoded_secret_key = urllib.parse.quote(aws_secret_key, "")
if not any(m.mountPoint == f"/mnt/{mount_name}" for m in dbutils.fs.mounts()):
    dbutils.fs.mount(f"s3a://{aws_access_key}:{encoded_secret_key}@{aws_bucket_name}", f"/mnt/{mount_name}")
    print(f"Mounted S3 bucket at /mnt/{mount_name}")
else:
    print(f"Mount /mnt/{mount_name} already exists")

# DEFINE SCHEMA OF INGEST
schema = StructType([
    StructField("Date", DateType(), True),
    StructField("Song", StringType(), True),
    StructField("Artist", StringType(), True),
    StructField("Rank", IntegerType(), True),
    StructField("Last Week", IntegerType(), True),
    StructField("Peak Position", IntegerType(), True),
    StructField("Weeks in Charts", IntegerType(), True)
])

# FIRST RUN: INITIALIZE BRONZE TABLE ON FIRST RUN
if IS_FIRST_RUN:
    print("First run: initializing Bronze delta table")
    
    df = spark.read.format("csv") \
        .schema(schema) \
        .option("header", "true") \
        .load(source_csv_path) \
        .withColumn("Date", to_date(col("Date"), "yyyy-MM-dd")) \
        .limit(MAX_ROWS)
    
    df.write.format("delta") \
        .mode("overwrite") \
        .option("delta.enableChangeDataFeed", "true") \
        .option("delta.columnMapping.mode", "name") \
        .save(bronze_delta_path)
    
    spark.sql("""
        CREATE TABLE IF NOT EXISTS {}
        USING DELTA LOCATION '{}'
        TBLPROPERTIES (
            delta.enableChangeDataFeed = true,
            delta.columnMapping.mode = 'name'
        )
    """.format(bronze_table, bronze_delta_path))
    
    _ = spark.sql("OPTIMIZE {} ZORDER BY (Date, Rank)".format(bronze_table))
    print(f"Initialized Bronze table with {df.count()} rows (MAX_ROWS = {MAX_ROWS})")

# LOAD CSV FILES FROM S3
all_csv_files = [
    f.path for f in dbutils.fs.ls(f"/mnt/{mount_name}/bronze/")
    if f.name.startswith("hot100_delta_") and f.name.endswith(".csv")
]

# COLLECT FILES ALREADY IN BRONZE
try:
    processed_files = spark.sql("SELECT DISTINCT input_file_name() FROM {}".format(bronze_table)) \
        .rdd.map(lambda x: x[0]).collect()
except:
    processed_files = []

# LOCATE FILES NOT YET IN BRONZE
new_files = [f for f in all_csv_files if f not in processed_files]

# ENSURE THERE ARE NEW FILES IN INCREMENTAL RUNS
if not new_files:
    df_new = spark.createDataFrame([], schema)
    print("No new files found")
else:
    print(f"Found {len(new_files)} new file(s)")
    df_new = spark.read.format("csv") \
        .schema(schema) \
        .option("header", "true") \
        .load(new_files) \
        .withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))
    
    # FILTER ONLY NEW RECORDS USING LEFT ANTI JOIN TO EXCLUSE EXISTING RECORDS
    existing_dates = spark.table(bronze_table).select("Date").distinct()
    df_new = df_new.join(existing_dates, "Date", "left_anti")
    
    if df_new.count() == 0:
        print("No new dates found in new files. Skipping append.")
        df_new = spark.createDataFrame([], schema)
    else:
        new_date_count = df_new.select("Date").distinct().count()
        print(f"Found {new_date_count} new dates in new files")
        
        # FOR TESTING: CAP TO MAX_ROWS TO REDUCE UNNECESSARY PROCESSING
        spark.sql("REFRESH TABLE {}".format(bronze_table))
        current_count = spark.table(bronze_table).count()
        print(f"Current rows: {current_count}, MAX_ROWS: {MAX_ROWS}")
        
        # CALCULATE NUMBER OF ROWS TO ADD BASED ON MAX_ROWS VALUE
        if current_count >= MAX_ROWS:
            df_new = spark.createDataFrame([], schema)
            print("MAX_ROWS reached. Skipping append.")
        else:
            to_take = MAX_ROWS - current_count
            df_new = df_new.limit(to_take)
            print(f"Taking {to_take} rows to reach MAX_ROWS={MAX_ROWS}")

# WITH NEW ROWS ISOLATED, APPEND TO BRONZE
if df_new.count() > 0:
    df_new.write.format("delta").mode("append").saveAsTable(bronze_table)
    print(f"Appended {df_new.count()} new rows")
else:
    print("No new rows")

# OPTIMIZE DELTA TABLE
_ = spark.sql("OPTIMIZE {} ZORDER BY (Date, Rank)".format(bronze_table))
print("Bronze ingestion complete.")

In [0]:
# SILVER

from pyspark.sql.functions import col, current_timestamp, lit, max, row_number
from pyspark.sql.types import StringType, IntegerType, BooleanType, FloatType, DateType, TimestampType, StructType, StructField
from pyspark.sql.window import Window
import requests
import time
import re

# DEFINE PATHS AND TABLES
bronze_table = "bronze.hot100_raw"
silver_table = "silver.hot100_clean"
silver_delta_path = "/mnt/billboard_mount/silver/hot100_clean"

# DEFINE SILVER SCHEMA
silver_schema = StructType([
    StructField("Song", StringType(), True),
    StructField("Date", DateType(), True),
    StructField("Artist", StringType(), True),
    StructField("Rank", IntegerType(), True),
    StructField("processed_date", TimestampType(), True),
    StructField("Image_URL", StringType(), True),
    StructField("Duration", IntegerType(), True),
    StructField("Explicit", BooleanType(), True),
    StructField("Song_Release_Date", StringType(), True),
    StructField("Track_Number", IntegerType(), True),
    StructField("Danceability", FloatType(), True),
    StructField("Energy", FloatType(), True),
    StructField("Key", IntegerType(), True),
    StructField("Loudness", FloatType(), True),
    StructField("Mode", IntegerType(), True),
    StructField("Speechiness", FloatType(), True),
    StructField("Acousticness", FloatType(), True),
    StructField("Instrumentalness", FloatType(), True),
    StructField("Liveness", FloatType(), True),
    StructField("Valence", FloatType(), True),
    StructField("Tempo", FloatType(), True),
    StructField("Track_ID", StringType(), True),
    StructField("Artist_ID", StringType(), True),
    StructField("In Spotify API", BooleanType(), True)
])

# CREATE SCHEMA FOR SILVER (FIRST RUN)
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")

# DEFINE SCHEMA FOR SILVER
# EXCLUDING COLUMNS FROM BRONZE: PEAK POSITION, LAST WEEK, WEEKS IN CHARTS. THESE VALUES IN THE DATA SOURCE ARE BASED ON THE TOP 100, IN THIS CASE JUST INTERESTED IN TRACKS PLACEMENT IN TOP 10 TO LIMIT PROCESSING. THEY WILL BE RECALCULATED IN GOLD LAYER.
try:
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {silver_table} (
            Song STRING,
            Date DATE,
            Artist STRING,
            Rank INT,
            processed_date TIMESTAMP,
            Image_URL STRING,
            Duration INT,
            Explicit BOOLEAN,
            Song_Release_Date STRING,
            Track_Number INT,
            Danceability FLOAT,
            Energy FLOAT,
            `Key` INT,
            Loudness FLOAT,
            Mode INT,
            Speechiness FLOAT,
            Acousticness FLOAT,
            Instrumentalness FLOAT,
            Liveness FLOAT,
            Valence FLOAT,
            Tempo FLOAT,
            Track_ID STRING,
            Artist_ID STRING,
            `In Spotify API` BOOLEAN
        )
        USING DELTA
        LOCATION '{silver_delta_path}'
        TBLPROPERTIES (
            delta.enableChangeDataFeed = true,
            delta.columnMapping.mode = 'name'
        )
    """)
    print(f"Silver table {silver_table} checked/created.")
except Exception as e:
    print(f"Error creating silver table: {e}")
    raise

# USE CHANGE DATA FEED (CDF) TO READ ONLY CHANGES TO THE BRONZE TABLE FOR INCREMENTAL PROCESSING
try:
    spark.sql(f"REFRESH TABLE {bronze_table}")
    latest_version = spark.sql(f"DESCRIBE HISTORY {bronze_table}").select(max("version")).collect()[0][0]
    starting_version = latest_version - 1 if latest_version > 0 else 0

    df_bronze_changes = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", starting_version) \
        .table(bronze_table)
except Exception as e:
    print(f"Failed to read bronze table with CDF: {e}")
    raise

# CONVERT TO DF OBJECT
silver_existing_df = spark.read.format("delta").table(silver_table)

# FILTER FOR INSERTS WITH EXPLICIT COL SELECTION
df_silver_input = df_bronze_changes.where("_change_type = 'insert'") \
    .select("Song", "Date", "Artist", "Rank") \
    .drop("_change_type", "_commit_version")

# FILTER ONLY TOP 10
df_silver = df_silver_input.filter("Rank BETWEEN 1 AND 10")

# ADD ROW NUMBERS TO DF SILVER (NEW RECORDS ONLY) FOR BATCHING
window = Window.orderBy("Date", "Rank", "Song", "Artist")
df_silver = df_silver.withColumn("rn", row_number().over(window))

# ADD NEW COLUMNS FOR API DATA ENRICHMENT
df_silver = df_silver \
    .withColumn("processed_date", current_timestamp()) \
    .withColumn("Image_URL", lit(None).cast(StringType())) \
    .withColumn("Duration", lit(None).cast(IntegerType())) \
    .withColumn("Explicit", lit(None).cast(BooleanType())) \
    .withColumn("Song_Release_Date", lit(None).cast(StringType())) \
    .withColumn("Track_Number", lit(None).cast(IntegerType())) \
    .withColumn("Danceability", lit(None).cast(FloatType())) \
    .withColumn("Energy", lit(None).cast(FloatType())) \
    .withColumn("Key", lit(None).cast(IntegerType())) \
    .withColumn("Loudness", lit(None).cast(FloatType())) \
    .withColumn("Mode", lit(None).cast(IntegerType())) \
    .withColumn("Speechiness", lit(None).cast(FloatType())) \
    .withColumn("Acousticness", lit(None).cast(FloatType())) \
    .withColumn("Instrumentalness", lit(None).cast(FloatType())) \
    .withColumn("Liveness", lit(None).cast(FloatType())) \
    .withColumn("Valence", lit(None).cast(FloatType())) \
    .withColumn("Tempo", lit(None).cast(FloatType())) \
    .withColumn("Track_ID", lit(None).cast(StringType())) \
    .withColumn("Artist_ID", lit(None).cast(StringType())) \
    .withColumn("In Spotify API", lit(False))

# SPOTIFY TOKEN FUNCTION
def get_spotify_token(client_id, client_secret):
    url = "https://accounts.spotify.com/api/token"
    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    try:
        response = requests.post(url, headers=headers, data=payload)
        response.raise_for_status()
        token_data = response.json()
        return token_data.get("access_token"), token_data.get("expires_in", 3600)
    except requests.exceptions.RequestException as e:
        print(f"Error getting Spotify token: {e}")
        print(f"Response: {response.text}")
        raise

# OBTAIN INITIAL SPOTIFY TOKEN
print("Getting Spotify Creds")
client_id = "id"
client_secret = "secret"
access_token, token_expiry = get_spotify_token(client_id, client_secret)
token_timestamp = time.time()
headers = {"Authorization": f"Bearer {access_token}"}
print(f"token acquired: {access_token}")

# DEFINE VARIABLES FOR DATA ENRICHMENT LOOP
BATCH_SIZE = 10
total = df_silver.count()

if total > 0:
    print(f"Enriching {total} new top-10 rows in batches of {BATCH_SIZE}...")

    # CACHE EXISTING SILVER DATA
    silver_existing = spark.table(silver_table) \
        .filter(col("In Spotify API").isNotNull()) \
        .select("Song", "Artist", "Track_ID", "Artist_ID", "Duration", "Explicit",
                "Song_Release_Date", "Track_Number", "Danceability", "Energy",
                "Key", "Loudness", "Mode", "Speechiness", "Acousticness", "Instrumentalness",
                "Liveness", "Valence", "Tempo", "Image_URL", "In Spotify API") \
        .collect()

    silver_dict = { (r.Song, r.Artist): r.asDict() for r in silver_existing }

    search_url = "https://api.spotify.com/v1/search"
    all_rows = df_silver.orderBy("rn").collect()

    # USED IN THE LOOP FOR REMOVING INFORMATION IN PARENTHESIS IN TRACK NAMES WHICH CAN CAUSE SPOTIFY SEARCH API TO FAIL TO LOCATE A TRACK
    pattern = r'\((.*?)\)'

    for i in range(0, total, BATCH_SIZE):
        # REFRESH SPOTIFY TOKEN IF APPROACHING EXPIRATION
        if time.time() - token_timestamp > token_expiry - 10:
            print("Refreshing Spotify token")
            access_token, token_expiry = get_spotify_token(client_id, client_secret)
            headers = {"Authorization Auth acquired": f"Bearer {access_token}"}
            token_timestamp = time.time()

        rows = all_rows[i:i + BATCH_SIZE]
        print(f"Batch {i//BATCH_SIZE + 1}: {len(rows)} rows")
        
        # SET UP LOOP VARIABLES
        rows = [row.asDict() for row in rows]
        enriched_rows = []

        # LOOP THROUGH EACH TRACK IN THE BATCH
        for row in rows:
            song = row["Song"]
            artist = row["Artist"]
            key = (song, artist)

            # REFRESH TOKEN IF APPROACHING EXPIRATION
            if time.time() - token_timestamp > token_expiry - 60:
                print(f"Token near expiry. Refreshing before calling API for: {song} - {artist}")
                access_token, token_expiry = get_spotify_token(client_id, client_secret)
                headers = {"Authorization": f"Bearer {access_token}"}
                token_timestamp = time.time()
                print(f"New token acquired, expires in {token_expiry} seconds.")

            # CHECK IF TRACK ALREADY EXISTS FROM A PREVIOUS WEEKLY TOP 10 AND "In Spotify API" HAS A VALUE (MEANING IT HAS BEEN PROCESSED BEFORE). IF IT DOES, COPY API ENRICHMENT VALUES OVER AND UPDATE WHERE APPROPRIATE.
            if key in silver_dict and silver_dict[key]["In Spotify API"]:
                print(f"Reusing cached data for: {song} - {artist}")
                cached = silver_dict[key].copy()
                cached.update({
                    "Song": row["Song"], "Date": row["Date"], "Artist": row["Artist"],
                    "Rank": row["Rank"],
                    "processed_date": row["processed_date"], "In Spotify API": True
                })
                enriched_rows.append(cached)
                continue

            # IF TRACK IS NOT IN SILVER YET, PREPARE TO CALL APIs TO PROVIDE INFORMATION ON THE TRACK
            print(f"Calling Spotify search for: {song} - {artist}")

            # SONG TITLES MAY HAVE WORDS IN PARENTHESES THAT CAN CAUSE THE SPOTIFY SEARCH TO NOT IDENTIFY THE SONG. REMOVE THE WORDS IN PARENTHESES. 
            match = re.search(pattern, song)

            # DEFINING NEW VAR song_query SO THAT song IS NOT MODIFIED
            song_query = song
            if match:
                print(f"removing parenteses from song: {song_query}")
                trimmed_song_query = re.sub(r'\([^)]*\)', '', song_query).rstrip()
                print(f"removed parentheses from song. Now searching for {trimmed_song_query}")
                
                # THERE MAY BE A CASE WHERE AN ENTIRE SONGNAME IS IN PARENTHESES, IN WHICH CASE THE SONG NAME SHOULD NOT HAVE THE PARENTHESES OR CONTENT BETWEEN PARENTHESES REMOVED FOR THE SEARCH
                if len(trimmed_song_query) > 0:
                    song_query = trimmed_song_query

            query = f"track:{song_query} artist:{artist}"
            params = {"q": query, "type": "track", "limit": 1}

            # INTIALIZE VALUES FOR ENRICHED ROW
            enriched_row = {
                "Song": song, "Date": row["Date"], "Artist": artist, "Rank": row["Rank"],
                "processed_date": row["processed_date"],
                "Image_URL": None, "Duration": None, "Explicit": None, "Song_Release_Date": None,
                "Track_Number": None, "Danceability": None, "Energy": None,
                "Key": None, "Loudness": None, "Mode": None, "Speechiness": None,
                "Acousticness": None, "Instrumentalness": None, "Liveness": None,
                "Valence": None, "Tempo": None, "Track_ID": None, "Artist_ID": None,
                "In Spotify API": False
            }

            api_success = False
            for attempt in range(2):  # TWO TRIES ALLOTTED SO IT WILL TRY AGAIN IF 401 ERROR (MEANING A TOKEN ISSUE)
                try:
                    response = requests.get(search_url, headers=headers, params=params, timeout=10)
                    
                    if response.status_code == 401:
                        print(f"401 Unauthorized on attempt {attempt + 1}. Forcing token refresh...")
                        access_token, token_expiry = get_spotify_token(client_id, client_secret)
                        headers = {"Authorization": f"Bearer {access_token}"}
                        token_timestamp = time.time()
                        print(f"Refreshed token. Retrying...")
                        continue  # AFTER GETTING FRESH TOKEN, TRY AGAIN

                    response.raise_for_status()
                    data = response.json()

                    # IF THERE IS DATA FOR THE TRACK FROM THE SPOTIFY API..
                    if data["tracks"]["items"]:
                        track = data["tracks"]["items"][0]
                        track_id = track["id"]
                        print(f"Spotify API provided track id: {track_id}")

                        # .. CAN INCLUDE SPOTIFY API VALUES IN THE ENRICHED ROW LOOPING VARIABLE TO LATER APPEND TO THE TRACK IN SILVER..
                        enriched_row.update({
                            "Track_ID": track_id,
                            "Artist_ID": track["artists"][0]["id"] if track["artists"] else None,
                            "Image_URL": track["album"]["images"][0]["url"] if track["album"]["images"] else None,
                            "Duration": track["duration_ms"],
                            "Explicit": track["explicit"],
                            "Song_Release_Date": track["album"]["release_date"],
                            "Track_Number": track["track_number"],
                            "In Spotify API": True
                        })

                        # .. ALSO CAN TRY TO CALL RECCOBEATS API FOR ADDITIONAL INFORMATION ON THE TRACK
                        try:
                            print(f"Calling Reccobeats API for: {song} - {artist}")
                            url = f"https://api.reccobeats.com/v1/track?ids={track_id}"
                            recco_response = requests.get(url, headers={'Accept': 'application/json'}, timeout=10)
                            recco_response.raise_for_status()
                            recco_id = recco_response.json()["content"][0]["id"] # RECCOBEATS ID IS DIFFERENT THAN SPOTIFY ID
                            print("Reccobeats API success")

                            # IF RECCOBEATS API CALL IS SUCCESSFUL, CAN INCLUDE RECCOBEATS API VALUES IN THE ENRICHED ROW LOOPING VARIABLE TO LATER APPEND TO THE TRACK IN SILVER.
                            feat_url = f"https://api.reccobeats.com/v1/track/{recco_id}/audio-features"
                            feat = requests.get(feat_url, headers={'Accept': 'application/json'}, timeout=10).json()
                            enriched_row.update({
                                "Danceability":      feat.get("danceability"),
                                "Energy":            feat.get("energy"),
                                "Key":               feat.get("key"),
                                "Loudness":          feat.get("loudness"),
                                "Mode":              feat.get("mode"),
                                "Speechiness":       feat.get("speechiness"),
                                "Acousticness":      feat.get("acousticness"),
                                "Instrumentalness":  feat.get("instrumentalness"),
                                "Liveness":          feat.get("liveness"),
                                "Valence":           feat.get("valence"),
                                "Tempo":             feat.get("tempo")
                            })
                        except Exception as e:
                            print(f"Reccobeats failed for {track_id}: {e}")
                            # IF RECCOBEATS FAILED, PROCEED WITHOUT ENRICHING THE ROW

                    else:
                        print(f"No tracks found for {song} - {artist}")
                        enriched_row["In Spotify API"] = False
                    
                    break

                except requests.exceptions.RequestException as e:
                    print(f"API error (attempt {attempt + 1}) for {song} - {artist}: {e}")
                    if response and hasattr(response, 'text'):
                        print(f"Response: {response.text}")
                    if attempt == 1:
                        enriched_row["In Spotify API"] = False
                    time.sleep(1)

            # INCLUDE ROW FOR ENRICHED TRACK DATA IN CURRENT BATCH OF ENRICHED ROWS
            enriched_rows.append(enriched_row)

            # ADD TO SILVER DICT TO ALLOW FOR REUSING APPROPRIATE TRACK DATA IF TRACK IS IN A TOP 10 LIST IN A SUBSEQUENT WEEK
            silver_dict[key] = enriched_row

            time.sleep(1)  # DELAY TO PREVENT SPOTIFY API LIMIT       

        print(f"Enriched {len(enriched_rows)} rows for batch {i//BATCH_SIZE + 1}")

        # APPEND BATCH OF 10 ROWS
        if enriched_rows:
            spark.createDataFrame(enriched_rows, schema=silver_schema).write \
                .format("delta").mode("append").saveAsTable(silver_table)
            print(f"  Appended batch {i//BATCH_SIZE + 1}")
        else:
            print(f"  No rows to append for batch {i//BATCH_SIZE + 1}")

else:
    print("No new songs to append.")

# PRINT FINAL COUNT OF SILVER TABLE
final_count = spark.table(silver_table).count()
print(f"Silver table final row count: {final_count}")

# OPTIMIZE AND Z ORDER
_ = spark.sql(f"OPTIMIZE {silver_table} ZORDER BY (Date, Rank)")
print("Silver optimized with ZORDER by (Date, Rank). Ingestion complete.")

# TABLE TO SHOW SPOTIFY SUCCESS RATE
print("Spotify enrichment status:")
spark.table(silver_table).groupBy("In Spotify API").count().show()