In [1]:
import pandas as pd
from fastapi import FastAPI
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

app = FastAPI()

# URLs de los archivos en el repositorio local
url_cleaned = 'D:/SOYHENRY_PI_1/fastapi_project/movie_dataset_cleaned.csv'
url_unido = 'D:/SOYHENRY_PI_1/fastapi_project/df_unido.csv'

# Descargar y cargar los archivos
df_cleaned = pd.read_csv(url_cleaned)
df_unido = pd.read_csv(url_unido)

@app.get("/")
async def read_root():
    return {"message": "Welcome to the API!"}

# Ejemplo: limitar a las primeras 10,000 películas para reducir el tamaño
df_cleaned = df_cleaned.head(10000)

# Preprocesamiento de datos
df_cleaned['overview'] = df_cleaned['overview'].fillna('')
df_cleaned['name_genres'] = df_cleaned['name_genres'].fillna('')

# Crear una nueva columna combinando características
df_cleaned['features'] = df_cleaned['overview'] + ' ' + df_cleaned['name_genres']

# Vectorización de las características combinadas
tfidf = TfidfVectorizer(stop_words='english')
tfidf_matrix = tfidf.fit_transform(df_cleaned['features'])

# Calcular similitud de coseno entre todas las películas
cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix)

# Crear una función para obtener el índice de una película a partir de su título
indices = pd.Series(df_cleaned.index, index=df_cleaned['title_film']).drop_duplicates()

# Función para obtener recomendaciones
def get_recommendations(title_film, cosine_sim=cosine_sim):
    # Obtener el índice de la película que coincide con el título
    idx = indices.get(title_film)
    if idx is None:
        return pd.DataFrame(columns=['title_film', 'overview'])  # Retorna un DataFrame vacío si no se encuentra

    # Obtener los puntajes de similitud de todas las películas con esa película
    sim_scores = list(enumerate(cosine_sim[idx]))

    # Ordenar las películas basadas en los puntajes de similitud
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)

    # Obtener los índices de las 10 películas más similares
    sim_scores = sim_scores[1:11]  # Excluye la película misma
    movie_indices = [i[0] for i in sim_scores]

    # Retornar los títulos de las 10 películas más similares
    return df_cleaned[['title_film', 'overview']].iloc[movie_indices]

# Rutas de la API

@app.get("/cantidad_filmaciones_mes/{mes}")
async def cantidad_filmaciones_mes(mes: str):
    df_cleaned['release_date'] = pd.to_datetime(df_cleaned['release_date'], errors='coerce')
    mes_num = pd.to_datetime(mes, format='%B').month
    cantidad = df_cleaned[df_cleaned['release_date'].dt.month == mes_num].shape[0]
    return f"{cantidad} cantidad de películas fueron estrenadas en el mes de {mes}"

@app.get("/cantidad_filmaciones_dia/{dia}")
async def cantidad_filmaciones_dia(dia: str):
    dias = {
        'lunes': 0, 'martes': 1, 'miércoles': 2, 'jueves': 3, 'viernes': 4, 'sábado': 5, 'domingo': 6
    }
    dia_num = dias.get(dia.lower())
    if dia_num is None:
        return "Día inválido"
    df_cleaned['release_date'] = pd.to_datetime(df_cleaned['release_date'], errors='coerce')
    cantidad = df_cleaned[df_cleaned['release_date'].dt.dayofweek == dia_num].shape[0]
    return f"{cantidad} cantidad de películas fueron estrenadas en los días {dia}"

@app.get("/score_titulo/{titulo_de_la_filmacion}")
async def score_titulo(titulo_de_la_filmacion: str):
    pelicula = df_cleaned[df_cleaned['title_film'].str.contains(titulo_de_la_filmacion, case=False, na=False)]
    if pelicula.empty:
        return "Título no encontrado"
    score = pelicula.iloc[0]['popularity']
    año = pelicula.iloc[0]['release_year']
    return f"La película {titulo_de_la_filmacion} fue estrenada en el año {año} con un score/popularidad de {score}"

