In [0]:
import requests
import pandas as pd
import base64
import time
from datetime import datetime, timedelta
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
CATALOG_NAME = "spotify_etl"
BRONZE_SCHEMA = "bronze"

today = datetime.utcnow()
print(today)
yesterday = today - timedelta(days=1)
print(yesterday)

after_ts = int(yesterday.timestamp() *1000)
print(after_ts)

In [0]:
# --- 1. UNITY CATALOG CONFIGURATION ---
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG_NAME}.{BRONZE_SCHEMA}")

print(f"Using Catalog: {CATALOG_NAME}, Schema: {BRONZE_SCHEMA}")

# --- 2. CONFIGURE SECRETS AND API ---
# Retrieve the secrets needed for refresh
try:
    # Pay attention to the scope and key names, make sure they match
    CLIENT_ID = dbutils.secrets.get(scope="spotify_secrets", key="user") # You used 'user', usually it's 'client-id'
    CLIENT_SECRET = dbutils.secrets.get(scope="spotify_secrets", key="client_secret")
    REFRESH_TOKEN = dbutils.secrets.get(scope="spotify_secrets", key="refresh-token")
except Exception as e:
    print("ERROR: Could not read secrets (user, client_secret, refresh-token) from 'spotify_secrets'.")
    print("Make sure you followed the setup steps to create the secrets.")
    raise e

# Spotify login URL
TOKEN_URL = "https://accounts.spotify.com/api/token"
BASE_URL = "https://api.spotify.com/v1"

# Global variable to store the current token
CURRENT_ACCESS_TOKEN = None

print("Configuration and secrets have been uploaded.")

In [0]:
def refresh_access_token():
    """
    Use REFRESH_TOKEN (from secrets) to get a new access_token.
    """
    global CURRENT_ACCESS_TOKEN
    print("Refreshing access token...")
    
    auth_str = f"{CLIENT_ID}:{CLIENT_SECRET}"
    auth_b64 = base64.b64encode(auth_str.encode()).decode()
    
    auth_data = {
        "grant_type": "refresh_token",
        "refresh_token": REFRESH_TOKEN
    }

    auth_headers = {"Authorization": f"Basic {auth_b64}"}
    
    try:
        response = requests.post(TOKEN_URL, data=auth_data, headers=auth_headers)
        response.raise_for_status()
        token_data = response.json()
        new_token = token_data.get('access_token')
        
        if not new_token:
            raise Exception("Response did not contain an 'access_token'")
            
        CURRENT_ACCESS_TOKEN = new_token
        print("Access token successfully refreshed.")
        return True
        
    except Exception as e:
        print(f"CRITICAL ERROR refreshing token: {e}")
        CURRENT_ACCESS_TOKEN = None
        return False

def make_api_call(url, params=None, retries=1):
    """
    Wrapper function that handles API calls, token refresh, and retries.
    """
    global CURRENT_ACCESS_TOKEN
    
    if not CURRENT_ACCESS_TOKEN:
        if not refresh_access_token():
            raise Exception("Could not obtain initial token.")

    headers = {"Authorization": f"Bearer {CURRENT_ACCESS_TOKEN}"}
    
    try:
        response = requests.get(url, headers=headers, params=params)
        
        # Check if the token has expired
        if response.status_code in [401, 403] and retries > 0:
            print(f"Token expired ({response.status_code}). Refreshing...")
            if refresh_access_token():
                print("Retrying API call with new token...")
                return make_api_call(url, params=params, retries=0)
            else:
                response.raise_for_status()
        
        response.raise_for_status()
        return response.json()
    
    except requests.exceptions.HTTPError as e:
        print(f"API error calling {url}: {e}")
        raise e

In [0]:
def chunk_list(data, size):
    """Splits a list into smaller chunks."""
    for i in range(0, len(data), size):
        yield data[i:i + size]

In [0]:
# ============================================================
# SPOTIFY BRONZE INGESTION - FETCH FUNCTIONS (OPTIMIZED)
# ============================================================

def get_play_history(unique_track_ids, unique_artist_ids):
    """Source A: Get recent listening history."""
    print("Fetching play history...")
    play_history_data = []
    url = f"{BASE_URL}/me/player/recently-played?limit=50&after={after_ts}"
    
    try:
        results = make_api_call(url)
        for item in results.get('items', []):
            if not item:
                continue

            track = item.get('track', {})
            artist = (track.get('artists') or [{}])[0]

            play_history_data.append({
                'played_at': item.get('played_at'),
                'track_id': track.get('id'),
                'track_name': track.get('name'),
                'artist_id': artist.get('id'),
                'artist_name': artist.get('name'),
                'album_id': track.get('album', {}).get('id'),
                'album_name': track.get('album', {}).get('name'),
                'context_type': (item.get('context') or {}).get('type'),
                'duration_ms': track.get('duration_ms')
            })

            if track.get('id'):
                unique_track_ids.add(track.get('id'))
            if artist.get('id'):
                unique_artist_ids.add(artist.get('id'))

    except Exception as e:
        print(f"⚠️ Error fetching play history: {e}")
    
    print(f"✅ Retrieved {len(play_history_data)} play history records.")
    return play_history_data


