<a href="https://colab.research.google.com/github/CarlosAltamiranoR/recommender_system/blob/rs_2024/ModeloKeras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import random
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.metrics import Recall, Precision
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt

In [None]:
# Cargar el archivo datos_movies.csv
movies_df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/movies.csv')

# Cargar el archivo datos_movies_list.csv
data = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/resultados.csv')


In [None]:
movies_df


In [None]:
data

# Limpieza

In [None]:
# Calcular el número de películas en cada lista
list_sizes = data.groupby('ID_Lista').size()

# Mostrar estadísticas descriptivas
print("Estadísticas de tamaños de listas:")
print(list_sizes.describe())


## mostrar graficamente distribución

In [None]:
plt.figure(figsize=(10,6))
list_sizes.hist(bins=50)
plt.title('Distribución de Tamaños de Listas')
plt.xlabel('Número de películas en la lista')
plt.ylabel('Número de listas')
plt.show()

In [None]:
# Definir un umbral para listas muy cortas (por ejemplo, menos de 2 películas)
short_list_threshold = 3

# Filtrar listas muy cortas
short_lists = list_sizes[list_sizes < short_list_threshold]

print(f"Número de listas con menos de {short_list_threshold} películas: {len(short_lists)}")
print("IDs de listas muy cortas:")
print(short_lists.index.tolist())

In [None]:
# Filtrar las listas que cumplen con el tamaño mínimo
valid_list_ids = list_sizes[list_sizes >= short_list_threshold].index
data = data[data['ID_Lista'].isin(valid_list_ids)]

In [None]:
# Contar cuántas veces aparece cada película en las listas
movie_counts = data['ID_Pelicula'].value_counts()

# Mostrar estadísticas descriptivas
print("Estadísticas de frecuencias de películas:")
print(movie_counts.describe())

In [None]:
plt.figure(figsize=(10,6))
movie_counts.hist(bins=50)
plt.title('Distribución de Frecuencias de Películas en Listas')
plt.xlabel('Número de apariciones en listas')
plt.ylabel('Número de películas')
plt.show()

In [None]:
# Definir un umbral para películas muy frecuentes (por ejemplo, aparecen en más del 5% de las listas)
frequency_threshold = data['ID_Lista'].nunique() * 0.05

frequent_movies = movie_counts[movie_counts > frequency_threshold]

print(f"Número de películas que aparecen en más del 5% de las listas: {len(frequent_movies)}")
print("IDs de películas muy frecuentes:")
print(frequent_movies.index.tolist())

ninguna película aparece en más del 5% de las listas. Es decir, no hay películas que sean extremadamente frecuentes en el conjunto de datos.

**Implicaciones**: Dado que no hay películas que aparezcan con tanta frecuencia, el problema de que el modelo recomiende siempre las mismas películas no se debe a que ciertas películas estén sobre-representadas en los datos.

In [None]:
# Contar la frecuencia de cada película
movie_counts = data['ID_Pelicula'].value_counts()
print(movie_counts.head(1000))  # Ver las 10 películas más frecuentes


# Modelo IA

In [None]:
# Obtener el conjunto de IDs de películas presentes en datos_movies_list.csv
movie_ids_in_list = set(data['ID_Pelicula'].astype(str))

# Filtrar movies_df para que solo contenga películas en movie_ids_in_list
movies_df = movies_df[movies_df['id'].astype(str).isin(movie_ids_in_list)].reset_index(drop=True)


In [None]:
# Convertir IDs a string para consistencia
data['ID_Pelicula'] = data['ID_Pelicula'].astype(str)
data['ID_Lista'] = data['ID_Lista'].astype(str)

# Obtener el conjunto de todos los IDs de películas
all_movie_ids = set(data['ID_Pelicula'])

# Crear un diccionario que mapea ID_Lista a un conjunto de IDs de películas
list_to_movies = data.groupby('ID_Lista')['ID_Pelicula'].apply(set).to_dict()


In [None]:
# Crear un diccionario que mapea ID_Pelicula a un índice único
movie_id_to_index = {movie_id: idx for idx, movie_id in enumerate(sorted(all_movie_ids))}
index_to_movie_id = {idx: movie_id for movie_id, idx in movie_id_to_index.items()}

# Número total de películas
num_movies = len(movie_id_to_index)


In [None]:
# Obtener todos los IDs de listas
all_list_ids = list(list_to_movies.keys())

# Dividir en entrenamiento y prueba
train_list_ids, test_list_ids = train_test_split(all_list_ids, test_size=0.5, random_state=42)


