# Bronze Layer

- Bronze Layer - Automated Ingestion Script
- Reads all streaming history files from **Unity Catalog Volumes** and creates a **Delta Table** in **Bronze Layer**

In [0]:
dbutils.library.restartPython()

## Initialization

In [0]:
from bronze_config import INGESTION_CONFIG, metadata_configs
import json
import pyspark.sql.functions as F
from pyspark.sql.types import *
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from dotenv import load_dotenv
from datetime import datetime

## Spotify API Authentication

In [0]:
load_dotenv()
cache_path = "/Workspace/Users/pg52694@alunos.uminho.pt/spotify-data-streaming-project/.spotify_token_cache"
scope = "user-read-recently-played user-read-playback-state user-read-currently-playing"

auth_manager = SpotifyOAuth(scope=scope, open_browser=False, cache_path=cache_path)
sp = spotipy.Spotify(auth_manager=auth_manager)

# --- Verifica√ß√£o de Token ---
token_info = auth_manager.get_cached_token()

if not token_info:
    auth_url = auth_manager.get_authorize_url()
    print(f"\n1. Open this link: {auth_url}")
    
    response_url = input("2. Paste the full URL here after the redirect: ")
    
    # Extrai o c√≥digo da URL de forma segura
    code = auth_manager.parse_response_code(response_url)
    
    try:
        # Substitu√≠mos o get_access_token pelo fluxo recomendado
        token_info = auth_manager.get_access_token(code, as_dict=False)
        print("‚úÖ Authentication successful and token cached!")
    except Exception as e:
        print(f"‚ùå Error obtaining token: {e}")

## Read from json files to write Bronze Table

In [0]:
recently_played_schema = StructType([
    # Track Info
    StructField("track_id", StringType(), True),
    StructField("track_name", StringType(), True),
    StructField("track_uri", StringType(), True),
    StructField("track_duration_ms", LongType(), True),
    StructField("track_popularity", IntegerType(), True),
    StructField("track_is_explicit", BooleanType(), True),
    StructField("track_number", IntegerType(), True),
    StructField("track_type", StringType(), True),
    StructField("track_disc_number", IntegerType(), True),
    StructField("track_is_local", BooleanType(), True),
    StructField("track_href", StringType(), True),
    StructField("track_external_urls_spotify", StringType(), True),
    StructField("track_external_ids_isrc", StringType(), True),
    StructField("track_available_markets", ArrayType(StringType()), True),
    
    # Album Info
    StructField("album_id", StringType(), True),
    StructField("album_name", StringType(), True),
    StructField("album_type", StringType(), True),
    StructField("album_uri", StringType(), True),
    StructField("album_release_date", StringType(), True),
    StructField("album_release_date_precision", StringType(), True),
    StructField("album_total_tracks", IntegerType(), True),
    StructField("album_available_markets", ArrayType(StringType()), True),
    StructField("album_external_urls_spotify", StringType(), True),
    StructField("album_href", StringType(), True),
    StructField("album_images", ArrayType(StringType()), True),
    StructField("album_restrictions", StringType(), True),
    
    # Album Artists
    StructField("album_artists_external_urls_spotify", ArrayType(StringType()), True),
    StructField("album_artists_href", ArrayType(StringType()), True),
    StructField("album_artists_type", ArrayType(StringType()), True),
    StructField("album_artists_uri", ArrayType(StringType()), True),
    StructField("album_artists_names", ArrayType(StringType()), True),
    StructField("album_artists_ids", ArrayType(StringType()), True),
    
    # Artists Info
    StructField("track_artists_external_urls_spotify", ArrayType(StringType()), True),
    StructField("track_artists_href", ArrayType(StringType()), True),
    StructField("track_artists_type", ArrayType(StringType()), True),
    StructField("track_artists_uri", ArrayType(StringType()), True),
    StructField("track_artists_names", ArrayType(StringType()), True),
    StructField("track_artists_ids", ArrayType(StringType()), True),
    
    # Metadata & Context
    StructField("played_at", StringType(), True),
    StructField("context_type", StringType(), True),
    StructField("context_href", StringType(), True),
    StructField("context_uri", StringType(), True),
    StructField("context_external_urls_spotify", StringType(), True),
    StructField("processed_at", TimestampType(), True)
])

