In [1]:
import math

import numpy as np
import torch
from catboost.datasets import msrank_10k
from sklearn.preprocessing import StandardScaler

from typing import List


class ListNet(torch.nn.Module):
    def __init__(self, num_input_features: int, hidden_dim: int):
        super().__init__()
        self.hidden_dim = hidden_dim
        # model architecture
        self.model = torch.nn.Sequential(
            torch.nn.Linear(num_input_features, self.hidden_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(self.hidden_dim, 1),
        )

    def forward(self, input_1: torch.Tensor) -> torch.Tensor:
        logits = self.model(input_1)
        return logits


class Solution:
    """
    Class for training and testing ListNet model.
    """
    def __init__(
        self, 
        n_epochs: int = 5, 
        listnet_hidden_dim: int = 30, 
        lr: float = 0.001, 
        ndcg_top_k: int = 10,
    ):
        """
        Args:
            n_epochs (int): Number of epochs.
            listnet_hidden_dim (int): ListNet model hidden dim size.
            lr (float): Learning rate.
            ndcg_top_k (int): NDCG metric score for top k.
        """
        self._prepare_data()
        self.num_input_features = self.X_train.shape[1]
        self.n_train = self.X_train.shape[0]
        self.ndcg_top_k = ndcg_top_k
        self.n_epochs = n_epochs

        self.model = self._create_model(
            self.num_input_features, listnet_hidden_dim
        )
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)

    def _get_data(self) -> List[np.ndarray]:
        """
        Method loads data for training and testing.
        """
        train_df, test_df = msrank_10k()

        X_train = train_df.drop([0, 1], axis=1).values
        y_train = train_df[0].values
        query_ids_train = train_df[1].values.astype(int)

        X_test = test_df.drop([0, 1], axis=1).values
        y_test = test_df[0].values
        query_ids_test = test_df[1].values.astype(int)

        return [X_train, y_train, query_ids_train, X_test, y_test, query_ids_test]

    def _prepare_data(self) -> None:
        """
        Method prepares data for training and test.
        """
        (X_train, y_train, self.query_ids_train,
            X_test, y_test, self.query_ids_test) = self._get_data()
        X_train = self._scale_features_in_query_groups(X_train, self.query_ids_train)
        X_test = self._scale_features_in_query_groups(X_test, self.query_ids_test)
        self.X_train, self.ys_train, self.X_test, self.ys_test = map(torch.FloatTensor,
                                                                    [X_train, y_train, X_test, y_test])
        self.query_ids_train_u = np.unique(self.query_ids_train)
        self.query_ids_test_u = np.unique(self.query_ids_test)

    def _scale_features_in_query_groups(
        self, 
        inp_feat_array: np.ndarray, 
        inp_query_ids: np.ndarray
    ) -> np.ndarray:
        """
        Method normalizes the input data.
        """
        result = []
        for q_id in np.unique(inp_query_ids):
            result.append(StandardScaler().fit_transform(inp_feat_array[inp_query_ids == q_id]))
        return np.vstack(result)  

    def _create_model(
        self, 
        listnet_num_input_features: int, 
        listnet_hidden_dim: int
    ) -> torch.nn.Module:
        """
        Method normalizes the input data.
        """
        torch.manual_seed(0)
        net = ListNet(listnet_num_input_features, listnet_hidden_dim)
        return net
    
    def fit(self) -> List[float]:
        """
        Method train and evaluates ListNet model on N epochs.
        
        Returns:
            List[float]: NCDG metric score on epochs.
        """
        val_metrics = []

        for epoch in range(self.n_epochs):
            self._train_one_epoch(epoch)
            val_metrics.append(self._eval_test_set())

        return val_metrics

    def _calc_loss(
        self, 
        batch_ys: torch.FloatTensor,
        batch_pred: torch.FloatTensor
    ) -> torch.FloatTensor:
        """
        Method calculate Kullback-Leibler divergence loss.
        https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence
        
        Args:
            batch_ys (torch.FloatTensor): (n_i, 1) GT
            batch_pred (torch.FloatTensor): (n_i, 1) preds
            
        Returns:
            torch.FloatTensor: KL loss.
        """
        P_y_i = torch.softmax(batch_ys, dim=0)
        P_z_i = torch.softmax(batch_pred, dim=0)
        return -torch.sum(P_y_i * torch.log(P_z_i/P_y_i))


    def _train_one_epoch(self, epoch) -> None:
        """
        Method train ListNet model on test train set.
        """
        self.model.train()
        idx = torch.randperm(self.n_train)

        X_train = self.X_train[idx]
        y_train = self.ys_train[idx]
        query_ids_train = self.query_ids_train[idx]

        for query_id in self.query_ids_train_u:
            idx_q = query_ids_train == query_id
            X_q = X_train[idx_q]
            y_q = y_train[idx_q]
            self.optimizer.zero_grad()

            if len(y_q) > 0:
                pred = self.model(X_q).flatten()
                loss = self._calc_loss(y_q, pred)
                loss.backward(retain_graph=True)
                self.optimizer.step()

    def _eval_test_set(self) -> float:
        """
        Method evaluates ListNet model on test set.
        
        Returns:
            float: NCDG at k on test set.
        """
        with torch.no_grad():
            self.model.eval()
            ndcgs = []
            for query_id in self.query_ids_test_u:
                idx_q = self.query_ids_test == query_id
                X_q = self.X_test[idx_q]
                y_q = self.ys_test[idx_q]
                valid_pred_q = self.model(X_q).flatten()
                ndcg = self._ndcg_k(y_q, valid_pred_q, "exp2", self.ndcg_top_k)
                if ndcg > 1 or math.isnan(ndcg) or ndcg < 0:
                    ndcg = 0.0
                ndcgs.append(ndcg)
                
        return np.mean(ndcgs)

    def _compute_gain(self, y_value: float, gain_scheme: str) -> float:
        """
        Method for calculating DCG and NDCG, which calculates Gain.
        """
        if gain_scheme == "const":
            return y_value
        elif gain_scheme == "exp2":
            return 2**y_value - 1
        else:
            raise ValueError(f"{gain_scheme} gain method not supported")
                          
    def _dcg_k(
        self, 
        ys_true: torch.Tensor, 
        ys_pred: torch.Tensor, 
        gain_scheme: str, 
        k: int = None
    ) -> float:
        """
        Method to calculate the DCG at k. 
        https://en.wikipedia.org/wiki/Discounted_cumulative_gain
        
        Args:
            ys_true (torch.Tensor): Tensor of actual relevancy values.
            ys_pred (torch.Tensor): Tensor of predicted relevancy values.
            gain_scheme (str): Gain scheme.
            k (int) : Choose highest k scores in the ranking.

        Returns:
            torch.Tensor: DCG at k metric.
        """
        order = torch.argsort(ys_pred, descending=True, dim=-1)
        true_sorted_by_preds = torch.gather(ys_true, dim=-1, index=order)

        gain_function = lambda x: self._compute_gain(x, gain_scheme)
        gains = gain_function(true_sorted_by_preds)

        discounts = torch.tensor(1) /  torch.log2(torch.arange(true_sorted_by_preds.shape[0], dtype=torch.double) + 2.0)
        if k is not None:
            discounts[k:] = 0
        discounted_gains = gains * discounts

        sum_dcg = torch.sum(discounted_gains, dim=-1)
        return float(sum_dcg)
                          
    def _ndcg_k(
        self, 
        ys_true: torch.Tensor, 
        ys_pred: torch.Tensor, 
        gain_scheme: str = "exp2",
        ndcg_top_k: int = None,
    ) -> float:
        """
        Method to calculate the NDCG at k. 
        https://en.wikipedia.org/wiki/Discounted_cumulative_gain
        
        Args:
            ys_true (torch.Tensor): Tensor of actual relevancy values.
            ys_pred (torch.Tensor): Tensor of predicted relevancy values.
            gain_scheme (str): Gain scheme.
            k (int) : Choose highest k scores in the ranking.

        Returns:
            torch.Tensor: NDCG at k metric.
        """
        ideal_dcgs = self._dcg_k(ys_true, ys_true, gain_scheme, ndcg_top_k)
        predicted_dcgs = self._dcg_k(ys_true, ys_pred, gain_scheme, ndcg_top_k)
        ndcg_score = predicted_dcgs / ideal_dcgs
        return ndcg_score

In [2]:
model = Solution(n_epochs=5)

In [3]:
model = Solution(n_epochs=5)
model.fit()

[0.42946389355986125,
 0.4369915362647493,
 0.4388079099292035,
 0.4426234048287631,
 0.43943256481926923]