Importaciones y definiciones

In [2]:
import pandas as pd
import numpy as np
import json
from fastapi import FastAPI

Carga de movies_dataset

In [3]:
movies_df = pd.read_csv('data/movies_dataset.csv', sep=',', encoding='utf-8', low_memory=False)
credits_df = pd.read_csv('data/credits.csv', sep=',', encoding='utf-8', low_memory=False)

Nos asegúramos de que ambas columnas 'id' sean del mismo tipo ver [DD]('doc/Diccionario de Datos - PIMLOps.xlsx')

In [4]:
movies_df['id'] = movies_df['id'].astype(str)
credits_df['id'] = credits_df['id'].astype(str)

Unir los datasets usando la columna 'id' como clave

In [5]:
df = pd.merge(movies_df, credits_df, on='id', how='inner')

Transformaciones

Algunos campos, como belongs_to_collection, production_companies y otros [DD]('doc/Diccionario de Datos - PIMLOps.xlsx') están anidados, esto es o bien tienen un diccionario o una lista como valores en cada fila, ¡deberán desanidarlos para poder y unirlos al dataset de nuevo hacer alguna de las consultas de la API! O bien buscar la manera de acceder a esos datos sin desanidarlos.

In [6]:
# Función para desanidar un campo JSON
# Pendiente de aplicación
def desanidar_json(valor):
    try:
        return json.loads(valor)
    except (TypeError, ValueError):
        return {} # O [] si esperas listas

# Desanidar los campos relevantes (reemplaza 'campo1', 'campo2', etc.)
campos_anidados = ['belongs_to_collection', 'production_companies', 'genres', 'production_countries', 'spoken_languages']
for campo in campos_anidados:
    df[campo] = df[campo].apply(desanidar_json)

# Ejemplo de cómo acceder a datos desanidados:
#df['production_companies'][0][0]['name']  # Accede al nombre de la primera compañía de producción de la primera película

Los valores nulos de los campos revenue, budget deben ser rellenados por el número 0.

In [7]:
# Rellenar los valores nulos de las columnas 'revenue' y 'budget' con 0
df['revenue'] = df['revenue'].fillna(0)
df['budget'] = df['budget'].fillna(0)

Los valores nulos del campo release date deben eliminarse.

In [8]:
# Eliminar las filas donde 'release_date' tenga valores nulos
df = df.dropna(subset=['release_date'])
print(df['release_date'].isnull().sum())

0


De haber fechas, deberán tener el formato AAAA-mm-dd, además deberán crear la columna release_year donde extraerán el año de la fecha de estreno.

In [9]:
#Convertir la columna 'release_date' al formato datetime:
df['release_date'] = pd.to_datetime(df['release_date'], errors='coerce')

#Formatear la columna 'release_date' a 'AAAA-mm-dd':
df['release_date'] = df['release_date'].dt.strftime('%Y-%m-%d')

#Crear la columna 'release_year' extrayendo el año:
df['release_date'] = pd.to_datetime(df['release_date'], format='%Y-%m-%d', errors='coerce')
df['release_year'] = df['release_date'].dt.year

Crear la columna con el retorno de inversión, llamada return con los campos revenue y budget, dividiendo estas dos últimas revenue / budget, cuando no hay datos disponibles para calcularlo, deberá tomar el valor 0.

In [10]:
# Convertir las columnas 'revenue' y 'budget' a tipo numérico, manejando los valores que no se pueden convertir
df['revenue'] = pd.to_numeric(df['revenue'], errors='coerce')  # Convertir a float, poner NaN si no es posible
df['budget'] = pd.to_numeric(df['budget'], errors='coerce')  # Convertir a float, poner NaN si no es posible

# Reemplazar valores NaN con 0 si es necesario
df['revenue'] = df['revenue'].fillna(0)
df['budget'] = df['budget'].fillna(0)

# Calcular el retorno de inversión
# Usamos np.where para manejar las divisiones por cero y los casos donde revenue es cero
df['return'] = np.where(df['budget'] != 0, df['revenue'] / df['budget'], 0)

# Asegurarse de que si tanto 'revenue' como 'budget' son 0, el retorno también sea 0
df['return'] = np.where((df['revenue'] == 0) & (df['budget'] == 0), 0, df['return'])

