# Домашнее задание №3. Вариант рекомендательной системы с knn

In [1]:
from collections import Counter
from typing import Union

import implicit
import rectools
import requests
import scipy as sp
import numpy as np
import pandas as pd
from rectools import Columns
from rectools.metrics import MAP, MeanInvUserFreq
from rectools.dataset import Dataset
from scipy.sparse import coo_matrix, spmatrix
from implicit.nearest_neighbours import ItemItemRecommender

import os
import os.path

  from .autonotebook import tqdm as notebook_tqdm


In [9]:
data_path = os.environ.get("DATA_PATH")

In [3]:
print("implicit:", implicit.__version__)
print("requests:", requests.__version__)
print("rectools:", rectools.__version__)
print("pandas:", pd.__version__)
print("numpy:", np.__version__)
print("scipy:", sp.__version__)

implicit: 0.7.2
requests: 2.32.3
rectools: 0.12.0
pandas: 2.2.3
numpy: 1.26.4
scipy: 1.12.0


In [90]:
%%time

!wget -q https://github.com/irsafilo/KION_DATASET/raw/f69775be31fa5779907cf0a92ddedb70037fb5ae/data_original.zip -O kion_train.zip
!unzip -o kion_train.zip -x '__MACOSX/*'
!rm kion_train.zip

Archive:  kion_train.zip
  inflating: data_original/interactions.csv  
  inflating: data_original/users.csv  
  inflating: data_original/items.csv  
CPU times: user 53.6 ms, sys: 5.14 ms, total: 58.8 ms
Wall time: 5.63 s


In [10]:
if data_path is None:
    data_path = "C:/Users/Nigamadyanov/Downloads/data_original/"

In [11]:
interactions = (
    pd.read_csv(os.path.join(data_path, "interactions.csv"), parse_dates=["last_watch_dt"])
    .rename(columns={'total_dur': Columns.Weight,
                     'last_watch_dt': Columns.Datetime})
)
users = pd.read_csv(os.path.join(data_path, "users.csv"))
items = pd.read_csv(os.path.join(data_path, "items.csv"))

print(interactions.shape)
interactions.head(5)

(5476251, 5)


Unnamed: 0,user_id,item_id,datetime,weight,watched_pct
0,176549,9506,2021-05-11,4250,72.0
1,699317,1659,2021-05-29,8317,100.0
2,656683,7107,2021-05-09,10,0.0
3,864613,7638,2021-07-05,14483,100.0
4,964868,9506,2021-04-30,6725,100.0


# Ситуация:

Ваш коллега ушел в отпуск, оставив вам недописанную модель в `userknn.py` (код из семинара №3 будет в ячейке этого ноутбука далее)

Через 3 дня ее нужно выкатить в А/B тест. Это значит, что модель должна уметь выдавать рекомендации размера `k` любым пользователям (холодным / горячим).

Требуется дописать методы класса UserKnn:

- метод, который выдает рекомендации холодным пользователям (сейчас модель этого не умеет)
- в методе, который выдает рекомендации горячим пользователям, дописать код так, чтобы у всех было одинаковое кол-во рекомендаций k, а не меньше k, как сейчас.

Также была договоренность, что перед выкаткой вы покажете продакт менеджеру финальные значения по оффлайн метрикам.

# Готовим тестовую выборку

Здесь и далее жестко фиксируем тестовую выборку **ровно последние 14 дней из `interactions`**  - по результатам на ней будут выставляться баллы

Сохраните тестовую выборку (тестовые interactions) в pandas DataFrame и назовите его `test`.

Оставьте в нем все поля из `interactions`. Удостоверьтесь, что формат полей следующий:

    Column   Dtype         
 - user_id -    int64         
 - item_id -    int64         
 - datetime -   datetime64
 - weight -      int64         
 - watched_pct - float64    


In [92]:
max_date = interactions['datetime'].max()
cutoff_date = max_date - pd.Timedelta(days=14)

test = interactions[interactions['datetime'] > cutoff_date].copy()
test = test[['user_id', 'item_id', 'datetime', 'weight', 'watched_pct']]

train = interactions[interactions['datetime'] <= cutoff_date].copy()
train = train[['user_id', 'item_id', 'datetime', 'weight', 'watched_pct']]


