# Fase 3: Preparación de los datos

## Setup Kedro | Importación de librerías y carga de los datasets

In [6]:
from pathlib import Path
import sys, tomllib
from kedro.framework.project import configure_project
from kedro.framework.session import KedroSession

# Detecta raíz del proyecto y package_name
project_path = Path.cwd() if (Path.cwd() / "pyproject.toml").exists() else Path.cwd().parent
with open(project_path / "pyproject.toml", "rb") as f:
    package_name = tomllib.load(f)["tool"]["kedro"]["package_name"]

# src importable + sesión
sys.path.insert(0, str(project_path / "src"))
configure_project(package_name)
session = KedroSession.create(project_path=project_path)
context = session.load_context()
catalog = context.catalog

In [7]:
names = ["releases", "countries", "genres"]
dfs = {name: catalog.load(name) for name in names}
for name in names:
    print(f"dataset {name} cargado")

dataset releases cargado
dataset countries cargado
dataset genres cargado


## Seleccionar los datos relevantes

### Datasets y columnas permitidas
------------------------------

*   **releases**: usar **id**, **date**_(excluir rating y type — no son necesarios y rating tiene alta ausencia)_
    
*   **countries**: usar **id**, **country**
    
*   **genres**: usar **id**, **genre**

### Reglas de selección y filtros
-------------------------------------------

1.  **Normalizar país** en countries.country y **quedarse solo con películas asociadas a EE. UU.**Alias aceptados: _USA, US, United States, United States of America_ (minúsculas, sin puntos/comedas).
    
2.  **Parsear releases.date** y calcular la **primera fecha** por película (min(date) por id) para no contar re-estrenos múltiples.
    
3.  **Filtrar periodo** a **\[2000-01-01, 2019-12-31\]** usando la **primera fecha**.
    
4.  **Intersección de llaves**: conservar solo id que aparezcan en **los tres** datasets (garantiza integridad al unir).
    
5.  **Uniones necesarias**:
    
    *   us\_ids (del paso 1) **∩** min\_release\_2000\_2019 (pasos 2–3) → base de películas **EE. UU. y década válida**.
        
    *   Unir esa base con genres por id para obtener **(id, decade, genre)**.
        
6.  **Evitar sobreconteo por multi-etiqueta**: **deduplicar** por **(id, genre, decade)** (una película aporta 1 por género en su década).
    
7.  **Columnas finales (dataset analítico mínimo)**:**id**, **decade** ∈ {**2000s**, **2010s**}, **genre**._(Opcional, para auditoría: first\_date)_

## Limpiar datos

In [9]:
# Bloque 2 — Limpiar los datos (autosuficiente: crea df_* si no existen)
import pandas as pd
import numpy as np

# Asegura dataframes base (desde dfs o desde catalog)
if 'df_releases' not in globals() or 'df_countries' not in globals() or 'df_genres' not in globals():
    if 'dfs' in globals():
        _releases  = dfs["releases"]
        _countries = dfs["countries"]
        _genres    = dfs["genres"]
    else:
        # Si no tienes `dfs`, intenta cargar desde Kedro catalog (asegúrate de haber hecho el setup)
        _releases  = catalog.load("releases")
        _countries = catalog.load("countries")
        _genres    = catalog.load("genres")

    df_releases  = _releases[["id","date"]].copy()
    df_countries = _countries[["id","country"]].copy()
    df_genres    = _genres[["id","genre"]].copy()

# --- Limpieza solicitada (sin transformar más de lo necesario) ---
def _norm_txt(s):
    if pd.isna(s): 
        return ""
    return str(s).lower().replace(".","").replace(",","").strip()

# Normalizar país (texto) – no filtra aún
df_countries["_country_norm"] = df_countries["country"].map(_norm_txt)

# Parseo robusto de fecha (naive)
df_releases["_date_parsed"] = pd.to_datetime(
    df_releases["date"], errors="coerce", utc=True
).dt.tz_localize(None)

# Drop de fechas no parseables
_before = len(df_releases)
df_releases = df_releases.dropna(subset=["_date_parsed"])
_after = len(df_releases)

# Remueve outliers temporales extremos (muy fuera de rango razonable)
df_releases = df_releases[df_releases["_date_parsed"].dt.year.between(1900, 2025)]

# Deduplicación exacta por claves lógicas
df_countries = df_countries.drop_duplicates(subset=["id","country"])
df_genres    = df_genres.drop_duplicates(subset=["id","genre"])
df_releases  = df_releases.drop_duplicates(subset=["id","date"])

