# ETL

### Carga de Datos - movies_dataset.csv

In [None]:
import pandas as pd
import ast
import numpy as np

# Función para desanidar listas y diccionarios dentro de una columna
def desanidar_columna(df, columna, key=None):
    def desanidar_elemento(elemento, key):
        if isinstance(elemento, str):
            try:
                elemento = ast.literal_eval(elemento)
            except (ValueError, SyntaxError):
                return None
        if isinstance(elemento, list):
            if key:
                return ', '.join([str(d.get(key, '')) for d in elemento if isinstance(d, dict)])
            return ', '.join([str(d) for d in elemento])
        if isinstance(elemento, dict):
            if key:
                return elemento.get(key, None)
            return str(elemento)
        return None

    return df[columna].apply(lambda x: desanidar_elemento(x, key))

# Se carga el dataset de peliculas
movies_df = pd.read_csv('../../movies_dataset.csv')


### Transformaciones

In [None]:
# Corrección para 'belongs_to_collection': Extraer 'id' del diccionario, manejando casos vacíos y asegurando que sean enteros
movies_df['belongs_to_collection'] = movies_df['belongs_to_collection'].apply(
    lambda x: int(ast.literal_eval(x)['id']) if isinstance(x, str) and ast.literal_eval(x) and isinstance(ast.literal_eval(x), dict) else None
)
movies_df['genres'] = desanidar_columna(movies_df, 'genres', 'name')
movies_df['production_companies'] = desanidar_columna(movies_df, 'production_companies', 'name')
movies_df['production_countries'] = desanidar_columna(movies_df, 'production_countries', 'iso_3166_1')
movies_df['spoken_languages'] = desanidar_columna(movies_df, 'spoken_languages', 'iso_639_1')

# Convertimos la columna 'id' en movies_df a numérica y asegurar que sea entera para luego facilitar el mergeo de datasets
movies_df['id'] = pd.to_numeric(movies_df['id'], errors='coerce').astype('Int64')

### Carga de Datos - credits.csv

In [None]:
# Cargar los datos de los créditos
credits_df = pd.read_csv('../../credits.csv')

### Transformaciones

In [None]:
# Transformamos las columnas en credits_df
credits_df['cast'] = desanidar_columna(credits_df, 'cast', 'name')

# Separamos actores y directores
credits_df['actors'] = credits_df['cast'].apply(lambda x: ', '.join([actor for actor in x.split(', ')[:5]]))

# Corrección para 'directors': Buscar "Director" en lugar de "Directing"
def extract_directors(crew_str):
    try:
        crew_list = ast.literal_eval(crew_str)
        directors = [member['name'] for member in crew_list if member.get('job') == 'Director']
        return ', '.join(directors)
    except (ValueError, SyntaxError):
        return None

credits_df['directors'] = credits_df['crew'].apply(extract_directors)

# Mantenemos solo las columnas relevantes en credits_df
credits_df = credits_df[['id', 'actors', 'directors']]

# Convertimos la columna 'id' en credits_df a numérica y asegurar que sea entera para luego facilitar el mergeo de datasets
credits_df['id'] = pd.to_numeric(credits_df['id'], errors='coerce').astype('Int64')

### Mergeamos los datasets

In [None]:
# Unimos los datasets
merged_df = pd.merge(movies_df, credits_df, on='id', how='left')

# Hacemos unas transformaciones adicionales en merged_df
merged_df['revenue'] = pd.to_numeric(merged_df['revenue'], errors='coerce').fillna(0)
merged_df['budget'] = pd.to_numeric(merged_df['budget'], errors='coerce').fillna(0)
merged_df = merged_df.dropna(subset=['release_date'])
merged_df['release_date'] = pd.to_datetime(merged_df['release_date'], format='%Y-%m-%d', errors='coerce')
merged_df['release_year'] = merged_df['release_date'].dt.year.astype('Int64')

### Calculamos el retorno de inversion (revenue / budget), luego eliminamos las columnas que no utilizaremos segun la consigna del proyecto, y finalizaremos guardando un nuevo csv con todos los datos nuevos.

In [None]:
# Calculamos el retorno de inversión
merged_df['return'] = merged_df.apply(lambda row: row['revenue'] / row['budget'] if row['budget'] > 0 else 0, axis=1)

# Eliminamos las columnas no utilizadas segun los requisitos del trabajo
columns_to_drop = ['video', 'imdb_id', 'adult', 'original_title', 'poster_path', 'homepage']
merged_df = merged_df.drop(columns=columns_to_drop)

# Convertimos 'belongs_to_collection' a entero, para manejar valores nulos
merged_df['belongs_to_collection'] = merged_df['belongs_to_collection'].astype('Int64')

# Validamos y eliminamos filas que no cumplen con los tipos de datos especificados
def validate_and_remove_invalid_rows(df, target_dtypes):
    for column, dtype in target_dtypes.items():
        try:
            df[column] = df[column].astype(dtype)
        except ValueError:
            # Mantener filas con datos válidos y eliminar las que no cumplen con el tipo de dato
            if dtype in ['Int64', 'float64']:
                df = df[pd.to_numeric(df[column], errors='coerce').notnull()]
            else:
                df = df[df[column].apply(lambda x: isinstance(x, dtype))]
    return df

# Aplicamos tipos de datos y mantenemos datos desanidados
target_dtypes = {
    'belongs_to_collection': 'Int64',
    'budget': 'float64',
    'genres': 'object',
    'id': 'Int64',
    'original_language': 'object',
    'overview': 'object',
    'popularity': 'float64',
    'production_companies': 'object',
    'production_countries': 'object',
    'release_date': 'datetime64[ns]',
    'revenue': 'float64',
    'runtime': 'float64',
    'spoken_languages': 'object',
    'status': 'object',
    'tagline': 'object',
    'title': 'object',
    'vote_average': 'float64',
    'vote_count': 'float64',
    'actors': 'object',
    'directors': 'object',
    'release_year': 'Int64',
    'return': 'float64'
}

merged_df = validate_and_remove_invalid_rows(merged_df, target_dtypes)

# Guardarmos entonces el dataset limpio
merged_df.to_csv('../../dataset_limpio.csv', index=False)

print(merged_df.head())

#### Este ETL fue realizado en Colab de Google