Eliminar las columnas que no serán utilizadas, video,imdb_id,adult,original_title,poster_path y homepage.

In [11]:
columnas_a_remover = ['video', 'imdb_id', 'adult', 'original_title', 'poster_path', 'homepage']
df = df.drop(columns=columnas_a_remover, errors='ignore')

In [12]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 45451 entries, 0 to 45537
Data columns (total 22 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   belongs_to_collection  45451 non-null  object        
 1   budget                 45451 non-null  int64         
 2   genres                 45451 non-null  object        
 3   id                     45451 non-null  object        
 4   original_language      45440 non-null  object        
 5   overview               44510 non-null  object        
 6   popularity             45451 non-null  object        
 7   production_companies   45451 non-null  object        
 8   production_countries   45451 non-null  object        
 9   release_date           45451 non-null  datetime64[ns]
 10  revenue                45451 non-null  float64       
 11  runtime                45205 non-null  float64       
 12  spoken_languages       45451 non-null  object        
 13  status

In [13]:
# Asegurarse de que la columna 'release_date' existe en el DataFrame
if 'release_date' in df.columns:
    # Convertir la columna 'release_date' a tipo datetime
    df['release_date'] = pd.to_datetime(df['release_date'], errors='coerce')

    # Extraer el mes (en formato numérico)
    df['mes'] = df['release_date'].dt.month

    # Extraer el día de la semana (en formato numérico, lunes=0, domingo=6)
    df['dia_semana_num'] = df['release_date'].dt.dayofweek

    # Extraer el año
    df['anio'] = df['release_date'].dt.year

    # Para obtener el nombre del mes en español, se puede usar la configuración regional o un mapeo
    df['mes_nombre_esp'] = df['mes'].apply(lambda x: pd.to_datetime(x, format='%m').strftime('%B') if pd.notna(x) else '')

    # Para obtener el nombre del día de la semana en español, se puede usar un mapeo
    dias_semana_esp = {0: 'lunes', 1: 'martes', 2: 'miércoles', 3: 'jueves', 4: 'viernes', 5: 'sábado', 6: 'domingo'}
    df['dia_semana_nombre_esp'] = df['dia_semana_num'].map(dias_semana_esp)

    print(df[['release_date', 'mes', 'dia_semana_num', 'anio', 'mes_nombre_esp', 'dia_semana_nombre_esp']].head())
else:
    print("La columna 'release_date' no se encontró en el DataFrame.")

  release_date  mes  dia_semana_num  anio mes_nombre_esp dia_semana_nombre_esp
0   1995-10-30   10               0  1995        October                 lunes
1   1995-12-15   12               4  1995       December               viernes
2   1995-12-22   12               4  1995       December               viernes
3   1995-12-22   12               4  1995       December               viernes
4   1995-02-10    2               4  1995       February               viernes


Implementación del Endpoint /cantidad_filmaciones_mes/{Mes}

In [14]:
from fastapi import FastAPI

app = FastAPI()

@app.get("/cantidad_filmaciones_mes/{Mes}")
async def cantidad_filmaciones_mes(Mes: str):
    # Mapeo de nombres de meses en español a formato de Pandas
    meses_esp_a_num = {'enero': 1, 'febrero': 2, 'marzo': 3, 'abril': 4, 'mayo': 5, 'junio': 6,
                        'julio': 7, 'agosto': 8, 'septiembre': 9, 'octubre': 10, 'noviembre': 11, 'diciembre': 12}
    mes_num = meses_esp_a_num.get(Mes.lower())
    if mes_num:
        cantidad = len(df[df['mes'] == mes_num])
        return {"Mes": Mes, "cantidad": cantidad}
    else:
        return {"error": f"El mes '{Mes}' no es válido."}



Implementación del Endpoint /cantidad_filmaciones_dia/{Dia}

In [15]:
@app.get("/cantidad_filmaciones_dia/{Dia}")
async def cantidad_filmaciones_dia(Dia: str):
    # Mapeo de nombres de días de la semana en español a números de Pandas
    dias_esp_a_num = {'lunes': 0, 'martes': 1, 'miércoles': 2, 'jueves': 3, 'viernes': 4, 'sábado': 5, 'domingo': 6}
    dia_num = dias_esp_a_num.get(Dia.lower())
    if dia_num is not None:
        cantidad = len(df[df['dia_semana_num'] == dia_num])
        return {"Dia": Dia, "cantidad": cantidad}
    else:
        return {"error": f"El día '{Dia}' no es válido."}


Implementación del Endpoint /score_titulo/{titulo_de_la_filmacion}

In [16]:
@app.get("/score_titulo/{titulo_de_la_filmacion}")
async def score_titulo(titulo_de_la_filmacion: str):
    pelicula = df[df['title'].str.lower() == titulo_de_la_filmacion.lower()]
    if not pelicula.empty:
        # Considerar devolver la primera coincidencia si hay múltiples
        pelicula = pelicula.iloc[0]
        return {
            "titulo": pelicula['title'],
            "anio_estreno": int(pelicula['anio']) if pd.notna(pelicula['anio']) else None,
            "score": pelicula['popularity']
        }
    else:
        return {"mensaje": f"No se encontró la película con el título '{titulo_de_la_filmacion}'."}


Implementación del Endpoint /votos_titulo/{titulo_de_la_filmacion}

In [17]:
@app.get("/votos_titulo/{titulo_de_la_filmacion}")
async def votos_titulo(titulo_de_la_filmacion: str):
    pelicula = df[df['title'].str.lower() == titulo_de_la_filmacion.lower()]
    if not pelicula.empty:
        pelicula = pelicula.iloc[0]
        cantidad_votos = pelicula['vote_count']
        if cantidad_votos >= 2000:
            return {
                "titulo": pelicula['title'],
                "cantidad_votos": int(cantidad_votos) if pd.notna(cantidad_votos) else None,
                "promedio_votos": pelicula['vote_average']
            }
        else:
            return {"mensaje": f"La película '{titulo_de_la_filmacion}' no cumple con las valoraciones mínimas (>= 2000 votos)."}
    else:
        return {"mensaje": f"No se encontró la película con el título '{titulo_de_la_filmacion}'."}

Implementación del Endpoint /get_actor/{nombre_actor}

In [18]:
@app.get("/get_actor/{nombre_actor}")
async def get_actor(nombre_actor: str):
    peliculas_actor = df[df['cast'].apply(lambda x: nombre_actor.lower() in str(x).lower())]
    if not peliculas_actor.empty:
        retornos = peliculas_actor.apply(lambda row: row['revenue'] - row['budget'] if pd.notna(row['revenue']) and pd.notna(row['budget']) else np.nan, axis=1)
        retornos_validos = retornos.dropna()
        cantidad_peliculas = len(peliculas_actor)
        retorno_total = retornos_validos.sum()
        promedio_retorno = retornos_validos.mean() if not retornos_validos.empty else 0
        return {
            "nombre_actor": nombre_actor,
            "cantidad_filmaciones": cantidad_peliculas,
            "retorno_total": retorno_total,
            "promedio_retorno": promedio_retorno
        }
    else:
        return {"mensaje": f"No se encontró al actor con el nombre '{nombre_actor}' en las filmaciones."}

Implementación del Endpoint /get_director/{nombre_director}

In [19]:
@app.get("/get_director/{nombre_director}")
async def get_director(nombre_director: str):
    peliculas_director = df[df['crew'].apply(lambda x: nombre_director.lower() in str(x).lower() and 'director' in str(x).lower())]
    if not peliculas_director.empty:
        retorno_total = peliculas_director.apply(lambda row: row['revenue'] - row['budget'] if pd.notna(row['revenue']) and pd.notna(row['budget']) else np.nan, axis=1).dropna().sum()
        peliculas_info = []
        for index, pelicula in peliculas_director.iterrows():
            retorno_individual = pelicula['revenue'] - pelicula['budget'] if pd.notna(pelicula['revenue']) and pd.notna(pelicula['budget']) else None
            ganancia = pelicula['revenue'] if pd.notna(pelicula['revenue']) else None
            costo = pelicula['budget'] if pd.notna(pelicula['budget']) else None
            fecha_lanzamiento = pelicula['release_date'].isoformat() if pd.notna(pelicula['release_date']) else None
            peliculas_info.append({
                "titulo": pelicula['title'],
                "fecha_lanzamiento": fecha_lanzamiento,
                "retorno_individual": retorno_individual,
                "costo": costo,
                "ganancia": ganancia
            })
        return {
            "nombre_director": nombre_director,
            "retorno_total": retorno_total,
            "peliculas": peliculas_info
        }
    else:
        return {"mensaje": f"No se encontró al director con el nombre '{nombre_director}'."}

Sistema de recomendación:

In [20]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Preparar los datos: vectorización de los títulos de las películas
tfidf = TfidfVectorizer(stop_words='english')
tfidf_matrix = tfidf.fit_transform(df['title'].fillna(''))

# Calcular similitud de coseno entre las películas
cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix)

