In [None]:
import random
import numpy as np
import pandas as pd
import tensorflow as tf

import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from utils.transformation import *
from utils.models import *

VAR_SEED = 42
VAR_TESTSET_SIZE = 0.20
VAR_DIR_DATA_CLEAN = '../data/cleaning'

random.seed(VAR_SEED)
np.random.seed(VAR_SEED)

catalogo = pd.read_csv(f"{VAR_DIR_DATA_CLEAN}/catalogo.csv", sep=",", encoding="latin1")[['id_ejercicio', 'h1', 'h2', 'h3', 'h4', 's1', 's2', 's3', 's4', 'k1', 'k2', 'k3', 'k4']]
mf_dataset = pd.read_csv(f"{VAR_DIR_DATA_CLEAN}/mf_dataset.csv", sep=",", encoding="latin1")
diagnostico = pd.read_csv(f"{VAR_DIR_DATA_CLEAN}/prueba_diagnostico.csv", sep=",", encoding="latin1")[['id_estudiante', 'score_a', 'score_p', 'score_d', 'score_s']]

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Definición del modelo UserTowerRecommender
class UserTowerRecommender(nn.Module):
    def __init__(self, user_feature_dim, hidden_dims, num_categories):
        super(UserTowerRecommender, self).__init__()
        
        # Define the user tower as a series of dense layers
        layers = []
        input_dim = user_feature_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(input_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.5))  # Add dropout for regularization
            input_dim = hidden_dim
        self.user_tower = nn.Sequential(*layers)
        
        # Final layer to produce predictions for each category
        self.output_layer = nn.Linear(input_dim, num_categories)

    def forward(self, user_features):
        user_out = self.user_tower(user_features)
        return self.output_layer(user_out)

# Suposiciones:
# m características generales de los usuarios
# 3 características por ítem
# n ítems con los que interactúa cada usuario

m = 5  # Por ejemplo, 5 características generales del usuario
n = 10  # El usuario ha interactuado con 10 ítems
item_features_per_item = 3  # Cada ítem tiene 3 características

# Total de características de entrada por usuario
user_feature_dim = m + (item_features_per_item * n)  # m características + 3 * n características de los ítems

hidden_dims = [128, 64]  # Dimensiones de las capas ocultas
num_categories = 5  # Número de categorías a predecir (por ejemplo, 5 tipos de productos)

# Crear el modelo
model = UserTowerRecommender(user_feature_dim, hidden_dims, num_categories)

# Simulamos datos de entrenamiento
num_samples = 1000  # Número de usuarios en el dataset de entrenamiento
train_features = torch.randn(num_samples, user_feature_dim)  # Características del usuario
train_labels = torch.randint(0, 2, (num_samples, num_categories)).float()  # Etiquetas binarias de interés (0 o 1)

# Función de entrenamiento
def train_model(model, train_features, train_labels, num_epochs=50, learning_rate=0.001):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.BCEWithLogitsLoss()  # Usamos BCEWithLogitsLoss para clasificación binaria
    
    for epoch in range(num_epochs):
        model.train()  # Establecer el modelo en modo de entrenamiento
        optimizer.zero_grad()  # Limpiar los gradientes previos
        
        # Realizar la predicción
        outputs = model(train_features)
        
        # Calcular la pérdida
        loss = criterion(outputs, train_labels)
        
        # Realizar la retropropagación
        loss.backward()
        
        # Actualizar los pesos del modelo
        optimizer.step()
        
        # Imprimir la pérdida cada cierto número de épocas
        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

# Entrenar el modelo
train_model(model, train_features, train_labels, num_epochs=50, learning_rate=0.001)

# Realizar predicciones
user_features_test = torch.randn(1, user_feature_dim)  # Características de un usuario de prueba
category_scores = model(user_features_test)

print("Predicciones para el usuario de prueba:", category_scores)


# Traditional two towers recommender systems

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class UserTowerRecommender(nn.Module):
    def __init__(self, user_feature_dim, hidden_dims, num_categories):
        super(UserTowerRecommender, self).__init__()
        
        # Define the user tower as a series of dense layers
        layers = []
        input_dim = user_feature_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(input_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.5))  # Add dropout for regularization
            input_dim = hidden_dim
        self.user_tower = nn.Sequential(*layers)
        
        # Final layer to produce predictions for each category
        self.output_layer = nn.Linear(input_dim, num_categories)

    def forward(self, user_features):
        user_out = self.user_tower(user_features)
        return self.output_layer(user_out)

# Example usage
user_feature_dim = 10  # Suppose each user is represented by 10 features
hidden_dims = [128, 64]  # Two hidden layers
num_categories = 5  # Predict interest in 5 categories

model = UserTowerRecommender(user_feature_dim, hidden_dims, num_categories)
user_features = torch.rand((1, user_feature_dim))
category_scores = model(user_features)

print(category_scores)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class ItemTowerRecommender(nn.Module):
    def __init__(self, item_feature_dim, hidden_dims):
        super(ItemTowerRecommender, self).__init__()
        
        # Define the item tower as a series of dense layers
        layers = []
        input_dim = item_feature_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(input_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.5))  # Add dropout for regularization
            input_dim = hidden_dim
        self.item_tower = nn.Sequential(*layers)
        
        # Final layer to produce a score for the item's predicted popularity
        self.output_layer = nn.Linear(input_dim, 1)

    def forward(self, item_features):
        item_out = self.item_tower(item_features)
        return self.output_layer(item_out)

# Example usage
item_feature_dim = 8  # Suppose each item is represented by 8 features
hidden_dims = [64, 32]  # Two hidden layers

