In [70]:
import os
import io
import json
from datetime import date, datetime, timedelta

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


In [71]:
BUKECT_NAME = "dai03rt-proyecto"
ACCESS_KEY_ID = "AKIAWZEDMKF3SHFDRH3B"
SECRET_KEY = "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"
REGION = "eu-west-1"

## TMDB API

Para el recopilado de datos usaremos las siguientes enpoints de TMDB:

- #### DISCOVER: https://api.themoviedb.org/3/discover/movie
    * Esta endpoints nos da una lista de peliculas aleatorias determinando una variedad de filtros
    * Nuestro modelo ha sido entrenado con peliculas de hace 20 años, por lo que nuestro fitro debe ser ese rango de fechas
    * De esta lista solo nos interesa el **ID** de cada pelicula ya que necesitamos usar otro endpoint para conseguir toda la info necesaria de cada peli
    * Toda la información es paginada asi que por ese rango de fechas la request no da la info, solo de la primera pagina, teniendo que hacer una request por cada pagina restante hasta alcanzar el nº max_pages de esa request especifica

>**PROBLEMA:**La propia API nos trunquea todas aquellas paginas por encima del numero 500, por lo que nos obliga a que nuestro rango de fechas nunca produzca una request con un numero de paginas superior. Debido a esto hemos tenido que crear unos rangos de fechas especificos, que partimos en request con un step determinado, uno para cada rango. Esto se debe a que la producción de las peliculas es una distribución compleja

- #### DETAILS_MOVIES: https://api.themoviedb.org/3/movie/{movie_id}
    * Una vez tenemos todos los ids de nuestras peliculas, haremos una peticion a este endpoint por cada id
    * Ahora si tenemos toda la info encesaria como, los actores qeu ahn trabajado en la peli, los generos, el presupuesto, la recaudación, etc

- #### DETAILS_ACTORS https://api.themoviedb.org/3/person/{person_id}
    * Con los ids de los actores recopilados gracias a la anterior url, podemos generar una lista, sin actores repetidos, de los ids de cada actor para buscar sus detalles
    * Debido a esto nuestro flujo simpre necesita primero conocer los detalles de las paliculas antes de conocer los detalles de cada actor

- #### GENRE_LIST https://api.themoviedb.org/3/genre/movie/list
    * En esta enpoint recogemos todos los generos posibles que pueden tener las peliculas
    * Cabe recordar que una pelicula puede tener varios generos, y que un genero puede encontrarse en varios peliculas por supuesto

## LAMBDAS

![Alt](./img/lambdas.png "Propiedades del BUCKET")

- init_tmdb: funcion que crea en nuestra instancia RDS la base de datos y el usuario que la gestiona
- create_tables_tmdb: funcion que crea la estructura de la base de datos
- funcion_diaria: funcion de recogida y guardado de nuevos datos cada dia
- send_query: funcion para ejecutar queries de tipo DML, en concreto generadoras de vistas
- funcion_inicial: funcion de carga incial de peliculas
- insert_data_tmdb | Runtime: python3.11 | Última modif: 2025-09-21T15:30:09.000+0000

In [65]:
lambda_ = boto3.client("lambda",
                  aws_access_key_id = ACCESS_KEY_ID,
                  aws_secret_access_key = SECRET_KEY,
                  region_name=REGION) 

In [66]:
## Listar funciones Lambda existentes

def invoke_lambda_inicial(payload: dict, async_: bool = False):
    """Invoca la lambda con el payload dado (sync por defecto)."""
    resp = lambda_.invoke(
        FunctionName="funcion_inicial",
        InvocationType="Event" if async_ else "RequestResponse",
        Payload=json.dumps(payload).encode("utf-8"),
    )
    if async_:
        print("✔ Invocación async enviada.")
        return None
    print("HTTP Status:", resp.get("StatusCode"))
    body = resp.get("Payload").read().decode("utf-8")
    try:
        print("Respuesta Lambda:", json.loads(body))
    except Exception:
        print("Respuesta Lambda (raw):", body)


def listar_funciones():
    response = lambda_.list_functions()
    for f in response['Functions']:
        print(f"{f['FunctionName']} | Runtime: {f['Runtime']} | Última modif: {f['LastModified']}")
        
def invocar_lambda(nombre_funcion, payload={}):
    try:
        response = lambda_.invoke(
            FunctionName=nombre_funcion,
            InvocationType='RequestResponse',
            Payload=json.dumps(payload),
        )
        print("Respuesta:")
        result_raw = (response['Payload']).read().decode('utf-8')
        result = json.loads(result_raw)
        print(result)
        return result
    except ClientError as e:
        print(f"Error: {e}")

