In [1]:
import pandas as pd

# ---------------------------------------------------------
# Función: download_csv_from_public_s3
# ---------------------------------------------------------
def download_csv_from_public_s3(bucket_name: str, file_key: str, sep: str = None) -> pd.DataFrame:
    url = f"https://{bucket_name}.s3.amazonaws.com/{file_key}"
    return pd.read_csv(
        url,
        sep=sep,
        engine="python",
        quotechar='"',
        escapechar="\\",
        on_bad_lines="skip"
    )

In [2]:
# Cargar datasets desde el bucket público a memoria
df_netflix = download_csv_from_public_s3("desafio-rkd", "netflix_titles.csv", sep=";")
df_disney = download_csv_from_public_s3("desafio-rkd", "disney_plus_titles.csv", sep=",")

# Agregar columna de plataforma
df_netflix["platform"] = "Netflix"
df_disney["platform"] = "Disney+"

In [3]:
# ============================================
# 1. Configuración inicial
# ============================================
import os
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv

# Cargar variables de entorno
load_dotenv()
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT", "5432")
db = os.getenv("DB_NAME")

engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{db}")


In [4]:
from datetime import datetime

# ============================================
# 2. Carga de datos crudos en capa RAW
# ============================================

# Fecha/hora actual
now = datetime.now()

# Agregar columnas de auditoría a Netflix
df_netflix_raw = df_netflix.copy()
df_netflix_raw["audit_created"] = now
df_netflix_raw["audit_updated"] = now

# Agregar columnas de auditoría a Disney
df_disney_raw = df_disney.copy()
df_disney_raw["audit_created"] = now
df_disney_raw["audit_updated"] = now

# Insertar en schema RAW
df_netflix_raw.to_sql("raw_netflix", engine, schema="raw", if_exists="replace", index=False)
df_disney_raw.to_sql("raw_disney", engine, schema="raw", if_exists="replace", index=False)


450

In [6]:
from datetime import datetime

# ============================================
# 3. Limpieza y normalización
# ============================================

# Normalizar duración a minutos
df_netflix["duration_minutes"] = df_netflix["duration"].str.extract(r'(\d+)').astype(float)
df_disney["duration_minutes"] = df_disney["duration"].str.extract(r'(\d+)').astype(float)

# Convertir fechas
df_netflix["date_added"] = pd.to_datetime(df_netflix["date_added"], errors="coerce")
df_disney["date_added"] = pd.to_datetime(df_disney["date_added"], errors="coerce")

# Normalizar texto en cast
df_netflix["cast"] = df_netflix["cast"].fillna("").str.strip()
df_disney["cast"] = df_disney["cast"].fillna("").str.strip()

# Eliminar duplicados
df_netflix = df_netflix.drop_duplicates()
df_disney = df_disney.drop_duplicates()

# Unificar en un solo DataFrame procesado
df_processed = pd.concat([df_netflix, df_disney])


In [7]:
# ============================================
# 4. Carga en capa PROCESSED
# ============================================

# ---- 4.1 Plataformas ----
df_platforms = pd.DataFrame({"name": df_processed["platform"].dropna().unique()})
df_platforms = df_platforms.drop_duplicates(subset=["name"])

# Plataformas ya existentes en la DB
existing_platforms = pd.read_sql("SELECT name FROM processed.platforms", engine)["name"].tolist()

# Filtrar solo las nuevas
df_platforms = df_platforms[~df_platforms["name"].isin(existing_platforms)]

# Insertar solo si hay nuevas
if not df_platforms.empty:
    df_platforms.to_sql("platforms", engine, schema="processed", if_exists="append", index=False)

# Mapear IDs de plataformas
platforms_map = pd.read_sql("SELECT * FROM processed.platforms", engine).set_index("name")["platform_id"].to_dict()

# ---- 4.2 Titles ----
import datetime
current_year = datetime.datetime.now().year

# Limpiar release_year
df_processed["release_year"] = (
    df_processed["release_year"].astype(str).str.extract(r"(\d{4})").astype("Int64")
)
df_processed.loc[~df_processed["release_year"].between(1900, current_year), "release_year"] = pd.NA

# Limpiar duration_minutes
df_processed["duration_minutes"] = (
    df_processed["duration_minutes"].astype(str).str.extract(r"(\d+)").astype("Int64")
)

# Filtrar valores absurdos en duración (ej. <=10 minutos o >1000 minutos)
df_processed.loc[
    (df_processed["duration_minutes"] <= 10) | (df_processed["duration_minutes"] > 1000),
    "duration_minutes"
] = pd.NA

# Construir DataFrame final
df_titles = pd.DataFrame({
    "show_id": df_processed["show_id"],
    "title": df_processed["title"],
    "type": df_processed["type"],
    "release_year": df_processed["release_year"],
    "rating": df_processed["rating"],
    "duration_minutes": df_processed["duration_minutes"],
    "date_added": df_processed["date_added"],
    "description": df_processed["description"],
    "platform_id": df_processed["platform"].map(platforms_map)
})
df_titles = df_titles.drop_duplicates(subset=["show_id"])

# Reemplazar NaT por None en columnas de fecha
df_titles["date_added"] = df_titles["date_added"].where(df_titles["date_added"].notna(), None)

