#### Carga librerias

In [151]:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math
from datetime import datetime
import scipy.sparse as sp
from sklearn.metrics.pairwise import cosine_similarity
import random
from scipy.stats import pearsonr
from collections import Counter
import collections

#### Carga archivo

In [152]:
df = pd.read_csv("./Video_Games.csv", header=None)

df.columns = ['item_id', 'user_id', 'rating', 'timestamp']

df = df.drop('timestamp', axis=1)

print(f"Tamaño total del dataset: {df.shape}")

Tamaño total del dataset: (2565349, 3)


In [3]:
top_n_users = 100000 # Maximo de usuarios con los que queremos trabajar
min_ratings = 10   # Usuarios con al menos este número de ratings
user_counts = df['user_id'].value_counts()

dense_users = user_counts[user_counts >= min_ratings].index.tolist()

if len(dense_users) > top_n_users:
    dense_users = dense_users[:top_n_users]

print(f"Seleccionados {len(dense_users)} usuarios con al menos {min_ratings} ratings cada uno")

df_dense = df[df['user_id'].isin(dense_users)]

print(f"Tamaño del dataset filtrado: {df_dense.shape}")
print(f"Reducción del dataset: {100 * (1 - len(df_dense) / len(df)):.2f}%")

Seleccionados 15517 usuarios con al menos 10 ratings cada uno
Tamaño del dataset filtrado: (284867, 3)
Reducción del dataset: 88.90%


In [177]:
def random_split(data, test_ratio=0.2, random_state=42):
    data_shuffled = data.sample(frac=1, random_state=random_state)
    
    split = int(len(data) * (1 - test_ratio))
    
    train = data_shuffled.iloc[:split]
    test = data_shuffled.iloc[split:]
    
    train_users = set(train['user_id'].unique())
    train_items = set(train['item_id'].unique())
    test = test[test['user_id'].isin(train_users) & test['item_id'].isin(train_items)]
    
    return train, test

train_data, test_data = random_split(df_dense)
print(f"Tamaño del conjunto de entrenamiento: {train_data.shape}")
print(f"Tamaño del conjunto de prueba: {test_data.shape}")

train_users = set(train_data['user_id'].unique())
train_items = set(train_data['item_id'].unique())
common_users = len(train_users.intersection(set(test_data['user_id'].unique())))
common_items = len(train_items.intersection(set(test_data['item_id'].unique())))

print(f"Usuarios comunes en train y test: {common_users}/{len(train_users)}")
print(f"Items comunes en train y test: {common_items}/{len(train_items)}")

Tamaño del conjunto de entrenamiento: (227893, 3)
Tamaño del conjunto de prueba: (53735, 3)
Usuarios comunes en train y test: 14541/15517
Items comunes en train y test: 15474/34132


#### Creación de matrices

Mapeos para los datos de entrenamiento

In [178]:
train_users_list = train_data['user_id'].unique()
train_items_list = train_data['item_id'].unique()

user2idx = {user_id: idx for idx, user_id in enumerate(train_users_list)}
item2idx = {item_id: idx for idx, item_id in enumerate(train_items_list)}

idx2user = {idx: user for user, idx in user2idx.items()}
idx2item = {idx: item for item, idx in item2idx.items()}

NUM_USERS = len(user2idx)
NUM_ITEMS = len(item2idx)
MIN_RATING = df['rating'].min()
MAX_RATING = df['rating'].max()

print(f"Usuarios: {NUM_USERS}, Items: {NUM_ITEMS}")

Usuarios: 15517, Items: 34132


Matriz de train sparse

In [179]:
# Inicializar matriz vacía
train_matrix = sp.lil_matrix((NUM_USERS, NUM_ITEMS))

# Llenar con valores sin acumular
for _, row in train_data.iterrows():
    if row['user_id'] in user2idx and row['item_id'] in item2idx:
        u_idx = user2idx[row['user_id']]
        i_idx = item2idx[row['item_id']]
        # Asignar directamente, sin acumular
        train_matrix[u_idx, i_idx] = row['rating']