@app.get("/votos_titulo/{titulo_de_la_filmacion}")
async def votos_titulo(titulo_de_la_filmacion: str):
    pelicula = df_cleaned[df_cleaned['title_film'].str.contains(titulo_de_la_filmacion, case=False, na=False)]
    if pelicula.empty:
        return "Título no encontrado"
    votos = pelicula.iloc[0]['vote_count']
    promedio_votos = pelicula.iloc[0]['vote_average']
    if votos < 2000:
        return "La película no cumple con la condición de al menos 2000 valoraciones"
    return f"La película {titulo_de_la_filmacion} fue estrenada en el año {pelicula.iloc[0]['release_year']} con un promedio de votos de {promedio_votos}"

@app.get("/get_actor/{nombre_actor}")
async def get_actor(nombre_actor: str):
    actor = df_unido[df_unido['name_actor'].str.contains(nombre_actor, case=False, na=False)]
    if actor.empty:
        return "Actor no encontrado"
    cantidad_peliculas = actor.shape[0]
    retorno_promedio = actor['return'].mean()
    retorno_total = actor['return'].sum()
    return f"El actor {nombre_actor} ha participado en {cantidad_peliculas} películas, con un retorno promedio de {retorno_promedio} y un retorno total de {retorno_total}"

@app.get("/get_director/{nombre_director}")
async def get_director(nombre_director: str):
    director = df_unido[df_unido['name_director'].str.contains(nombre_director, case=False, na=False)]
    if director.empty:
        return "Director no encontrado"
    resultados = []
    for _, row in director.iterrows():
        resultados.append({
            'titulo': row['title'],
            'fecha_lanzamiento': row['release_date'],
            'retorno': row['return'],
            'costo': row['budget'],
            'ganancia': row['revenue']
        })
    return resultados

@app.get("/recomendar/{titulo}")
async def recomendar(titulo: str):
    recomendaciones = get_recommendations(titulo)
    if recomendaciones.empty:
        return {"mensaje": "Película no encontrada"}
    
    # Convertir el DataFrame a una lista de diccionarios
    return recomendaciones.to_dict(orient='records')


  df_cleaned = pd.read_csv(url_cleaned)
  df_unido = pd.read_csv(url_unido)


In [2]:
try:
    df_cleaned = pd.read_csv(url_cleaned)
    df_unido = pd.read_csv(url_unido)
except FileNotFoundError:
    raise Exception("Archivo no encontrado en la ruta especificada.")

  df_cleaned = pd.read_csv(url_cleaned)
  df_unido = pd.read_csv(url_unido)


In [3]:
from fastapi import HTTPException

@app.get("/score_titulo/{titulo_de_la_filmacion}")
async def score_titulo(titulo_de_la_filmacion: str):
    pelicula = df_cleaned[df_cleaned['title_film'].str.contains(titulo_de_la_filmacion, case=False, na=False)]
    if pelicula.empty:
        raise HTTPException(status_code=404, detail="Título no encontrado")
    score = pelicula.iloc[0]['popularity']
    año = pelicula.iloc[0]['release_year']
    return f"La película {titulo_de_la_filmacion} fue estrenada en el año {año} con un score/popularidad de {score}"

In [5]:
from fastapi import HTTPException

@app.get("/cantidad_filmaciones_mes/{mes}")
async def cantidad_filmaciones_mes(mes: str):
    try:
        df_cleaned['release_date'] = pd.to_datetime(df_cleaned['release_date'], errors='coerce')
        mes_num = pd.to_datetime(mes, format='%B').month
        cantidad = df_cleaned[df_cleaned['release_date'].dt.month == mes_num].shape[0]
        return f"{cantidad} cantidad de películas fueron estrenadas en el mes de {mes}"
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


In [2]:
main = 'SOYHENRY_PI_1/fastapi_project/main.py'

In [3]:
from fastapi.testclient import TestClient
from main import app  # Asegúrate de importar tu aplicación FastAPI correctamente

client = TestClient(app)

def test_cantidad_filmaciones_mes():
    response = client.get("/cantidad_filmaciones_mes/janvier")
    assert response.status_code == 200
    assert "cantidad de películas fueron estrenadas en el mes de janvier" in response.text


In [4]:
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

def test_cantidad_filmaciones_mes():
    response = client.get("/cantidad_filmaciones_mes/janvier")  # Asegúrate de que la ruta sea correcta
    assert response.status_code == 200
    assert "cantidad de películas fueron estrenadas en el mes de janvier" in response.text
