In [2]:
#from extract import extract, load_bronze, transform_silver, aggregate_gold, main

In [10]:
import os
import shutil
import polars as pl
import kagglehub

In [11]:
# ------------------------------
# 1️⃣ Extract: descargar CSV a bronze
# ------------------------------
def extract(target_path):
    """
    Descarga el dataset de Kaggle 'Ultimate Spotify Tracks DB' y copia
    todos los archivos CSV a la carpeta 'bronze' dentro de target_path.
    
    Args:
        target_path (str): Ruta donde se encuentra la carpeta 'data'.
    
    Returns:
        List[str]: Rutas completas de los CSV copiados en bronze.
    """
    bronze_path = os.path.join(target_path, "bronze")
    os.makedirs(bronze_path, exist_ok=True)
    print(f"[extract] Carpeta bronze lista en: {bronze_path}")

    cache_path = kagglehub.dataset_download("zaheenhamidani/ultimate-spotify-tracks-db")
    print(f"[extract] Dataset descargado en cache: {cache_path}")

    csv_files = []
    for file_name in os.listdir(cache_path):
        if file_name.endswith(".csv"):
            src = os.path.join(cache_path, file_name)
            shutil.copy(src, bronze_path)
            csv_files.append(os.path.join(bronze_path, file_name))
            print(f"[extract] Copiado a bronze: {file_name}")

    print(f"[extract] CSV en bronze: {[os.path.basename(f) for f in csv_files]}")
    return csv_files

In [12]:
# ------------------------------
# 2️⃣ Load Bronze: leer CSV con Polars + PyArrow
# ------------------------------
def load_bronze(csv_path):
    """
    Lee un CSV desde bronze usando Polars con backend PyArrow.
    """
    df_bronze = pl.read_csv(csv_path, use_pyarrow=True)
    print(f"[load_bronze] {len(df_bronze)} filas cargadas de {os.path.basename(csv_path)}")
    return df_bronze

In [13]:
# ------------------------------
# 3️⃣ Transform Silver: limpieza y guardado incremental
# ------------------------------
def transform_silver(df_bronze, silver_path, csv_name):
    """
    Limpia nulos y guarda el DataFrame en Silver como Parquet.
    Si ya existe, lo carga directamente (incremental).
    """
    os.makedirs(silver_path, exist_ok=True)
    silver_file = os.path.join(silver_path, f"{csv_name}_silver.parquet")

    if os.path.exists(silver_file):
        print(f"[transform_silver] Silver ya existe, cargando: {silver_file}")
        df_silver = pl.read_parquet(silver_file, engine="pyarrow")
    else:
        # Rellenar nulos de forma vectorizada
        fill_dict = {}
        for col, dtype in zip(df_bronze.columns, df_bronze.dtypes):
            if dtype == pl.Utf8:
                fill_dict[col] = "N/A"
            elif dtype in [pl.Int64, pl.Float64]:
                fill_dict[col] = 0
        df_silver = df_bronze.fill_null(fill_dict)

        df_silver.write_parquet(silver_file, engine="pyarrow")
        print(f"[transform_silver] Silver guardado en: {silver_file}")

    return df_silver


In [14]:
# ------------------------------
# 4️⃣ Aggregate Gold: agregación incremental
# ------------------------------
def aggregate_gold(df_silver, gold_path, csv_name):
    """
    Agrega datos para la capa Gold y guarda como Parquet.
    Solo procesa si no existe ya el Gold.
    """
    os.makedirs(gold_path, exist_ok=True)
    gold_file = os.path.join(gold_path, f"{csv_name}_gold.parquet")

    if os.path.exists(gold_file):
        print(f"[aggregate_gold] Gold ya existe, cargando: {gold_file}")
        df_gold = pl.read_parquet(gold_file, engine="pyarrow")
    else:
        if "artist_name" in df_silver.columns and "track_name" in df_silver.columns:
            df_gold = df_silver.groupby("artist_name").agg(
                pl.count("track_name").alias("track_count")
            ).sort("track_count", reverse=True)
        else:
            df_gold = df_silver
        df_gold.write_parquet(gold_file, engine="pyarrow")
        print(f"[aggregate_gold] Gold guardado en: {gold_file}")

    return df_gold

In [15]:
# ------------------------------
# 5️⃣ Main pipeline: orquestador incremental
# ------------------------------
def main():
    base_path = os.path.join(os.getcwd(), "data")
    silver_path = os.path.join(base_path, "silver")
    gold_path = os.path.join(base_path, "gold")

    # 1️⃣ Extract
    csv_files = extract(base_path)

    gold_results = {}

    for csv_path in csv_files:
        csv_name = os.path.splitext(os.path.basename(csv_path))[0]

        # 2️⃣ Load Bronze
        df_bronze = load_bronze(csv_path)

        # 3️⃣ Transform Silver (incremental)
        df_silver = transform_silver(df_bronze, silver_path, csv_name)

        # 4️⃣ Aggregate Gold (incremental)
        df_gold = aggregate_gold(df_silver, gold_path, csv_name)

        gold_results[csv_name] = df_gold

    print("\n[main] Pipeline completado. Resultados Gold listos en memoria.")
    return gold_results

In [16]:
# ------------------------------
# Ejecutar
# ------------------------------
if __name__ == "__main__":
    gold_dict = main()

[extract] Carpeta bronze lista en: C:\Users\germa\Desktop\Carpetas\Data_Engineer_Specialist\Spotify_Medallon\notebooks\data\bronze
[extract] Dataset descargado en cache: C:\Users\germa\.cache\kagglehub\datasets\zaheenhamidani\ultimate-spotify-tracks-db\versions\3
[extract] Copiado a bronze: SpotifyFeatures.csv
[extract] CSV en bronze: ['SpotifyFeatures.csv']
[load_bronze] 232725 filas cargadas de SpotifyFeatures.csv


InvalidOperationError: must specify one field in the struct