# Sistema de recomendación de filtro colaborativo basado en modelos

Se busca realizar cinco recomendaciones de productos a usuarios basandose en la similitud con otros usuarios y los productos comprados. A través de un sistema de recomendación de filtro colaborativo basado en modelos implementando la estrategia de descomposición de valor único (SVD) de la matriz usuario-item que muestra la interacción de cada usuario con los productos de tecnología de una tienda global. Se utilizo inicialmente un dataset con un tamaño de 51290 filas y 24 columnas. La matriz usuario-item, se construyo filtrando el dataset para solo productos de tecnología, donde las filas son el id del usuario, las columnas el id del producto y los valores índica si el usuario compro el producto(1) o no compro el producto(0).

In [3]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [4]:
# Ruta del archivo
base_dir = os.path.dirname(os.getcwd())
url = os.path.join(base_dir, "data", "processed", "users_matriz_items.csv")

# Cargando el archivo
try: 
    data = pd.read_csv(url, sep=',', encoding='utf-8', index_col=0)
    print("Carga exitosa: ", data.shape)
    
except FileNotFoundError:
    print("Error: Archivo no encontrado. Verifica la ruta.")
    
except PermissionError:
    print("Error: No tienes permiso para acceder al archivo.")
    
except pd.errors.EmptyDataError:
    print("Error: El archivo esta vacío")
    
except pd.errors.ParserError:
    print("Error: Fallo al analizar el archivo. Verifica el formato")
    
except Exception as e:
    print(f"Error inesperado: {e}")

Carga exitosa:  (1301, 2375)


## Separar datos de entrenamiento y prueba

In [5]:
from sklearn.model_selection import train_test_split


# División del conjunto de datos en entrenamiento y prueba
train, test = train_test_split(
    data, test_size=0.2, random_state=42
    )

print(f"Tamaño conjunto de datos de entrenamiento: {train.shape}")
print(f"Tamaño conjunto de datos de prueba: {test.shape}")

Tamaño conjunto de datos de entrenamiento: (1040, 2375)
Tamaño conjunto de datos de prueba: (261, 2375)


### Convertir a matriz dispersa

In [6]:
from scipy.sparse import csr_matrix

train_matrix = csr_matrix(train)
test_matrix = csr_matrix(test)

print(train_matrix.shape)
print(test_matrix.shape)

(1040, 2375)
(261, 2375)


## Aplicar TruncatedSVD

In [7]:
from sklearn.decomposition import TruncatedSVD


# Para tratar la indeterminación de signo primero se ajusta en los datos de entrenamiento y luego se usar ese mismo ajuste
# Para transformar los datos de entrenamiento, prueba y datos nuevos.
svd = TruncatedSVD(
    n_components=443, 
    n_iter=7, 
    random_state=42, 
    tol=0.0, 
    n_oversamples=13, 
    power_iteration_normalizer='LU'
    )

# Ajustar el modelo a los datos de entrenamiento
svd.fit(train_matrix)

# Transformar los datos de entrenamiento
U = svd.transform(train_matrix)
s = np.diag(svd.singular_values_)
Vt = svd.components_

In [8]:
# Por el momento se asume que los primeros valores singulares contendran la mayor información de la matriz usuario-items
# Verificando el la matriz de vectores singulares izquierdos
print(U.shape)
# Verificando el tamaño de la matriz de valores singulares
print(s.shape)
# Verificando el tamaño de la matriz de vectores singulares singulares derechos
print(Vt.shape)

(1040, 443)
(443, 443)
(443, 2375)


## Reconstruir matriz

Reconstruyendo la matriz latente apróximada con la matriz de usuario y la matriz de items

In [9]:
matriz_user = U
matriz_items = Vt

In [10]:
# Reconstruyendo la matriz apróxima a través del producto punto
matriz_prediction = np.dot(matriz_user, matriz_items)

# Convirtiendo a dataframe para un manejo más fácil
df_prediction = pd.DataFrame(
    matriz_prediction, 
    index=train.index, 
    columns=train.columns
    )

