## Configuraciones Iniciales


In [28]:
import sys
import os
import pandas as pd
import numpy as np

# Agregar la ruta del directorio scripts
sys.path.append(os.path.abspath(os.path.join('..', 'scripts')))

# Configuración de directorios
RAW_DIR = r'D:\Henry Data Science\Proyecto_individual_1\Datasets'
PROCESSED_DIR = r'D:\Henry Data Science\Proyecto_individual_1\Data'

# Crear directorio de destino si no existe
os.makedirs(PROCESSED_DIR, exist_ok=True)

### Conversión del archivo CSV a Parquet.
- Se convierte el CSV a Parquet optimizando el espacio y preparando el dataset para procesamiento.

In [29]:
import os
import pandas as pd
from convert_csv_to_parquet import convertir_csv_a_parquet

# Verificar que el archivo CSV existe
csv_path = os.path.join(RAW_DIR, 'movies_dataset.csv')
parquet_path = os.path.join(RAW_DIR, 'movies_dataset.parquet')
if not os.path.exists(csv_path):
    print("El archivo CSV no existe en la ruta especificada.")
else:
    try:
        # Convertir de CSV a Parquet con transformaciones
        convertir_csv_a_parquet(
            csv_path,
            parquet_path,
            columns_to_drop=['video', 'imdb_id', 'adult', 'original_title', 'poster_path', 'homepage'],
            fillna_values={'revenue': 0, 'budget': 0},
            dtype_conversion={'popularity': float, 'budget': float, 'id': int}
        )
        print("Conversión completada exitosamente.")
    except Exception as e:
        print(f"Ocurrió un error durante la conversión: {e}")

    # Leer el archivo convertido
    if os.path.exists(parquet_path):
        df = pd.read_parquet(parquet_path)
        print("Archivo Parquet cargado exitosamente.")
        print(df.head())
        print(f"Dimensiones del DataFrame: {df.shape}")

        # Validar columnas procesadas
        expected_columns = ['popularity', 'budget', 'id']
        missing_columns = [col for col in expected_columns if col not in df.columns]
        if missing_columns:
            print(f"Columnas faltantes en el DataFrame final: {missing_columns}")
    else:
        print("El archivo Parquet no se generó correctamente.")

  df = pd.read_csv(csv_path)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df[column].fillna(value=value, inplace=True)


Advertencia: No se pudo convertir la columna 'popularity' al tipo '<class 'float'>'.
Advertencia: No se pudo convertir la columna 'budget' al tipo '<class 'float'>'.
Advertencia: No se pudo convertir la columna 'id' al tipo '<class 'int'>'.
Archivo convertido correctamente y guardado en: D:\Henry Data Science\Proyecto_individual_1\Datasets\movies_dataset.parquet
Conversión completada exitosamente.
Archivo Parquet cargado exitosamente.
                               belongs_to_collection      budget  \