print("Limpieza OK.")
print(f"Fechas no parseables removidas: {_before - _after}")
print(f"Shapes ⇒ releases: {df_releases.shape}, countries: {df_countries.shape}, genres: {df_genres.shape}")

Limpieza OK.
Fechas no parseables removidas: 0
Shapes ⇒ releases: (1201917, 3), countries: (693476, 3), genres: (1046849, 2)


*   **Asegura los DataFrames base** (df\_releases, df\_countries, df\_genres): si no existen, los crea desde dfs (o desde catalog) y **selecciona solo las columnas relevantes**.
    
*   **Normaliza texto de países** (\_country\_norm): pasa a minúsculas y quita puntos/comas para manejar variantes como _USA/US/United States_ de forma consistente.
    
*   **Parsea fechas** en releases.date a datetime (naive) y **elimina filas no parseables**.
    
*   **Acota outliers temporales groseros** a un rango amplio razonable **\[1900, 2025\]** (sin aplicar aún el recorte analítico 2000–2019).
    
*   **Deduplica** por **claves lógicas** en cada tabla:
    
    *   releases: (id, date)
        
    *   countries: (id, country)
        
    *   genres: (id, genre)
        
*   **Informa** un resumen: cuántas fechas inválidas se removieron y los **shapes** resultantes.

## Construir nuevas variables

In [10]:
min_release = (df_releases.groupby("id", as_index=False)["_date_parsed"].min()
                           .rename(columns={"_date_parsed":"first_date"}))
min_release["first_year"] = min_release["first_date"].dt.year
min_release["decade"] = np.where(min_release["first_year"].between(2000, 2009), "2000s",
                         np.where(min_release["first_year"].between(2010, 2019), "2010s", "other"))

print("Features creadas: first_date, first_year, decade (2000s/2010s/other)")
min_release.head()

Features creadas: first_date, first_year, decade (2000s/2010s/other)


Unnamed: 0,id,first_date,first_year,decade
0,1000001,2023-07-06,2023,other
1,1000002,2019-05-21,2019,2010s
2,1000003,2022-03-11,2022,other
3,1000004,1999-09-10,1999,other
4,1000005,2016-08-31,2016,2010s


El bloque calcula, para cada película, su **primera fecha de estreno** disponible y a partir de ella asigna una **década analítica**:
    
*   Obtiene la **fecha mínima** por id (la más antigua entre todos sus estrenos) y la llama first\_date. Esto evita contar **reestrenos** múltiples.
    
*   Extrae el **año** de first\_date como first\_year.
    
*   Clasifica en **“2000s”** si el año está entre 2000–2009, en **“2010s”** si está entre 2010–2019 y en **“other”** si cae **antes de 2000** o **después de 2019**.

## Integración de datos de múltiples fuentes

In [11]:
# Identificar EE. UU. tras normalización
US_ALIASES = {"usa","us","u s","u s a","united states","united states of america"}
df_countries["is_us"] = df_countries["_country_norm"].isin(US_ALIASES)
us_ids = set(df_countries.loc[df_countries["is_us"], "id"].unique())

# Integridad básica de llaves
ids_rel = set(df_releases["id"].unique())
ids_gen = set(df_genres["id"].unique())
ids_cty = set(df_countries["id"].unique())
ids_all = ids_rel & ids_gen & ids_cty

# Base de películas: EE. UU. + décadas de interés (2000s/2010s)
base = (min_release[min_release["decade"].isin(["2000s","2010s"])]
        .loc[lambda d: d["id"].isin(us_ids & ids_all), ["id","first_date","decade"]]
        .drop_duplicates())

# Unir con géneros (multi-etiqueta) y deduplicar
final_df = (base.merge(df_genres[["id","genre"]], on="id", how="inner")
                 .drop_duplicates(subset=["id","genre","decade"])
                 [["id","decade","genre"]]
                 .reset_index(drop=True))

print("=== Integración completada ===")
print("Filas (id, decade, genre):", final_df.shape[0])
print("Décadas:", sorted(final_df["decade"].unique().tolist()))
print("Géneros distintos:", final_df["genre"].nunique())
final_df.head()

=== Integración completada ===
Filas (id, decade, genre): 102060
Décadas: ['2000s', '2010s']
Géneros distintos: 19