# Convertir a CSR para operaciones eficientes
train_matrix = train_matrix.tocsr()

Matriz de test sparse

In [180]:
test_matrix = sp.lil_matrix((NUM_USERS, NUM_ITEMS))

# Llenar con valores sin acumular
for _, row in test_data.iterrows():
    if row['user_id'] in user2idx and row['item_id'] in item2idx:
        u_idx = user2idx[row['user_id']]
        i_idx = item2idx[row['item_id']]
        # Asignar directamente, sin acumular
        test_matrix[u_idx, i_idx] = row['rating']

# Convertir a CSR para operaciones eficientes
test_matrix = test_matrix.tocsr()

Medidas

In [181]:
print(f"Forma de la matriz de train: {train_matrix.shape}")
print(f"Densidad de la matriz de train: {train_matrix.nnz / (NUM_USERS * NUM_ITEMS):.6f}")
print(f"Forma de la matriz de test: {test_matrix.shape}")
print(f"Densidad de la matriz de test: {test_matrix.nnz / (NUM_USERS * NUM_ITEMS):.6f}")

Forma de la matriz de train: (15517, 34132)
Densidad de la matriz de train: 0.000416
Forma de la matriz de test: (15517, 34132)
Densidad de la matriz de test: 0.000101


# Matrix Factorization

In [161]:
import random
import numpy as np

# Hiperparámetros para Factorización Matricial
NUM_FACTORS = 10  # Número de factores latentes
LEARNING_RATE = 0.001  # Tasa de aprendizaje (gamma)
REGULARIZATION = 0.1  # Parámetro de regularización (lambda)
NUM_ITERATIONS = 10  # Número de iteraciones de entrenamiento

Inicializamos las matrices de factores con valores uniformes aleatorios en el intervalo \[0, 1].

In [None]:
def init_matrix_factorization_model():
    """Inicializa los parámetros del modelo de factorización matricial"""
    # Inicializar matriz de factores de usuario aleatoriamente
    p = np.random.random((NUM_USERS, NUM_FACTORS))
    
    # Inicializar matriz de factores de ítem aleatoriamente
    q = np.random.random((NUM_ITEMS, NUM_FACTORS))
    
    return p, q


Como hemos comentado, calcular la predicción del voto del usuario *u* al item *i* implicar realizar el producto escalar de sus vectores de factores. La siguiente función realiza esta operación:

In [None]:
def compute_prediction(p_u, q_i):
    """Calcula la predicción de valoración para un par usuario-ítem usando producto escalar"""
    return np.dot(p_u, q_i)



Implementemos la función de entrenamiento para el modelo PMF básico:



In [163]:
def train_pmf(train_matrix, num_iterations=NUM_ITERATIONS, learning_rate=LEARNING_RATE, 
            reg_param=REGULARIZATION, num_factors=NUM_FACTORS):
    """Entrena el modelo PMF usando descenso de gradiente"""
    # Inicializar parámetros del modelo
    p, q = init_matrix_factorization_model()
    
    print("Entrenando modelo PMF...")
    for it in range(num_iterations):
        print(f"Iteración {it+1}/{num_iterations}")
        
        # Procesar todas las valoraciones conocidas
        for u, i in zip(*train_matrix.nonzero()):
            # Obtener la valoración real
            r_ui = train_matrix[u, i]
            
            # Calcular predicción
            pred = compute_prediction(p[u], q[i])
            
            # Calcular error
            e_ui = r_ui - pred
            
            # Actualizar factores de usuario e ítem
            p_u_old = p[u].copy()
            q_i_old = q[i].copy()
            
            # Actualización por descenso de gradiente
            p[u] += learning_rate * (e_ui * q_i_old - reg_param * p_u_old)
            q[i] += learning_rate * (e_ui * p_u_old - reg_param * q_i_old)
    
    return p, q



Ahora implementemos la versión mejorada con términos de sesgo:



In [None]:
def compute_biased_prediction(avg, bu, bi, p_u, q_i):
    """Calcula la predicción de valoración con términos de sesgo"""
    return avg + bu + bi + np.dot(p_u, q_i)

