## This notebook contains models architectures

In [None]:
from typing import Callable, Any

import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import os
import numpy as np
import scipy
from scipy.sparse import csr_matrix
from pathlib import Path
from torch.utils.data import DataLoader
export_dir = os.getcwd()
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.graph_objs as go
from plotly.offline import plot
import random
import math
import heapq
from scipy.special import expit  # Sigmoid function
import itertools
from IPython.display import Latex, display
import pickle
import warnings

# Ignore FutureWarnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=RuntimeWarning)
warnings.simplefilter(action='ignore', category=UserWarning)

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
torch.set_printoptions(sci_mode=False)


In [None]:
pip install ipynb

In [None]:
from ipynb.fs.defs.utils import *
from ipynb.fs.defs.data_processing import *


## SAE Architecture for NCF Latent Concepts

In [None]:
class TopKActivation(nn.Module):
    def __init__(self, k):
        """
        Keeps only the top k values (per sample) and zeros out the rest.
        """
        super(TopKActivation, self).__init__()
        self.k = k

    def forward(self, x):

        # Get top-k values along the feature dimension.
        topk_values, _ = torch.topk(x, self.k, dim=1)
        # Threshold is the k-th largest value for each sample.
        threshold = topk_values[:, -1].unsqueeze(1).expand_as(x)
        mask = (x >= threshold).float()
        return x * mask

# -----------------------------------------------------------------------------
# 2. Custom Tied Transpose Module for Decoder
class TiedTranspose(nn.Module):
    def __init__(self, tied_layer):
        """
        Ties this module's weight to the transpose of the given linear layer's weight.
        """
        super(TiedTranspose, self).__init__()
        self.tied_layer = tied_layer

    def forward(self, x):
        # Use the transpose of the tied layer's weight.
        return nn.functional.linear(x, self.tied_layer.weight.t())



In [None]:
class SparseAutoencoderNCF(nn.Module):
    def __init__(self, input_dim=20, hidden_dim=22, topk=5, tie_weights=True):
        """
        Args:
            input_dim (int): Dimensionality of each input embedding (20).
            hidden_dim (int): Dimensionality of the latent space.
            topk (int): Number of activations to keep per sample- if using topK activation.
            tie_weights (bool): If True, tie the decoder's weight to the encoder's weight.
        """
        super(SparseAutoencoderNCF, self).__init__()
        self.encoder_linear = nn.Linear(input_dim, hidden_dim)
        self.topk_activation = nn.ReLU()

        self.tie_weights = tie_weights
        if tie_weights:
            self.decoder = TiedTranspose(self.encoder_linear)
        else:
            self.decoder = nn.Linear(hidden_dim, input_dim, bias=False)
        self.loss = []
        self.weights_loss = []
        self.test_subset_users_ind = test_subset_users
        self.test_subset_items_ind = test_subset_items

    def forward(self, x):

        latent_pre_act = self.encoder_linear(x)           # (batch, hidden_dim)
        encoded = self.topk_activation(latent_pre_act)      # (batch, hidden_dim)
        decoded = self.decoder(encoded)                     # (batch, input_dim)
        return decoded, encoded


class SAEInteractionDataset(Dataset):
    def __init__(self, interactions):
        """
        interactions: list of tuples (user_id, item_id)
        """
        self.interactions = interactions

    def __len__(self):
        return len(self.interactions)

    def __getitem__(self, idx):
        user, item = self.interactions[idx]
        return torch.tensor(user, dtype=torch.long), torch.tensor(item, dtype=torch.long)


-

## SAE Architecture for MF Latent Concepts

