<a href="https://colab.research.google.com/github/budennovsk/AuthorBooksComments/blob/master/v2_SASRec_vs_two_level.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install implicit catboost rectools[lightfm] replay-rec==0.21.2rc0 #replay-rec
# !pip -q uninstall -y pyspark
# !pip -q install "pyspark==3.4.3"
import sys
import pyspark
print("python:", sys.version)
print("pyspark:", pyspark.__version__)

In [None]:
import numpy as np
import pandas as pd


def _ensure_datetime(df: pd.DataFrame, col: str = "datetime") -> pd.DataFrame:
    out = df.copy()
    if col in out.columns and not np.issubdtype(out[col].dtype, np.datetime64):
        out[col] = pd.to_datetime(out[col], errors="coerce")
    return out


def _safe_log1p(s: pd.Series) -> pd.Series:
    return np.log1p(s.astype("float64"))


def _add_quantiles(features: pd.DataFrame, group: pd.core.groupby.generic.SeriesGroupBy, prefix: str):
    # добавляет q05/q50/q95 как отдельные колонки
    q = group.quantile([0.05, 0.5, 0.95]).unstack(level=-1)
    q.columns = [f"{prefix}_q{int(p*100):02d}" for p in q.columns]
    return features.join(q, how="left")


def add_pairwise_features(
    interactions: pd.DataFrame,
    user_col: str = "user_id",
    item_col: str = "item_id",
    dt_col: str = "datetime",
    relevance_col: str = "watched_pct",
    cold_user_threshold: int = 1,
    cold_item_threshold: int = 1,
) -> pd.DataFrame:
    """
    Вход: interactions с колонками [user_id, item_id, datetime, total_dur, watched_pct, weight]
    Выход: тот же датафрейм + новые фичи (user- и item-агрегаты примержены обратно на строки взаимодействий).
    """
    df = _ensure_datetime(interactions, dt_col)

    # ---------- Кол-во взаимодействий ----------
    u_cnt = df.groupby(user_col).size().rename("user_interactions_cnt")
    i_cnt = df.groupby(item_col).size().rename("item_interactions_cnt")

    df = df.join(u_cnt, on=user_col).join(i_cnt, on=item_col)

    df["user_log_interactions_cnt"] = _safe_log1p(df["user_interactions_cnt"])
    df["item_log_interactions_cnt"] = _safe_log1p(df["item_interactions_cnt"])

    # ---------- Средний log(кол-ва) "по соседям" ----------
    # item_avg_user_log: средний user_log_interactions_cnt среди пользователей, которые взаимодействовали с item
    item_avg_user_log = (
        df.groupby(item_col)["user_log_interactions_cnt"]
        .mean()
        .rename("item_avg_user_log_interactions_cnt")
    )
    # user_avg_item_log: средний item_log_interactions_cnt среди айтемов, с которыми взаимодействовал user
    user_avg_item_log = (
        df.groupby(user_col)["item_log_interactions_cnt"]
        .mean()
        .rename("user_avg_item_log_interactions_cnt")
    )

    df = df.join(item_avg_user_log, on=item_col).join(user_avg_item_log, on=user_col)

    # разница (как ты просил) между "своим" кол-вом и средним (по соседям)
    df["user_diff_cnt_vs_avg_items"] = df["user_interactions_cnt"] - df["user_avg_item_log_interactions_cnt"].fillna(0.0)
    df["item_diff_cnt_vs_avg_users"] = df["item_interactions_cnt"] - df["item_avg_user_log_interactions_cnt"].fillna(0.0)

    # cold flags
    df["user_cold_flag"] = (df["user_interactions_cnt"] <= cold_user_threshold).astype("int8")
    df["item_cold_flag"] = (df["item_interactions_cnt"] <= cold_item_threshold).astype("int8")

    # ---------- Timestamp-based ----------
    if dt_col in df.columns and np.issubdtype(df[dt_col].dtype, np.datetime64):
        # min/max ts
        u_min = df.groupby(user_col)[dt_col].min().rename("user_min_ts")
        u_max = df.groupby(user_col)[dt_col].max().rename("user_max_ts")
        i_min = df.groupby(item_col)[dt_col].min().rename("item_min_ts")
        i_max = df.groupby(item_col)[dt_col].max().rename("item_max_ts")

        df = (
            df.join(u_min, on=user_col)
              .join(u_max, on=user_col)
              .join(i_min, on=item_col)
              .join(i_max, on=item_col)
        )

        # history length
        df["user_history_timedelta"] = df["user_max_ts"] - df["user_min_ts"]
        df["item_history_timedelta"] = df["item_max_ts"] - df["item_min_ts"]

        df["user_history_days"] = df["user_history_timedelta"].dt.total_seconds() / 86400.0
        df["item_history_days"] = df["item_history_timedelta"].dt.total_seconds() / 86400.0

        # log number of interaction days: кол-во уникальных дней с взаимодействиями
        tmp_date = df[dt_col].dt.floor("D")
        u_days = tmp_date.groupby(df[user_col]).nunique().rename("user_interaction_days_cnt")
        i_days = tmp_date.groupby(df[item_col]).nunique().rename("item_interaction_days_cnt")

        df = df.join(u_days, on=user_col).join(i_days, on=item_col)

        df["user_log_interaction_days_cnt"] = _safe_log1p(df["user_interaction_days_cnt"])
        df["item_log_interaction_days_cnt"] = _safe_log1p(df["item_interaction_days_cnt"])

        # days since last interaction (относительно последней даты в логе)
        last_log_ts = df[dt_col].max()
        df["user_days_since_last_interaction"] = (last_log_ts - df["user_max_ts"]).dt.total_seconds() / 86400.0
        df["item_days_since_last_interaction"] = (last_log_ts - df["item_max_ts"]).dt.total_seconds() / 86400.0

    # ---------- Relevance-based (по watched_pct) ----------
    if relevance_col in df.columns:
        # user relevance stats
        g_u = df.groupby(user_col)[relevance_col]
        u_mean = g_u.mean().rename("user_rel_mean")
        u_std = g_u.std(ddof=0).rename("user_rel_std")

        # item relevance stats
        g_i = df.groupby(item_col)[relevance_col]
        i_mean = g_i.mean().rename("item_rel_mean")
        i_std = g_i.std(ddof=0).rename("item_rel_std")

        df = (
            df.join(u_mean, on=user_col)
              .join(u_std, on=user_col)
              .join(i_mean, on=item_col)
              .join(i_std, on=item_col)
        )

        # quantiles
        # (делаем через отдельные таблицы, чтобы не раздувать расчёты на каждой строке)
        u_q = g_u.quantile([0.05, 0.5, 0.95]).unstack(level=-1)
        u_q.columns = ["user_rel_q05", "user_rel_q50", "user_rel_q95"]

        i_q = g_i.quantile([0.05, 0.5, 0.95]).unstack(level=-1)
        i_q.columns = ["item_rel_q05", "item_rel_q50", "item_rel_q95"]

        df = df.join(u_q, on=user_col).join(i_q, on=item_col)

        # ---------- Abnormality (простой вариант через KL divergence p_u(item) || q(item)) ----------
        # q(item): доля взаимодействий с item во всём логе
        item_pop = (df.groupby(item_col).size() / len(df)).rename("q_item_pop")

        # p_u(item): доля взаимодействий пользователя с item
        ui_cnt = df.groupby([user_col, item_col]).size().rename("ui_cnt").reset_index()
        u_total = ui_cnt.groupby(user_col)["ui_cnt"].sum().rename("u_total").reset_index()

        ui_cnt = ui_cnt.merge(u_total, on=user_col, how="left")
        ui_cnt["p_ui"] = ui_cnt["ui_cnt"] / ui_cnt["u_total"]

        ui_cnt = ui_cnt.merge(item_pop.reset_index(), on=item_col, how="left")

        eps = 1e-12
        ui_cnt["kl_term"] = ui_cnt["p_ui"] * np.log((ui_cnt["p_ui"] + eps) / (ui_cnt["q_item_pop"] + eps))
        user_abn = ui_cnt.groupby(user_col)["kl_term"].sum().rename("user_pref_abnormality_kl")

        df = df.join(user_abn, on=user_col)

    return df