In [None]:
def create_dataset(list_ids):
    X = []
    Y = []
    for list_id in list_ids:
        movies_in_list = list(list_to_movies[list_id])
        num_movies_in_list = len(movies_in_list)

        if num_movies_in_list < 2:
            continue  # Saltar listas con menos de 2 películas

        # Seleccionar aleatoriamente entre 2 y 5 películas para x
        num_movies_in_x = min(random.randint(1, 5), num_movies_in_list)
        movies_in_x = random.sample(movies_in_list, num_movies_in_x)

        # Crear vector x
        x = np.zeros(num_movies)
        for movie_id in movies_in_x:
            idx = movie_id_to_index[str(movie_id)]
            x[idx] = 1

        # Crear vector y
        y = np.zeros(num_movies)
        for movie_id in movies_in_list:
            idx = movie_id_to_index[str(movie_id)]
            y[idx] = 1

        # Agregar a los conjuntos de datos
        X.append(x)
        Y.append(y)
    return np.array(X), np.array(Y)


In [None]:
X_train, Y_train = create_dataset(train_list_ids)
X_test, Y_test = create_dataset(test_list_ids)


In [None]:
import tensorflow as tf
from tensorflow.keras import backend as K

def binary_focal_loss(gamma=2., alpha=.25):
    def focal_loss_fixed(y_true, y_pred):
        epsilon = K.epsilon()
        y_pred = K.clip(y_pred, epsilon, 1. - epsilon)
        pt = tf.where(K.equal(y_true, 1), y_pred, 1 - y_pred)
        loss = -K.mean(alpha * K.pow(1. - pt, gamma) * K.log(pt))
        return loss
    return focal_loss_fixed




Definimos métricas personalizadas

In [None]:
def custom_precision(y_true, y_pred):
    y_pred_binary = tf.cast(tf.greater(y_pred, 0.1), tf.float32)
    true_positives = tf.reduce_sum(y_true * y_pred_binary)
    predicted_positives = tf.reduce_sum(y_pred_binary)
    precision = true_positives / (predicted_positives + tf.keras.backend.epsilon())
    return precision

def custom_recall(y_true, y_pred):
    y_pred_binary = tf.cast(tf.greater(y_pred, 0.1), tf.float32)
    true_positives = tf.reduce_sum(y_true * y_pred_binary)
    possible_positives = tf.reduce_sum(y_true)
    recall = true_positives / (possible_positives + tf.keras.backend.epsilon())
    return recall

In [None]:
# @title
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.metrics import Recall, Precision
from tensorflow.keras.callbacks import EarlyStopping

# Definir el modelo
model = Sequential()
model.add(Input(shape=(num_movies,)))  # Añadimos una capa de entrada explícita
model.add(Dense(128, activation='relu'))
model.add(Dense(256, activation='relu'))
model.add(Dense(num_movies, activation='sigmoid'))  # Usamos sigmoid para salida multi-etiqueta

# Compilar el modelo con Binary Focal Loss
#model.compile(optimizer='adam', loss=binary_focal_loss(gamma=2., alpha=.25), metrics=['accuracy'])

# Compilar el modelo con Binary Focal Loss con métricas personalizadas
model.compile(optimizer='adam',loss=binary_focal_loss(gamma=2., alpha=.25), metrics=['accuracy', custom_precision, custom_recall])

# Entrenar el modelo
early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)

model.fit(X_train, Y_train, epochs=20, batch_size=32, validation_split=0.1, callbacks=[early_stopping])


# Generar predicciones en el conjunto de prueba
Y_pred_prob = model.predict(X_test)
threshold = 0.1  # Ajustar este valor según sea necesario
Y_pred = (Y_pred_prob > threshold).astype(int)



Parámetros:
gamma: Controla el enfoque en las muestras difíciles. Un valor más alto incrementa el enfoque.
alpha: Equilibra la importancia entre clases positivas y negativas.

In [None]:
import sys
import os
import contextlib

@contextlib.contextmanager
def suppress_stdout():
    with open(os.devnull, "w") as devnull:
        old_stdout = sys.stdout
        sys.stdout = devnull
        try:
            yield
        finally:
            sys.stdout = old_stdout