In [0]:
def flatten_api(item_raw):
    # Navega√ß√£o segura nos n√≠veis do JSON
    track = item_raw.get('track') or {}
    album = track.get('album') or {}
    context = item_raw.get('context') or {}
    
    # Listas de Artistas
    track_artists = track.get('artists') or []
    album_artists = album.get('artists') or []
    
    return {
        # --- Track Info ---
        "track_id": track.get('id'),
        "track_name": track.get('name'),
        "track_uri": track.get('uri'),
        "track_is_local": track.get('is_local'),
        "track_duration_ms": track.get('duration_ms'),
        "track_popularity": track.get('popularity'),
        "track_is_explicit": track.get('explicit'),
        "track_number": track.get('track_number'),
        "track_type": track.get('type'),
        "track_disc_number": track.get('disc_number'),
        "track_href": track.get('href'),
        "track_external_urls_spotify": track.get('external_urls', {}).get('spotify'),
        "track_external_ids_isrc": track.get('external_ids', {}).get('isrc'),
        "track_available_markets": track.get('available_markets', []),

        # --- Album Info ---
        "album_id": album.get('id'),
        "album_name": album.get('name'),
        "album_type": album.get('album_type'),
        "album_uri": album.get('uri'),
        "album_release_date": album.get('release_date'),
        "album_release_date_precision": album.get('release_date_precision'),
        "album_total_tracks": album.get('total_tracks'),
        "album_available_markets": album.get('available_markets', {}),
        "album_external_urls_spotify": album.get('external_urls', {}).get('spotify'),
        "album_href": album.get('href'),
        "album_images": [img.get('url') for img in album.get('images', [])],
        "album_restrictions": str(album.get('restrictions')) if album.get('restrictions') else None,

        # --- Album Artists (Extra√≠do do n√≥ Album) ---
        "album_artists_external_urls_spotify": [a.get('external_urls', {}).get('spotify') for a in album_artists],
        "album_artists_href": [a.get('href') for a in album_artists],
        "album_artists_type": [a.get('type') for a in album_artists],
        "album_artists_uri": [a.get('uri') for a in album_artists],
        "album_artists_names": [a.get('name') for a in album_artists],
        "album_artists_ids": [a.get('id') for a in album_artists],

        # --- Track Artists (Extra√≠do do n√≥ Track) ---
        "track_artists_external_urls_spotify": [a.get('external_urls', {}).get('spotify') for a in track_artists],
        "track_artists_href": [a.get('href') for a in track_artists],
        "track_artists_type": [a.get('type') for a in track_artists],
        "track_artists_uri": [a.get('uri') for a in track_artists],
        "track_artists_names": [a.get('name') for a in track_artists],
        "track_artists_ids": [a.get('id') for a in track_artists],

        # --- Metadata & Context ---
        "played_at": item_raw.get('played_at'),
        "context_type": context.get('type'),
        "context_href": context.get('href'),
        "context_uri": context.get('uri'),
        "context_external_urls_spotify": context.get('external_urls', {}).get('spotify'),
        "processed_at": datetime.now()
    }

In [0]:
for item in INGESTION_CONFIG:
    print(f"Processing {item['table']}...")
    
    if item["format"] == "json":
        df = spark.read.option("multiLine", "True").json(item["path"])
        
    elif item["format"] == "api_call":
        # Puxa os dados (a tua fun√ß√£o get_spotify_api_data j√° deve retornar results['items'])
        raw_items = sp.current_user_recently_played(limit=50)["items"]
        
        if raw_items:
            # Aplica o flatten simples
            flat_data = [flatten_api(i) for i in raw_items]
            
            # Cria o DataFrame com o schema nativo
            df = spark.createDataFrame(flat_data, schema=recently_played_schema)
        else:
            continue

    # Adiciona timestamp e salva
    df_final = df.withColumn("processed_at", F.current_timestamp())
    
    df_final.write.format("delta") \
            .mode(item["mode"]) \
            .option("mergeSchema", "true") \
            .saveAsTable(item["table"])

print("Bronze Tables saved with success!")

## Get Metadata

In [0]:
df_streaming_history = spark.read.table("workspace.bronze.spotify_streaming_history_raw")
df_recently_played = spark.read.table("workspace.bronze.spotify_recently_played_raw")

df_streaming_history_track_ids = df_streaming_history \
    .filter(F.col("spotify_track_uri").contains("track")) \
    .select(F.split(F.col("spotify_track_uri"), ":")[2].alias("id")) \
    .distinct()