In [None]:
from pathlib import Path
import typing as tp
import warnings

import pandas as pd
import numpy as np

from implicit.nearest_neighbours import CosineRecommender
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import RidgeClassifier
from catboost import CatBoostClassifier, CatBoostRanker
try:
    from lightgbm import LGBMClassifier, LGBMRanker
    LGBM_AVAILABLE = True
except ImportError:
    warnings.warn("lightgbm is not installed. Some parts of the notebook will be skipped.")
    LGBM_AVAILABLE = False
from rectools.dataset import Interactions
from lightfm import LightFM
from rectools import Columns
from rectools.dataset import Dataset
from rectools.metrics import Precision, Recall, MeanInvUserFreq, Serendipity,MAP,NDCG,HitRate
from rectools.models import (
    ImplicitALSWrapperModel,
    ImplicitBPRWrapperModel,
    LightFMWrapperModel,
    PureSVDModel,
    ImplicitItemKNNWrapperModel,
    EASEModel,
    PopularModel)
from implicit.als import AlternatingLeastSquares
from implicit.bpr import BayesianPersonalizedRanking
from implicit.nearest_neighbours import CosineRecommender
from rectools.models.base import ExternalIds
from rectools.models.ranking import (
    CandidateRankingModel,
    CandidateGenerator,
    Reranker,
    CatBoostReranker,
    CandidateFeatureCollector,
    PerUserNegativeSampler
)
from rectools.model_selection import cross_validate, TimeRangeSplitter,LastNSplitter,Splitter