def calculate_global_mean(train_matrix):
    """Calcula la media global de valoraciones"""
    return train_matrix.sum() / train_matrix.getnnz() if train_matrix.getnnz() > 0 else 0



Implementemos la función de entrenamiento para el PMF con sesgos:



In [None]:
def train_biased_pmf(train_matrix, num_iterations=NUM_ITERATIONS, learning_rate=LEARNING_RATE, 
                   reg_param=REGULARIZATION, num_factors=NUM_FACTORS):
    """Entrena el modelo PMF mejorado con sesgos usando descenso de gradiente"""
    # Inicializar parámetros del modelo
    p, q = init_matrix_factorization_model()
    
    # Inicializar términos de sesgo
    bu = np.zeros(NUM_USERS)
    bi = np.zeros(NUM_ITEMS)
    
    # Calcular media global
    avg = calculate_global_mean(train_matrix)
    print(f"Media global de valoraciones: {avg:.4f}")
    
    print("Entrenando modelo PMF con sesgos...")
    for it in range(num_iterations):
        print(f"Iteración {it+1}/{num_iterations}")
        
        # Procesar todas las valoraciones conocidas
        for u, i in zip(*train_matrix.nonzero()):
            # Obtener la valoración real
            r_ui = train_matrix[u, i]
            
            # Calcular predicción con términos de sesgo
            pred = compute_biased_prediction(avg, bu[u], bi[i], p[u], q[i])
            
            # Calcular error
            e_ui = r_ui - pred
            
            # Actualizar términos de sesgo
            bu_old = bu[u]
            bi_old = bi[i]
            bu[u] += learning_rate * (e_ui - reg_param * bu_old)
            bi[i] += learning_rate * (e_ui - reg_param * bi_old)
            
            # Actualizar factores de usuario e ítem
            p_u_old = p[u].copy()
            q_i_old = q[i].copy()
            
            # Actualización por descenso de gradiente
            p[u] += learning_rate * (e_ui * q_i_old - reg_param * p_u_old)
            q[i] += learning_rate * (e_ui * p_u_old - reg_param * q_i_old)
    
    return mu, bu, bi, p, q




Añadimos funciones para generar predicciones usando los modelos entrenados:



In [None]:
def generate_pmf_predictions(test_indices, p, q):
    """Genera predicciones para ítems de prueba usando el modelo PMF básico"""
    predictions = {}
    
    for u, i in test_indices:
        if u < len(p) and i < len(q):
            if u not in predictions:
                predictions[u] = {}
            predictions[u][i] = compute_prediction(p[u], q[i])
    
    return predictions

def generate_biased_pmf_predictions(test_indices, avg, bu, bi, p, q):
    """Genera predicciones para ítems de prueba usando el modelo PMF con sesgos"""
    predictions = {}
    
    for u, i in test_indices:
        if u < len(p) and i < len(q):
            if u not in predictions:
                predictions[u] = {}
            predictions[u][i] = compute_biased_prediction(avg, bu[u], bi[i], p[u], q[i])
    
    return predictions

Ahora definimos las funciones de evaluación del modelo

In [167]:
# Funciones auxiliares para evaluación de modelos de factorización matricial

def get_recommendations_from_dict(predictions_dict, user_id, n=10):
    """
    Obtiene las n mejores recomendaciones para un usuario dado un diccionario de predicciones
    
    Args:
        predictions_dict: Diccionario con formato {usuario: {item: predicción}}
        user_id: ID del usuario
        n: Número de recomendaciones (por defecto 10)
        
    Returns:
        Lista con los n ítems recomendados
    """
    if user_id not in predictions_dict:
        return None
        
    user_preds = predictions_dict[user_id]
    
    # Ordenar las predicciones por valor (de mayor a menor)
    sorted_items = sorted(user_preds.items(), key=lambda x: x[1], reverse=True)
    
    # Extraer solo los IDs de ítem
    recommendations = [item_id for item_id, _ in sorted_items[:n]]
    
    return recommendations