# Función de recomendación
def recomendacion(titulo):
    try:
        # Encontrar el índice de la película ingresada
        idx = df[df['title'].str.lower() == titulo.lower()].index[0]
        # Obtener similitudes de esa película con todas las demás
        sim_scores = list(enumerate(cosine_sim[idx]))
        # Ordenar películas por puntaje de similitud
        sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
        # Obtener los índices de las 5 películas más similares
        top_indices = [i[0] for i in sim_scores[1:6]]
        # Retornar los títulos de las películas más similares
        return df.iloc[top_indices]['title'].tolist()
    except IndexError:
        return "Título no encontrado en el dataset. Por favor, verifica el nombre e intenta nuevamente."

# Ejemplo de uso
#titulo_busqueda = "Titanic"
#print(recomendacion(titulo_busqueda))

Obtener todos los idiomas
#python -m nltk.downloader popular

In [21]:
def obtener_idiomas(df):
    idiomas = set()  # Usamos un conjunto para evitar duplicados
    
    for entrada in df['spoken_languages'].dropna():
        try:
            # Convertir la cadena de texto en una lista de diccionarios
            lista_idiomas = json.loads(entrada.replace("'", "\""))  # Reemplazar comillas simples por dobles
            # Extraer el campo 'name' de cada diccionario
            idiomas.update(lang['name'] for lang in lista_idiomas if 'name' in lang)
        except json.JSONDecodeError:
            # Manejar errores de formato en el JSON
            continue
    
    return list(idiomas)  # Convertir a lista antes de retornar