In [None]:
path_users = '/content/drive/MyDrive/Colab Notebooks/Симбирсофт/recsys/dataset/data_original/users.csv'
path_items = '/content/drive/MyDrive/Colab Notebooks/Симбирсофт/recsys/dataset/data_original/items.csv'
path_interactions = '/content/drive/MyDrive/Colab Notebooks/Симбирсофт/recsys/dataset/data_original/interactions.csv'


users = pd.read_csv(path_users)
items = pd.read_csv(path_items)
interactions = (
    pd.read_csv(path_interactions, parse_dates=["last_watch_dt"])
    .rename(columns={"last_watch_dt": Columns.Datetime})
)
interactions = interactions.head(1000000)
users_clise = users[users['user_id'].isin(interactions['user_id'].unique())]
interactions["weight"] = 1
dataset = Dataset.construct(interactions)
RANDOM_STATE = 32
# dataset

In [None]:
# from replay.preprocessing.history_based_fp import LogStatFeaturesProcessor
# from replay.utils.session_handler import get_spark_session, State
# from replay.experimental.preprocessing.data_preparator import DataPreparator, Indexer
# spark = State().session
# spark.sparkContext.setLogLevel('ERROR')
# dp = DataPreparator()
# log = dp.transform(data=interactions,
#                   columns_mapping={
#                       "user_id": "user_id",
#                       "item_id":  "item_id",
#                       "relevance": "watched_pct",
#                       "timestamp": "datetime"
#                   })

# log.show(2)

In [None]:
# indexer = Indexer(user_col='user_id', item_col='item_id')
# indexer.fit(users=log.select('user_id'),
#             items=log.select('item_id'))
# log = indexer.transform(df=log)
# log.show(2)