# Funciones para MAE y RMSE
def get_user_mae_from_dict(u, predictions_dict, test_matrix):
    """Calcula MAE para un usuario con predicciones en formato diccionario"""
    if u not in predictions_dict:
        return None
        
    mae = 0
    count = 0
    
    for i in predictions_dict[u]:
        if test_matrix[u, i] != 0:
            mae += abs(float(test_matrix[u, i]) - predictions_dict[u][i])
            count += 1
    
    if count > 0:
        return mae / count
    else:
        return None

def get_mae_from_dict(predictions_dict, test_matrix):
    """Calcula MAE global con predicciones en formato diccionario"""
    mae = 0
    count = 0
    
    for u in predictions_dict:
        user_mae = get_user_mae_from_dict(u, predictions_dict, test_matrix)
        if user_mae is not None:
            mae += user_mae
            count += 1
    
    if count > 0:
        return mae / count
    else:
        return None
        
def get_user_rmse_from_dict(u, predictions_dict, test_matrix):
    """Calcula RMSE para un usuario con predicciones en formato diccionario"""
    if u not in predictions_dict:
        return None
        
    mse = 0
    count = 0
    
    for i in predictions_dict[u]:
        if test_matrix[u, i] != 0:
            mse += (float(test_matrix[u, i]) - predictions_dict[u][i]) ** 2
            count += 1
    
    if count > 0:
        return math.sqrt(mse / count)
    else:
        return None

def get_rmse_from_dict(predictions_dict, test_matrix):
    """Calcula RMSE global con predicciones en formato diccionario"""
    rmse = 0
    count = 0
    
    for u in predictions_dict:
        user_rmse = get_user_rmse_from_dict(u, predictions_dict, test_matrix)
        if user_rmse is not None:
            rmse += user_rmse
            count += 1
    
    if count > 0:
        return rmse / count
    else:
        return None

# Funciones para Precision, Recall y F1
def get_user_precision_from_dict(u, predictions_dict, test_matrix, theta=4, n=10):
    """Calcula precisión para un usuario con predicciones en formato diccionario"""
    if u not in predictions_dict:
        return None
        
    recommendations = get_recommendations_from_dict(predictions_dict, u, n)
    
    if not recommendations:
        return None
    
    precision = 0
    count = 0
    
    for i in recommendations:
        if test_matrix[u, i] != 0:
            precision += 1 if float(test_matrix[u, i]) >= theta else 0
            count += 1
    
    if count > 0:
        return precision / count
    else:
        return None

def get_precision_from_dict(predictions_dict, test_matrix, theta=4, n=10):
    """Calcula precisión global con predicciones en formato diccionario"""
    precision = 0
    count = 0
    
    for u in predictions_dict:
        user_precision = get_user_precision_from_dict(u, predictions_dict, test_matrix, theta, n)
        if user_precision is not None:
            precision += user_precision
            count += 1
    
    if count > 0:
        return precision / count
    else:
        return None

def get_user_recall_from_dict(u, predictions_dict, test_matrix, theta=4, n=10):
    """Calcula recall para un usuario con predicciones en formato diccionario"""
    if u not in predictions_dict:
        return None
        
    recommendations = get_recommendations_from_dict(predictions_dict, u, n)
    
    if not recommendations:
        return None
    
    recall = 0
    count = 0
    
    test_items = test_matrix[u].nonzero()[1]
    
    for i in test_items:
        if float(test_matrix[u, i]) >= theta:
            recall += 1 if i in recommendations else 0
            count += 1
    
    if count > 0:
        return recall / count
    else:
        return None

def get_recall_from_dict(predictions_dict, test_matrix, theta=4, n=10):
    """Calcula recall global con predicciones en formato diccionario"""
    recall = 0
    count = 0
    
    for u in predictions_dict:
        user_recall = get_user_recall_from_dict(u, predictions_dict, test_matrix, theta, n)
        if user_recall is not None:
            recall += user_recall
            count += 1
    
    if count > 0:
        return recall / count
    else:
        return None

