## ETL DIM_FILM

In [224]:
import pandas as pd
from config.db_config import conn, target, source

In [225]:
# Base de datos mysql source
df_film = pd.read_sql('select * from film', source)
df_film_category = pd.read_sql('select * from film_category', source)
df_category = pd.read_sql('select * from category', source)

## Extracción

In [226]:
# DataFrames para film, film_category, category 
df_film = pd.read_sql('select * from film', source)
df_film_category = pd.read_sql('select * from film_category', source)
df_category = pd.read_sql('select * from category', source)

## Transformacion

In [227]:
# Realizar los joins
merged_df = pd.merge(df_film, df_film_category, on='film_id', how='inner')
merged_df = pd.merge(merged_df, df_category, on='category_id', how='inner')

In [228]:
# Generar una columna de ID
merged_df['id_film'] = range(1, len(merged_df) + 1)

In [229]:
df_dim_film = merged_df[['id_film', 'title', 'name']]
df_dim_film.columns = ['id_film', 'name', 'category']

In [230]:
# Reemplazar valores nulos en las columnas
df_dim_film.loc[df_dim_film['id_film'].isnull(), 'id_film'] = -1
df_dim_film.loc[df_dim_film['name'].isnull(), 'name'] = 'NO IDENTIFICADO'
df_dim_film.loc[df_dim_film['category'].isnull(), 'category'] = 'NO IDENTIFICADO'

In [231]:
# Eliminar duplicados basados en la columna 'id_film'
df_dim_film = df_dim_film.drop_duplicates(subset=['id_film'])

In [232]:
# Transformación de tipos de datos
df_dim_film['id_film'] = df_dim_film['id_film'].astype(int)
df_dim_film['name'] = df_dim_film['name'].astype(str)
df_dim_film['category'] = df_dim_film['category'].astype(str)

In [233]:
# Manipulando las cadenas 
df_dim_film['name'] = df_dim_film['name'].str.title()
df_dim_film['category'] = df_dim_film['category'].str.title()

In [234]:
# validación simple
# validate_data = df_dim_film[df_dim_film['id_film'] > 0]
# validate_data

# Manipulación de nombres de columnas (si es necesario)
# df_dim_film.rename(columns={'name': 'movie_name'}, inplace=True)
# df_dim_film

## Carga

In [235]:
cursor = conn.cursor()
try:
    table_name = 'dim_film'
    insert_query = """
        INSERT INTO dim_film (id_film, name, category) 
        VALUES (%s, %s, %s)
        ON CONFLICT (id_film) DO UPDATE
        SET name = EXCLUDED.name, category = EXCLUDED.category;
    """

    filas_insertadas = 0
    for index, row in df_dim_film.iterrows():
        values = tuple(row)
        cursor.execute(insert_query, values)
        filas_insertadas += 1
    conn.commit()

    print(f"Se afectaron {filas_insertadas} filas exitosamente.")
except Exception as e:
    conn.rollback()
    print("Error durante la inserción:", e)
finally:
    cursor.close()
    # conn.close()

Se afectaron 1000 filas exitosamente.