In [None]:
# from replay.preprocessing.history_based_fp import LogStatFeaturesProcessor
# lf = LogStatFeaturesProcessor()
# lf.fit(log)
# log_trsfrm = lf.transform(log)
# log_trsfrm.show(1, vertical=True)

In [None]:
# log_trsfrm = lf.transform(log)

In [None]:
# log_trsfrm.show(1, vertical=True)

In [None]:
# log_pd = log_trsfrm.toPandas()
# rename_map = {
#     "user_idx": "user_id",
#     "item_idx": "item_id",
#     "timestamp": "datetime",
#     "relevance": "watched_pct"}

# log_pd = log_pd.rename(columns=rename_map)
# # log_pd  = Interactions.construct(log_pd)
# log_pd

In [None]:
# interactions_fe = add_pairwise_features(interactions)

In [None]:
# log_pd[log_pd['total_dur']==57581].T

In [None]:
# interactions_fe[interactions_fe['total_dur']==57581].T

In [None]:
# log_pd[['item_id', 'user_id', 'datetime', 'total_dur', 'watched_pct', 'weight',
#        'u_log_num_interact', 'u_log_interact_days_count',
#        'u_min_interact_date', 'u_max_interact_date', 'u_std', 'u_mean',
#        'u_quantile_05', 'u_quantile_5', 'u_quantile_95',
#        'u_history_length_days', 'u_last_interaction_gap_days', 'abnormality',
#        'abnormalityCR', 'u_mean_i_log_num_interact', 'i_log_num_interact',
#        'i_log_interact_days_count', 'i_min_interact_date',
#        'i_max_interact_date', 'i_std', 'i_mean', 'i_quantile_05',
#        'i_quantile_5', 'i_quantile_95', 'i_history_length_days',
#        'i_last_interaction_gap_days', 'i_mean_u_log_num_interact',
#        'na_u_log_features', 'na_i_log_features', 'u_i_log_num_interact_diff',
#        'i_u_log_num_interact_diff']]

In [None]:
# Prepare first stage models. They will be used to generate candidates for reranking
first_stage = [
    CandidateGenerator(PopularModel(), num_candidates=100, keep_ranks=True, keep_scores=True),
    CandidateGenerator(
        ImplicitItemKNNWrapperModel(CosineRecommender()),
        num_candidates=100,
        keep_ranks=True,
        keep_scores=True
    ),
    CandidateGenerator(
        ImplicitALSWrapperModel(
          AlternatingLeastSquares(
            factors=10,  # latent embeddings size
            regularization=0.1,
            iterations=10,
            alpha=50,  # confidence multiplier for non-zero entries in interactions
            random_state=RANDOM_STATE)),
    num_candidates=100, keep_ranks=True, keep_scores=True),
    CandidateGenerator(
        LightFMWrapperModel(
            LightFM(no_components=10,
                    loss="bpr",
                    random_state=RANDOM_STATE)),
    num_candidates=100, keep_ranks=True, keep_scores=True
)
]

In [None]:
# # Write custom feature collecting funcs for users, items and user/item pairs
# class CustomFeatureCollector(CandidateFeatureCollector):

#     def __init__(self, user_features_path: Path, user_cat_cols: tp.List[str]) -> None:
#         self.user_features_path = user_features_path
#         self.user_cat_cols = user_cat_cols

#     # your any helper functions for working with loaded data
#     def _encode_cat_cols(self, df: pd.DataFrame, cols: tp.List[str]) -> pd.DataFrame:
#         for col in cols:
#             df[col] = df[col].astype("category").cat.codes.astype("category")
#         return df

#     def _get_user_features(
#         self, users: ExternalIds, dataset: Dataset, fold_info: tp.Optional[tp.Dict[str, tp.Any]]
#     ) -> pd.DataFrame:
#         columns = self.user_cat_cols.copy()
#         columns.append(Columns.User)
#         user_features = users_clise[columns]

