In [1]:
from datetime import date

hoy = date.today()
print(hoy)  # Ejemplo: 2025-05-07


2025-05-19


In [2]:
import pandas as pd

In [3]:
df_limpio = pd.read_pickle("../data/data_limpio/itunes.pkl")
print(len(df_limpio))

117668


In [4]:
def procesar_dataframe_maestro(ruta_pickle):
    """
    Carga un DataFrame maestro desde un archivo pickle, lo limpia y separa en tablas normalizadas:
    Artist, Album, Track, Genre, Track_prices, Album_prices.
    Garantiza que no haya duplicados por ID en tablas principales y limpia registros duplicados
    por ID + fecha en las tablas históricas.

    Parámetros:
    -----------
    ruta_pickle : str
        Ruta del archivo pickle con el DataFrame completo.

    Retorna:
    --------
    dict
        Diccionario con las tablas limpias separadas por nombre.
    """
    # Renombrar columnas para que coincidan con el esquema SQL
    df_limpio = pd.read_pickle(ruta_pickle)
    df_limpio = df_limpio.rename(columns={
        "artistId": "artist_id",
        "artistName": "artistname",
        "artistViewUrl": "artistviewurl",
        "collectionId": "collection_id",
        "collectionName": "collectionname",
        "collectionCensoredName": "collectioncensoredname",
        "releaseDate": "release_date",
        "collectionExplicitness": "collectionexplicitness",
        "contentAdvisoryRating": "contentadvisoryrating",
        "collectionPrice": "collectionprice",
        "currency": "currency",
        "trackCount": "trackcount",
        "discCount": "disccount",
        "collectionViewUrl": "collectionviewurl",
        "collectionArtistName": "collectionartistname",
        "collectionArtistViewUrl": "collectionartistviewurl",
        "trackId": "track_id",
        "trackName": "trackname",
        "trackNumber": "tracknumber",
        "trackPrice": "trackprice",
        "discNumber": "discnumber",
        "trackTimeMillis": "tracktimemillis",
        "trackExplicitness": "trackexplicitness",
        "trackViewUrl": "trackviewurl",
        "isStreamable": "is_streamable",
        "kind": "kind",
        "primaryGenreName": "primarygenrename"
    })

    # Tablas principales
    artist_df = (
        df_limpio.groupby("artist_id")[["artistname", "artistviewurl"]]
        .agg(lambda x: x.dropna().value_counts().idxmax() if not x.dropna().empty else pd.NA)
        .reset_index()
    )

    album_cols = [
        "collection_id", "collectionname", "collectioncensoredname", "release_date",
        "collectionexplicitness", "contentadvisoryrating", "collectionprice", "currency",
        "trackcount", "disccount", "collectionviewurl",
        "collectionartistname", "collectionartistviewurl", "artist_id"
    ]
    album_df = df_limpio[album_cols].drop_duplicates(subset=["collection_id"]).copy()

    track_cols = [
        "track_id", "trackname", "tracknumber", "trackprice", "discnumber", "tracktimemillis",
        "trackexplicitness", "release_date", "trackviewurl", "is_streamable", "kind",
        "artist_id", "collection_id", "primarygenrename"
    ]
    track_df = df_limpio[track_cols].drop_duplicates(subset=["track_id"]).copy()

    genre_df = (
    df_limpio[["primarygenrename"]]
    .drop_duplicates()
    .rename(columns={"primaryGenreName": "primarygenrename"})
    .reset_index(drop=True) 
    )
    
    track_df = track_df.merge(genre_df, on="primarygenrename", how="left")

    track_prices_df = (
        df_limpio[["track_id", "trackprice", "checked_at"]]
        .dropna(subset=["track_id", "trackprice", "checked_at"])
        .drop_duplicates(subset=["track_id", "checked_at"])
    )

    album_prices_df = (
        df_limpio[["collection_id", "collectionprice", "checked_at"]]
        .dropna(subset=["collection_id", "collectionprice", "checked_at"])
        .drop_duplicates(subset=["collection_id", "checked_at"])
    )

    return {
        "artist": artist_df,
        "album": album_df,
        "track": track_df,
        "genre": genre_df,
        "track_prices": track_prices_df,
        "album_prices": album_prices_df
    }
 
# Ejecutar función actualizada y guardar como pickle
tablas_normalizadas = procesar_dataframe_maestro("../data/data_limpio/itunes.pkl")

# Guardar los archivos actualizados
for nombre, tabla in tablas_normalizadas.items():
    tabla.to_pickle(f"../data/data_prueba/{nombre.lower()}.pkl")

# Confirmar archivos exportados
[f"../data/data_prueba/{nombre.lower()}.pkl" for nombre in tablas_normalizadas.keys()]

['../data/data_prueba/artist.pkl',
 '../data/data_prueba/album.pkl',
 '../data/data_prueba/track.pkl',
 '../data/data_prueba/genre.pkl',
 '../data/data_prueba/track_prices.pkl',
 '../data/data_prueba/album_prices.pkl']

In [5]:
tablas = procesar_dataframe_maestro("../data/data_limpio/itunes.pkl")

for nombre, df in tablas.items():
    print(f"{nombre}: {len(df)} registros")


artist: 22460 registros
album: 51056 registros
track: 110567 registros
genre: 290 registros
track_prices: 116217 registros
album_prices: 54219 registros


In [6]:
from pathlib import Path

archivos = sorted(Path("../data/data_raw").glob("itunes_*.csv"))
for f in archivos:
    print(f.name)


itunes_2025-04-20.csv
itunes_2025-04-21.csv
itunes_2025-04-22.csv
itunes_2025-04-23.csv
itunes_2025-04-24.csv
itunes_2025-04-25.csv
itunes_2025-04-26.csv
itunes_2025-04-27.csv


In [7]:
import sys
import os

# Añade el path al directorio raíz del proyecto (ajústalo si es necesario)
sys.path.append(os.path.abspath(".."))  # Si estás dentro de notebooks/

# Ahora importa el módulo
from src.ETL.file_utils import cargar_datos_itunes



In [8]:
df = cargar_datos_itunes()
print("Registros cargados:", len(df))


Total de registros cargados: 118808
Registros cargados: 118808


In [9]:
from src.ETL.file_utils import cargar_datos_itunes
df = cargar_datos_itunes()
print("Total registros encontrados:", len(df))


Total de registros cargados: 118808
Total registros encontrados: 118808


In [10]:
df = pd.read_pickle("../data/data_limpio/itunes.pkl")
df.to_csv("../data/data_raw/itunes.csv", index=False)