# ---- UPSERT en titles ----
from sqlalchemy import text
import pandas as pd

with engine.begin() as conn:
    for _, row in df_titles.iterrows():
        row_dict = row.to_dict()

        # Convertir NaT/NaN a None en columnas de fecha y numéricas
        for col in ["date_added", "release_year", "duration_minutes"]:
            if pd.isna(row_dict[col]):
                row_dict[col] = None

        conn.execute(text("""
            INSERT INTO processed.titles (
                show_id, title, type, release_year, rating, duration_minutes,
                date_added, description, platform_id
            )
            VALUES (
                :show_id, :title, :type, :release_year, :rating, :duration_minutes,
                :date_added, :description, :platform_id
            )
            ON CONFLICT (show_id) DO UPDATE SET
                title = EXCLUDED.title,
                type = EXCLUDED.type,
                release_year = EXCLUDED.release_year,
                rating = EXCLUDED.rating,
                duration_minutes = EXCLUDED.duration_minutes,
                date_added = EXCLUDED.date_added,
                description = EXCLUDED.description,
                platform_id = EXCLUDED.platform_id,
                updated_at = CURRENT_TIMESTAMP;
        """), row_dict)


In [8]:
# ============================================
# 5. Directores y relación títulos-directores
# ============================================

# ---- Directores únicos ----
df_directors = pd.DataFrame({"name": df_processed["director"].dropna().unique()})
df_directors = df_directors.drop_duplicates(subset=["name"])

# Directores ya existentes en la DB
existing_directors = pd.read_sql("SELECT name FROM processed.directors", engine)["name"].tolist()

# Filtrar solo los nuevos
df_directors = df_directors[~df_directors["name"].isin(existing_directors)]

# Insertar solo si hay nuevos
if not df_directors.empty:
    df_directors.to_sql("directors", engine, schema="processed", if_exists="append", index=False)

# Mapear IDs de directores
directors_map = pd.read_sql("SELECT * FROM processed.directors", engine).set_index("name")["director_id"].to_dict()

# ---- Relación títulos-directores ----
# Recuperar IDs de titles
titles_db = pd.read_sql("SELECT show_id, title_id FROM processed.titles", engine)

# Construir DataFrame de relación
df_title_directors = pd.DataFrame({
    "show_id": df_processed["show_id"],
    "director": df_processed["director"]
}).dropna()

# Mapear show_id -> title_id
df_title_directors = df_title_directors.merge(titles_db, on="show_id", how="inner")

# Mapear director -> director_id
df_title_directors["director_id"] = df_title_directors["director"].map(directors_map)

# Seleccionar solo columnas necesarias y eliminar duplicados
df_title_directors = df_title_directors[["title_id", "director_id"]].dropna().drop_duplicates()

# ---- UPSERT en title_directors ----
from sqlalchemy import text

with engine.begin() as conn:
    for _, row in df_title_directors.iterrows():
        conn.execute(text("""
            INSERT INTO processed.title_directors (title_id, director_id)
            VALUES (:title_id, :director_id)
            ON CONFLICT (title_id, director_id) DO NOTHING;
        """), row.to_dict())


In [9]:
# ============================================
# 6. Actores y relación títulos-actores
# ============================================

# ---- Actores únicos ----
# Actores únicos
all_actors = df_processed["cast"].dropna().str.split(", ")
actors_list = [actor.strip() for sublist in all_actors for actor in sublist if actor.strip() != ""]
actors_unique = pd.unique(pd.Series(actors_list))
df_actors = pd.DataFrame({"name": actors_unique})


# Actores ya existentes en la DB
existing_actors = pd.read_sql("SELECT name FROM processed.actors", engine)["name"].tolist()

# Filtrar solo los nuevos
df_actors = df_actors[~df_actors["name"].isin(existing_actors)]

# Insertar solo si hay nuevos
if not df_actors.empty:
    df_actors.to_sql("actors", engine, schema="processed", if_exists="append", index=False)

# Mapear IDs de actores
actors_map = pd.read_sql("SELECT * FROM processed.actors", engine).set_index("name")["actor_id"].to_dict()

# ---- Relación títulos-actores ----
# Recuperar IDs de titles
titles_db = pd.read_sql("SELECT show_id, title_id FROM processed.titles", engine)

rows = []
for _, row in df_processed.iterrows():
    if pd.notna(row["cast"]) and row["cast"].strip() != "":
        for actor in row["cast"].split(", "):
            actor = actor.strip()
            if actor in actors_map:
                # Mapear show_id -> title_id
                title_id = titles_db.loc[titles_db["show_id"] == row["show_id"], "title_id"].values
                if len(title_id) > 0:
                    rows.append({"title_id": title_id[0], "actor_id": actors_map[actor]})

df_title_actors = pd.DataFrame(rows).drop_duplicates()

# ---- UPSERT en title_actors ----
from sqlalchemy import text

with engine.begin() as conn:
    for _, row in df_title_actors.iterrows():
        row_dict = row.to_dict()
        conn.execute(text("""
            INSERT INTO processed.title_actors (title_id, actor_id)
            VALUES (:title_id, :actor_id)
            ON CONFLICT (title_id, actor_id) DO NOTHING;
        """), row_dict)