#         users_without_features = pd.DataFrame(
#             np.setdiff1d(dataset.user_id_map.external_ids, user_features[Columns.User].unique()),
#             columns=[Columns.User]
#         )
#         user_features = pd.concat([user_features, users_without_features], axis=0)
#         user_features = self._encode_cat_cols(user_features, self.user_cat_cols)
#         print(user_features[user_features[Columns.User].isin(users)])
#         return user_features[user_features[Columns.User].isin(users)]
# # To transfer CatBoostRanker we use CatBoostReranker
# splitter = TimeRangeSplitter("1D", n_splits=1)
# pool_kwargs = {
#     "cat_features": ["age", "income", "sex"]
# }

# two_stage_catboost_ranker = CandidateRankingModel(
#     candidate_generators=first_stage,
#     splitter=splitter,
#     reranker=CatBoostReranker(CatBoostRanker(verbose=False, random_state=RANDOM_STATE),pool_kwargs=pool_kwargs),#pool_kwargs=pool_kwargs
#     sampler=PerUserNegativeSampler(n_negatives=3, random_state=RANDOM_STATE), # pass sampler to fix random_state
#      feature_collector=CustomFeatureCollector(
#         user_features_path=path_users,
#         user_cat_cols=["age", "income", "sex"],
#     )
# )
# candidates = two_stage_catboost_ranker.get_train_with_targets_for_reranker(dataset) #log_pd dataset
# # candidates

In [None]:
class CustomFeatureCollector(CandidateFeatureCollector):

    def __init__(self, log_df) -> None:
        self.log_df = log_df


    def _get_user_item_features(
        self, useritem: pd.DataFrame, dataset: Dataset, fold_info: tp.Optional[tp.Dict[str, tp.Any]]
    ) -> pd.DataFrame:

        users_without_features = pd.DataFrame(
            np.setdiff1d(dataset.user_id_map.external_ids, self.log_df['user_id'].unique()),
            columns=['user_id']
        )
        # print('users_without_features',users_without_features) #здесь пустой датасет возрвщается потмоу что у меня внешние айдишники в dataset == self.log_df['user_id'].unique() сгенерированому датасету над признакми
        # print(self.log_df)

        test_data = self.log_df[self.log_df['user_id'].isin(useritem['user_id'].values)].fillna(0)
        # print('return ', test_data)
        # return test_data[['item_id','user_id','user_log_interactions_cnt']]
        # return test_data[['item_id','user_id','user_log_interactions_cnt']]

        return test_data[[i for i in test_data.columns if not i in ["datetime","user_min_ts","user_max_ts","item_min_ts","item_max_ts","user_history_timedelta","item_history_timedelta"]]]


# To transfer CatBoostRanker we use CatBoostReranker
splitter = TimeRangeSplitter("7D", n_splits=1)


two_stage_catboost_ranker = CandidateRankingModel(
    candidate_generators=first_stage,
    splitter=splitter,
    reranker=CatBoostReranker(CatBoostRanker(verbose=False, random_state=RANDOM_STATE)),#pool_kwargs=pool_kwargs
    sampler=PerUserNegativeSampler(n_negatives=3, random_state=RANDOM_STATE), # pass sampler to fix random_state
    # feature_collector=CandidateFeatureCollector(),
    feature_collector=CustomFeatureCollector(log_df= add_pairwise_features(interactions)),
    verbose=1
)
# candidates = two_stage_catboost_ranker.get_train_with_targets_for_reranker(dataset) #log_pd dataset
# candidates.head(5)

In [None]:
# # To transfer CatBoostRanker we use CatBoostReranker
# splitter = TimeRangeSplitter("1D", n_splits=1)