# Llamar a la función
idiomas_unicos = obtener_idiomas(movies_df)
print("Idiomas únicos:", idiomas_unicos)

Idiomas únicos: ['', 'ქართული', 'Malti', 'پښتو', 'Somali', 'Català', 'Український', '普通话', 'اردو', 'Nederlands', 'العربية', 'ελληνικά', 'ਪੰਜਾਬੀ', 'Bahasa indonesia', 'shqip', 'Deutsch', '日本語', 'Dansk', 'Hausa', 'svenska', 'No Language', 'తెలుగు', 'עִבְרִית', 'Hrvatski', 'Latviešu', 'Bamanankan', 'Gaeilge', 'Norsk', 'Azərbaycan', 'Polski', 'Esperanto', 'Eesti', 'বাংলা', 'Fulfulde', 'Magyar', 'ภาษาไทย', 'isiZulu', 'Wolof', 'Latin', 'Kiswahili', 'Tiếng Việt', 'беларуская мова', 'Bahasa melayu', 'euskera', 'Pусский', 'suomi', 'English', 'Italiano', 'فارسی', '한국어/조선말', 'Română', 'Bokmål', 'Galego', 'български език', 'Español', 'Slovenščina', '??????', 'हिन्दी', 'Slovenčina', 'Afrikaans', 'қазақ', 'Português', 'Srpski', 'Cymraeg', 'Türkçe', '?????', '广州话 / 廣州話', 'ozbek', 'Český', 'Kinyarwanda', 'Français', 'Íslenska', 'Bosanski', 'தமிழ்']


Ejecución y Prueba de la API
Ejecución de la Aplicación con Uvicorn

In [23]:
if __name__ == "__main__":
    import uvicorn
    #uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
    uvicorn.run(app, host="0.0.0.0", port=8000)

RuntimeError: asyncio.run() cannot be called from a running event loop