model = ItemTowerRecommender(item_feature_dim, hidden_dims)
item_features = torch.rand((1, item_feature_dim))
popularity_score = model(item_features)

print(popularity_score)

# Dense and Sparse Vectors

In [None]:
import numpy as np

# Sample user-item interaction matrix (ratings)
R = np.array([
    [5, 3, 0, 1],
    [4, 0, 0, 1],
    [1, 1, 0, 5],
    [1, 0, 0, 4],
    [0, 1, 5, 4],
])

# Perform SVD
U, sigma, Vt = np.linalg.svd(R, full_matrices=False)

print("User embeddings (dense vectors):")
print(U)

print("\nItem embeddings (dense vectors):")
print(Vt.T)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split

# Define the Neural Network
class ComplexDenseRepresentationNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, dense_dim):
        super(ComplexDenseRepresentationNetwork, self).__init__()
        
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, dense_dim)
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

# Custom Dataset
class CustomDataset(Dataset):
    def __init__(self, data, targets):
        self.data = data
        self.targets = targets
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.targets[idx]

# Hyperparameters
input_dim = 10
hidden_dim = 20
dense_dim = 5
batch_size = 5
learning_rate = 0.01
epochs = 2000

# Sample data: 10 one-hot encoded vectors representing 10 items
data = torch.eye(10)
targets = torch.rand(10, 5)

# Split data into training and validation sets (80% train, 20% validation)
dataset = CustomDataset(data, targets)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Initialize the network
model = ComplexDenseRepresentationNetwork(input_dim, hidden_dim, dense_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training Loop with Validation
for epoch in range(epochs):
    model.train()
    train_loss = 0.0
    for batch_data, batch_targets in train_loader:
        outputs = model(batch_data)
        loss = criterion(outputs, batch_targets)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch_data, batch_targets in val_loader:
            outputs = model(batch_data)
            loss = criterion(outputs, batch_targets)
            val_loss += loss.item()
    val_loss /= len(val_loader)

    # Print losses every 100 epochs
    if (epoch+1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

# Extract and print dense vectors
with torch.no_grad():
    dense_vectors = model(data)
    print(dense_vectors)

# Contextual Recommendations

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class ComplexContextualRecommender(nn.Module):
    def __init__(self, num_users, num_items, num_contexts, user_embed_dim, item_embed_dim, context_embed_dim):
        super(ComplexContextualRecommender, self).__init__()

        # Embedding layers with different dimensions
        self.user_embedding = nn.Embedding(num_users, user_embed_dim)
        self.item_embedding = nn.Embedding(num_items, item_embed_dim)
        self.context_embedding = nn.Embedding(num_contexts, context_embed_dim)

        # Calculate total embeddings dimension
        total_embed_dim = user_embed_dim + item_embed_dim + context_embed_dim
        
        # Dense layers with InstanceNorm and Dropout
        self.fc1 = nn.Linear(total_embed_dim, 256)
        self.in1 = nn.InstanceNorm1d(256)
        self.fc2 = nn.Linear(256, 128)
        self.in2 = nn.InstanceNorm1d(128)
        self.fc3 = nn.Linear(128, 64)
        self.in3 = nn.InstanceNorm1d(64)
        self.fc_out = nn.Linear(64, 1)  # Output a score
        self.dropout = nn.Dropout(0.5)

    def forward(self, user, item, context):
        user_embed = self.user_embedding(user)
        item_embed = self.item_embedding(item)
        context_embed = self.context_embedding(context)

        # Concatenate the embeddings
        x = torch.cat([user_embed, item_embed, context_embed], 1)

        # Pass through dense layers with activations, instance normalization, and dropout
        x = self.dropout(F.relu(self.in1(self.fc1(x).unsqueeze(1)).squeeze(1)))
        x = self.dropout(F.relu(self.in2(self.fc2(x).unsqueeze(1)).squeeze(1)))
        x = self.dropout(F.relu(self.in3(self.fc3(x).unsqueeze(1)).squeeze(1)))
        x = self.fc_out(x)
        return x

# Hyperparameters and Data
num_users = 1000
num_items = 5000
num_contexts = 3
user_embed_dim = 50
item_embed_dim = 100
context_embed_dim = 10
batch_size = 32

# Initialize the complex model
model = ComplexContextualRecommender(num_users, num_items, num_contexts, user_embed_dim, item_embed_dim, context_embed_dim)
criterion = nn.MSELoss() # Assuming a regression problem
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Generate random sample data
users = torch.randint(0, num_users, (1000,))
items = torch.randint(0, num_items, (1000,))
contexts = torch.randint(0, num_contexts, (1000,))
ratings = torch.rand((1000, 1)) # Random ratings

# Sample Training loop for 100 epochs
for epoch in range(100):
    model.train()
    total_loss = 0.0

    # Create mini-batches
    for i in range(0, len(users), batch_size):
        batch_users = users[i:i+batch_size]
        batch_items = items[i:i+batch_size]
        batch_contexts = contexts[i:i+batch_size]
        batch_ratings = ratings[i:i+batch_size]

        # Skip the batch if size is 1
        if len(batch_users) == 1:
            continue

        optimizer.zero_grad()
        outputs = model(batch_users, batch_items, batch_contexts)
        loss = criterion(outputs, batch_ratings)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    if (epoch+1) % 10 == 0:
        avg_loss = total_loss / (len(users) // batch_size)
        print(f'Epoch [{epoch+1}/100], Loss: {avg_loss:.4f}')


# Example prediction
user = torch.tensor([5])
item = torch.tensor([100])
context = torch.tensor([2])
score = model(user, item, context)
print(f"Predicted Score: {score.item():.4f}")