In [None]:
class Autoencoder(nn.Module):

      def __init__(
          self, n_latents: int, n_inputs: int, activation: Callable = nn.ReLU(), tied: bool = True,
          normalize: bool = False
      ) -> None:
          super().__init__()

          self.pre_bias = nn.Parameter(torch.zeros(n_inputs))
          self.encoder: nn.Module = nn.Linear(n_inputs, n_latents, bias=False)
          self.latent_bias = nn.Parameter(torch.zeros(n_latents))
          self.activation = activation
          if tied:
              self.decoder: nn.Linear | TiedTranspose = TiedTranspose(self.encoder)
          else:
              self.decoder = nn.Linear(n_latents, n_inputs, bias=False)
          self.normalize = normalize
          self.loss = []
          self.test_subset_users_ind = test_subset_users ####
          self.test_subset_items_ind = test_subset_items ####
          self.weights_loss = [] ####
          self.test = test_flag


      def encode_pre_act(self, x: torch.Tensor, latent_slice: slice = slice(None)) -> torch.Tensor:
          if type(x) == np.ndarray:
            x= torch.from_numpy(x)
          x = x - self.pre_bias
          latents_pre_act = F.linear(
              x, self.encoder.weight[latent_slice], self.latent_bias[latent_slice]
          )
          return latents_pre_act

      def preprocess(self, x: torch.Tensor) -> tuple[torch.Tensor, dict[str, Any]]:
          if not self.normalize:
              return x, dict()
          x, mu, std = LN(x)
          return x, dict(mu=mu, std=std)

      def encode(self, x: torch.Tensor) -> tuple[torch.Tensor, dict[str, Any]]:
          x, info = self.preprocess(x)
          return self.activation(self.encode_pre_act(x)), info

      def decode(self, latents: torch.Tensor, info: dict[str, Any] | None = None) -> torch.Tensor:
          ret = self.decoder(latents) + self.pre_bias
          if self.normalize:
              assert info is not None
              ret = ret * info["std"] + info["mu"]
          return ret

      def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
          x, info = self.preprocess(x)
          latents_pre_act = self.encode_pre_act(x)
          latents = self.activation(latents_pre_act)
          recons = self.decode(latents, info)
          return latents_pre_act, latents, recons

      @classmethod
      def from_state_dict(
          cls, state_dict: dict[str, torch.Tensor], strict: bool = True
      ) -> "Autoencoder":
          n_latents, d_model = state_dict["encoder.weight"].shape

          # Retrieve activation
          activation_class_name = state_dict.pop("activation", "ReLU")
          activation_class = ACTIVATIONS_CLASSES.get(activation_class_name, nn.ReLU)
          normalize = activation_class_name == "TopK"
          activation_state_dict = state_dict.pop("activation_state_dict", {})
          if hasattr(activation_class, "from_state_dict"):
              activation = activation_class.from_state_dict(
                  activation_state_dict, strict=strict
              )
          else:
              activation = activation_class()
              if hasattr(activation, "load_state_dict"):
                  activation.load_state_dict(activation_state_dict, strict=strict)

          autoencoder = cls(n_latents, d_model, activation=activation, normalize=normalize)
          autoencoder.load_state_dict(state_dict, strict=strict)
          return autoencoder

      def state_dict(self, destination=None, prefix="", keep_vars=False):
          sd = super().state_dict(destination, prefix, keep_vars)
          sd[prefix + "activation"] = self.activation.__class__.__name__
          if hasattr(self.activation, "state_dict"):
              sd[prefix + "activation_state_dict"] = self.activation.state_dict()
          return sd


class TiedTranspose(nn.Module):
    def __init__(self, linear: nn.Linear):
        super().__init__()
        self.linear = linear

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        assert self.linear.bias is None
        return F.linear(x, self.linear.weight.t(), None)

    @property
    def weight(self) -> torch.Tensor:
        return self.linear.weight.t()

    @property
    def bias(self) -> torch.Tensor:
        return self.linear.bias


