# HDDA F24 Recommendation systems via approximate matrix factorization

by Dmitry Beresnev (<d.beresnev@innopolis.university>)
and Vsevolod Klyushev (<v.klyushev@innopolis.university>)

Group ID = 1


In [96]:
from typing import Callable, Literal, Optional

import numpy as np
import pandas as pd
import scipy
import sklearn
import tqdm
from scipy.sparse import csr_matrix
from sklearn.impute import SimpleImputer

## Utilities

In [97]:
def get_params_combinations(params: list[list]):
    if len(params) == 0:
        return [[]]
    return [(x, *k) for k in get_params_combinations(params[1:]) for x in params[0]]


def get_params_combinations_dict(possible_params_dict: dict[str, list]):
    if len(possible_params_dict) == 0:
        return [{}]
    dict_items = list(possible_params_dict.items())

    first_item = dict_items[0]

    if not isinstance(first_item[1], list):
        return [
            {first_item[0]: first_item[1], **k}
            for k in get_params_combinations_dict(dict(dict_items[1:]))
        ]

    return [
        {first_item[0]: x, **k}
        for k in get_params_combinations_dict(dict(dict_items[1:]))
        for x in first_item[1]
    ]

In [98]:
def rmse_score(
    initial_matrix: np.ndarray, predicted_matrix: np.ndarray, test_mask: np.ndarray
) -> float:
    differences = (initial_matrix[test_mask] - predicted_matrix[test_mask]) ** 2
    return float(np.sqrt(differences.mean()))

In [99]:
def choose_best(
    matrix: np.ndarray,
    mask: np.ndarray,
    solver: Callable,
    loss_fn: Callable[[np.ndarray], float],
    possible_params: dict,
) -> tuple[np.ndarray, float, dict]:
    best_loss = 1e3
    best_solution = np.zeros(1)
    best_params = {}

    for params in get_params_combinations_dict(possible_params):
        solution = solver(matrix, mask, **params)

        loss = loss_fn(solution)

        if loss < best_loss:
            best_loss = loss
            best_solution = solution.copy()
            best_params = params.copy()

    return best_solution, best_loss, best_params