## Проверка:  тестовая выборка - 3 балла

Внимание! Есть скрытые тесты

In [93]:
assert isinstance(test, pd.DataFrame)
assert test.datetime.nunique() == 14

expected_mean = 6887.2
actual_mean = round(test['weight'].mean(), 1)
assert abs(actual_mean - expected_mean) < 0.001


# Код модели

Допишите методы `fit_cold_model` и  `recommend_cold`, которые делают рекомендации холодным пользователям (сейчас модель этого не умеет)

In [94]:
class UserKnn:
    """
    A user-based KNN model wrapper around `implicit.nearest_neighbours.ItemItemRecommender`
    """

    SIMILAR_USER_COLUMN = "similar_user_id"
    SIMILARITY_COLUMN = "similarity"
    IDF_COLUMN = "idf"

    def __init__(self, model: ItemItemRecommender, N_similar_users: int):
        self.model = model
        self.N_similar_users = N_similar_users

        self.users_inv_mapping = None
        self.users_mapping = None
        self.items_inv_mapping = None
        self.items_mapping = None

        self.watched_items_dataframe = None
        self.item_idf = None
        self.cold_model_fitted = False

    def _set_mappings(self, interactions: pd.DataFrame) -> None:
        """
        Create dictionaries to map external IDs (users, items) to internal IDs and vice versa.
        """
        unique_users = interactions[Columns.User].unique()
        self.users_inv_mapping = dict(enumerate(unique_users))
        self.users_mapping = {v: k for k, v in self.users_inv_mapping.items()}

        unique_items = interactions[Columns.Item].unique()
        self.items_inv_mapping = dict(enumerate(unique_items))
        self.items_mapping = {v: k for k, v in self.items_inv_mapping.items()}

    def _get_user_item_matrix(self, interactions: pd.DataFrame) -> spmatrix:
        """
        Construct a sparse user-item matrix in CSR format.
        Rows represent users, and columns represent items.
        """
        user_idx = interactions[Columns.User].map(self.users_mapping.get)
        item_idx = interactions[Columns.Item].map(self.items_mapping.get)
        data = interactions[Columns.Weight].astype(np.float64)

        user_item_coo = coo_matrix((data, (user_idx, item_idx)))
        return user_item_coo.tocsr()

    def _set_interacted_items_dataframe(self, interactions: pd.DataFrame) -> None:
        """
        Groups interactions by user to get item_id list for each user
        """
        self.interacted_items_dataframe = (
            interactions.groupby(Columns.User, as_index=False)
            .agg({Columns.Item: list})
            .rename(columns={Columns.User: self.SIMILAR_USER_COLUMN})
        )

    @staticmethod
    def idf(n: int, x: float):
        """
        Calculates IDF for one item
        """
        return np.log((1 + n) / (1 + x) + 1)

    def _count_item_idf(self, interactions: pd.DataFrame) -> None:
        """
        Calculate IDF values for all items present in the interactions dataset
         and store the result in self.item_idf.
        """
        item_freqs = Counter(interactions[Columns.Item].values)
        item_idf_df = (
            pd.DataFrame
            .from_dict(item_freqs, orient="index", columns=["doc_freq"])
            .reset_index()
        )
        total_interactions = len(interactions)
        item_idf_df[self.IDF_COLUMN] = item_idf_df["doc_freq"].apply(
            lambda x: self.idf(total_interactions, x)
        )
        self.item_idf = item_idf_df

    def _prepare_for_model(self, train_interactions: pd.DataFrame) -> None:
        """
        Sets mappings, grouped interactions, calculates idf
        """
        self._set_mappings(train_interactions)
        self._set_interacted_items_dataframe(train_interactions)
        self._count_item_idf(train_interactions)

    def fit_cold_model(self, train_interactions: pd.DataFrame) -> None:
        """
        Fit a model for cold recommendations.

        Parameters:
        train_interactions (pd.DataFrame): interaction data used to train the model.
        """
        self.cold_items = (
          train_interactions[Columns.Item]
          .value_counts()
          .head(1000)
          .index.tolist()
        )
        self.cold_model_fitted = True

    def recommend_cold(self, users: Union[list, np.array],
                        k: int = 100) -> pd.DataFrame:
        """
        Return recommendations for the given cold users.
        Can be called separately or within the class. Supports both list and numpy array as input.

        Parameters:
        users (list | np.array): List or array of users for whom recommendations will be generated.
        k (int, optional): Number of recommendations to generate per user. Default is 100.

        Returns:
        pd.DataFrame: A dataframe containing user-item recommendations.
        """

        all_recs = []
        for user in users:
            recs = pd.DataFrame({
                Columns.User: [user] * min(k, len(self.cold_items)),
                Columns.Item: self.cold_items[:k],
                Columns.Score: np.arange(k, 0, -1),
            })
            all_recs.append(recs)

        return pd.concat(all_recs).reset_index(drop=True)

    def fit(self, train_interactions: pd.DataFrame) -> None:
        """
        Fit the model on the provided training data.

        Internally:
        1) Prepare mappings, watchlist DataFrame, and item IDF.
        2) Create a user-item matrix and fit the underlying Implicit model.
        """
        self._prepare_for_model(train_interactions)
        user_item_matrix = self._get_user_item_matrix(train_interactions)
        self.model.fit(user_item_matrix.T)


    def _get_similar_users(self, external_user_id: int) -> tuple[list[int], list[float]]:
        """
        Retrieve a list of similar users and corresponding similarities
        from the underlying Implicit model.
        """
        if external_user_id not in self.users_mapping:
            # if user doesn't exist in mapping, return sentinel (-1).
            return [-1], [-1]

        internal_user_id = self.users_mapping[external_user_id]
        user_ids, similarities = self.model.similar_items(
            internal_user_id,
            N=self.N_similar_users
        )
        # convert back to external IDs
        external_user_ids = [self.users_inv_mapping[u_id] for u_id in user_ids]
        return external_user_ids, similarities

    @staticmethod
    def get_rank(recs: pd.DataFrame, k: int) -> pd.DataFrame:
        """
        Sort recommendations by score in descending order,
        assign ranks within each user group, and then truncate by top-k.
        """
        recs = recs.sort_values([Columns.User, Columns.Score], ascending=False)
        recs = recs.drop_duplicates([Columns.User, Columns.Item])
        recs[Columns.Rank] = recs.groupby(Columns.User).cumcount() + 1
        recs = recs[recs[Columns.Rank] <= k][
            [Columns.User, Columns.Item, Columns.Score, Columns.Rank]
        ]

        return recs



    def recommend(self, users: np.ndarray, k: int) -> pd.DataFrame:
        """
        Generate top-k recommendations for the specified list of users.

        Steps:
        1) Find similar users for each target user.
        2) Join watched items from these similar users.
        3) Compute a final score as similarity * IDF.
        4) Return top-k items per user.
        """

        recs = pd.DataFrame({Columns.User: users}) # создаем датафрейм из списка юзерайди, которое передали
        recs[self.SIMILAR_USER_COLUMN], recs[self.SIMILARITY_COLUMN] = zip(
            *recs[Columns.User].map(lambda user_id: self._get_similar_users(user_id))
        ) # смотрим на похожих юзеров + на их силу близости
        recs = recs.set_index(Columns.User).apply(pd.Series.explode).reset_index()

        recs = (
            recs[~(recs[Columns.User] == recs[self.SIMILAR_USER_COLUMN])]
            .merge(
                self.interacted_items_dataframe,
                on=[self.SIMILAR_USER_COLUMN],
                how="left",
            )
            .explode(Columns.Item)
            .sort_values([Columns.User, self.SIMILARITY_COLUMN], ascending=False)
            .drop_duplicates([Columns.User, Columns.Item], keep="first")
            .merge(self.item_idf, left_on=Columns.Item, right_on="index", how="left")
        )

        recs[Columns.Score] = recs[self.SIMILARITY_COLUMN] * recs[self.IDF_COLUMN]
        recs = recs[[Columns.User, Columns.Item, Columns.Score]]

        recs = self.get_rank(recs, k=k)

        if self.cold_model_fitted:
            input_users = pd.DataFrame({Columns.User: users})
            user_counts = recs.groupby(Columns.User)[Columns.Item].count().reset_index(name='count')
            user_counts = input_users.merge(user_counts, on=Columns.User, how='left').fillna(0)
            users_to_fill = user_counts[user_counts['count'] < k][Columns.User].tolist()

            if users_to_fill:
                cold_recs = self.recommend_cold(users=users_to_fill, k=k)
                existing_pairs = set(recs.apply(lambda row: (row[Columns.User], row[Columns.Item]), axis=1))
                cold_recs['pair'] = cold_recs.apply(lambda row: (row[Columns.User], row[Columns.Item]), axis=1)
                cold_recs_filtered = cold_recs[~cold_recs['pair'].isin(existing_pairs)].drop(columns='pair')

                if not cold_recs_filtered.empty:
                    min_score = recs[Columns.Score].min() if not recs.empty else 0
                    cold_score = min_score - 1.0 if not recs.empty else 0.0
                    cold_recs_filtered[Columns.Score] = cold_score

                    combined_recs = pd.concat([recs, cold_recs_filtered], ignore_index=True)
                    combined_recs = combined_recs.sort_values(
                        by=[Columns.User, Columns.Score],
                        ascending=[True, False]
                    )
                    combined_recs = combined_recs.drop_duplicates(
                        subset=[Columns.User, Columns.Item],
                        keep='first'
                    )
                    recs = self.get_rank(combined_recs, k=k)

        return recs