class TopK(nn.Module):
    def __init__(self, k: int, postact_fn: Callable = nn.ReLU()) -> None:
        super().__init__()
        self.k = k
        self.postact_fn = postact_fn

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        topk = torch.topk(x, k=self.k, dim=-1)
        values = self.postact_fn(topk.values)
        result = torch.zeros_like(x)
        result.scatter_(-1, topk.indices, values)
        return result

    def state_dict(self, destination=None, prefix="", keep_vars=False):
        state_dict = super().state_dict(destination, prefix, keep_vars)
        state_dict.update({prefix + "k": self.k, prefix + "postact_fn": self.postact_fn.__class__.__name__})
        return state_dict

    @classmethod
    def from_state_dict(cls, state_dict: dict[str, torch.Tensor], strict: bool = True) -> "TopK":
        k = state_dict["k"]
        postact_fn = ACTIVATIONS_CLASSES[state_dict["postact_fn"]]()
        return cls(k=k, postact_fn=postact_fn)


ACTIVATIONS_CLASSES = {
    "ReLU": nn.ReLU,
    "Identity": nn.Identity,
    "TopK": TopK,
}


-

## MF recommender Architecture

In [None]:
class MatrixFactorization:
    def __init__(self, R, pos_idx_ex_use, neg_idx_ex_use, neg_ex_hidden, neg_ex, pos_ex_num, K, alpha, beta, iterations, pop_flag):
        self.R = R
        self.ratings = R
        self.num_users = R.shape[0]
        self.num_items = R.shape[1]
        self.K = K  # latent dimensions (e.g., 20)
        self.alpha = alpha
        self.beta = beta
        self.iterations = iterations

        self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
        self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))

        # Initialize biases
        self.b_u = np.zeros(self.num_users)
        self.b_i = np.zeros(self.num_items)
        self.b = np.mean(self.R[self.R > 0])

        self.pos_idx_ex_use = pos_idx_ex_use
        self.neg_idx_ex_use = neg_idx_ex_use
        self.neg_ex = neg_ex
        self.neg_ex_hidden = neg_ex_hidden
        self.neg_ex_use = {(row): list(filter(lambda x: x not in self.neg_ex_hidden[row],
                                self.neg_ex[row])) for row in self.R.index}
        self.pos_ex_num = pos_ex_num
        self.pop_flag = pop_flag

        self.columns = list(self.ratings.columns)
        self.batch_history = []

    def sgd_batch(self, batch_samples):
        # Convert mini-batch samples to arrays
        users = np.array([s[0] for s in batch_samples], dtype=int)
        items = np.array([s[1] for s in batch_samples], dtype=int)
        ratings = np.array([s[2] for s in batch_samples], dtype=float)

        # Compute predictions vectorized
        predictions = expit(self.b + self.b_u[users] + self.b_i[items] + np.sum(self.P[users] * self.Q[items], axis=1))
        errors = ratings - predictions  # error vector

        # Compute gradients for biases and latent factors
        grad_b_u = self.alpha * (errors - self.beta * self.b_u[users])
        grad_b_i = self.alpha * (errors - self.beta * self.b_i[items])
        grad_P = self.alpha * (errors[:, np.newaxis] * self.Q[items] - self.beta * self.P[users])
        grad_Q = self.alpha * (errors[:, np.newaxis] * self.P[users] - self.beta * self.Q[items])

        # Use np.add.at to accumulate updates correctly for duplicate indices
        np.add.at(self.b_u, users, grad_b_u)
        np.add.at(self.b_i, items, grad_b_i)
        np.add.at(self.P, users, grad_P)
        np.add.at(self.Q, items, grad_Q)

    def rmse(self):
        # Compute full prediction matrix
        predicted = expit(self.b + self.b_u[:, np.newaxis] + self.b_i[np.newaxis, :] + self.P.dot(self.Q.T))
        error = 0
        count = 0
        # Only consider positive examples (rating==1)
        for i, k, r, _ in self.samples:
            if r == 1:
                error += (1 - predicted[i, k]) ** 2
                count += 1
        return np.sqrt(error / count) if count > 0 else None

    def get_rating(self, i, j):
        return expit(self.b + self.b_u[i] + self.b_i[j] + self.P[i, :].dot(self.Q[j, :].T))

    def full_matrix(self):
        return expit(self.b + self.b_u[:, np.newaxis] + self.b_i[np.newaxis, :] + self.P.dot(self.Q.T))

    def recommend(self, user_id, top_n):
        df_user_ratings = pd.DataFrame(self.full_matrix(), index=self.R.index, columns=self.R.columns)
        df_user_ratings_one = df_user_ratings.loc[user_id]
        recommendations = [(i, df_user_ratings_one.loc[i]) for i in df_user_ratings.columns]
        recommendations.sort(key=lambda x: x[1], reverse=True)
        return recommendations[:top_n]

    def recommend_norm(self, user_id, top_n):
        return normalize_val(self.recommend(user_id, top_n))


