In [1]:
try:
    from google.colab import drive
    IN_COLAB = True
except ModuleNotFoundError:
    IN_COLAB = False
    
if IN_COLAB:
    drive.mount('/content/drive')
    !pip install dill
    !pip install neptune
    !pip install -U neptune-pytorch
    !mkdir -p /content/data_original && \
      cp /content/drive/MyDrive/Repo/RecoServiceTemplate/data/interactions.csv /content/data_original && \
      cp /content/drive/MyDrive/Repo/RecoServiceTemplate/data/users.csv /content/data_original && \
      cp /content/drive/MyDrive/Repo/RecoServiceTemplate/data/items.csv /content/data_original 

Mounted at /content/drive
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting dill
  Downloading dill-0.3.6-py3-none-any.whl (110 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m110.5/110.5 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dill
Successfully installed dill-0.3.6
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting neptune
  Downloading neptune-1.2.0-py3-none-any.whl (448 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m448.1/448.1 kB[0m [31m23.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting GitPython>=2.0.8 (from neptune)
  Downloading GitPython-3.1.31-py3-none-any.whl (184 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m184.3/184.3 kB[0m [31m23.2 MB/s[0m eta [36m0:00:00[0m
Collecting PyJWT (from neptune)
  Downloading PyJWT-2.7.0-py3-none-any.whl (22 kB)
Collecti

# Homework-3. AutoEncoder for hot users, experiment tracking

In [2]:
from math import ceil
import uuid
import time
from typing import Literal, TypeAlias
import os
import pickle
import sys

import torch
from torch import nn
import torch.nn.functional as F
import dill
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm

from numpy.typing import NDArray
from scipy.sparse import coo_matrix, csr_matrix, vstack

import neptune
from neptune_pytorch import NeptuneLogger
from neptune.utils import stringify_unsupported

import warnings
warnings.filterwarnings("ignore")

In [3]:
# Set random seeds
manual_seed = 123
np.random.seed(manual_seed)
torch.manual_seed(manual_seed)

<torch._C.Generator at 0x7f75c1126bb0>

In [4]:
class Columns:
    """Aliases for column names."""

    Name = "item_name"
    User = "user_id"
    Item = "item_id"
    Weight = "weight"
    Date = "date"

In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Data loading

In [7]:
input_datapath = "data_original"

In [8]:
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", 200)

interactions = pd.read_csv(os.path.join(input_datapath, "interactions.csv"), header=0)
users = pd.read_csv("data_original/users.csv")
items = pd.read_csv("data_original/items.csv")

# rename columns, convert timestamp
interactions.rename(columns={"last_watch_dt": "datetime"}, inplace=True)
interactions["datetime"] = pd.to_datetime(interactions["datetime"])

interactions["user_id"] = interactions["user_id"].astype(np.uint32)
interactions["item_id"] = interactions["item_id"].astype(np.uint16)
interactions["total_dur"] = interactions["total_dur"].astype(np.uint16)
interactions["watched_pct"] = interactions["watched_pct"].astype(np.float32)

interactions.rename(
    columns={
        "user_id": Columns.User,
        "item_id": Columns.Item,
        "datetime": Columns.Date,
        "watched_pct": Columns.Weight,
    },
    inplace=True,
)

In [9]:
interactions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5476251 entries, 0 to 5476250
Data columns (total 5 columns):
 #   Column     Dtype         
---  ------     -----         
 0   user_id    uint32        
 1   item_id    uint16        
 2   date       datetime64[ns]
 3   total_dur  uint16        
 4   weight     float32       
dtypes: datetime64[ns](1), float32(1), uint16(2), uint32(1)
memory usage: 104.5 MB


In [10]:
interactions.sample(5)

Unnamed: 0,user_id,item_id,date,total_dur,weight
5177745,766973,7308,2021-08-15,6594,82.0
4019669,719429,7107,2021-05-10,178,50.0
2248768,550244,7085,2021-07-27,202,4.0
3302005,526108,1554,2021-07-12,8918,100.0
3841491,209494,7943,2021-08-09,7454,100.0


In [16]:
# Check if implicit feedback contains NaNs and remove it
np.isnan(interactions.weight.values).nonzero()[0].size

828

In [11]:
interactions.fillna(0, inplace=True)

# Preparation

### Data generation code

In [12]:
def count_per_item(interactions: pd.DataFrame, column_name: str) -> pd.Series:
    """Get a dataframe containing size of groups formed by specific column."""
    item_groups = interactions.groupby([column_name], as_index=False).size()
    return item_groups

In [13]:
def filter_by_limits(
    interactions: pd.DataFrame, user_lim: int, item_lim: int
) -> tuple[pd.DataFrame, pd.Series, pd.Series]:
    """Filter over the dataframe iteratively until nothing could be removed.

    Args:
        interactions: The dataframe containing interactions.
        user_lim: The mininum number of rated items that one user must have.
        item_lim: The mininum number of ratings that one item must have.

    Returns:
        Returns a tuple (interactions, user_groups, item_groups), where interactions is
            filtered dataframe, user_groups is dataframe containing number of items per
                user, item_groups is dataframe containing number of ratings per item.

    """
    while True:
        end_cycle = False
        if user_lim > 0:
            user_groups = count_per_item(interactions, Columns.User)
            n_users = len(user_groups)
            mask = interactions[Columns.User].isin(
                user_groups[user_groups["size"] >= user_lim][Columns.User].values
            )
            interactions = interactions[mask]
            # Check if smth was removed in this step
            if interactions[Columns.User].nunique() == n_users:
                end_cycle = True
        if item_lim > 0:
            item_groups = count_per_item(interactions, Columns.Item)
            n_items = len(item_groups)
            mask = interactions[Columns.Item].isin(
                item_groups[item_groups["size"] >= item_lim][Columns.Item].values
            )
            interactions = interactions[mask]

            if end_cycle and interactions[Columns.Item].nunique() == n_items:
                break

    print(f"{interactions[Columns.User].nunique()} unique users left", end="\n")
    print(f"{interactions[Columns.Item].nunique()} unique items left")

    # Sort users by ids and items by time
    interactions = interactions.sort_values([Columns.User, Columns.Date])
    user_groups, item_groups = count_per_item(
        interactions, Columns.User
    ), count_per_item(interactions, Columns.Item)
    return interactions, user_groups, item_groups

In [14]:
def reindex_mappings(
    train_interactions: pd.DataFrame,
    test_interactions: pd.DataFrame,
    items: NDArray[np.float64],
) -> tuple[dict[int, int], dict[int, int]]:
    """Create the mappings for each dataframe.

    Args:
        train_interactions: The training dataframe containing interactions.
        test_interactions: The testing dataframe containing interactions.
        items: The unique item ids.

    Returns:
        Returns a tuple (user_to_ind, item_to_ind), where user_to_ind is dictionary
            containing mappings from external user ids to the internal, user_to_ind is
                dictionary containing mappings from external item ids to the internal.

    """
    users_id: NDArray[np.int64] = train_interactions[Columns.User].values
    user_to_ind: dict[int, int] = {}

    for user in users_id:
        if user not in user_to_ind:
            user_to_ind[user] = len(user_to_ind)

    users_id = test_interactions[Columns.User].values
    for user in users_id:
        if user not in user_to_ind:
            user_to_ind[user] = len(user_to_ind)

    item_to_ind: dict[int, int] = {}
    for item in items:
        item_to_ind[item] = len(item_to_ind)

    return user_to_ind, item_to_ind

In [15]:
def apply_reindex(
    interactions: pd.DataFrame, user_to_ind: dict[int, int], item_to_ind: dict[int, int]
) -> None:
    """
    Remap users and items ids using mappings.

    Args:
        interactions: The dataframe containing interactions.
        user_to_ind: The mapping from external user ids to the internal.
        item_to_ind: The mapping from external item ids to the internal.

    """
    interactions[Columns.User] = interactions[Columns.User].map(user_to_ind)
    interactions[Columns.Item] = interactions[Columns.Item].map(item_to_ind)

### Dataclass wrapping code

In [16]:
class TrainDataset(torch.utils.data.Dataset):
    """Convert train data to sparse format.

    In order to train autoencoder model, there should be a matrix, where
    rows represent users, columns - items and rating sits at intersection
    of specific user_id and item_id.

    Attributes:
        dir: The directory path where the data are allocated.
        n_items: The number of all items.
        matrix: The train interactions matrix as
            input for auetoencoders models.

    """

    def __init__(self, path: str):
        super().__init__()
        self.dir = path
        self.n_items = self._get_n_items()
        self.matrix = self._extract_train_data()
        self.shape: tuple[int, int] = self.matrix.get_shape()

    @property
    def sparsity(self) -> float:
        """Get the sparsity of train matrix."""
        return self.matrix.getnnz() / (self.shape[0] * self.shape[1])

    def _get_n_items(self) -> int:
        path = os.path.join(self.dir, "items_count.txt")
        with open(path, "r", encoding="utf8") as f:
            n_items = int(f.readline())
        return n_items

    def _extract_train_data(self) -> csr_matrix:
        path = os.path.join(self.dir, "train.csv")

        train_interactions = pd.read_csv(path)
        ratings = train_interactions[Columns.Weight].values

        n_users = len(train_interactions.groupby(Columns.User).size())

        rows, cols = (
            train_interactions[Columns.User].values,
            train_interactions[Columns.Item].values,
        )
        data = csr_matrix(
            (ratings, (rows, cols)),
            dtype="float64",
            shape=(n_users, self.n_items),
        )
        return data

    def __len__(self) -> int:
        """Return the length of dataset."""
        return self.shape[0]

    def __getitem__(self, idx) -> int:
        """Return one item from dataset by index."""
        return self.matrix[idx]

In [17]:
class TestDataset(torch.utils.data.Dataset):
    """Convert test data to sparse format.

    In order to train autoencoder model, there should be a matrix, where
    rows represent users, columns - items and rating sits at intersection
    of specific user_id and item_id.

    The only difference from train data is that we should store 2 matrixes:
    one as input for model to build reconstruction and other as validation
    part for metrics calculation.

    Attributes:
        dir: The directory path where the data are allocated.
        n_items: The number of all items.
        matrix_input: The test interactions matrix (sparse view) as input
            for auetoencoders models to build reconstruction.
        matrix_valid: The test interactions matrix (sparse view) as valididation
            to compare with reconstruction results.

    """

    def __init__(self, path: str):
        super().__init__()
        self.dir = path
        self.n_items = self._get_n_items()
        self.matrix_input, self.matrix_valid = self._extract_test_data()
        self.shape_input: tuple[int, int] = self.matrix_input.get_shape()
        self.shape_valid: tuple[int, int] = self.matrix_valid.get_shape()

    @property
    def sparsity(self) -> tuple[float, float]:
        """Get the sparsity of both test matrices."""
        return self.matrix_input.getnnz() / (
            self.shape_input[0] * self.shape_input[1]
        ), self.matrix_valid.getnnz() / (self.shape_valid[0] * self.shape_valid[1])

    def _get_n_items(self) -> int:
        path = os.path.join(self.dir, "items_count.txt")
        with open(path, "r", encoding="utf8") as f:
            n_items = int(f.readline())
        return n_items

    def _extract_test_data(self) -> tuple[csr_matrix, csr_matrix]:
        path_input = os.path.join(self.dir, "test_input.csv")
        path_valid = os.path.join(self.dir, "test_valid.csv")

        # Have equal number of users
        test_interactions_input = pd.read_csv(path_input)
        test_interactions_valid = pd.read_csv(path_valid)

        ratings_input = test_interactions_input[Columns.Weight].values
        ratings_valid = test_interactions_valid[Columns.Weight].values

        # Reindex to meet dimension boundaries
        start_idx = test_interactions_input[Columns.User].min()
        end_idx = test_interactions_input[Columns.User].max()
        n_users = end_idx - start_idx + 1

        rows_input, cols_input = (
            test_interactions_input[Columns.User].values - start_idx,
            test_interactions_input[Columns.Item].values,
        )
        rows_valid, cols_valid = (
            test_interactions_valid[Columns.User].values - start_idx,
            test_interactions_valid[Columns.Item].values,
        )

        data_input = csr_matrix(
            (ratings_input, (rows_input, cols_input)),
            dtype="float64",
            shape=(n_users, self.n_items),
        )

        data_valid = csr_matrix(
            (ratings_valid, (rows_valid, cols_valid)),
            dtype="float64",
            shape=(n_users, self.n_items),
        )

        return data_input, data_valid

    def __len__(self) -> int:
        """Return the length of dataset."""
        return self.shape_valid[0]  # Both matrices have the same len

    def __getitem__(self, idx) -> tuple[int, int]:
        """Return pair of items from dataset by index."""
        return (
            self.matrix_input[idx],
            self.matrix_valid[idx],
        )

In [18]:
def coo_matrix_to_sparse_tensor(coo: coo_matrix) -> torch.Tensor:
    """Transform scipy coo matrix to pytorch sparse tensor."""
    values = coo.data
    indices = np.vstack((coo.row, coo.col))
    shape = coo.shape

    ind = torch.LongTensor(indices)
    val = torch.FloatTensor(values)
    shp = torch.Size(shape)

    return torch.sparse_coo_tensor(ind, val, shp)

In [19]:
def train_sparse_collate_fn(batch: list[csr_matrix]) -> torch.Tensor:
    """Collate function which transform train scipy coo matrix to pytorch tensor."""
    batch_coo: coo_matrix = vstack(batch).tocoo()
    torch_sparse_batch = coo_matrix_to_sparse_tensor(batch_coo)
    return torch_sparse_batch.to_dense()

In [20]:
def test_sparse_collate_fn(
    batch: list[tuple[csr_matrix, csr_matrix]]
) -> tuple[torch.Tensor, torch.Tensor]:
    """Collate function which transform scipy test coo matrixes to pytorch tensor."""
    batch_input_coo: coo_matrix = vstack([item[0] for item in batch]).tocoo()
    batch_valid_coo: coo_matrix = vstack([item[1] for item in batch]).tocoo()
    torch_sparse_batch_input = coo_matrix_to_sparse_tensor(batch_input_coo)
    torch_sparse_batch_valid = coo_matrix_to_sparse_tensor(batch_valid_coo)
    return torch_sparse_batch_input.to_dense(), torch_sparse_batch_valid.to_dense()

### Models code

In [21]:
class LinearAE(nn.Module):
    """The realization of a simple linear autoencoder using PyTorch.

    Attributes:
        enc_dims: The list containing dimension sizes of encoder layers.
        dec_dims: The list containing dimension sizes of decoder layers.

    """

    def __init__(
        self,
        enc_dims: list[int],
        dec_dims: list[int] | None = None,
    ):
        super().__init__()
        self.enc_dims = enc_dims
        self.dec_dims = dec_dims
        if dec_dims:
            if enc_dims[0] != dec_dims[-1]:
                raise ValueError("Reconstruction dimension should be equal to input")
            if enc_dims[-1] != dec_dims[0]:
                raise ValueError(
                    "Latent dimension for encoder and decoder should be equal"
                )
            self.dec_dims = dec_dims
        else:
            # Decoder is symmetric by default
            self.dec_dims = enc_dims[::-1]

        self.dims = self.enc_dims + self.dec_dims[1:]
        self.layers = nn.ModuleList(
            [
                nn.Linear(d_in, d_out)
                for d_in, d_out in zip(self.dims[:-1], self.dims[1:])
            ]
        )

    @property
    def device(self):
        """Move all model parameters to the device."""
        return next(self.parameters()).device

    def forward(self, batch: torch.Tensor) -> torch.Tensor:
        """Pass data batch through all layers."""
        # Row-wise L2 norm is meant to take user's rating behaviour into account
        output = F.normalize(batch)

        for layer in self.layers:
            output = layer(output)
        return output

    def predict(self, batch: torch.Tensor) -> torch.Tensor:
        """Get prediction while inference."""
        return self.forward(batch)

In [22]:
class DenoisingAE(nn.Module):
    """The realization of a denoising autoencoder using PyTorch.

    Attributes:
        enc_dims: The list containing dimension sizes of encoder layers.
        dec_dims: The list containing dimension sizes of decoder layers.
        corrupt_ratio: The denoising parameter, i.e. the proportion of
            data that will be corrupted.

    """

    def __init__(
        self,
        enc_dims: list[int],
        dec_dims: list[int] | None = None,
        corrupt_ratio: float = 0.2,
    ):
        super().__init__()
        if not 0 < corrupt_ratio < 1:
            raise ValueError(
                f"corrupt_ratio={corrupt_ratio} should be a float in (0, 1) range."
            )

        self.enc_dims = enc_dims
        if dec_dims:
            if enc_dims[0] != dec_dims[-1]:
                raise ValueError("Reconstruction dimension should be equal to input")
            if enc_dims[-1] != dec_dims[0]:
                raise ValueError(
                    "Latent dimension for encoder and decoder should be equal"
                )
            self.dec_dims = dec_dims
        else:
            # Decoder is symmetric by default
            self.dec_dims = enc_dims[::-1]

        self.dims = self.enc_dims + self.dec_dims[1:]
        self.layers = nn.ModuleList(
            [
                nn.Linear(d_in, d_out)
                for d_in, d_out in zip(self.dims[:-1], self.dims[1:])
            ]
        )

        self.corrupt_ratio = corrupt_ratio
        self._init_weights()

    def _init_weights(self) -> None:
        for layer in self.layers:
            # Xavier Initialization
            size = layer.weight.size()
            fan_out = size[0]
            fan_in = size[1]
            scope = np.sqrt(6.0 / (fan_in + fan_out))
            layer.weight.data.uniform_(-scope, scope)
            layer.bias.data.zero_()

    @property
    def device(self):
        """Move all model parameters to the device."""
        return next(self.parameters()).device

    def forward(self, batch: torch.Tensor) -> torch.Tensor:
        """Pass data batch through all layers."""
        # Row-wise L2 norm is meant to take user's rating behaviour into account
        output = F.normalize(batch)
        # Corrupt the interactions matrix only during training
        output = F.dropout(batch, p=self.corrupt_ratio, training=self.training)

        for i, layer in enumerate(self.layers):
            output = layer(output)
            # Apply activation except the last decoding layer
            if i != len(self.layers) - 1:
                output = torch.tanh(output)
        return output

    def predict(self, batch: torch.Tensor) -> torch.Tensor:
        """Get prediction while inference."""
        return self.forward(batch)

In [23]:
class VariationalAE(nn.Module):
    """The realization of a variational autoencoder using PyTorch.

    Variational autoencoder based on multinomial likelihood with noising
    potentially making it robust.

    Attributes:
        enc_dims: The list containing dimension sizes of encoder layers.
        dec_dims: The list containing dimension sizes of decoder layers.
        corrupt_ratio: The denoising parameter, i.e. the proportion of
            data that will be corrupted.

    """

    def __init__(
        self,
        enc_dims: list[int],
        dec_dims: list[int] | None = None,
        corrupt_ratio: float = 0.2,
    ):
        super().__init__()
        if not 0 < corrupt_ratio < 1:
            raise ValueError(
                f"corrupt_ratio={corrupt_ratio} should be a float in (0, 1) range."
            )

        self.enc_dims = enc_dims
        if dec_dims:
            if enc_dims[0] != dec_dims[-1]:
                raise ValueError("Reconstruction dimension should be equal to input")
            if enc_dims[-1] != dec_dims[0]:
                raise ValueError(
                    "Latent dimension for encoder and decoder should be equal"
                )
            self.dec_dims = dec_dims
        else:
            # Decoder is symmetric by default
            self.dec_dims = enc_dims[::-1]

        # Create dimension to store the params of variational distribution
        enc_dims_ext = self.enc_dims[:-1] + [self.enc_dims[-1] * 2]
        self.enc_layers = nn.ModuleList(
            [
                nn.Linear(d_in, d_out)
                for d_in, d_out in zip(enc_dims_ext[:-1], enc_dims_ext[1:])
            ]
        )
        self.dec_layers = nn.ModuleList(
            [
                nn.Linear(d_in, d_out)
                for d_in, d_out in zip(self.dec_dims[:-1], self.dec_dims[1:])
            ]
        )

        self.corrupt_ratio = corrupt_ratio
        self._init_weights()

    def _init_weights(self) -> None:
        for layer in self.enc_layers + self.dec_layers:
            # Xavier Initialization
            size = layer.weight.size()
            fan_out = size[0]
            fan_in = size[1]
            scope = np.sqrt(6.0 / (fan_in + fan_out))
            layer.weight.data.uniform_(-scope, scope)
            layer.bias.data.zero_()

    @property
    def device(self):
        """Move all model parameters to the device."""
        return next(self.parameters()).device

    def forward(
        self, batch: torch.Tensor
    ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """Build data reconstruction using latent representation."""
        mu, logvar = self.encode(batch)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

    def predict(self, batch: torch.Tensor) -> torch.Tensor:
        """Get prediction while inference."""
        mu, _ = self.encode(batch)
        return self.decode(mu)

    def encode(self, batch: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        """Obtain params of the variational distribution."""
        # Row-wise L2 norm is meant to take user's rating behaviour into account
        output = F.normalize(batch)
        # Corrupt the interactions matrix only during training
        output = F.dropout(batch, p=self.corrupt_ratio, training=self.training)

        for i, layer in enumerate(self.enc_layers):
            output = layer(output)
            # Apply activation except the last layer
            if i != len(self.enc_layers) - 1:
                output = torch.tanh(output)
            else:
                mu = output[:, : self.enc_dims[-1]]
                logvar = output[:, self.enc_dims[-1] :]
        return mu, logvar

    def reparameterize(self, mu: torch.Tensor, logvar: torch.Tensor) -> torch.Tensor:
        """Obtain a latent vector by applying a reparametrization trick."""
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return eps * std + mu

    def decode(self, z: torch.Tensor) -> torch.Tensor:
        """Build reconstruction from the latent representation."""
        output = z
        for i, layer in enumerate(self.dec_layers):
            output = layer(output)
            if i != len(self.dec_layers) - 1:
                output = torch.tanh(output)
        return output

### Training Code

#### Logging

In [24]:
Model: TypeAlias = DenoisingAE | LinearAE | VariationalAE
Config: TypeAlias = dict[str, int | str | float]
Optimizer: TypeAlias = torch.optim.Adam | torch.optim.SGD | torch.optim.RMSprop
NeptuneRun: TypeAlias = neptune.metadata_containers.run.Run

In [149]:
class Callback:
    """Log train and test results with help of Neptune.

    Attributes:
        run: The Neptune run object for experiment tracking.
        npt_logger: The NeptuneLogger object for PyTorch integration.
        dataset: The test dataset.
        batch_size: The size of one batch.
        delimeter: The number of steps agter which _testing will be called.
        k: The number of recos which are considered optimal to give.
        loss_name: The name of loss function.

    """

    def __init__(
        self,
        run: NeptuneRun,
        npt_logger: NeptuneLogger,
        dataset: TestDataset,
        batch_size: int,
        k: int,
        loss_name: str,
    ):
        self.step = 0
        self.run = run
        self.logger = npt_logger
        self.batch_size = batch_size
        self.dataset = dataset
        self.k = k
        self.loss_name = loss_name

    def forward(self, model: Model, loss: float) -> None:
        """Save losses and metrics values to writer."""
        self.step += 1
        self.run[self.logger.base_namespace][f"Losses/{self.loss_name}_train"].append(loss)

        if self.step % self.logger.log_freq == 0:
            total_loss, recall, precision, mrr, map_ = self._testing(model)

            self.run[self.logger.base_namespace][f"Losses/{self.loss_name}_test"].append(total_loss)
            self.run[self.logger.base_namespace][f"Metrics/Recall@{self.k}"].append(recall)
            self.run[self.logger.base_namespace][f"Metrics/Precision@{self.k}"].append(precision)
            self.run[self.logger.base_namespace][f"Metrics/MRR@{self.k}"].append(mrr)
            self.run[self.logger.base_namespace][f"Metrics/MAP@{self.k}"].append(map_)

    def _testing(self, model: Model) -> tuple[float, float, float, float, float]:
        total_loss = 0.0
        recall_list: list[float] = []
        precision_list: list[float] = []
        mrr_list: list[float] = []
        map_list: list[float] = []

        for batch_input, batch_valid in torch.utils.data.DataLoader(
            dataset=self.dataset,
            batch_size=self.batch_size,
            shuffle=False,
            collate_fn=test_sparse_collate_fn,
        ):
            batch_input = batch_input.to(model.device)
            if self.loss_name == "MSE":
                batch_recon: torch.Tensor = model(batch_input)
                reduced_loss = calculate_mse_loss(batch_recon, batch_valid.to(model.device))
            else:
                batch_recon, mu, logvar = model(batch_input)
                reduced_loss = calculate_elbo_loss(batch_recon, batch_input, mu, logvar)
            total_loss += reduced_loss.to("cpu").item()
            # Convert to numpy
            batch_recon_np: NDArray[np.float32] = batch_recon.to("cpu").numpy()
            batch_valid_np: NDArray[np.float32] = batch_valid.numpy()
            batch_input_np: NDArray[np.float32] = batch_input.to("cpu").numpy()

            # Exclude known ratings that were used to build reconstruction
            batch_recon_np[batch_input_np.nonzero()] = float("-inf")

            # Compute metrics
            recall = mean_recall_at_k(batch_recon_np, batch_valid_np, self.k)
            precision = mean_precision_at_k(batch_recon_np, batch_valid_np, self.k)
            mrr = mrr_at_k(batch_recon_np, batch_valid_np, self.k)
            map_ = map_at_k(batch_recon_np, batch_valid_np, self.k)
            recall_list.append(recall)
            precision_list.append(precision)
            mrr_list.append(mrr)
            map_list.append(map_)

        return (
            total_loss,
            sum(recall_list) / len(recall_list),
            sum(precision_list) / len(precision_list),
            sum(mrr_list) / len(mrr_list),
            sum(map_list) / len(map_list),
        )

    def __call__(self, model: Model, loss: float):
        return self.forward(model, loss)

#### Losses

In [26]:
def calculate_elbo_loss(
    batch_recon: torch.Tensor,
    batch: torch.Tensor,
    mu: torch.Tensor,
    logvar: torch.Tensor,
) -> torch.Tensor:
    """Calculate the ELBO approximation of log-likelihood with negative sign."""
    cross_entropy = torch.sum(F.log_softmax(batch_recon, dim=1) * batch, dim=1)
    kl_divergence = 0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=1)
    return -1 * torch.mean(cross_entropy + kl_divergence)

In [148]:
def calculate_mse_loss(batch_recon: torch.Tensor, batch: torch.Tensor, consider_zeroes=True) -> torch.Tensor:
    """Calculate the MSE loss."""
    if consider_zeroes:
        return F.mse_loss(batch_recon, batch)
    else:
        # Exclude non-relevant items from calculations
        unreduced_loss = F.mse_loss(batch_recon, batch, reduction="none")
        return torch.masked_select(unreduced_loss, batch != 0).mean()


#### Learning

In [28]:
def train_on_batch(model: Model, batch, optimizer: Optimizer) -> float:
    """Train model on only one batch of data."""
    model.train()
    optimizer.zero_grad()
    batch = batch.to(model.device)

    if not isinstance(model, VariationalAE):
        batch_recon = model(batch)
        reduced_loss = calculate_mse_loss(batch_recon, batch)
    else:
        batch_recon, mu, logvar = model(batch)
        reduced_loss = calculate_elbo_loss(batch_recon, batch, mu, logvar)
    reduced_loss.backward()  # type: ignore
    optimizer.step()

    return reduced_loss.to("cpu").item()

In [29]:
def train_epoch(
    train_generator: tqdm,
    model: Model,
    optimizer: Optimizer,
    callback: None | Callback,
) -> float:
    """Train model on only one epoch."""
    total_len, epoch_loss = 0, 0.0
    for batch in train_generator:
        batch_loss = train_on_batch(model, batch, optimizer)

        if callback:
            model.eval()
            with torch.no_grad():
                callback(model, batch_loss)

        epoch_loss += batch_loss * len(batch)
        total_len += len(batch)
    return epoch_loss / total_len

In [30]:
def trainer(
    num_epoch: int,
    batch_size: int,
    dataset: TrainDataset,
    model: Model,
    optimizer_name: Literal["Adam", "SGD", "RMSprop"],
    lr: float,
    wd: float,
    callback: None | Callback,
) -> None:
    """Provide training for choosed number of epochs."""
    optimizer = getattr(torch.optim, optimizer_name)(
        model.parameters(), lr=lr, weight_decay=wd
    )

    iterations = tqdm(range(num_epoch), desc="epoch")
    iterations.set_postfix({"train epoch loss": np.nan})
    for _ in iterations:
        batch_generator = tqdm(
            torch.utils.data.DataLoader(
                dataset=dataset,
                batch_size=batch_size,
                shuffle=True,
                collate_fn=train_sparse_collate_fn,
            ),
            leave=False,
            total=len(dataset) // batch_size + (len(dataset) % batch_size > 0),
        )

        epoch_loss = train_epoch(
            batch_generator,
            model,
            optimizer,
            callback,
        )

        iterations.set_postfix({"train epoch loss": epoch_loss})

### Validation Code

#### Accuracy metrics

In [31]:
def mean_recall_at_k(
    recon_batch: NDArray[np.float32], true_batch: NDArray[np.float32], k: int
) -> float:
    """Wrapper for recall_at_k to calculate the mean recall@K metric for batch."""
    return np.mean(recall_at_k(recon_batch, true_batch, k)).item()

In [32]:
def recall_at_k(
    recon_batch: NDArray[np.float32], true_batch: NDArray[np.float32], k: int
) -> NDArray[np.float32]:
    """Calculate recall@K for users in batch.

    Recall at k is the proportion of relevant items found in the top-k recommendations.

    Args:
        recon_batch: The batch of recos that model built for each user.
        true_batch: The batch thath contains only relevant items for each user.
        k: The number of recos which are considered.

    Returns:
        recall_k: The batch of recall@K.

    """
    batch_users = recon_batch.shape[0]

    # Indexes of the first k recommendations
    idx = np.argpartition(-recon_batch, k - 1, axis=1)

    # Boolean mask where True means this item is in recos at k
    recon_batch_bool = np.zeros_like(recon_batch, dtype=bool)
    recon_batch_bool[np.arange(batch_users)[:, np.newaxis], idx[:, :k]] = True

    true_batch_bool = true_batch > 0

    # Number of the relevant items that model recommends at k for each user in batch
    intersection_cardinality: NDArray[np.float32] = (
        (recon_batch_bool & true_batch_bool).sum(axis=1).astype(np.float32)
    )

    # Calculate the batch of Recall@K by definition
    divider: NDArray[np.int64] = np.minimum(k, true_batch_bool.sum(axis=1))
    recall_k = (intersection_cardinality / divider).astype(np.float32)
    return recall_k

In [33]:
def mean_precision_at_k(
    recon_batch: NDArray[np.float32], true_batch: NDArray[np.float32], k: int
) -> float:
    """Wrapper for precision_at_k to calculate the mean precision@K metric for batch."""
    return np.mean(precision_at_k(recon_batch, true_batch, k)).item()

In [34]:
def precision_at_k(
    recon_batch: NDArray[np.float32], true_batch: NDArray[np.float32], k: int
) -> NDArray[np.float32]:
    """Calculate precision@K for users in batch.

    Precision at k is the proportion of recommended items in the top-k set
    that are relevant.

    Args:
        recon_batch: The batch of recos that model built for each user.
        true_batch: The batch thath contains only relevant items for each user.
        k: The number of recos which are considered.

    Returns:
        precision_k: The batch of precision@K.

    """
    batch_users = recon_batch.shape[0]

    # Indexes of the first k recommendations
    recos_ids: NDArray[np.intp] = np.argpartition(-recon_batch, k - 1, axis=1)

    # Boolean mask where True means this item is in recos at k
    is_item_in_recos = np.zeros_like(recon_batch, dtype=bool)
    is_item_in_recos[np.arange(batch_users)[:, np.newaxis], recos_ids[:, :k]] = True

    true_batch_bool = true_batch > 0

    # Number of the relevant items that model recommends at k for each user in batch
    intersection_cardinality: NDArray[np.float32] = (
        (is_item_in_recos & true_batch_bool).sum(axis=1).astype(np.float32)
    )

    # Calculate the batch of Precision@K by definition
    precision_k = (intersection_cardinality / k).astype(np.float32)
    return precision_k

#### Ranking metrics

In [35]:
def mrr_at_k(
    recon_batch: NDArray[np.float32], true_batch: NDArray[np.float32], k: int
) -> float:
    """Wrapper for rr_at_k to calculate the mean reciprocal rank metric for batch."""
    return np.mean(rr_at_k(recon_batch, true_batch, k)).item()

In [36]:
def rr_at_k(
    recon_batch: NDArray[np.float32], true_batch: NDArray[np.float32], k: int
) -> NDArray[np.float32]:
    """Calculate the reciprocal ranks for users in batch.

    The reciprocal rank of top-k recommendations is the multiplicative inverse
    of the rank of the first relevant item occurence in this list: 1 for first place,
    1/2 for second place, 1/3 for third place and so on.

    Args:
        recon_batch: The batch of recos that model built for each user.
        true_batch: The batch thath contains only relevant items for each user.
        k: The number of recos which are considered.

    Returns:
        rr: The batch of reciprocal ranks.

    """
    batch_users = recon_batch.shape[0]

    # Indexes of the first k recommendations
    recos_ids: NDArray[np.int64] = np.argpartition(-recon_batch, k - 1, axis=1)

    # Get recos ids in right sorted order (from high to low)
    top_k_recs = recon_batch[np.arange(batch_users)[:, np.newaxis], recos_ids[:, :k]]
    sorted_recos_ids: NDArray[np.int64] = recos_ids[
        np.arange(batch_users)[:, np.newaxis], np.argsort(-top_k_recs)
    ]

    # Boolean mask where relevant recos are True values
    true_batch_bool = true_batch > 0
    indicator_k = true_batch_bool[
        np.arange(batch_users)[:, np.newaxis], sorted_recos_ids
    ]

    # Index of the first relevant reco for each user
    relevant_rank: Generator[NDArray[np.int64], None, None] = (
        np.nonzero(row == 1)[0] for row in indicator_k
    )

    # Calculate the batch of RR@K by definition
    rr_list = [1.0 / (arr[0] + 1) if arr.size else 0.0 for arr in relevant_rank]
    rr = np.array(rr_list).astype(np.float32)
    return rr

In [37]:
def map_at_k(
    recon_batch: NDArray[np.float32], true_batch: NDArray[np.float32], k: int
) -> float:
    """Wrapper for ap_at_k to calculate the mean average precision metric for batch."""
    return np.mean(ap_at_k(recon_batch, true_batch, k)).item()

In [38]:
def ap_at_k(
    recon_batch: NDArray[np.float32], true_batch: NDArray[np.float32], k: int
) -> NDArray[np.float32]:
    """Calculate the average precision for each user in batch.

    The average precision of the top-k recommendations is the sum of precision@K
    where the item at the kth rank is relevant divided by the total number of
    relevant items (with adjustment to k) in the top-k recommendations.

    Args:
        recon_batch: The batch of recos that model built for each user.
        true_batch: The batch thath contains only relevant items for each user.
        k: The number of recos which are considered.

    Returns:
        ap: The batch of AP@K.

    """
    batch_users = recon_batch.shape[0]

    true_batch_bool = true_batch > 0
    # Indexes of the first k recommendations
    recos_ids: NDArray[np.int64] = np.argpartition(-recon_batch, k - 1, axis=1)
    # Get recos ids in right sorted order (from high to low)
    top_k_recs = recon_batch[np.arange(batch_users)[:, np.newaxis], recos_ids[:, :k]]
    sorted_recos_ids: NDArray[np.int64] = recos_ids[
        np.arange(batch_users)[:, np.newaxis], np.argsort(-top_k_recs)
    ]

    # Boolean mask where relevant recos are True values
    indicator_k = true_batch_bool[
        np.arange(batch_users)[:, np.newaxis], sorted_recos_ids
    ]

    # Accumulate precisions
    precisions = np.zeros_like(top_k_recs)
    for running_k in range(1, k + 1):
        # Calculate precision in a familiar way
        is_item_in_recos = np.zeros_like(recon_batch, dtype=bool)
        is_item_in_recos[
            np.arange(batch_users)[:, np.newaxis], sorted_recos_ids[:, :running_k]
        ] = True

        intersection_cardinality: NDArray[np.float32] = (
            (is_item_in_recos & true_batch_bool)
            .sum(axis=1, keepdims=True)
            .astype(np.float32)
        )

        precision_k = (intersection_cardinality / running_k).astype(np.float32)
        # Update values
        precisions[np.arange(batch_users)[:, np.newaxis], running_k - 1] = precision_k

    # Calculate the batch of AP@K by definition
    divider: NDArray[np.int64] = np.minimum(k, true_batch_bool.sum(axis=1))
    ap = ((precisions * indicator_k).sum(axis=1) / divider).astype(np.float32)
    return ap

### Inference code

In [39]:
def create_user_input(
    input_dim: int,
    interactions: pd.DataFrame,
    user_id: int,
    item_to_ind: dict[int, int],
) -> torch.Tensor:
    """Build proper user input for the autoencoder model.

    Extract interactions from data for the specific user id (the existence of
    the data is meant) and build input for the autoencoder model: tensor of
    shape (1, input_dim) filled with floats where 0.0 means absence of interaction.

    Args:
        input_dim: The input dimension of the model.
        interactions: The dataframe containing interactions.
        user_id: The external user id.
        item_to_ind: The mapping from external item ids to the internal.

    Returns:
        user_input: The user input for autoencoder model.

    """
    cond = interactions[Columns.User] == user_id
    items: NDArray[np.int64] = interactions[cond][Columns.Item].values
    ratings: NDArray[np.float64] = interactions[cond][Columns.Weight].values

    user_input = torch.zeros(size=(1, input_dim), dtype=torch.float32)
    items = np.vectorize(item_to_ind.get)(items)

    # Complete user input
    ratings_tsr = torch.tensor(ratings, dtype=torch.float32)
    user_input[:, items] = ratings_tsr
    return user_input

In [40]:
def predict_for_user_id(
    user_input: torch.Tensor,
    model: Model,
    k: int,
    ind_to_item: dict[int, int],
    filter_viewed: bool = True,
) -> NDArray[np.int64]:
    """Get top-k recommendations for one user.

    Args:
        user_input: The user input for autoencoder model.
        model: The allowed PyTorch model in evaluation regime.
        k: The number of recos which are considered.
        ind_to_item: The mapping from internal item ids to the external.
        filter_viewed: The flag to get rid of watched items in recos or not.

    Returns:
        recos_at_k: The top-k recommendations for user as external item indices.

    """
    # Get recos_at_k
    recon: NDArray[np.float32] = model.predict(user_input).detach().to("cpu").numpy()
    if filter_viewed:
        recon[user_input.numpy().nonzero()] = float("-inf")
    recos: NDArray[np.int64] = np.argpartition(-recon, k, axis=1)
    recos_top_k = recos[:, :k]
    # Internal indices to external
    recos_top_k = np.vectorize(ind_to_item.get)(recos_top_k)
    return recos_top_k.squeeze()

## Pipeline

### Data generation

In [41]:
# Consider ACTIVE users and items that have MANY interactions
user_lim, item_lim = 10, 10
# Represents a min proportion of items that will be used for metrics calculation
validation_size = 0.5

# Represents a proportion of test users
test_size = 0.2

In [42]:
interactions_cp = interactions.copy()

In [43]:
# Leave only long valuable interactions
interactions_cp = interactions_cp[interactions_cp[Columns.Weight] > 10]

In [44]:
# Filter and normalize data
interactions_cp, user_groups, _ = filter_by_limits(interactions_cp, user_lim, item_lim)

maxx, minn = (
    interactions_cp[Columns.Weight].max(),
    interactions_cp[Columns.Weight].min(),
)
interactions_cp[Columns.Weight] = (interactions_cp[Columns.Weight] - minn) / (
    maxx - minn
)

89882 unique users left
6180 unique items left


In [45]:
# Get users/items indices
unique_users_id: NDArray[np.int64] = user_groups[Columns.User].values
n_users = len(unique_users_id)

In [46]:
# Shuffle indices
np.random.shuffle(unique_users_id)

n_heldout_users = int(test_size * n_users)

In [47]:
# Split into train/test folds by indices
train_users = unique_users_id[: (n_users - n_heldout_users)]
print(f"Train fold size is: {len(train_users)}")
test_users = unique_users_id[(n_users - n_heldout_users) :]
print(f"Test fold size is: {len(test_users)}")

Train fold size is: 71906
Test fold size is: 17976


In [48]:
# Prepare train/test data for PyTorch dataset
train_df = interactions_cp.loc[interactions_cp[Columns.User].isin(train_users)]
# Save items presented in train_df only
unique_items_id = pd.unique(train_df[Columns.Item])
# Prepare test data for PyTorch dataset
test_df = interactions_cp.loc[interactions_cp[Columns.User].isin(test_users)]
test_df = test_df.loc[test_df[Columns.Item].isin(unique_items_id)]

For test df split items per each user into 2 groups.

One part goes into an autoencoder as the input for building reconstruction first,
then another part will be used for metrics calculation along with reconstruction
results. The latter consists of a certain amount of last watched items (int from rounding the minimum validation fraction = `validation_size`) 

In [49]:
test_df_valid = test_df.groupby(Columns.User, group_keys=False).apply(
    lambda x: x.tail(ceil(validation_size * x.shape[0]))
)
test_df_input = test_df.loc[~test_df.index.isin(test_df_valid.index)]

In [50]:
# Prepare inference data
inference_df = interactions_cp.loc[interactions_cp[Columns.Item].isin(unique_items_id)]

In [51]:
# Get mappings
user_to_ind, item_to_ind = reindex_mappings(train_df, test_df, unique_items_id)

output_datapath = "./data_original"
# Save the mapping for inference
with open(os.path.join(output_datapath, "item_to_ind"), "wb") as fp:
    pickle.dump(item_to_ind, fp)

In [52]:
# Apply inplace remapping
for el in [train_df, test_df_input, test_df_valid]:
    apply_reindex(el, user_to_ind, item_to_ind)

Save the processed data

In [53]:
items_count = len(unique_items_id)

# Save data
train_df.to_csv(os.path.join(output_datapath, "train.csv"), index=False)
test_df_input.to_csv(os.path.join(output_datapath, "test_input.csv"), index=False)
test_df_valid.to_csv(os.path.join(output_datapath, "test_valid.csv"), index=False)
inference_df.to_csv(os.path.join(output_datapath, "inference.csv"), index=False)
with open(os.path.join(output_datapath, "items_count.txt"), "w", encoding="utf8") as f:
    f.write(f"{items_count}")

### Training with Neptune logging

In [66]:
input_datapath = "./data_original"

In [67]:
train_dataset = TrainDataset(path=input_datapath)
test_dataset = TestDataset(path=input_datapath)

In [68]:
print(f"Train matrix sparsity: {train_dataset.sparsity}", end="\n")
print(f"Test matrix (as model input) sparsity: {test_dataset.sparsity[0]}", end="\n")
print(
    f"Test matrix (as holdout for metric evaluation) sparsity: "
    f"{test_dataset.sparsity[1]}"
)

Train matrix sparsity: 0.0033602324393848604
Test matrix (as model input) sparsity: 0.0016400688152344081
Test matrix (as holdout for metric evaluation) sparsity: 0.0017152949707844908


In [150]:
# Initialize model
input_dim = train_dataset.n_items

# Build proper model dims list
enc_dims = [input_dim, 600, 200]

model = DenoisingAE(enc_dims)
_ = model.to(device)
loss_name = "ELBO" if isinstance(model, VariationalAE) else "MSE"

In [151]:
# Create a Neptune run object
run = neptune.init_run(
    project="AEs/AEs",
    api_token="eyJhcGlfYWRkcmVzcyI6Imh0dHBzOi8vYXBwLm5lcHR1bmUuYWkiLCJhcGlfdXJsIjoiaHR0cHM6Ly9hcHAubmVwdHVuZS5haSIsImFwaV9rZXkiOiI1YjdkYjQ4OC03NDg2LTRjYjgtYWUwOS1hNWI3ODBkYTFmODgifQ==",
)

https://app.neptune.ai/AEs/AEs/e/AES-18


In [152]:
# Add NeptuneLogger - object from Neptune PyTorch integration
# Validation will be called after each epoch - should be enough
batch_size: int = 10000
delimeter = len(train_dataset) // batch_size + (len(train_dataset) % batch_size > 0)


npt_logger = NeptuneLogger(
    run=run,
    model=model,
    log_model_diagram=True,
    log_gradients=True,
    log_parameters=True,
    log_freq=delimeter,
)

In [153]:
# Set other params of training
num_epoch: int = 20
model_name: str = type(model).__name__
optimizer_name: Literal["Adam", "SGD", "RMSprop"] = "Adam"
lr: float = 0.001 
wd: float = 0.01 # Optimizer weight decay
k: int = 10 # Number of recs

In [154]:
# Collect all hyperparametres and send them to run
parameters = {
    "num_epoch": num_epoch,
    "batch_size": batch_size,
    "enc_dims": enc_dims,
    "model": model_name,
    "optimizer": optimizer_name,
    "loss": loss_name,
    "lr": lr,
    "wd": wd,
    "device": device,
    "k": k}


run[npt_logger.base_namespace]["hyperparams"] = stringify_unsupported(  
    parameters
)

In [155]:
# Make validation and save result to Neptune
callback = Callback(run, npt_logger, test_dataset, batch_size, k, loss_name)

In [156]:
print("Training starts.")
start = time.time()
train_time = trainer(
    num_epoch,
    batch_size,
    train_dataset,
    model,
    optimizer_name,
    lr,
    wd,
    callback,
)
end = time.time()
print("Training ends.")

Training starts.


epoch:   0%|          | 0/20 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

Training ends.


In [157]:
# Save technical metrics
# Model size
size_model = 0
for param in model.parameters():
    if param.data.is_floating_point():
        size_model += param.numel() * torch.finfo(param.data.dtype).bits
    else:
        size_model += param.numel() * torch.iinfo(param.data.dtype).bits
size_model_mb = round(size_model / 8e6, 2)
# Train time, in min
train_time = round((end - start) / 60, 2)
train_time_epoch = round((end - start) / (60 * num_epoch), 2)

In [158]:
run[npt_logger.base_namespace]["Technical/Model_size (Mb)"] = stringify_unsupported(
    size_model_mb
)
run[npt_logger.base_namespace]["Technical/General_train_time (min)"] = stringify_unsupported(
    train_time
)
run[npt_logger.base_namespace]["Technical/Epoch_train_time (min)"] = stringify_unsupported(
    train_time_epoch
)

In [159]:
run.stop()

Shutting down background jobs, please wait a moment...
Done!
Waiting for the remaining 8 operations to synchronize with Neptune. Do not kill this process.
All 8 operations synced, thanks for waiting!
Explore the metadata in the Neptune app:
https://app.neptune.ai/AEs/AEs/e/AES-18/metadata


### Predictions

Make and save predictions from ae model to all hot users

In [None]:
input_datapath = 'data_original'

In [None]:
# Upload interactions table suitable (proper item ids) for inference
interactions = pd.read_csv(os.path.join(input_datapath, "inference.csv"), header=0)

# Rename columns, convert timestamp
interactions.rename(columns={"last_watch_dt": Columns.Date}, inplace=True)
interactions[Columns.Date] = pd.to_datetime(interactions[Columns.Date])

interactions["user_id"] = interactions[Columns.User].astype(np.uint32)
interactions["item_id"] = interactions[Columns.Item].astype(np.uint16)
interactions["total_dur"] = interactions["total_dur"].astype(np.uint16)
interactions["watched_pct"] = interactions[Columns.Weight].astype(np.float32)

In [None]:
# Upload the mapper
with open(os.path.join(input_datapath, "item_to_ind"), "rb") as fp:
    item_to_ind = pickle.load(fp)
ind_to_item = {v: k for k, v in item_to_ind.items()}  # Reverse mapping

In [None]:
input_modelpath = 'model_weights/349ee5_VariationalAE_10_epochs'

In [None]:
# Get dimensions size of the given autoencoder model
input_dimspath = input_modelpath + ".pickle"
with open(input_dimspath, "rb") as fp:
    enc_dims: list[int] = pickle.load(fp)

In [None]:
 # Initialize model
model_name = input_modelpath.split("_")[2]
model: Model = locals()[model_name](enc_dims)

with open(input_modelpath, "rb") as fp:
    state_dict = torch.load(fp, map_location=torch.device("cpu"))
model.load_state_dict(state_dict, strict=False)
model.to(device)
model.eval()

VariationalAE(
  (enc_layers): ModuleList(
    (0): Linear(in_features=9661, out_features=600, bias=True)
    (1): Linear(in_features=600, out_features=400, bias=True)
  )
  (dec_layers): ModuleList(
    (0): Linear(in_features=200, out_features=600, bias=True)
    (1): Linear(in_features=600, out_features=9661, bias=True)
  )
)

In [None]:
user_id = interactions[Columns.User].loc[0]
input_dim = enc_dims[0]
k = 10

In [None]:
# Get recommendations
input_dim = enc_dims[0]
user_input = create_user_input(input_dim, interactions, user_id, item_to_ind)
recos = predict_for_user_id(user_input, model, k, ind_to_item)

In [None]:
def predict_online(input_dim=input_dim, user_id=user_id, k: int = 10):
    """Wrapper for onlinetime-measuring."""
    user_input = create_user_input(input_dim, interactions, user_id, item_to_ind)
    recos = predict_for_user_id(user_input, model, k, ind_to_item)
    return list(recos)

In [None]:
recos = predict_online()
recos

[8874, 3784, 7460, 15297, 3800, 3182, 16270, 4582, 10169, 8034]

In [None]:
# Check if online predictions are fast enough
%timeit predict_online()

8.15 ms ± 299 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


Скорость удовлетворительна, попробуем ускорить и сделать инференс c данными в другом виде

In [None]:
# Convert all hot users to dictionary with items
hot_users = interactions.groupby(Columns.User)[Columns.Item].apply(list).to_dict()
hot_users_weights = interactions.groupby(Columns.User)[Columns.Weight].apply(list).to_dict()

In [None]:
with open(
    "hot_users.dill",
    "wb",
) as f:
    dill.dump(hot_users, f)

In [None]:
with open(
    "hot_users_weights.dill",
    "wb",
) as f:
    dill.dump(hot_users_weights, f)

In [None]:
def create_user_input_2(
    input_dim: int,
    hot_users: dict[int, list[int]],
    hot_users_weights: dict[int, list[float]],
    user_id: int,
    item_to_ind: dict[int, int],
) -> torch.Tensor:
    """Build proper user input for the autoencoder model.

    Extract interactions from data for the specific user id (the existence of
    the data is meant) and build input for the autoencoder model: tensor of
    shape (1, input_dim) filled with floats where 0.0 means absence of interaction.

    Args:
        input_dim: The input dimension of the model.
        interactions: The dataframe containing interactions.
        user_id: The external user id.
        item_to_ind: The mapping from external item ids to the internal.

    Returns:
        user_input: The user input for autoencoder model.

    """
    items = hot_users[user_id]
    ratings = hot_users_weights[user_id]

    user_input = torch.zeros(size=(1, input_dim), dtype=torch.float32)
    items = [item_to_ind[item] for item in items]

    # Complete user input
    ratings_tsr = torch.tensor(ratings, dtype=torch.float32)
    user_input[:, items] = ratings_tsr
    return user_input

In [None]:
def predict_online(
    input_dim=input_dim,
    hot_users=hot_users,
    hot_users_weights=hot_users_weights,
    user_id=user_id,
    item_to_ind=item_to_ind,
    k: int = 10,
):
    """Wrapper for onlinetime-measuring."""
    user_input = create_user_input_2(
        input_dim, hot_users, hot_users_weights, user_id, item_to_ind
    )
    recos = predict_for_user_id(user_input, model, k, ind_to_item)
    return list(recos)

In [None]:
%timeit predict_online()

4.6 ms ± 32 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


Модель удалась еще шустрее