# Рекомендации для холодных пользователей

Используйте метод класса UseKnn, который выдает рекомендации холодным пользователям.

### Порекомендуйте k=10 уникальных айтемов для каждого холодного пользователя из тестовой выборки. Сохраните рекомендации в pandas DataFrame и назовите его `reco_cold`.

Датафрейм `reco_cold` должен иметь обязательные поля `user_id`, `item_id`, `rank`.

⚠️ Холодными считаем пользователей, у которых нет интеракций в train.


In [95]:
test_users = test['user_id'].unique()
train_users = train['user_id'].unique()
cold_users = [user for user in test_users if user not in train_users]

user_knn = UserKnn(model=ItemItemRecommender(K=10), N_similar_users=10)

train_interactions = interactions[interactions['datetime'] <= cutoff_date]
user_knn.fit_cold_model(train_interactions)

reco_cold = user_knn.recommend_cold(cold_users, k=10)
reco_cold = reco_cold.sort_values(['user_id', 'score'], ascending=[True, False])
reco_cold['rank'] = reco_cold.groupby('user_id').cumcount() + 1
reco_cold = reco_cold[['user_id', 'item_id', 'rank']]
reco_cold.columns = ['user_id', 'item_id', 'rank']

## Проверка рекомендаций по холодным пользователям - 4 балла