df_recently_played_track_ids = df_recently_played \
    .filter(F.col("track_uri").contains("track")) \
    .select(F.split(F.col("track_uri"), ":")[2].alias("id")) \
    .distinct()

df_track_ids = df_streaming_history_track_ids.union(df_recently_played_track_ids).distinct()

track_ids_list = [row.id for row in df_track_ids.collect()]

print(f"Tracks: {len(track_ids_list)}")

In [0]:
def save_metadata_to_bronze(data_list, table_name, schema):
    if not data_list:
        return
    
    # 1. Criar DataFrame a partir de Strings JSON (evita RDDs e AssertionError)
    json_data = [(json.dumps(item),) for item in data_list if item]
    df_raw = spark.createDataFrame(json_data, ["json_string"])
    
    # 2. Parse com Schema fixo
    df_parsed = df_raw.select(
        F.from_json(F.col("json_string"), schema).alias("data")
    ).select("data.*")
    
    # 3. Adicionar metadados e gravar
    df_final = df_parsed.withColumn("processed_at", F.current_timestamp())
    
    df_final.write.format("delta") \
          .mode("append") \
          .option("mergeSchema", "true") \
          .saveAsTable(table_name)
          
    print(f"‚úÖ {len(data_list)} new records added to {table_name}")




# def save_metadata_to_bronze(data_list, table_name, schema):
#     if not data_list:
#         return
    
#     # 1. Criar DataFrame a partir de Strings JSON
#     json_data = [(json.dumps(item),) for item in data_list if item]
#     df_raw = spark.createDataFrame(json_data, ["json_string"])
    
#     # 2. Parse com o schema que reflete a API (Hier√°rquico)
#     df_parsed = df_raw.select(
#         F.from_json(F.col("json_string"), schema).alias("data")
#     ).select("data.*")
    
#     # 3. Tratamento din√¢mico baseado na tabela
#     # Se a coluna existir como Struct, extra√≠mos o valor interno para n√£o perder dados
    
#     # Extrair Spotify URL para qualquer entidade
#     if "external_urls" in df_parsed.columns:
#         df_parsed = df_parsed.withColumn("external_urls_spotify", F.col("external_urls.spotify"))
    
#     # Extrair ISRC para √Ålbuns ou Tracks
#     if "external_ids" in df_parsed.columns:
#         df_parsed = df_parsed.withColumn("external_ids_isrc", F.col("external_ids.isrc"))
        
#     # Extrair Seguidores para Artistas
#     if "followers" in df_parsed.columns:
#         # Verifica se followers √© um struct antes de extrair
#         df_parsed = df_parsed.withColumn("total_followers", F.col("followers.total"))

#     # Extrair IDs de listas aninhadas (Casos espec√≠ficos de √Ålbuns)
#     if "artists" in df_parsed.columns:
#         # Se artistas for uma lista de objetos, extrai apenas os IDs
#         df_parsed = df_parsed.withColumn("artists_ids", F.col("artists.id"))
        
#     if "tracks" in df_parsed.columns:
#         # No caso de √°lbuns, as tracks v√™m em tracks.items
#         df_parsed = df_parsed.withColumn("tracks_ids", F.col("tracks.items.id"))

#     # 4. Limpeza final e Timestamp
#     df_final = df_parsed.withColumn("processed_at", F.current_timestamp())
    
#     # Opcional: Remover as colunas originais de Struct para manter a tabela limpa
#     cols_to_drop = ["external_urls", "external_ids", "followers", "tracks", "artists"]
#     df_final = df_final.drop(*[c for c in cols_to_drop if c in df_final.columns])
    
#     # 5. Grava√ß√£o em Delta
#     df_final.write.format("delta") \
#           .mode("append") \
#           .option("mergeSchema", "true") \
#           .saveAsTable(table_name)
          
#     print(f"‚úÖ {len(data_list)} new records added to {table_name}")

In [0]:
track_schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("uri", StringType(), True),
    StructField("duration_ms", LongType(), True),
    StructField("available_markets", StringType(), True),
    StructField("external_urls_spotify", StringType(), True),
    StructField("explicit", BooleanType(), True),
    StructField("href", StringType(), True),
    StructField("popularity", IntegerType(), True),
    StructField("track_number", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("external_ids_isrc", StringType(), True),
    StructField("disc_number", IntegerType(), True),
    StructField("is_local", BooleanType(), True),
    StructField("album_id", StringType(), True),
    StructField("artist_ids", ArrayType(StringType()), True),
    StructField("processed_at", TimestampType(), True)
])