def get_user_f1_from_dict(u, predictions_dict, test_matrix, theta=4, n=10):
    """Calcula F1 para un usuario con predicciones en formato diccionario"""
    precision = get_user_precision_from_dict(u, predictions_dict, test_matrix, theta, n)
    recall = get_user_recall_from_dict(u, predictions_dict, test_matrix, theta, n)
    
    if precision is None or recall is None:
        return None
    elif precision == 0 and recall == 0:
        return 0
    else:
        return 2 * precision * recall / (precision + recall)

def get_f1_from_dict(predictions_dict, test_matrix, theta=4, n=10):
    """Calcula F1 global con predicciones en formato diccionario"""
    f1 = 0
    count = 0
    
    for u in predictions_dict:
        user_f1 = get_user_f1_from_dict(u, predictions_dict, test_matrix, theta, n)
        if user_f1 is not None:
            f1 += user_f1
            count += 1
    
    if count > 0:
        return f1 / count
    else:
        return None

# Funciones para nDCG
def get_sorted_test_items_from_dict(u, test_matrix):
    """Retorna los ítems del test para un usuario ordenados por valoración (descendente)"""
    test_items = test_matrix[u].nonzero()[1]
    items = []
    
    if len(test_items) == 0:
        return items
    
    for i in test_items:
        items.append(i)
    
    # Ordenamos por valoración (descendente)
    items.sort(key=lambda x: float(test_matrix[u, x]), reverse=True)
    
    return items

def get_user_dcg_from_dict(u, recommendations, test_matrix):
    """Calcula DCG para un usuario con recomendaciones"""
    dcg = 0
    
    if not recommendations:
        return 0
    
    for pos, i in enumerate(recommendations):
        # Convertir a valor escalar en lugar de array
        rating = float(test_matrix[u, i]) if test_matrix[u, i] != 0 else 0
        
        if rating != 0:
            dcg += (2 ** rating - 1) / math.log2(pos + 2)
    
    return dcg

def get_user_idcg_from_dict(u, test_matrix, theta=4):
    """Calcula el IDCG para un usuario usando la matriz de test"""
    items = get_sorted_test_items_from_dict(u, test_matrix)
    idcg = 0.0  # Inicializar como flotante
    
    for pos, i in enumerate(items):
        # Asegurarse de que la comparación sea con un valor escalar
        rating = float(test_matrix[u, i])
        if rating >= theta:
            idcg += (2 ** rating - 1) / math.log2(pos + 2)
    
    return idcg

def get_user_ndcg_from_dict(u, predictions_dict, test_matrix, n=10, theta=4):
    """Calcula nDCG para un usuario con predicciones en formato diccionario"""
    if u not in predictions_dict:
        return None
        
    recommendations = get_recommendations_from_dict(predictions_dict, u, n)
    
    if not recommendations:
        return None
    
    dcg = get_user_dcg_from_dict(u, recommendations, test_matrix)
    idcg = get_user_idcg_from_dict(u, test_matrix, theta)
    
    if idcg == 0:
        return None
    else:
        return dcg / idcg

def get_ndcg_from_dict(predictions_dict, test_matrix, n=10, theta=4):
    """Calcula nDCG global con predicciones en formato diccionario"""
    ndcg = 0
    count = 0
    
    for u in predictions_dict:
        user_ndcg = get_user_ndcg_from_dict(u, predictions_dict, test_matrix, n, theta)
        if user_ndcg is not None:
            ndcg += user_ndcg
            count += 1
    
    if count > 0:
        return ndcg / count
    else:
        return None

# Función principal de evaluación
def evaluate_matrix_factorization(predictions_dict, test_matrix, theta=4, n=10):
    """
    Evalúa las predicciones de un modelo de factorización matricial con todas las métricas
    
    Args:
        predictions_dict: Diccionario de predicciones {usuario: {item: predicción}}
        test_matrix: Matriz de test sparse
        theta: Umbral para precisión/recall (por defecto 4)
        n: Número de recomendaciones (por defecto 10)
        
    Returns:
        Diccionario con todas las métricas calculadas
    """
    metrics = {}
    
    # Calcular métricas de error
    metrics['mae'] = get_mae_from_dict(predictions_dict, test_matrix)
    metrics['rmse'] = get_rmse_from_dict(predictions_dict, test_matrix)
    
    # Calcular métricas de ranking
    metrics['precision'] = get_precision_from_dict(predictions_dict, test_matrix, theta, n)
    metrics['recall'] = get_recall_from_dict(predictions_dict, test_matrix, theta, n)
    metrics['f1'] = get_f1_from_dict(predictions_dict, test_matrix, theta, n)
    metrics['ndcg'] = get_ndcg_from_dict(predictions_dict, test_matrix, n, theta)
    
    return metrics