Внимание! Есть скрытые тесты

In [96]:
# проверка правильности формирования холодных рекомендаций - 2 балл

assert isinstance(reco_cold, pd.DataFrame)

expected_columns = {'user_id', 'item_id', 'rank'}
assert expected_columns.issubset(set(reco_cold.columns))

assert reco_cold.user_id.nunique() == 111690

assert (reco_cold.groupby('user_id')['item_id'].nunique() == 10).all(), \
    "Ошибка: у каждого user_id должно быть ровно 10 уникальных item_id"


In [97]:
# проверка метрик качества холодных рекомендаций на полном test - 2 балла
assert (MAP(k=10).calc(reco_cold, test) >= 0.04
    and MAP(k=10).calc(reco_cold, test) < 0.98)


# Рекомендации для горячих пользователей

Допишите метод `recommend` класса UseKnn так, чтобы он выдавал ровно k рекомендаций (сейчас он выдает некоторым пользователям меньше k, особенность implicit knn модели)

### Порекомендуйте k=10 уникальных айтемов для каждого горячего пользователя из тестовой выборки. Сохраните рекомендации в pandas DataFrame и назовите его `reco_hot`

датафрейм `reco_hot` ддолжен иметь обязательные поля `user_id`, `item_id`, `rank`.

⚠️ горячими считаем пользователей, у которых ЕСТЬ любое количество интеракции в train.

## Hack в помощь