In [0]:
artist_schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("uri", StringType(), True),
    StructField("href", StringType(), True),
    StructField("type", StringType(), True),
    StructField("genres", ArrayType(StringType()), True),
    StructField("external_urls_spotify", StringType(), True),
    StructField("popularity", IntegerType(), True),
    StructField("followers", StructType([
        StructField("total", LongType(), True)
    ]), True),
    StructField("images", ArrayType(StructType([
        StructField("url", StringType(), True),
        StructField("height", IntegerType(), True),
        StructField("width", IntegerType(), True)
    ])), True),
    StructField("processed_at", TimestampType(), True)
])

In [0]:
album_schema = StructType([
    StructField("id", StringType(), True),
    StructField("uri", StringType(), True),
    StructField("name", StringType(), True),
    StructField("album_type", StringType(), True),
    StructField("type", StringType(), True),
    StructField("href", StringType(), True),
    StructField("release_date", StringType(), True),
    StructField("release_date_precision", StringType(), True),
    StructField("total_tracks", IntegerType(), True),
    StructField("label", StringType(), True),
    StructField("popularity", IntegerType(), True),
    # Campos que v√™m dentro de objetos no JSON:
    StructField("external_urls", StructType([
        StructField("spotify", StringType(), True)
    ]), True),
    StructField("external_ids", StructType([
        StructField("isrc", StringType(), True)
    ]), True),
    StructField("available_markets", ArrayType(StringType()), True),
    StructField("artists", ArrayType(StructType([
        StructField("id", StringType(), True)
    ])), True),
    StructField("tracks", StructType([
        StructField("items", ArrayType(StructType([
            StructField("id", StringType(), True)
        ])), True)
    ]), True),
    StructField("copyrights", ArrayType(StructType([
        StructField("text", StringType(), True)
    ])), True),
    StructField("images", ArrayType(StructType([
        StructField("url", StringType(), True)
    ])), True)
])

In [0]:
# 1. Verificar quais tracks j√° existem para n√£o repetir chamadas √† API
# Tenta carregar os IDs existentes
try:
    print(f"Verifying the existence of {metadata_configs["track"]["table"]}...")
    df_existing = spark.read.table(metadata_configs["track"]["table"]).select("id").distinct()
    existing_ids = [row.id for row in df_existing.collect()]
    new_track_ids = [tid for tid in track_ids_list if tid not in existing_ids]
    print(f"‚úÖ Table  found. Filtered {len(existing_ids)} IDs that already exist.")
except Exception as e:
    print(f"‚ö†Ô∏è Table not found or error reading (treating as new). Error: {str(e)[:100]}")
    new_track_ids = track_ids_list

In [0]:
# 2. PROCESSAR APENAS OS NOVOS IDS
all_entities_data = {"track": []}
artist_ids_set = set()
album_ids_set = set()

tracks_config = metadata_configs["track"]
target_table = tracks_config["table"]

if new_track_ids:
    chunk_size = tracks_config["chunk"]
    print(f"Starting API calls for {len (new_track_ids)} new tracks...")
    
    for i in range(0, len(new_track_ids), chunk_size):
        chunk = new_track_ids[i : i + chunk_size]
        try:
            res = sp.tracks(chunk)
            tracks_chunk = res[tracks_config["key"]]
            
            for track in tracks_chunk:
                if track:
                    track_to_save = {
                        "id": track.get("id"),
                        "name": track.get("name"),
                        "uri": track.get("uri"),
                        "href": track.get("href"),
                        "type": track.get("type"),
                        "duration_ms": track.get("duration_ms"),
                        "explicit": track.get("explicit"),
                        "popularity": track.get("popularity"),
                        "track_number": track.get("track_number"),
                        "disc_number": track.get("disc_number"),
                        "is_local": track.get("is_local"),
                        "external_ids_isrc": track.get("external_ids", {}).get("isrc"),
                        "external_urls_spotify": track.get("external_urls", {}).get("spotify"),
                        "album_id": track.get("album", {}).get("id"),
                        "artist_ids": [artist.get("id") for artist in track.get("artists", [])],
                        "available_markets": ", ".join(track.get("available_markets", [])) 
                    }

                    all_entities_data["track"].append(track_to_save)

                    # Mantemos os sets originais para o processo de Artistas/Albums seguinte
                    album_ids_set.add(track["album"]["id"])
                    for artist in track["artists"]:
                        artist_ids_set.add(artist["id"])
        except Exception as e:
            print(f"Error on batch {i} of tracks: {e}")

    # 3. Gravar apenas se houver dados novos
    if all_entities_data["track"]:
        save_metadata_to_bronze(all_entities_data["track"], target_table, track_schema)