In [100]:
def train_test_split_matrix(
    matrix: np.ndarray, mask: np.ndarray, test_percent: float, seed=420
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    np.random.seed(seed)

    train_matrix = matrix.copy().flatten()
    train_mask = mask.copy().flatten()
    test_mask = np.zeros(mask.shape).astype(bool).flatten()

    existing_indices = np.argsort(~train_mask)[: np.sum(train_mask)]
    test_indices = np.random.choice(
        existing_indices, int(existing_indices.shape[0] * test_percent), replace=False
    )

    train_matrix[test_indices] = 0.0
    train_mask[test_indices] = False
    test_mask[test_indices] = True

    matrix_shape = matrix.shape

    return train_matrix.reshape(matrix_shape), train_mask.reshape(
        matrix_shape
    ), test_mask.reshape(matrix_shape)

## Data loading

### Toy data

In [101]:
toy_matrix_full = np.array(
    [
        [2, 3, 2, 0, 0],
        [0, 2, 0, 4, 3],
        [3, 0, 3, 0, 4],
        [0, 3, 0, 4, 3],
        [0, 0, 0, 0, 2],
        [1, 4, 3, 4, 0],
    ]
)  # 0  for unknown

toy_mask_full = toy_matrix_full != 0

In [102]:
toy_matrix, toy_mask, toy_test_mask = train_test_split_matrix(
    toy_matrix_full, toy_mask_full, test_percent=0.2
)

### Real data

In [103]:
def load_real_data() -> tuple[np.ndarray, np.ndarray]:
    data = scipy.io.loadmat("./data/Data/matlab/inputX.mat")
    x = csr_matrix(data["X"]).toarray()
    mask = x != 0
    return x, mask


real_matrix_full, real_mask_full = load_real_data()

In [104]:
real_matrix, real_mask, real_test_mask = train_test_split_matrix(
    real_matrix_full, real_mask_full, test_percent=0.01
)

print(f"Test size: {int(real_test_mask.sum())}")

Test size: 8001


## Test real data

In [105]:
def load_real_test_flatten_indices(real_matrix_shape: tuple) -> np.ndarray:
    data = scipy.io.loadmat("./data/Data/matlab/inputEval.mat")
    x = data["Eval"]
    rows_indices = x[:, 0] - 1
    columns_indices = x[:, 1] - 1
    return rows_indices * real_matrix_shape[0] + columns_indices


real_test_flatten_indices = load_real_test_flatten_indices(real_matrix.shape)
real_test_flatten_indices

array([  593,  3104,  1544, ..., 40305, 39553, 39565], dtype=uint16)

In [106]:
def save_solution(
    matrix: np.ndarray,
    test_flatten_indices: np.ndarray = real_test_flatten_indices,
    prefix: str = "",
    decimals: int = 4,
):
    predictions = np.round(matrix.flatten()[test_flatten_indices], decimals=decimals)
    results = pd.DataFrame(
        predictions, columns=["Rating"], index=np.arange(1, predictions.shape[0] + 1)
    )
    results.to_csv(f"./solutions/{prefix}submission.csv", index_label="ID")

## A1. SVD

In [107]:
def impute(
    matrix: np.ndarray,
    mask: np.ndarray,
    strategy: Literal["mean", "most_frequent", "median", "random"],
) -> np.ndarray:
    if strategy in ["mean", "most_frequent", "median"]:
        return SimpleImputer(
            strategy=strategy, missing_values=0, keep_empty_features=True
        ).fit_transform(matrix)

    new_matrix = matrix.copy()
    new_matrix[~mask] = np.random.uniform(low=1, high=5, size=np.sum(~mask))
    return new_matrix


def svd_based(
    initial_matrix: np.ndarray,
    mask: np.ndarray,
    rank: int | float,
    iterations: int,
    imputing_strategy: Literal["mean", "most_frequent", "median", "random"] = "mean",
    randomized_svd: bool = False,
    verbose: bool = True,
    diff_norm_stopping_criterion: Optional[float] = None,
    seed: int = 420,
) -> np.ndarray:
    np.random.seed(seed)

    # Initial imputing
    matrix = impute(initial_matrix, mask, strategy=imputing_strategy)

    r = rank
    if isinstance(rank, float):
        r = int(min(matrix.shape) * rank)

    with tqdm.tqdm(range(iterations), disable=not verbose) as loop:
        for _ in loop:
            old_matrix = matrix[~mask].copy()

            if randomized_svd:
                u, s, vt = sklearn.utils.extmath.randomized_svd(
                    matrix, n_components=r, random_state=seed
                )
                s = np.diag(s)

            else:
                u, s, vt = np.linalg.svd(matrix, full_matrices=False)
                s = np.diag(s[:r])
                u = u[:, :r]
                vt = vt[:r, :]

            matrix = u @ s @ vt

            # Keep known values
            matrix[mask] = initial_matrix[mask]

            diff_norm = np.linalg.norm(matrix[~mask] - old_matrix)
            if verbose:
                loop.set_postfix({"Difference norm": diff_norm})

            if (
                diff_norm_stopping_criterion is not None
                and diff_norm <= diff_norm_stopping_criterion
            ):
                break

        return matrix

In [108]:
choose_best(
    toy_matrix,
    toy_mask,
    svd_based,
    lambda sol: rmse_score(toy_mask_full, sol, toy_test_mask),
    {
        "rank": [0.2, 0.3, 0.5],
        "iterations": 25,
        "imputing_strategy": ["mean", "most_frequent", "median", "random"],
        "randomized_svd": [True, False],
        "verbose": False,
    },
)

(array([[2.        , 3.        , 2.        , 2.92431594, 2.68232072],
        [0.45829577, 2.        , 3.07384344, 4.        , 1.34197273],
        [3.        , 4.46710247, 3.        , 4.37030923, 4.        ],
        [1.55571574, 3.        , 2.91388923, 4.        , 2.45737367],
        [1.07604102, 2.61790234, 3.09133014, 4.13588594, 2.        ],
        [1.        , 2.49527507, 3.        , 4.        , 1.89301043]]),
 1.22157424565747,
 {'rank': 0.5,
  'iterations': 25,
  'imputing_strategy': 'most_frequent',
  'randomized_svd': False,
  'verbose': False})

In [113]:
svd_sol, svd_score, svd_params = choose_best(
    real_matrix,
    real_mask,
    svd_based,
    lambda sol: rmse_score(real_mask_full, sol, real_test_mask),
    {
        "rank": [1],
        "iterations": 200,
        "imputing_strategy": "random",
        "randomized_svd": True,
        "verbose": True,
    },
)

svd_sol, svd_score, svd_params

100%|██████████| 200/200 [03:45<00:00,  1.13s/it, Difference norm=3.65] 


(array([[5.        , 3.38652131, 3.24441599, ..., 3.69182739, 3.84242649,
         4.04154983],
        [4.03643502, 3.1162232 , 2.98546013, ..., 3.39716101, 3.53573991,
         3.71897005],
        [4.15777447, 3.20990011, 3.07520617, ..., 3.49928321, 3.64202794,
         3.83076616],
        ...,
        [3.86696185, 2.98538591, 2.86011304, ..., 3.2545283 , 3.38728886,
         3.56282592],
        [4.00175327, 3.08944806, 2.95980853, ..., 3.36797201, 3.50536022,
         3.68701601],
        [3.        , 2.9757912 , 2.85092094, ..., 3.2440686 , 3.37640248,
         3.55137538]]),
 2.646701411137743,
 {'rank': 1,
  'iterations': 200,
  'imputing_strategy': 'random',
  'randomized_svd': True,
  'verbose': True})

In [114]:
save_solution(svd_sol, decimals=5)