Дело к близится к вечеру, продакт менеджер торопит Вас.

А код userknn фитится долго, потому что кто-то запихнул в него весь train с over 900 тыс пользователей.

Вы решаете аккуратно уменьшить трейн

- `возьмите только последние 30 дней датасета в трейн`


In [98]:
max_date = train['datetime'].max()
cutoff_date = max_date - pd.Timedelta(days=31)

train30 = train[train['datetime'] > cutoff_date].copy()

In [99]:
test_users = test['user_id'].unique()
train_users = train30['user_id'].unique()

hot_users = [user for user in test_users if user in train_users]

In [100]:
user_knn = UserKnn(model=ItemItemRecommender(K=10), N_similar_users=10)
user_knn.fit_cold_model(train30)
user_knn.fit(train30)



  0%|          | 0/398800 [00:00<?, ?it/s]

In [101]:
reco_hot = user_knn.recommend(hot_users, k=10)
reco_hot = reco_hot.sort_values(['user_id', 'score'], ascending=[True, False])
reco_hot['rank'] = reco_hot.groupby('user_id').cumcount() + 1
reco_hot = reco_hot[['user_id', 'item_id', 'rank']]
reco_hot.columns = ['user_id', 'item_id', 'rank']

Обучите модель, сделайте рекомендации `reco_hot`

## Проверка рекомендаций по горячим пользователям - 8 баллов

Внимание! Есть скрытые тесты

In [102]:
# проверки на правильность формирования reco_hot - 3 балла за все проверки в ячейке

assert isinstance(reco_hot, pd.DataFrame)

expected_columns = {'user_id', 'item_id', 'rank'}
assert expected_columns.issubset(set(reco_hot.columns))

assert reco_hot.user_id.nunique() == 128070


In [103]:
# проверки на метрики - 5 баллов за все проверки в ячейке

assert (reco_hot.groupby('user_id')['item_id'].nunique() == 10).all(), \
    "Ошибка: у каждого user_id должно быть ровно 10 уникальных item_id"

assert (MAP(k=10).calc(reco_hot, test) <= 0.04
    and MeanInvUserFreq(k=10).calc(reco_hot, test) > 7.)


# Как сдать ноутбук `knn.ipynb` на проверку

⚠️ Важное замечание: чтобы ваш ноутбук смог пройти проверку, скопируйте код класса `UserKnn` из `userknn.py` в этот ноутбук. Мы не можем гарантировать, что импорты из py файла будут работать.

1. Прогоните весь код ноутбука - проверьте, что нет ошибок и тесты проходят
2. Выложите готовый ноутбук в ваш репозиторий с сервисом из домашнего задания №1 по пути `notebooks/hw_3/knn.ipynb` в ветке `hw_3`

3. Проверьте, что есть доступ к вашему репозиторию для аккаунтов `https://github.com/feldlime`

5. Откройте PR в main ветку и добавьте в ревьюеры **своего ментора**

6. Не проводите мердж в `main` ветку, пока не увидите оценку за это ДЗ в ведомости. Файл с ноутбуком должен находиться в ветке `hw_3`

Обратите внимание, что сборка ноутбуков на проверку автоматизирована. В случае неправильного пути, имени файла или ветки (а также при отсутствии доступа у `@feldlime`) ваша работа не попадёт на проверку и получит `0` баллов.

# Баллы по ДЗ №3: максимум 25 баллов

1. прохождение проверки кода в ноутбуке `knn.ipynb` - **15 баллов**
2. обернуть модель UserKnn в сервис и побить безлайн `map@10 = 0.063` на лидерборде. Оценивается только лидерборд, без код ревью - **10 баллов**

## Комментарии  

- Вы можете переспользовать в своем сервисе код из `userknn.py`, который использован в ноутбуке `knn.ipynb`

Как реализовать модель в сервисе:

- онлайн вариант: обучаете модель, сохраняете обученную модель (pickle, dill), при запуске сервиса ее поднимаете и запрашиваете рекомендации "на лету"
- оффлайн вариант: предварительно посчитайте рекомендации для всех пользователей, сохраните и запрашивайте их
- в приватном тесте лидерборда есть как холодные, так и горячие пользователи