0  {'id': 10194, 'name': 'Toy Story Collection', ...  30000000.0   
1                                               None  65000000.0   
2  {'id': 119050, 'name': 'Grumpy Old Men Collect...         0.0   
3                                               None  16000000.0   
4  {'id': 96871, 'name': 'Father of the Bride Col...         0.0   

                                              genres       id  \
0  [{'id': 16, 'name': 'Animation'}, {'id': 35, '...    862.0   
1  [{'id': 12, 'name':

In [26]:
from convert_csv_to_parquet import convertir_csv_a_parquet

convertir_csv_a_parquet(
    os.path.join(RAW_DIR, 'movies_dataset.csv'),
    os.path.join(RAW_DIR, 'movies_dataset.parquet'),
    columns_to_drop=['video', 'imdb_id', 'adult', 'original_title', 'poster_path', 'homepage'],
    fillna_values={'revenue': 0, 'budget': 0},
    dtype_conversion={'popularity': float, 'budget': float, 'id': int}
)

# Leer el archivo convertido
df = pd.read_parquet(os.path.join(RAW_DIR, 'movies_dataset.parquet'))

  df = pd.read_csv(csv_path)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df[column].fillna(value=value, inplace=True)


Advertencia: No se pudo convertir la columna 'popularity' al tipo '<class 'float'>'.
Advertencia: No se pudo convertir la columna 'budget' al tipo '<class 'float'>'.
Advertencia: No se pudo convertir la columna 'id' al tipo '<class 'int'>'.
Archivo convertido correctamente y guardado en: D:\Henry Data Science\Proyecto_individual_1\Datasets\movies_dataset.parquet


## Limpieza y estandarización de datos.
1. Fechas y años de estreno.
- Eliminación de valores nulos y ajuste de formatos.
2. Cálculo de "Return":
- Generación de una columna que calcula la relación entre ingresos y presupuesto.
3. Filtro de películas no publicadas.
- Eliminación de filas cuyo status no sea "Released".

In [30]:
# Eliminar valores nulos en 'release_date' y corregir formato
df['release_date'] = pd.to_datetime(df['release_date'], format='%Y-%m-%d', errors='coerce')
df = df.dropna(subset=['release_date'])  # Eliminar filas con fechas no válidas
df['release_year'] = df['release_date'].dt.year

# Crear columna 'return' (revenue / budget) utilizando operaciones vectorizadas
df['return'] = np.where(df['budget'] != 0, df['revenue'] / df['budget'], 0)

# Filtrar películas no publicadas y eliminar columna 'status'
df = df.loc[df['status'] == 'Released'].drop('status', axis=1)

# Verificar datos procesados
print(df.head())
print(df['release_year'].describe())
print(df['return'].describe())

                               belongs_to_collection      budget  \
0  {'id': 10194, 'name': 'Toy Story Collection', ...  30000000.0   
1                                               None  65000000.0   
2  {'id': 119050, 'name': 'Grumpy Old Men Collect...         0.0   
3                                               None  16000000.0   
4  {'id': 96871, 'name': 'Father of the Bride Col...         0.0   

                                              genres       id  \
0  [{'id': 16, 'name': 'Animation'}, {'id': 35, '...    862.0   
1  [{'id': 12, 'name': 'Adventure'}, {'id': 14, '...   8844.0   
2  [{'id': 10749, 'name': 'Romance'}, {'id': 35, ...  15602.0   
3  [{'id': 35, 'name': 'Comedy'}, {'id': 18, 'nam...  31357.0   
4                     [{'id': 35, 'name': 'Comedy'}]  11862.0   

  original_language                                           overview  \
0                en  Led by Woody, Andy's toys live happily in his ...   
1                en  When siblings Judy and Peter di

### Estandarización de formatos.
- Conversión a tipos de datos consistentes.
- Redondeo de valores numéricos para una mayor claridad.

In [31]:
df['title'] = df['title'].str.lower()
df['release_year'] = df['release_year'].astype(int)
df['vote_count'] = df['vote_count'].astype(int)
df['id'] = df['id'].astype(int)

df['return'] = df['return'].round(2)
df['popularity'] = df['popularity'].round(2)

## Desanidado de columnas.
- Para datos complejos almacenados como listas o diccionarios, las columnas se transforman en tablas auxiliares.

belongs_to_collection

In [32]:
from convert_str_to_list_dic import convert_list_dict

df['belongs_to_collection'] = df['belongs_to_collection'].apply(convert_list_dict)

# Desanidar y combinar
belong_desanidado = pd.json_normalize(df['belongs_to_collection'])
belong_desanidado.rename(columns={
    'id': 'btc_id', 
    'name': 'btc_name', 
    'poster_path': 'btc_poster_path', 
    'backdrop_path': 'btc_backdrop_path'
}, inplace=True)

df = pd.concat([df, belong_desanidado], axis=1).drop(columns=['belongs_to_collection'])

genres

In [33]:
df['genres'] = df['genres'].apply(convert_list_dict)
from dislodge_columns import desanidar_columna
genres_desanidado = desanidar_columna(df, 'genres', 'genres_desanidado')

production_companies

In [34]:
df['production_companies'] = df['production_companies'].apply(convert_list_dict)
pc_desanidado = desanidar_columna(df, 'production_companies', 'pc_desanidado', 1)

production_countries

In [35]:
df['production_countries'] = df['production_countries'].apply(convert_list_dict)
pctry_desanidado = desanidar_columna(df, 'production_countries', 'pctry_desanidado', 2)

spoken_languages

In [36]:
df['spoken_languages'] = df['spoken_languages'].apply(convert_list_dict)
slan_desanidado = desanidar_columna(df, 'spoken_languages', 'slan_desanidado', 3)

### Optimización final.
- Se eliminan las columnas innecesarias y se ajusta el dataset principal.

In [37]:
df = df.drop(columns=[
    'btc_poster_path', 'btc_backdrop_path', 'tagline',
    'genres', 'production_companies', 'production_countries', 'spoken_languages'
])

# Resetear el índice
df = df.reset_index(drop=True)

### Exportación de dataset y tablas auxiliares.

In [38]:
# Exportar el DataFrame principal
df_path = os.path.join(PROCESSED_DIR, 'movies_dataset_etl.parquet')
df.to_parquet(df_path, engine='pyarrow', compression='snappy', index=False)
print("Dataset principal exportado correctamente.")

# Exportar tablas auxiliares
genres_desanidado.to_parquet(os.path.join(PROCESSED_DIR, 'genres_desanidado.parquet'), engine='pyarrow', compression='snappy', index=False)
pc_desanidado.to_parquet(os.path.join(PROCESSED_DIR, 'pc_desanidado.parquet'), engine='pyarrow', compression='snappy', index=False)
pctry_desanidado.to_parquet(os.path.join(PROCESSED_DIR, 'pctry_desanidado.parquet'), engine='pyarrow', compression='snappy', index=False)

# Nota: `slan_desanidado` no será exportada porque es irrelevante para el MVP.
print("Tablas auxiliares exportadas correctamente.")

Dataset principal exportado correctamente.
Tablas auxiliares exportadas correctamente.