def get_recommendations(input_movie_id, top_n=5):
    # Crear vector x con una sola película
    x = np.zeros(num_movies)
    idx = movie_id_to_index[str(input_movie_id)]
    x[idx] = 1

    # Suprimir la salida estándar durante la predicción
    with suppress_stdout():
        y_pred_prob = model.predict(np.array([x]))[0]

    # Obtener los índices de las top_n predicciones
    recommended_indices = np.argsort(y_pred_prob)[::-1]

    # Filtrar películas que no sean la película de entrada
    recommended_indices = [i for i in recommended_indices if i != idx]

    # Obtener los IDs de las películas recomendadas
    recommended_movie_ids = [index_to_movie_id[i] for i in recommended_indices[:top_n]]

    # Obtener los nombres de las películas recomendadas
    recommended_movies = movies_df[movies_df['id'].astype(str).isin(recommended_movie_ids)]

    return recommended_movies[['id', 'title']]




In [None]:
def get_movie_names(movie_ids):
    return movies_df[movies_df['id'].astype(str).isin(movie_ids)]['title'].tolist()

In [None]:
# Evaluar las recomendaciones
results = []

# Limitar el número de listas de prueba
max_test_lists = 100
test_list_ids_subset = random.sample(test_list_ids, min(max_test_lists, len(test_list_ids)))

for list_id in test_list_ids_subset:
    movies_in_list = list(list_to_movies[list_id])
    num_movies_in_list = len(movies_in_list)

    if num_movies_in_list < 3:
        continue  # Saltar listas con menos de 3 películas

    # Seleccionar una película al azar de la lista
    input_movie_id = random.choice(movies_in_list)

    # Obtener recomendaciones
    recommended_movies = get_recommendations(input_movie_id, top_n=5)

    # Obtener los IDs y nombres de las películas recomendadas
    recommended_movie_ids = set(recommended_movies['id'].astype(str))
    recommended_movie_names = recommended_movies['title'].tolist()

    # Películas correctas (que están en la misma lista)
    actual_movie_ids = set(movies_in_list)
    #actual_movie_ids.discard(input_movie_id)  # Excluir la película de entrada

    # Contar cuántas recomendaciones están en la misma lista
    correct_recommendations = recommended_movie_ids.intersection(actual_movie_ids)
    num_correct = len(correct_recommendations)
    correct_recommendation_names = get_movie_names(correct_recommendations)

    # Porcentaje de acierto
    possible_correct = min(5, len(actual_movie_ids))
    hit_percentage = num_correct / possible_correct if possible_correct > 0 else 0

    # Guardar resultados
    results.append({
        'ID_Lista': list_id,
        'Película_Entrada_ID': input_movie_id,
        'Película_Entrada_Nombre': movies_df[movies_df['id'].astype(str) == input_movie_id]['title'].values[0],
        'Recomendaciones_IDs': list(recommended_movie_ids),
        'Recomendaciones_Nombres': recommended_movie_names,
        'Películas_Correctas_IDs': list(actual_movie_ids),
        'Películas_Correctas_Nombres': get_movie_names(actual_movie_ids),
        'Aciertos_Nombres': correct_recommendation_names,
        'Aciertos': num_correct,
        'Posibles_Aciertos': possible_correct,
        'Porcentaje_Acierto': hit_percentage
    })


# Analizar resultados

In [None]:
results_df = pd.DataFrame(results)

# Calcular el porcentaje medio de aciertos
mean_hit_percentage = results_df['Porcentaje_Acierto'].mean()
print(f"Porcentaje medio de aciertos: {mean_hit_percentage:.2%}")

# Mostrar algunos resultados
results_df[['Película_Entrada_Nombre', 'Recomendaciones_Nombres', 'Aciertos_Nombres', 'Porcentaje_Acierto']].head()



# analisis detallado

In [None]:
# Proporción de etiquetas positivas en Y_train
positive_ratio = np.sum(Y_train) / (Y_train.shape[0] * Y_train.shape[1])
print(f"Proporción de etiquetas positivas en Y_train: {positive_ratio:.6f}")

# Proporción de etiquetas positivas en Y_test
positive_ratio_test = np.sum(Y_test) / (Y_test.shape[0] * Y_test.shape[1])
print(f"Proporción de etiquetas positivas en Y_test: {positive_ratio_test:.6f}")


Interpretación:

Si la proporción es muy baja (por ejemplo, menos del 0.1%), indica que las etiquetas son extremadamente esparsas.
Esto dificulta que el modelo aprenda, ya que hay muy pocas muestras positivas.
Solución:

Cambiar la función de pérdida a Kullback-Leibler Divergence ('kullback_leibler_divergence') o Binary Focal Loss que manejan mejor la esparsidad.
Ajustar los pesos de clase para penalizar más los errores en clases minoritarias.