## Pruebas

In [None]:
url_base = os.path.dirname(os.getcwd())
url = os.path.join(url_base, "data", "processed", "processed_data.csv")
data = pd.read_csv(url, encoding="utf-8", sep=",")

def recomendar_productos(user_id, top=5):
    """_summary_

    Args:
        user_id (pandas.core.series.Series): Series with a length of 2375
        top (int, optional): Number (n) of products to display. Defaults to 5.

    Returns:
        _type_: return the top n recommended products
    """
    # Se obtiene los productos comprados por el usuario para no incluirlos en las predicciones
    prod_comprados = train.loc[user_id]
    prod_comprados = list(prod_comprados[prod_comprados > 0].index)
    
    # Obtener las puntuaciones predichas para el usuario
    scores = df_prediction.loc[user_id]
    scores = scores.drop(prod_comprados)
    
    # Ordenar descendente las puntuacioens
    recommended_products = scores.sort_values(ascending=False).head(top)
    
    product_names = data.set_index("Product ID")["Product Name"].to_dict()
    
    recommended_products.index = recommended_products.index.map(lambda x: product_names.get(x, 'Desconocido'))
        
    return recommended_products

usuario_id = "BM-1140"
top_productos = recomendar_productos(usuario_id, top=10)

print(f"Productos recomendados para el usuario {usuario_id}:")
print(top_productos)

Productos recomendados para el usuario BM-1140:
Okidata Inkjet, Durable             0.021021
Belkin Memory Card, Programmable    0.010778
Okidata Calculator, White           0.009914
StarTech Calculator, Wireless       0.009021
Samsung Signal Booster, VoIP        0.008348
Enermax Router, Bluetooth           0.007877
Logitech Flash Drive, Bluetooth     0.007293
HP Copy Machine, Color              0.007235
Memorex Router, USB                 0.007159
Motorola Smart Phone, Cordless      0.006951
Name: BM-1140, dtype: float64


## Predicciones con los datos de prueba

In [None]:
matriz_test = svd.transform(test)

In [None]:
# Recomendar productos multiplicando usuarios (test) con productos (items)
predicciones = np.dot(matriz_test, matriz_items)
df_predicciones = pd.DataFrame(predicciones, index=test.index, columns=test.columns)

## Evaluación de rendimiento

### Proporción de variación explicada

Se verifica cuánta variación total explica el modelo con los componentes seleccionados. Si el valor es cercano a 1, significa que la mayoria de la  variación en los datos es explicada por los n_components seleccionados.

In [None]:
print(f"El porcentaje de varianza explícada con {svd.n_components} componentes es {round(svd.explained_variance_ratio_.sum(), 2)} %")

### Precisión

Precisión = Relevantes n recomendados @ k / cantidad de valores reales

relevantes: interacciones reales

n: intersección

In [None]:
def precision_k(data_real, data_pred, k=6):
    # Recorremos el número de indice de la longitud de los usuarios
    precisiones = []
    
    for user in range(data_real.shape[0]):
        
        # Buscamos las interaciones reales que tiene el usuario con los productos
        items_reales = set(np.asarray(data_real.iloc[user] > 0).nonzero()[0])
        # Buscamos las interaciones predichas que tiene el usuario con los productos
        items_pred = set(np.argsort(data_pred.iloc[user])[-k:][::-1])
        
        # Definimos para que la precisión se calcule en los usuarios que realizaron interaciones
        if len(items_reales) > 0:
            interaciones = len(items_reales & items_pred)
            precision = interaciones / min(k, len(items_reales))
            precisiones.append(precision)
    
    return np.mean(precisiones) if precisiones else 0

In [None]:
precision_k(test, df_predicciones, k=6)

Recall@k

recallk = Relevantes recomendados en el top k / total relevantes