Unnamed: 0,id,decade,genre
0,1000005,2010s,Drama
1,1000005,2010s,Comedy
2,1000005,2010s,Music
3,1000005,2010s,Romance
4,1000007,2010s,Science Fiction


### Explicación del bloque (integración y filtrado final)

1.  Identificar películas de EE. UU.
    
    *   Se parte de df\_countries\["\_country\_norm"\] (país normalizado) y se define un conjunto de alias de EE. UU. (USA, US, United States, etc.).
        
    *   Se marca cada fila con is\_us y se extraen los **us\_ids** (todos los id de películas asociadas a EE. UU.).
        
      Esto fija el universo de películas “de EE. UU.” para el análisis.
        
2.  Integridad referencial (llaves comunes).
    
    *   Se obtienen los conjuntos de id presentes en cada tabla: releases, genres, countries.
        
    *   Se calcula **ids\_all** como la intersección de los tres (solo id que existen en **todas** las fuentes).
        
      Evita perder filas al unir después (joins más estables).
        
3.  Base temporal filtrada a las décadas de interés.
    
    *   Desde min\_release (que ya tiene la **primera fecha** por película y la **década**), se filtra a **2000s** y **2010s**.
        
    *   Además, se restringe a id ∈ (us\_ids ∩ ids\_all).
        
    *   Se conserva \["id", "first\_date", "decade"\] y se eliminan duplicados.
        
      Queda una “base” con películas **de EE. UU.**, con **primera fecha** en 2000s/2010s y con presencia en las tres tablas.
        
4.  Unión con géneros y deduplicación analítica.
    
    *   Se hace un merge con df\_genres\[\["id","genre"\]\] para obtener los **géneros** de cada película.
        
    *   Se **deduplica** por **(id, genre, decade)**: una película cuenta **una vez** por género en su década (evita sobreconteo por multi-etiqueta y reestrenos).
        
    *   Se dejan solo las columnas analíticas mínimas: **id, decade, genre**.
        
      El resultado es final\_df, listo para conteos, participaciones y Top-3 por década.

### Formateo y transformación de datos

In [12]:
# Tipos y orden
final_df["id"] = final_df["id"].astype("int64")
final_df["decade"] = pd.Categorical(final_df["decade"], categories=["2000s","2010s"], ordered=True)
final_df["genre"] = final_df["genre"].astype("category")

final_df = final_df.sort_values(["decade","genre","id"]).reset_index(drop=True)

# QA rápido
assert final_df["id"].isna().sum()==0, "Hay id nulos"
assert final_df["genre"].isna().sum()==0, "Hay genre nulos"
assert set(final_df["decade"].unique()) <= {"2000s","2010s"}, "Décadas fuera de rango"
dup_ok = final_df.duplicated(subset=["id","genre","decade"]).sum()==0
print("Duplicados (id,genre,decade):", "OK (0)" if dup_ok else "Hay duplicados")

print("\nDataset final listo para análisis del Top-3 por década.")
print(final_df.dtypes)

# Guardar versión limpia
final_df.to_parquet("final_df_us_2000s_2010s.parquet", index=False)
print("Guardado: final_df_us_2000s_2010s.parquet")

Duplicados (id,genre,decade): OK (0)

Dataset final listo para análisis del Top-3 por década.
id           int64
decade    category
genre     category
dtype: object
Guardado: final_df_us_2000s_2010s.parquet


1.  **Tipado y orden canónico**
    
    *   id → int64: asegura identificadores numéricos consistentes.
        
    *   decade → **Categorical ordenada** con categorías \["2000s","2010s"\]: fija el orden lógico (útil para sort, groupby, gráficos).
        
    *   genre → **Categorical**: reduce memoria y acelera operaciones sobre texto repetido.
        
    *   Ordena el dataset por \["decade","genre","id"\] y reinicia el índice para dejarlo limpio.
        
2.  **QA rápido (sanidad del dataset)**
    
    *   assert sin nulos en id y genre.
        
    *   assert que decade solo tenga valores esperados (2000s, 2010s).
        
    *   Verifica duplicados en la clave analítica **(id, genre, decade)** y reporta si hay (debería ser 0 tras la deduplicación previa).
        
3.  **Inspección y persistencia**
    
    *   Imprime los dtypes finales (para auditar el esquema).
        
    *   **Guarda** una versión **limpia y columnar** en **Parquet** (final\_df\_us\_2000s\_2010s.parquet), formato eficiente (compresión, esquema, lectura rápida) para los análisis posteriores del **Top-3 por década** y pruebas estadísticas