# two_stage_catboost_ranker = CandidateRankingModel(
#     candidate_generators=first_stage,
#     splitter=splitter,
#     reranker=CatBoostReranker(CatBoostRanker(verbose=False, random_state=RANDOM_STATE)),#pool_kwargs=pool_kwargs
#     sampler=PerUserNegativeSampler(n_negatives=3, random_state=RANDOM_STATE), # pass sampler to fix random_state
#     # feature_collector=CandidateFeatureCollector(),
#     feature_collector=CustomFeatureCollector(log_df= add_pairwise_features(interactions)),
#     verbose=1
# )
# candidates = two_stage_catboost_ranker.get_train_with_targets_for_reranker(dataset) #log_pd dataset
# candidates

In [None]:
# trtt = add_pairwise_features(interactions)
# trtt[trtt['user_id']==684368]

In [None]:

# candi = two_stage_catboost_ranker.split_to_history_dataset_and_train_targets(dataset,splitter)
# candi

In [None]:
# interactions[interactions['user_id']==1014363]

In [None]:
# interactions

In [None]:
interactions[interactions['datetime']=='2021-08-22']

In [None]:
interactions[interactions['user_id'].isin([966733,424980,761394,1014363,684368])] #966733  1014363

In [None]:

items[items['item_id']==913] #931 1267

In [None]:
users[users['user_id']==684368] #684368 1014363

In [None]:
# # To transfer CatBoostRanker we use CatBoostReranker
# splitter = TimeRangeSplitter("7D", n_splits=1)

# # Categorical features are definitely transferred to the pool_kwargs
# pool_kwargs = {
#     "cat_features": ["age", "income", "sex"]
# }

# two_stage_catboost_ranker = CandidateRankingModel(
#     candidate_generators=first_stage,
#     splitter=splitter,
#     reranker=CatBoostReranker(CatBoostRanker(verbose=False, random_state=RANDOM_STATE), pool_kwargs=pool_kwargs),
#     sampler=PerUserNegativeSampler(n_negatives=3, random_state=RANDOM_STATE), # pass sampler to fix random_state
#     # feature_collector=CandidateFeatureCollector(),
#     feature_collector=CustomFeatureCollector(user_features_path=path_users, user_cat_cols=["age", "income", "sex"] ),

# )

In [None]:
all_users = dataset.user_id_map.external_ids
users_to_recommend = all_users[:100]

two_stage_catboost_ranker.fit(dataset)
reco_catboost_ranker = two_stage_catboost_ranker.recommend(
    users=users_to_recommend,
    dataset=dataset,
    k=10,
    filter_viewed=True
)
reco_catboost_ranker.head(5)

In [None]:
# Take few models to compare
models = {
    "popular": PopularModel(),
    "cosine_knn": ImplicitItemKNNWrapperModel(CosineRecommender()),
    'iALS':ImplicitALSWrapperModel(
          AlternatingLeastSquares(
            factors=10,  # latent embeddings size
            regularization=0.1,
            iterations=10,
            alpha=50,  # confidence multiplier for non-zero entries in interactions
            random_state=RANDOM_STATE)),
    'LightFM':LightFMWrapperModel(
            LightFM(no_components=10,
                    loss="bpr",
                    random_state=RANDOM_STATE)),
    "two_stage_catboost_ranker": two_stage_catboost_ranker,
}

# We will calculate several classic (precision@k and recall@k) and "beyond accuracy" metrics
metrics = {
    "prec@10": Precision(k=10),
    "recall@10": Recall(k=10),
    "novelty@10": MeanInvUserFreq(k=10),
    "serendipity@10": Serendipity(k=10),
    "MAP@10": MAP(k=10),
    "NDCG@10": NDCG(k=10),
    "HitRate@10": HitRate(k=10)
}

K_RECS = 10

In [None]:
cv_results = cross_validate(
    dataset=dataset,
    splitter=splitter,
    models=models,
    metrics=metrics,
    k=K_RECS,
    filter_viewed=True,
)

In [None]:
pivot_results = (
    pd.DataFrame(cv_results["metrics"])
    .drop(columns="i_split")
    .groupby(["model"], sort=False)
    .agg(["mean"])
)
pivot_results