def get_playlists_and_tracks(unique_track_ids, unique_artist_ids):
    """Source B: Get playlists and their tracks.
    Ensures schema stability by emitting `followers` even if Spotify API doesn't return it on /me/playlists.
    """
    print("Fetching playlists and their tracks...")
    playlists_data = []
    playlist_tracks_data = []
    
    next_url = f"{BASE_URL}/me/playlists?limit=50"
    
    try:
        while next_url:
            playlists = make_api_call(next_url)
            for playlist in playlists.get('items', []):
                playlist_id = playlist.get('id')
                if not playlist_id:
                    continue

                # Followers are NOT present on /me/playlists; keep column with None for schema stability
                followers_total = None
                followers_obj = playlist.get('followers')
                if isinstance(followers_obj, dict):
                    followers_total = followers_obj.get('total')

                playlists_data.append({
                    'playlist_id': playlist_id,
                    'playlist_name': playlist.get('name'),
                    'owner_name': (playlist.get('owner') or {}).get('display_name'),
                    'followers': followers_total,  # stays in schema even if None
                    'total_tracks': (playlist.get('tracks') or {}).get('total'),
                    'description': playlist.get('description'),
                    'snapshot_id': playlist.get('snapshot_id')
                })

                # Fetch all tracks from the playlist
                tracks_url = f"{BASE_URL}/playlists/{playlist_id}/tracks"
                while tracks_url:
                    tracks = make_api_call(tracks_url)
                    for item in tracks.get('items', []):
                        track = item.get('track')
                        if not track or not track.get('id'):
                            continue

                        artist_ids = [a.get('id') for a in (track.get('artists') or []) if a.get('id')]

                        playlist_tracks_data.append({
                            'playlist_id': playlist_id,
                            'track_id': track.get('id'),
                            'track_name': track.get('name'),
                            'artist_ids': artist_ids,
                            'album_id': (track.get('album') or {}).get('id'),
                            'added_at': item.get('added_at'),
                            'added_by': (item.get('added_by') or {}).get('id'),
                            'duration_ms': track.get('duration_ms'),
                            'popularity': track.get('popularity')
                        })

                        unique_track_ids.add(track.get('id'))
                        for art_id in artist_ids:
                            unique_artist_ids.add(art_id)

                    tracks_url = tracks.get('next')

            next_url = playlists.get('next')

    except Exception as e:
        print(f"⚠️ Error fetching playlists: {e}")

    print(f"✅ Retrieved {len(playlists_data)} playlists and {len(playlist_tracks_data)} playlist-track mappings.")
    return playlists_data, playlist_tracks_data



def get_full_metadata(unique_track_ids, unique_artist_ids):
    """Step 3: Data enrichment (tracks, artists). Emits `preview_url` consistently for schema stability."""
    print(f"Fetching metadata for {len(unique_track_ids)} tracks and {len(unique_artist_ids)} artists...")

    tracks_data, artists_data = [], []

    track_ids_list = list(filter(None, unique_track_ids))
    artist_ids_list = list(filter(None, unique_artist_ids))

    # 1️⃣ Fetch track metadata
    try:
        url = f"{BASE_URL}/tracks"
        for batch in chunk_list(track_ids_list, 50):
            params = {'ids': ','.join(batch)}
            tracks_results = make_api_call(url, params=params)
            for track in tracks_results.get('tracks', []):
                if not track:
                    continue
                first_artist = (track.get('artists') or [{}])[0] or {}
                album = track.get('album') or {}
                tracks_data.append({
                    'track_id': track.get('id'),
                    'track_name': track.get('name'),
                    'album_id': album.get('id'),
                    'album_name': album.get('name'),
                    'artist_id': first_artist.get('id'),
                    'artist_name': first_artist.get('name'),
                    'duration_ms': track.get('duration_ms'),
                    'popularity': track.get('popularity'),
                    'explicit': track.get('explicit'),
                    'release_date': album.get('release_date'),
                    'preview_url': track.get('preview_url')  # stays present even if None
                })
    except Exception as e:
        print(f"⚠️ Error fetching tracks metadata: {e}")

    # 2️⃣ Fetch artist metadata
    try:
        url = f"{BASE_URL}/artists"
        for batch in chunk_list(artist_ids_list, 50):
            params = {'ids': ','.join(batch)}
            artists_results = make_api_call(url, params=params)
            for artist in artists_results.get('artists', []):
                if not artist:
                    continue
                followers_total = None
                followers_obj = artist.get('followers')
                if isinstance(followers_obj, dict):
                    followers_total = followers_obj.get('total')
                artists_data.append({
                    'artist_id': artist.get('id'),
                    'artist_name': artist.get('name'),
                    'genres': artist.get('genres'),
                    'followers': followers_total,
                    'popularity': artist.get('popularity'),
                    'uri': artist.get('uri')
                })
    except Exception as e:
        print(f"⚠️ Error fetching artists metadata: {e}")

    print(f"✅ Retrieved: {len(tracks_data)} tracks, {len(artists_data)} artists.")
    return tracks_data, artists_data