## ESTRUCTURA S3

![Alt](./img/bucket_info.png "Lista de lambdas")

- initial_load: carpeta donde guardaremos todos los datos de peliculas, actores y generos
    - **__/movies**: a su vez dividida por carpetas de ventanas

        - /date_range_X: donde x es un numero no una fecha
            - /ventana_x/: guarda uno o varios ficheros de los detalles de las peliculas en un  rango de fechas
                - X_movie.json: el fichero en si (a veces son varios pues se ha tenido que partir) 
    
    - **__/actors__/**: lo ficheros de todos los actores que participan en las peliculas
        - X_actors.json no suele tener mas de 500 actores por fichero
    
    - **__/genres/**:
        - **genres_list.json: unico fichero donde se guarda la lista de géneros disponible


- dayly_update: carpeta donde se guardan temporalmente los datos a actualizar, se llama cada día por la mañana  
    - movies:
        X_new_movies.json
    - actors:
        X_new_actors.json
    - genres:
        new_genres.json

In [68]:
s3 = boto3.client("s3",
                  aws_access_key_id = ACCESS_KEY_ID,
                  aws_secret_access_key = SECRET_KEY,
                  region_name=REGION) 

In [None]:
# funciones auxiliares para cargar los datos de s3

def load_all_movies_records(prefix_root: str = "initial_load/movies/") -> list[dict]:
    """
    Lee TODOS los Parquet bajo:
      {prefix_root}/date_range_*/ventana_*/{n}_movies.parquet
    y devuelve una lista de diccionarios (records).
    """
    prefix = prefix_root.rstrip("/") + "/"

    # Listar todas las claves .parquet bajo el prefijo
    keys = []
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=BUKECT_NAME, Prefix=prefix):
        for obj in page.get("Contents", []):
            k = obj["Key"]
            if k.endswith(".parquet"):
                keys.append(k)

    keys.sort()  # lectura determinista (opcional)

    # Cargar y acumular
    records: list[dict] = []
    for key in keys:
        body = s3.get_object(Bucket=BUKECT_NAME, Key=key)["Body"].read()
        table = pq.read_table(io.BytesIO(body))
        records.extend(table.to_pylist())  # lista de dicts por fila

    return records


## CARGA DE DATOS

La carga de datos se realiza a traves de varias funciones lambdas de aws:

- "fucion_incial
- funcion_diaria

#### FUNCION INCIAL

Se encarga de conseguir todos los ids de las peliculas en un rango de fechas
- Por ejemplo las fechas 2005, 1, 1 hasta 2010, 12, 31
- Este rango se partirá a su vez en en rangos de fechas mas pequeños con la longitud marcada por el step, en este caso 182 días.
- Estos nuevos rangos los lllamamos ventantas y represenan la agrupacion de fechas entre 2005, 1, 1 hasta 2010, 12, 31, pero con una longitud de 182 (a excepcion de la última que no suele coincidir un step perfecto)
- En cada ventana tenemos un nuemor de paginas totales
- En cada pagina tenemos 20 peliculas
- Cuando tenemos todos los ids de todas las paginas de una ventana, se hara un request a details de movies y cuando hayamos conseguido los detalles de todas las peliculas de todas las paginas de esa ventana en concreto se guardara un archvio en s3 con los detalles de las (20xN_paginas_totales) peliculas


In [None]:

rangos_step = [
    (date(2005, 1, 1),  date(2010, 12, 31), 182),
    (date(2011, 1, 1),  date(2013, 9, 16),  120),
    (date(2013, 9, 17), date(2014, 1, 6),   100),
    (date(2014, 1, 7),  date(2015, 12, 31), 110),
    (date(2016, 1, 1),  date(2021, 12, 31), 70),
    (date(2022, 1, 1),  date(2024, 10, 16), 60),
    (date(2024, 10, 17),date(2024, 12, 16), 30),
    (date(2024, 12, 17),date.today(),       60),
]

## RDS: Preparación de la instancia RDS

![Alt](./img/rds.png "Propiedades del RDS")

Antes de empezar a usar nuestra instancia de RDS necesitamos:
- Crear la base de datos y un usuario que la vaya gestionar
- Crear la estructura de las tablas y sus relaciones
- Insertar todos los datos inciales (Guardados en s3 __initial_load/*/__) 
- Insertar o modificar los nuevos datos (Guardados en dayly/movies)

### RDS.1 Primero inicializamos nuestra base de datos

DOCUMENTACION: **./lambdas/init_tmdb**

Para ello llamamos a la LAMBDA __"init_tmdb"__ que hace lo siguinte:

- Creamos una base de datos "tmdb" 
- Creamos un usuario "user" que será el que maneje las tablas
    * Este usuario **NO** podrá eliminar la base de datos por seguridad
    * Podrá crear nuevas tablas y borrarlas pero nunca la base de datos entera

In [60]:
rangos_step = [
    (date(2005, 1, 1),  date(2010, 12, 31), 182),
    (date(2011, 1, 1),  date(2013, 9, 16),  120),
    (date(2013, 9, 17), date(2014, 1, 6),   100),
    (date(2014, 1, 7),  date(2015, 12, 31), 110),
    (date(2016, 1, 1),  date(2021, 12, 31), 70),
    (date(2022, 1, 1),  date(2024, 10, 16), 60),
    (date(2024, 10, 17),date(2024, 12, 16), 30),
    (date(2024, 12, 17),date.today(),       60),
]

In [None]:

date_range_n = 1
for f_incial, f_final, step in rangos_step:
    date_range_n += 1
    f_from = f_incial.strftime("%Y-%m-%d")
    f_to = f_final.strftime("%Y-%m-%d")
    payload = {
        "from": f_from,
        "to":   f_to,
        "step_days": step,
        "date_range_idx": date_range_n
    }
    invoke_lambda_inicial(payload, async_=True)
    

✔ Invocación async enviada.
✔ Invocación async enviada.
✔ Invocación async enviada.
✔ Invocación async enviada.
✔ Invocación async enviada.
✔ Invocación async enviada.
✔ Invocación async enviada.
✔ Invocación async enviada.


In [None]:
payload = {
    "from": "2005-01-01",
    "to":   "2010-12-31",
    "step_days": 182,
    "date_range_idx": 1 
}


In [57]:
all_movies = load_all_movies_records()

In [59]:
pd.DataFrame(all_movies)

Unnamed: 0,adult,backdrop_path,belongs_to_collection,budget,genres,homepage,id,imdb_id,origin_country,original_language,...,release_date,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count
0,False,/26OvB15pqk3eiKJG8LrXDVzO7Mw.jpg,{'backdrop_path': '/44TlYuWbWBpRY3i0Ch7MJtLv4Y...,100000000,"[{'id': 14, 'name': 'Fantasy'}, {'id': 28, 'na...",https://www.warnerbros.com/movies/constantine,561,tt0360486,[US],en,...,2005-02-08,230900000,121,"[{'english_name': 'Spanish', 'iso_639_1': 'es'...",Released,Hell wants him. Heaven won't take him. Earth n...,Constantine,False,7.100,7613
1,False,/2YJOXPl4QtCskixYA58ToyCXEW0.jpg,,110000000,"[{'id': 28, 'name': 'Action'}, {'id': 35, 'nam...",,787,tt0356910,[US],en,...,2005-06-07,487287646,119,"[{'english_name': 'English', 'iso_639_1': 'en'...",Released,'Til death do us part.,Mr. & Mrs. Smith,False,6.688,10791
2,False,/vzeyPbl4rWMVgdkAj7thmDuQZmE.jpg,,0,"[{'id': 18, 'name': 'Drama'}, {'id': 10749, 'n...",http://www.cbs.com/specials/magic_of_ordinary_...,47525,tt0406046,[US],en,...,2005-01-30,0,120,"[{'english_name': 'English', 'iso_639_1': 'en'...",Released,,The Magic of Ordinary Days,False,7.400,84
3,False,/bInicBgSHfQiWLbnRT5jxw9Grvm.jpg,,132000000,"[{'id': 12, 'name': 'Adventure'}, {'id': 53, '...",https://amblin.com/movie/war-of-the-worlds/,74,tt0407304,[US],en,...,2005-06-28,603873119,117,"[{'english_name': 'English', 'iso_639_1': 'en'...",Released,They're already here.,War of the Worlds,False,6.522,8742
4,False,/1WRwy3EDc2LVtwauzIMMGVJLEI5.jpg,,25000000,"[{'id': 18, 'name': 'Drama'}]",http://www.sonypictures.com/movies/lordsofdogtown,9787,tt0355702,[US],en,...,2005-06-03,13411957,107,"[{'english_name': 'English', 'iso_639_1': 'en'...",Released,They came from nothing to change everything.,Lords of Dogtown,False,7.102,773
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
35474,False,,,0,"[{'id': 99, 'name': 'Documentary'}]",,41329,tt1339141,[US],en,...,2007-01-01,0,74,[],Released,,Overdrawn!,False,0.000,0
35475,False,,,0,[],,30705,,[US],en,...,2007-03-07,0,92,"[{'english_name': 'Norwegian', 'iso_639_1': 'n...",Released,,Ka e vitsen?,False,7.500,2
35476,False,,,0,[],,52478,tt0768192,[US],en,...,2007-01-01,0,38,"[{'english_name': 'German', 'iso_639_1': 'de',...",Released,,Die Handwerker Gottes,False,0.000,0
35477,False,,,0,[],,23181,,[US],en,...,2007-01-01,0,0,"[{'english_name': 'German', 'iso_639_1': 'de',...",Released,,prediculous,False,0.000,0


In [None]:
invocar_lambda(nombre_funcion="init_tmdb")

### RDS.2 Creamos la estrucutura de nuestra base de datos:

DOCUMENTACION: **./lambdas/create_tables_tmdb**

Llmamaos a la LAMBDA __"create_tables_tmdb"__. Creación de las tablas y de sus relaciones, la lambda ejecuta un archivo .sql guardado en el propio codigo de la lambda.

Este es es archivo: 

```sql
-- SCHEMA: tmdb (tablas en 'public')

-- TABLE: movies
CREATE TABLE IF NOT EXISTS public.movies (
    id              INTEGER PRIMARY KEY,                      -- TMDB movie id (manual)
    title           TEXT NOT NULL,
    popularity      REAL,                                     -- float4
    vote_average    NUMERIC(3,1) 
    CHECK (vote_average >= 0 AND vote_average <= 10),     -- 0.0..10.0
    runtime         SMALLINT CHECK (runtime >= 0),            -- minutos
    budget          BIGINT CHECK (budget  >= 0),              -- entero (unidades monetarias)
    revenue         BIGINT CHECK (revenue >= 0),
    overview        TEXT,
    release_date    DATE,
    success         BOOLEAN
);

CREATE INDEX IF NOT EXISTS idx_movies_release_date ON public.movies (release_date);
CREATE INDEX IF NOT EXISTS idx_movies_popularity  ON public.movies (popularity);
CREATE INDEX IF NOT EXISTS idx_movies_title_lower ON public.movies (lower(title));

-- TABLE: actors (cast)
CREATE TABLE IF NOT EXISTS public.actors (
    id          INTEGER PRIMARY KEY,                          
    name        TEXT NOT NULL,
    age         SMALLINT CHECK (age IS NULL OR (age BETWEEN 0 AND 150)),
    gender      TEXT CHECK (gender IN ('Not set', 'Female', 'Male', 'Non-binary')),
    popularity  REAL
);

CREATE INDEX IF NOT EXISTS idx_actors_name_lower ON public.actors (lower(name));
CREATE INDEX IF NOT EXISTS idx_actors_popularity ON public.actors (popularity);


-- TABLE: genres
CREATE TABLE IF NOT EXISTS public.genres (
    id      INTEGER PRIMARY KEY,          -- TMDB genre id (manual)
    name    TEXT NOT NULL UNIQUE
);

-- TABLE: movie_actors  (credits → cast, relación N:M)
CREATE TABLE IF NOT EXISTS public.movie_actors (
    movie_id INTEGER NOT NULL REFERENCES public.movies(id) ON DELETE CASCADE,
    actor_id INTEGER NOT NULL REFERENCES public.actors(id) ON DELETE CASCADE,
    PRIMARY KEY (movie_id, actor_id)
);

-- TABLE: movie_genres  (genres, relación N:M)
CREATE TABLE IF NOT EXISTS public.movie_genres (
    movie_id INTEGER NOT NULL REFERENCES public.movies(id) ON DELETE CASCADE,
    genre_id INTEGER NOT NULL REFERENCES public.genres(id) ON DELETE CASCADE,
    PRIMARY KEY (movie_id, genre_id)
);

CREATE INDEX IF NOT EXISTS idx_movie_actors_actor ON public.movie_actors (actor_id);
CREATE INDEX IF NOT EXISTS idx_movie_genres_genre ON public.movie_genres (genre_id);
```

>**Nota:** Importante el orden de creación de las tablas, debido a la definción de sus relaciones

In [None]:
invocar_lambda(nombre_funcion="create_tables_tmdb")

### RDS.3 Insertamos los datos guardados en S3

DOCUMENTACION: **./lambdas/insert_data_tmdb**

En S3 tenemos guardados:
- **initial_load/movies/** Detalle de las peliculas
- **initial_load/actors/** Detalle de los actores que trabajaron en esas peliculas (MUCHOS A MUCHOS)
- **initial_load/genres/** Generos posibles de las peliculas (MUCHOS A MUCHOS)

Por lo tanto debemos hacer una limpieza de datos guardados en S3, para insertarlos en nuestras tablas SQL

>##### Funciones de limpieza S3 -> limpeza -> RDS (sql)

In [None]:
def impute_with_median(df):
    median = df[df["budget"] != 0]["budget"].median()

    df["budget"] = [median if b == 0 else b for b in df["budget"]]
    
    return df

def add_success_column(df):
    df_original = df.copy()
    if 'revenue' in df.columns and 'budget' in df.columns:
        df["success"] = (df_original["revenue"] > df_original["budget"]) & (df_original["vote_average"] > 7)
    else:
        raise ValueError("El DataFrame debe contener las columnas 'revenue' y 'budget'")
    return df

def prepare_movies_for_sql(df):
    df_movies_original = df.copy()
    df = df_movies_original[["id", "title","popularity", "vote_average", "runtime", "budget", "revenue", "overview", "genres", "credits", "release_date"]]
    df["credits"] = [get_values_for_row(credits["cast"], "id") for credits in df_movies_original["credits"]]
    df["genres"] = [get_values_for_row(genres, "id") for genres in df_movies_original["genres"]]
    df = add_success_column(df)
    #df = impute_with_median(df)
    return df

def prepare_table_for_ia(df):
    return  df[["popularity","vote_average","runtime","budget", "succes"]]

# Para las columnas de CREDITS y GENRES donde nos vienen la información que relaciona la
# tabla películas con actores y generos
def get_values_for_row(items, column_name):
    return [item.get(column_name) for item in items if isinstance(item, dict) and column_name in item]


def get_actors_id_from_movies(df: pd.DataFrame, credits_col: str = "credits") -> list[int]:
    # Concatenate all sub-lists, ignoring NaN
    all_ids = []
    for entry in df[credits_col]:
        if isinstance(entry, list):
            all_ids.extend(entry)

    # Remove duplicates preserving order
    seen = set()
    unique_ids = []
    for actor_id in all_ids:
        if actor_id not in seen:
            seen.add(actor_id)
            unique_ids.append(actor_id)

    return unique_ids


def movie_genres_sql(df: pd.DataFrame) -> pd.DataFrame:
    """
    Construye la tabla relacional (movie_id, genre_id) 
    a partir de un DataFrame de películas con columna 'genres'.
    """
    df_exploded = df[["id", "genres"]].copy()
    return (
        df_exploded
        .explode("genres")
        .dropna(subset=["genres"])
        .rename(columns={"id": "movie_id", "genres": "genre_id"})
        .drop_duplicates(ignore_index=True)
        .reset_index(drop=True)
        .astype({"movie_id": "int64", "genre_id": "int64"})
    )
def movie_actors_sql(df: pd.DataFrame) -> pd.DataFrame:
    """
    Build the relational table (movie_id, actor_id) from the movies DataFrame
    where 'credits' is a list of actor IDs.

    Returns
    -------
    pd.DataFrame with columns ['movie_id', 'actor_id']
    """
    df_exploded = df[["id", "credits"]].copy()
    return (
        df_exploded
        .explode("credits")                          # cada actor en fila propia
        .dropna(subset=["credits"])                  # elimina None/NaN
        .rename(columns={"id": "movie_id", "credits": "actor_id"})
        .astype({"movie_id": "int64", "actor_id": "int64"})
        .drop_duplicates(ignore_index=True)          # clave primaria (movie_id, actor_id)
        .reset_index(drop=True)
    )


#### Funciones auxiliares

Estas funcionaes nos transforman los data frames que hemos construido, en un insert con sus propios datos para la tabla en la base de datos RDS

In [None]:
import pandas as pd
from datetime import date, datetime
from decimal import Decimal, ROUND_HALF_UP

def df_to_movies_insert(df: pd.DataFrame, table: str = "public.movies") -> str:
    required = [
        "id", "title", "popularity", "vote_average", "runtime",
        "budget", "revenue", "overview", "release_date", "success"
    ]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required column(s): {missing}")

    def esc(s: str) -> str:
        # Escape single quotes for SQL strings
        return s.replace("'", "''")

    def fmt_str(s) -> str:
        if pd.isna(s) or s is None:
            return "''"
        return f"'{esc(str(s))}'"

    def fmt_int(x) -> str:
        if pd.isna(x) or x is None:
            return "NULL"
        return str(int(x))

    def fmt_dec(x, decimals: int) -> str:
        # Use Decimal to avoid float artifacts and force fixed decimals
        if pd.isna(x) or x is None:
            return "NULL"
        q = Decimal("1").scaleb(-decimals)  # e.g., decimals=4 -> Decimal('0.0001')
        val = Decimal(str(x)).quantize(q, rounding=ROUND_HALF_UP)
        return f"{val:.{decimals}f}"

    def fmt_date(d) -> str:
        if pd.isna(d) or d is None:
            return "NULL"
        if isinstance(d, (datetime, date)):
            return f"DATE '{d.strftime('%Y-%m-%d')}'"
        s = str(d).strip()
        if not s:
            return "NULL"
        try:
            ymd = s[:10]
            datetime.strptime(ymd, "%Y-%m-%d")
            return f"DATE '{ymd}'"
        except Exception:
            return "NULL"

    def fmt_bool(b) -> str:
        if pd.isna(b) or b is None:
            return "FALSE"
        return "TRUE" if bool(b) else "FALSE"

    values = []
    for _, r in df.iterrows():
        row_sql = (
            fmt_int(r["id"]),                    # id
            fmt_str(r["title"]),                 # title
            fmt_dec(r["popularity"], 4),         # popularity
            fmt_dec(r["vote_average"], 1),       # vote_average
            fmt_int(r["runtime"]),               # runtime
            fmt_int(r["budget"]),                # budget
            fmt_int(r["revenue"]),               # revenue
            fmt_str(r["overview"] if "overview" in r else ""),  # overview
            fmt_date(r["release_date"]),         # release_date
            fmt_bool(r["success"])               # success
        )
        values.append(f"({', '.join(row_sql)})")

    if not values:
        raise ValueError("DataFrame is empty; no rows to insert.")

    head = (
        f"INSERT INTO {table} (\n"
        f"  id, title, popularity, vote_average, runtime, budget, revenue, overview, release_date, success\n"
        f") VALUES\n"
    )
    tail = "\nON CONFLICT (id) DO NOTHING;"
    return head + ",\n".join(values) + tail

def df_to_movie_genres_insert(df: pd.DataFrame, table: str = "public.movie_genres") -> str:
    # Validate columns
    required = {"movie_id", "genre_id"}
    if not required.issubset(df.columns):
        raise ValueError(f"DataFrame must contain columns: {sorted(required)}")

    # Remove rows with nulls and exact duplicates (sane pre-checks for PK (movie_id, genre_id))
    df_clean = (
        df.loc[:, ["movie_id", "genre_id"]]
          .dropna(subset=["movie_id", "genre_id"])
          .drop_duplicates(ignore_index=True)
    )

    if df_clean.empty:
        raise ValueError("DataFrame is empty after cleaning; no rows to insert.")

    # Build VALUES tuples
    values = []
    for _, r in df_clean.iterrows():
        try:
            movie_id = int(r["movie_id"])
            genre_id = int(r["genre_id"])
        except Exception:
            raise ValueError(f"Non-integer IDs found in row: {r.to_dict()}")
        values.append(f"({movie_id}, {genre_id})")

    head = (
        f"INSERT INTO {table} (\n"
        f"  movie_id, genre_id\n"
        f") VALUES\n"
    )
    tail = "\nON CONFLICT DO NOTHING;"

    return head + ",\n".join(values) + tail

def df_to_actors_insert(
    df: pd.DataFrame,
    table: str = "public.actors",
    validate_gender: bool = True,
) -> str:
    required = {"id", "name", "age", "gender", "popularity"}
    if not required.issubset(df.columns):
        raise ValueError(f"DataFrame must contain columns: {sorted(required)}")

    allowed_genders = {"Not set", "Female", "Male", "Non-binary"}

    def esc(s: str) -> str:
        return s.replace("'", "''")

    def fmt_str(s) -> str:
        if pd.isna(s) or s is None:
            return "''"
        return f"'{esc(str(s))}'"

    def fmt_int(x) -> str:
        if pd.isna(x) or x is None:
            return "NULL"
        return str(int(x))

    def fmt_float(x, decimals: int = 4) -> str:
        if pd.isna(x) or x is None:
            return "NULL"
        return f"{float(x):.{decimals}f}"

    def fmt_gender(g) -> str:
        if pd.isna(g) or g is None:
            # si falta, lo tratamos como 'Not set'
            g = "Not set"
        g_str = str(g)
        if validate_gender and g_str not in allowed_genders:
            raise ValueError(
                f"Invalid gender value '{g_str}'. Expected one of {sorted(allowed_genders)}."
            )
        return f"'{esc(g_str)}'"

    values = []
    for _, r in df.iterrows():
        tup = (
            fmt_int(r["id"]),            # id
            fmt_str(r["name"]),          # name
            fmt_int(r["age"]),           # age -> NULL si None/NaN
            fmt_gender(r["gender"]),     # gender como TEXT validado
            fmt_float(r["popularity"], 4)# popularity
        )
        values.append(f"({', '.join(tup)})")

    if not values:
        raise ValueError("DataFrame is empty; no rows to insert.")

    head = (
        f"INSERT INTO {table} (\n"
        f"  id, name, age, gender, popularity\n"
        f") VALUES\n"
    )
    tail = "\nON CONFLICT (id) DO NOTHING;"

    return head + ",\n".join(values) + tail

def df_to_movie_actors_insert(df: pd.DataFrame, table: str = "public.movie_actors") -> str:
    """
    Build a single SQL INSERT statement for the `public.movie_actors` table
    from a pandas DataFrame that already mirrors the table schema.

    Expected DataFrame columns (exact):
      - movie_id (int, NOT NULL)
      - actor_id (int, NOT NULL)

    Behavior:
      - Generates: INSERT INTO <table> (movie_id, actor_id) VALUES (...), (...), ... ON CONFLICT DO NOTHING;
      - Drops duplicate pairs within the DataFrame to avoid redundant values.
      - Raises if DataFrame is empty or columns are missing.
    """
    required = {"movie_id", "actor_id"}
    if not required.issubset(df.columns):
        raise ValueError(f"DataFrame must contain columns: {sorted(required)}")

    # Clean duplicates and NaNs
    df_clean = (
        df.loc[:, ["movie_id", "actor_id"]]
          .dropna(subset=["movie_id", "actor_id"])
          .drop_duplicates(ignore_index=True)
    )

    if df_clean.empty:
        raise ValueError("DataFrame is empty after cleaning; no rows to insert.")

    values = []
    for _, r in df_clean.iterrows():
        movie_id = int(r["movie_id"])
        actor_id = int(r["actor_id"])
        values.append(f"({movie_id}, {actor_id})")

    head = (
        f"INSERT INTO {table} (\n"
        f"  movie_id, actor_id\n"
        f") VALUES\n"
    )
    tail = "\nON CONFLICT DO NOTHING;"

    return head + ",\n".join(values) + tail

def df_to_genres_insert(df: pd.DataFrame, table: str = "public.genres") -> str:
    """
    Build a single SQL INSERT statement for the `public.genres` table
    from a pandas DataFrame that already mirrors the table schema.

    Expected DataFrame columns:
      - id (int, PK)
      - name (str, NOT NULL)

    Behavior:
      - Escapa comillas simples en name.
      - NaN/None en name -> '' (vacío).
      - Devuelve un INSERT con ON CONFLICT (id) DO NOTHING.
    """
    required = {"id", "name"}
    if not required.issubset(df.columns):
        raise ValueError(f"DataFrame must contain columns: {sorted(required)}")

    def esc(s: str) -> str:
        return s.replace("'", "''")

    def fmt_str(s) -> str:
        if pd.isna(s) or s is None:
            return "''"
        return f"'{esc(str(s))}'"

    def fmt_int(x) -> str:
        if pd.isna(x) or x is None:
            return "NULL"
        return str(int(x))

    values = []
    for _, r in df.iterrows():
        tup = (
            fmt_int(r["id"]),       # id
            fmt_str(r["name"]),     # name
        )
        values.append(f"({', '.join(tup)})")

    if not values:
        raise ValueError("DataFrame is empty; no rows to insert.")

    head = f"INSERT INTO {table} (\n  id, name\n) VALUES\n"
    tail = "\nON CONFLICT (id) DO NOTHING;"

    return head + ",\n".join(values) + tail


In [None]:
df_movies = prepare_movies_for_sql(df_movies_sucia)
df_movies

In [None]:
actors_ids = get_actors_id_from_movies(df_movies)
actors_ids

In [None]:
actors_details = fetch_details_for_ids(actors_ids, "person")
actors_details

In [None]:
from datetime import datetime, date
import pandas as pd

def compute_actor_age(birthday: str | None, deathday: str | None) -> int | None:
    if not birthday or pd.isna(birthday) or birthday == "None":
        return None   # no se conoce la edad

    try:
        birth_date = datetime.strptime(str(birthday)[:10], "%Y-%m-%d").date()
    except Exception:
        return None

    if deathday and deathday != "None" and not pd.isna(deathday):
        try:
            end_date = datetime.strptime(str(deathday)[:10], "%Y-%m-%d").date()
        except Exception:
            end_date = date.today()
    else:
        end_date = date.today()

    age = end_date.year - birth_date.year - (
        (end_date.month, end_date.day) < (birth_date.month, birth_date.day)
    )

    return age if age >= 0 else None

import pandas as pd

def map_actor_gender(df: pd.DataFrame, gender_col: str = "gender") -> pd.DataFrame:
    
    mapping = {
        0: "Not set",
        1: "Female",
        2: "Male",
        3: "Non-binary"
    }
    df = df.copy()
    df[gender_col] = df[gender_col].replace(mapping)
    return df



In [None]:
def clean_actors(df: pd.DataFrame):
    toret = df[["id","name",  "birthday", "deathday", "gender", "popularity"]]
    toret["age"] = df.apply(
        lambda r: compute_actor_age(r["birthday"], r["deathday"]),
        axis=1
    )
    toret = toret.drop(columns=["birthday", "deathday"])
    toret = map_actor_gender(toret)
    return toret

In [None]:
df_actors = pd.DataFrame(actors_details)
df_actors = clean_actors(df_actors)
df_actors

In [None]:
actors_insert = df_to_actors_insert(df_actors)
actors_insert

In [None]:
invocar_lambda("insert_data_tmdb", {"QUERY": actors_insert})

In [None]:
movies_insert = df_to_movies_insert(df_movies)
movies_insert

In [None]:
invocar_lambda("insert_data_tmdb")

In [None]:
invocar_lambda("insert_data_tmdb", {
    "QUERY": movies_insert
})

In [None]:
df_movie_genres = movie_genres_sql(df_movies)
df_movie_genres

In [None]:
insert_movie_genres = df_to_movie_genres_insert(df_movie_genres)
insert_movie_genres

In [None]:
invocar_lambda("insert_data_tmdb", {
    "QUERY": insert_movie_genres
})

## EC2 - Con Fast api

Nos hemos construido una instancia ec2 que utilizaremos como servidor, que corra nuestro codigo python y escuche peticiones a los endpoints.

Para ver la documentación en profundidad de la API:
- **https://ec2-34-244-17-61.eu-west-1.compute.amazonaws.com:8000/docs**


![Alt](./img/ec2.png "Propiedades del EC2")

- ### /predict:
    - Con la IA clasificadora que hemos entrenado espera datos de pelicula para predecir si tendrán exito. Nos devuelve una lista que representa la columna a predecir de los datos enviados. Ec2 necesita el fichero del modelo compilado para esto

- ### /ask-text
    - Paso 1: Le pasamos una pregunta en lenguaje natural como parámetro, con la libreria __google-genai__ usamos gemini para generarnos una sentencia SQL a partir de la pregunta
    - Paso 2: Ejecutaremos con la lambda __send_quey__ que se conectará con __pycopg2__ a nuestra rds devolviendo la informacion de la sentencia.
    - Paso 3: Le pasamos a gemini otra vez la información de la ejecucion de la query, para que transforme los datos a una explicación en lenguaje natural.

- ### /ask-visual
    - Paso 1: Mismo que en **/ask_text** pero tiene un parametro extra "format" que indica como queremos ver el resultado:
        * **format = "code"**: el endooint entiende que estamos realizando la request desde código y nos devuelve codigo python para ejecutar y ver las graficas.
        * **format = html**: pensado para cunado hacemos la peticion a través del navegador, mostrando la grafica en el propio navegador en codigo html
    - Paso 2: Mismo que en **/ask_text**
    - Paso 3: Se le pide por ultimo a gemini otra vez que desde los datos mostrados por la query en nuestro RDS. Generara codigo python que guardará en un archivo, si format = code este codigo será el resultado del endpoint, pero si format = html, este codigo guardado en un archivo temporal en ec2 será ejecutado, y generará una imagen que con una funcion envoltorio prepararemos y mostraremos con html

>NOTA: Para ver en profundid el codigo de la nuestra api revisar la carpeta fast_api, esa carpeta se sube a ec2 con el comando 
```bash
# Subir carpeta local 'mi_carpeta' a /home/ec2-user/ en la instancia
scp -i /ruta/a/tu/key.pem -r ./mi_carpeta ec2-user@EC2_PUBLIC_IP:/home/ec2-user/

```