In [None]:
# Función para calcular el RecallK
def recall_k(data_real, data_pred, k=5):
    
    # Comprobar si los datos son un dataframe o un array
    if not isinstance(data_real, (pd.DataFrame, np.ndarray)) or not isinstance(data_pred, (pd.DataFrame, np.ndarray)):
        raise TypeError("Los datos deben ser dataframe o ndarray")
    
    if not isinstance(k, int) or k <= 0:
        raise ValueError("El parámetro k debe ser un entero positivo.")
    
    recalls = []

    for user in range(len(data_real)):
        items_real = set(np.asarray(data_real.iloc[user] > 0).nonzero()[0])
        items_pred = set(np.argsort(data_pred.iloc[user])[-k:][::-1])
        
        if len(items_real) > 0:
            interseccion = len(items_real & items_pred)
            recall = interseccion / len(items_real)
            recalls.append(recall)
    
    return np.mean(recalls) if recalls else 0.0

In [None]:
recall_k(test, df_predicciones, k=6)

### MAP(Mean Average Precision)

In [None]:
def mean_average_precision(data_real, data_pred, k=5):
    
    # Verificando que los datos sean un dataframe o un array
    if not isinstance(data_real, (pd.DataFrame, np.ndarray)) or not isinstance(data_pred, (pd.DataFrame, np.ndarray)):
        raise TypeError("Los datos deben ser un dataframe o un array")
    
    # Verificando que el valor k sea entero y mayor a 0
    if not isinstance(k, int) or k <= 0:
        raise ValueError("k debe ser un entero mayor a 0")
    
    ap_list = []
    
    for user in range(len(data_real)):
        items_reales = set(np.asarray(data_real.iloc[user] > 0).nonzero()[0])
        items_pred = set(np.argsort(data_pred.iloc[user])[-k:][::-1])
        
        if len(items_reales) > 0:
            hits = 0
            sum_precisions = 0
            for i, item in enumerate(items_pred):
                if item in items_reales:
                    hits += 1
                    sum_precisions += hits / (i + 1)
            
            ap = sum_precisions / min(k, len(items_reales))
            ap_list.append(ap)
        
    return np.mean(ap_list)

In [None]:
mean_average_precision(test, df_predicciones, k=6)

### F1 Score

In [None]:
def f1_score(data_real, data_pred, k=5):
    precision = precision_k(data_real, data_pred, k)
    recall = recall_k(data_real, data_pred, k)
    
    if precision + recall == 0:
        return 0
    return 2 * (precision * recall) / (precision + recall)

In [None]:
f1_score(test, df_predicciones, k=6)

## Optimización de hiperparámetros

In [None]:
def precision_k_scorer(estimator, X, y=None):
    X_transformed = estimator.fit_transform(X)
    score = precision_k(X, X_transformed, k=6)
    print("Score obtenido:", score)  # 🔍 Verifica si es NaN
    return score if not np.isnan(score) else 0

In [None]:
from sklearn.metrics import make_scorer

# Convertir la métrica en un scorer de scikit-learn
scoring = make_scorer(precision_k_scorer, greater_is_better=True)

In [None]:
svd.get_params(deep=True)

In [None]:
from sklearn.model_selection import RandomizedSearchCV

param_grid = {
    'n_components': np.random.randint(100, 600, 30),  # Número de componentes
    'n_iter': [5, 7, 10],  # Iteraciones de optimización
    'random_state': [42],
    'n_oversamples': np.random.randint(10, 100, 10),
    'power_iteration_normalizer': ['auto', 'QR', 'LU', 'none']
}

In [None]:
from sklearn.decomposition import TruncatedSVD

svd = TruncatedSVD()

random_search = RandomizedSearchCV(
    estimator=svd,
    param_distributions=param_grid,
    n_iter=5,  # Número de combinaciones a probar
    cv=3,
    scoring=scoring,# Validación cruzada con 3 folds
    random_state=42,
    n_jobs=-1  # Usar todos los núcleos disponibles
)

In [None]:
random_search.fit(train_disperses)

In [None]:
print("Mejores parámetros:", random_search.best_params_)