Ejemplo de uso

In [168]:
# Obtener índices de prueba
test_indices = list(zip(*test_matrix.nonzero()))

# Entrenar modelo PMF básico
p, q = train_pmf(train_matrix)

# Generar predicciones para conjunto de prueba
pmf_predictions = generate_pmf_predictions(test_indices, p, q)

# Entrenar modelo PMF con sesgos
mu, bu, bi, p_biased, q_biased = train_biased_pmf(train_matrix)

# Generar predicciones para conjunto de prueba usando modelo con sesgos
biased_pmf_predictions = generate_biased_pmf_predictions(test_indices, mu, bu, bi, p_biased, q_biased)



Entrenando modelo PMF...
Iteración 1/10
Iteración 2/10
Iteración 3/10
Iteración 4/10
Iteración 5/10
Iteración 6/10
Iteración 7/10
Iteración 8/10
Iteración 9/10
Iteración 10/10
Media global de valoraciones: 4.0239
Entrenando modelo PMF con sesgos...
Iteración 1/10
Iteración 2/10
Iteración 3/10
Iteración 4/10
Iteración 5/10
Iteración 6/10
Iteración 7/10
Iteración 8/10
Iteración 9/10
Iteración 10/10


In [169]:
# Evaluar los modelos
print("\nResultados de Factorización Matricial:")
pmf_metrics = evaluate_matrix_factorization(pmf_predictions, test_matrix)
for metric, value in pmf_metrics.items():
    print(f"{metric.upper()}: {value:.4f}")

print("\nResultados de Factorización Matricial con Sesgos:")
biased_pmf_metrics = evaluate_matrix_factorization(biased_pmf_predictions, test_matrix)
for metric, value in biased_pmf_metrics.items():
    print(f"{metric.upper()}: {value:.4f}")


Resultados de Factorización Matricial:
MAE: 1.2183
RMSE: 1.2426
PRECISION: 0.7644
RECALL: 0.9990
F1: 0.9705
NDCG: 0.9964

Resultados de Factorización Matricial con Sesgos:
MAE: 1.0732
RMSE: 1.1027
PRECISION: 0.7643
RECALL: 0.9989
F1: 0.9705
NDCG: 0.9933


# Bernoulli Matrix Factorization

In [170]:
# Hiperparámetros para BeMF
NUM_FACTORS_BEMF = 10  # Número de factores latentes
LEARNING_RATE_BEMF = 0.001  # Tasa de aprendizaje
REGULARIZATION_BEMF = 0.1  # Regularización
NUM_ITERATIONS_BEMF = 10  # Iteraciones de entrenamiento

# Puntuaciones posibles (scores)
SCORES = list(range(int(MIN_RATING), int(MAX_RATING) + 1))

def logit(x):
    """Función logística para BeMF"""
    return 1 / (1 + np.exp(-x))

Inicialización del modelo BeMF

In [171]:
def init_bemf_model():
    """Inicializa las matrices de factores para cada puntuación posible"""
    # Una matriz de factores de usuario para cada puntuación
    u_factors = [np.random.random((NUM_USERS, NUM_FACTORS_BEMF)) for _ in range(len(SCORES))]
    
    # Una matriz de factores de ítem para cada puntuación
    v_factors = [np.random.random((NUM_ITEMS, NUM_FACTORS_BEMF)) for _ in range(len(SCORES))]
    
    return u_factors, v_factors

 Entrenamiento del modelo BeMF