In [0]:
def save_as_delta_table(all_data):
    """
    Saves each list of dictionaries as a Delta table
    in the 'bronze' schema of Unity Catalog.
    """
    print(f"\n--- STARTING DATA LOAD (Saving to Delta Lake in {CATALOG_NAME}.{BRONZE_SCHEMA}) ---")
    
    for table_name, data_list in all_data.items():
        if not data_list:
            print(f"Skipping '{table_name}', no data found.")
            continue
        
        try:
            pd_df = pd.DataFrame(data_list)
            spark_df = spark.createDataFrame(pd_df)
            full_table_name = f"{CATALOG_NAME}.{BRONZE_SCHEMA}.{table_name}"
            
            print(f"Writing {spark_df.count()} rows to {full_table_name}...")
            spark_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(full_table_name)
            
            print(f"Successfully saved: {full_table_name}")
            
        except Exception as e:
            print(f"Error saving {table_name}: {e}")

In [0]:
print("--- STARTING SPOTIFY BRONZE PIPELINE ---")

unique_track_ids = set()
unique_artist_ids = set()

# --- 1. EXTRACT ---
print("\n--- STEP 1: EXTRACTION (API -> Python) ---")
play_history_table = get_play_history(unique_track_ids, unique_artist_ids)
playlists_table, playlist_tracks_table = get_playlists_and_tracks(unique_track_ids, unique_artist_ids)

# --- 2. ENRICH (API -> Python) ---
print("\n--- STEP 2: ENRICHMENT (API -> Python) ---")
tracks_table, artists_table = get_full_metadata(unique_track_ids, unique_artist_ids)

print("\n--- EXTRACTION SUMMARY ---")
print(f"bronze_play_history data:   {len(play_history_table)} rows")
print("-----------------------------")
print(f"bronze_playlists data:      {len(playlists_table)} rows")
print(f"bronze_playlist_tracks data:{len(playlist_tracks_table)} rows")
print("-----------------------------")
print(f"bronze_tracks data:         {len(tracks_table)} rows")
print(f"bronze_artists data:        {len(artists_table)} rows")


# --- 3. LOAD (Python -> Delta) ---
print("\n--- STEP 3: LOAD (Python -> Delta) ---")
all_bronze_data = {
    "bronze_play_history": play_history_table,
    "bronze_playlists": playlists_table,
    "bronze_playlist_tracks": playlist_tracks_table,
    "bronze_tracks": tracks_table,
    "bronze_artists": artists_table,
}

save_as_delta_table(all_bronze_data)

print("\n--- BRONZE PIPELINE FINISHED ---")

In [0]:
print(unique_track_ids)


In [0]:
print(unique_artist_ids)

In [0]:

# ============================================================
# SCHEMA STABILIZATION + DELTA WRITE HELPERS (for Power BI)
# Ensures columns like `followers` and `preview_url` always exist.
# ============================================================

from pyspark.sql.types import (StructType, StructField, StringType, LongType, ArrayType, BooleanType)

bronze_playlists_schema = StructType([
    StructField("playlist_id", StringType()),
    StructField("playlist_name", StringType()),
    StructField("owner_name", StringType()),
    StructField("followers", LongType()),         # keep for compatibility (may be NULL)
    StructField("total_tracks", LongType()),
    StructField("description", StringType()),
    StructField("snapshot_id", StringType())
])

bronze_tracks_schema = StructType([
    StructField("track_id", StringType()),
    StructField("track_name", StringType()),
    StructField("album_id", StringType()),
    StructField("album_name", StringType()),
    StructField("artist_id", StringType()),
    StructField("artist_name", StringType()),
    StructField("duration_ms", LongType()),
    StructField("popularity", LongType()),
    StructField("explicit", BooleanType()),
    StructField("release_date", StringType()),
    StructField("preview_url", StringType())      # keep for compatibility (may be NULL)
])

def write_delta_with_schema(df_list_or_dict, table_full_name, schema):
    """
    Create a DataFrame with an explicit schema and write it to Delta with overwriteSchema=true.
    df_list_or_dict: list[dict] or pyspark.sql.DataFrame
    """
    if isinstance(df_list_or_dict, list):
        df = spark.createDataFrame(df_list_or_dict, schema)
    else:
        df = df_list_or_dict  # assume already a DataFrame
    (df.write
       .format("delta")
       .mode("overwrite")
       .option("overwriteSchema", "true")
       .saveAsTable(table_full_name))
    print(f"✅ Wrote {table_full_name} rows={df.count()} (schema enforced).")