else:
    print("‚ú® All trails are already documented in the Bronze table. Nothing to do.")

In [0]:
# --- FASE 2: PROCESSAR ARTISTAS E √ÅLBUNS (BATCHES DIN√ÇMICOS) ---
# Mapeamos as listas de IDs que descobrimos na Fase 1
id_map = {
    "artist": list(artist_ids_set),
    "album": list(album_ids_set)
}

# for entity in ["artist", "album"]:
#     conf = metadata_configs[entity]
#     ids_to_process = id_map[entity]
#     chunk_size = conf["chunk"]
    
#     print(f"Processing {len(ids_to_process)} {entity}s...")
    
#     entity_results = []
#     for i in range(0, len(ids_to_process), chunk_size):
#         chunk = ids_to_process[i : i + chunk_size]
#         try:
#             api_function = sp.artists if entity == "artist" else sp.albums
#             res = api_function(chunk)
#             entity_results.extend([item for item in res[conf["key"]] if item])
#         except Exception as e:
#             print(f"Error on batch {i} from {entity}: {e}")
    
#     schema = artist_schema if entity == "artist" else album_schema

#     save_metadata_to_bronze(entity_results, conf["table"], schema)

# print("\nüöÄ Metadata pipeline finished with success!")



for entity in ["artist", "album"]:
    conf = metadata_configs[entity]
    ids_to_process = id_map[entity]
    chunk_size = conf["chunk"]
    
    print(f"Processing {len(ids_to_process)} {entity}s...")
    
    entity_results = []
    for i in range(0, len(ids_to_process), chunk_size):
        chunk = ids_to_process[i : i + chunk_size]
        try:
            api_function = sp.artists if entity == "artist" else sp.albums
            res = api_function(chunk)
            
            # --- TRATAMENTO DOS DADOS AQUI ---
            for item in res[conf["key"]]:
                if not item: continue
                
                # Criamos um dicion√°rio limpo que bate certo com o teu Schema
                flat_item = item.copy()
                
                # 1. Extrair URLs do Spotify (comum a ambos)
                flat_item["external_urls_spotify"] = item.get("external_urls", {}).get("spotify")
                
                if entity == "artist":
                    # Followers: Extrair apenas o total
                    flat_item["followers"] = item.get("followers", {}).get("total")
                    # Genres e Popularidade j√° v√™m no formato certo
                    
                elif entity == "album":
                    # Extrair IDs dos Artistas e das Tracks (Flattening das listas)
                    flat_item["artists_ids"] = [a.get("id") for a in item.get("artists", [])]
                    flat_item["tracks_ids"] = [t.get("id") for t in item.get("tracks", {}).get("items", [])]
                    # ISRC e Available Markets
                    flat_item["external_ids_isrc"] = item.get("external_ids", {}).get("isrc")
                    flat_item["available_markets"] = str(item.get("available_markets", []))
                
                # Imagens: Pegar apenas na lista de URLs se quiseres simplificar
                # flat_item["images"] = [img.get("url") for img in item.get("images", [])]

                entity_results.append(flat_item)

        except Exception as e:
            print(f"Error on batch {i} from {entity}: {e}")
    
    # Define o schema correto
    current_schema = artist_schema if entity == "artist" else album_schema
    
    # Envia os dados j√° "mastigados" para a fun√ß√£o
    save_metadata_to_bronze(entity_results, conf["table"], current_schema)


print("\nüöÄ Metadata pipeline finished with success!")

## Check Bronze Tables

In [0]:
%sql
SELECT *
FROM workspace.bronze.spotify_streaming_history_raw

In [0]:
%sql
SELECT *
FROM workspace.bronze.spotify_recently_played_raw

In [0]:
%sql
SELECT *
FROM workspace.bronze.spotify_tracks_raw

In [0]:
%sql
SELECT *
FROM workspace.bronze.spotify_artists_raw

In [0]:
%sql
SELECT *
FROM workspace.bronze.spotify_albums_raw