In [172]:
def train_bemf(train_matrix, num_iterations=NUM_ITERATIONS_BEMF, 
               learning_rate=LEARNING_RATE_BEMF, reg_param=REGULARIZATION_BEMF):
    """Entrena el modelo BeMF usando descenso de gradiente"""
    # Inicializar parámetros del modelo
    u_factors, v_factors = init_bemf_model()
    
    print("Entrenando modelo BeMF...")
    for it in range(num_iterations):
        print(f"Iteración {it+1}/{num_iterations}")
        
        # Para cada posible puntuación
        for s_idx, s in enumerate(SCORES):
            
            # Actualizar factores de usuario
            for u, i in zip(*train_matrix.nonzero()):
                # Obtener la valoración real (convertir a entero)
                r_ui = int(train_matrix[u, i])
                
                # Calcular el producto escalar
                dot = np.dot(u_factors[s_idx][u], v_factors[s_idx][i])
                
                # Es puntuación s o no
                is_score_s = (r_ui == s)
                
                # Actualizar factor del usuario
                if is_score_s:
                    gradient = (1 - logit(dot)) * v_factors[s_idx][i] - reg_param * u_factors[s_idx][u]
                else:
                    gradient = -logit(dot) * v_factors[s_idx][i] - reg_param * u_factors[s_idx][u]
                
                u_factors[s_idx][u] += learning_rate * gradient
                
                # Actualizar factor del ítem
                if is_score_s:
                    gradient = (1 - logit(dot)) * u_factors[s_idx][u] - reg_param * v_factors[s_idx][i]
                else:
                    gradient = -logit(dot) * u_factors[s_idx][u] - reg_param * v_factors[s_idx][i]
                
                v_factors[s_idx][i] += learning_rate * gradient
    
    return u_factors, v_factors

Generación de predicciones

In [173]:
def compute_bemf_prediction(u, i, u_factors, v_factors):
    """Calcula la predicción para un par usuario-ítem usando BeMF"""
    prediction = None
    max_prob = 0
    
    for s_idx, s in enumerate(SCORES):
        # Producto punto de los factores latentes para la puntuación s
        dot = np.dot(u_factors[s_idx][u], v_factors[s_idx][i])
        
        # Probabilidad de la puntuación s
        prob = logit(dot)
        
        # Si la probabilidad es mayor que las anteriores, actualizar predicción
        if prob > max_prob:
            max_prob = prob
            prediction = s
    
    return prediction, max_prob

def generate_bemf_predictions(test_indices, u_factors, v_factors):
    """Genera predicciones para ítems de prueba usando el modelo BeMF"""
    predictions = {}
    
    for u, i in test_indices:
        if u < u_factors[0].shape[0] and i < v_factors[0].shape[1]:
            prediction, _ = compute_bemf_prediction(u, i, u_factors, v_factors)
            
            if u not in predictions:
                predictions[u] = {}
            predictions[u][i] = prediction
    
    return predictions

Ejecución del entrenamiento y evaluación

In [182]:
# Obtener índices de prueba
test_indices = list(zip(*test_matrix.nonzero()))

# Entrenar modelo BeMF
u_factors_bemf, v_factors_bemf = train_bemf(train_matrix)

# Generar predicciones para conjunto de prueba
bemf_predictions = generate_bemf_predictions(test_indices, u_factors_bemf, v_factors_bemf)

# Evaluar el modelo BeMF
print("\nResultados de Bernoulli Matrix Factorization:")
bemf_metrics = evaluate_matrix_factorization(bemf_predictions, test_matrix)
for metric, value in bemf_metrics.items():
    print(f"{metric.upper()}: {value:.4f}")

Entrenando modelo BeMF...
Iteración 1/10
Iteración 2/10
Iteración 3/10
Iteración 4/10
Iteración 5/10
Iteración 6/10
Iteración 7/10
Iteración 8/10
Iteración 9/10
Iteración 10/10

Resultados de Bernoulli Matrix Factorization:
MAE: 1.0865
RMSE: 1.0865
PRECISION: 0.7788
RECALL: 0.2872
F1: 0.3928
NDCG: 0.4049