-

## NCF recommender Architecture

In [None]:
# Dataset for user-item interactions.

class InteractionDataset(Dataset):
    def __init__(self, interactions):
        """
        interactions: list of tuples (user_id, item_id, rating)
        """
        self.interactions = interactions

    def __len__(self):
        return len(self.interactions)

    def __getitem__(self, index):
        user, item, rating, item_real_num = self.interactions[index]
        # Convert user and item to tensor and rating to float tensor
        return torch.tensor(user, dtype=torch.long), \
               torch.tensor(item, dtype=torch.long), \
               torch.tensor(rating, dtype=torch.float), \
               torch.tensor(item_real_num, dtype=torch.long)

class NeuralCollaborativeFiltering(nn.Module):
    def __init__(self, num_users, num_items, embedding_dim=20, hidden_layers=[64, 32, 16]):
        """
        num_users: total number of users
        num_items: total number of items
        embedding_dim: dimensionality of the latent representation
        hidden_layers: list containing the sizes of the hidden layers of the MLP
        """
        super(NeuralCollaborativeFiltering, self).__init__()


        #------------------------------------------------------------
        # item and user embeddings initializtion:

        # pretrained_weights_item = dataset_items_init.clone().detach()
        # # Create the embedding layer:
        # item_embedding = nn.Embedding(num_items, embedding_dim)
        # # Copy the pretrained weights into the embedding layer:
        # item_embedding.weight.data.copy_(pretrained_weights_item)


        # pretrained_weights_user = dataset_users_init.clone().detach()
        # # Create the embedding layer:
        # user_embedding = nn.Embedding(num_users, embedding_dim)
        # # Copy the pretrained weights into the embedding layer:
        # user_embedding.weight.data.copy_(pretrained_weights_user)

        self.user_embedding = user_embedding
        self.item_embedding = item_embedding

        nn.init.normal_(self.user_embedding.weight, mean=0.0, std=0.1)
        nn.init.normal_(self.item_embedding.weight, mean=0.0, std=0.1)

        #------------------------------------------------------------

        self.pos_idx_ex_use = pos_idx_ex_use
        self.neg_idx_ex_use = neg_idx_ex_use
        self.neg_ex = neg_ex
        self.neg_ex_hidden= neg_ex_hidden
        self.pos_ex_num = pos_ex_num

        # MLP:
        layers = []
        input_dim = embedding_dim * 2  # Concatenation of user and item embeddings
        for hidden_dim in hidden_layers:
            layers.append(nn.Linear(input_dim, hidden_dim))
            layers.append(nn.ReLU())
            input_dim = hidden_dim
        # Final output layer to produce the rating or score
        layers.append(nn.Linear(input_dim, 1))
        layers.append(nn.Sigmoid())
        self.fc_layers = nn.Sequential(*layers)

    def forward(self, user_indices, item_indices):
        # Look up embeddings
        user_emb = self.user_embedding(user_indices)
        item_emb = self.item_embedding(item_indices)
        # Concatenate embeddings instead of taking inner product.
        x = torch.cat([user_emb, item_emb], dim=-1)
        # Pass through the MLP to obtain prediction.
        output = self.fc_layers(x)
        return output.squeeze()