In [1]:
VERSION = '1'

PARAMS = dict(
    als_iterations = 20,
    als_factors = 150,
    als_regularization = 0.1,
    als_alpha = 5.0,
    als_N_cand = 300,
    als_w_contact = 20,
    
    pop_N_cand = 100,
    proj_N_cand = 100,

    cb_learning_rate = 0.2,
    cb_iterations = 3000,
    cb_depth = 6,
    cb_l2_leaf_reg = 3.0,
    cb_loss_function = 'YetiRank',
    
    do_test_pred = True,
)

FILES = dict(
    VAL = dict(
        cands_als       = None,
        cands_pop       = None,
        cands_proj      = None,
        cookie_features = None,
        node_features   = None,
        distances       = None,
    ),
    
    TEST = dict(
        cands_als       = None,
        cands_pop       = None,
        cands_proj      = None,
        cookie_features = None,
        node_features   = None,
        distances       = None,
    )
)

PROJECTION_COOKIE_PATH = '/kaggle/input/aaa-recsys-ods-dataset/projection_cookie.parquet'
PROJECTION_NODE_PATH   = '/kaggle/input/aaa-recsys-ods-dataset/projection_node.parquet'

DATA_DIR = '/kaggle/input/avito-cup-2025-recsys'

In [2]:
!pip install implicit mlflow faiss-cpu polars==1.25.2 >> _

In [3]:
import os
import gc
import random
from datetime import timedelta
from tqdm import tqdm

import numpy as np
import pandas as pd
import polars as pl
import implicit

from scipy.sparse import csr_matrix
from sklearn.model_selection import train_test_split, GroupShuffleSplit
from catboost import Pool, CatBoostClassifier, CatBoostRanker

import faiss

import warnings
warnings.simplefilter('ignore')


SEED = 27
np.random.seed(SEED)
random.seed(SEED)

# Functions

In [4]:
def get_data() -> tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame, pl.DataFrame, pl.DataFrame]:
    """
    Load and preprocess the core datasets.

    Returns:
        df_test (pl.DataFrame): test users to predict nodes
        df_clickstream (pl.DataFrame): clickstream joined with events (is_contact)
        df_events (pl.DataFrame): event metadata
        df_cat (pl.DataFrame): categorical features to items
        df_text (pl.DataFrame): text feature projections to items
    """
    df_test        = pl.read_parquet(f'{DATA_DIR}/test_users.pq').select(pl.all().shrink_dtype())
    df_clickstream = pl.read_parquet(f'{DATA_DIR}/clickstream.pq').select(pl.all().shrink_dtype())
    df_events      = pl.read_parquet(f'{DATA_DIR}/events.pq').select(pl.all().shrink_dtype())
    df_cat         = pl.read_parquet(f'{DATA_DIR}/cat_features.pq').select(pl.all().shrink_dtype())
    df_text        = pl.read_parquet(f'{DATA_DIR}/text_features.pq').select(pl.all().shrink_dtype())

    df_clickstream = (df_clickstream.join(df_events, on='event', how='left'))
    return df_test, df_clickstream, df_events, df_cat, df_text


def split_train_val(
    df_clickstream: pl.DataFrame, 
    df_events: pl.DataFrame, 
    days: int = 14
) -> tuple[pl.DataFrame, pl.DataFrame]:
    """
    Split clickstream history into training and validation datasets.

    Args:
        df_clickstream: full click history with 'event_date'
        df_events: event metadata (including is_contact)
        days: how many days at end to use as validation

    Returns:
        df_train: interactions up to cutoff
        df_val: new unique (cookie, node) pairs with contact in the holdout period 
    """
    treshold = df_clickstream['event_date'].max() - timedelta(days=days)

    df_train = df_clickstream.filter(df_clickstream['event_date'] <= treshold)
    df_eval = (
        df_clickstream
        .filter(df_clickstream['event_date'] > treshold)
        .select(['cookie', 'node', 'event'])
        .join(df_train, on=['cookie', 'node'], how='anti')
        .filter(pl.col('event').is_in(df_events.filter(pl.col('is_contact') == 1)['event'].unique()))
        .filter(pl.col('cookie').is_in(df_train['cookie'].unique()))
        .filter(pl.col('node').is_in(df_train['node'].unique()))
        .unique(['cookie', 'node'])
    )
    return df_train, df_eval


def get_labels(
    df_val: pl.DataFrame, 
    df_cands: pl.DataFrame
) -> pl.DataFrame:
    """
    Join candidate pairs (cookie, node)  with true labels for validation.

    Args:
        df_val: true (cookie, node) contact pairs
        df_cands: candidate (cookie, node, ...) rows

    Returns:
        df_cands labeled with 0/1 in 'label' column
    """
    df_positive_val = (
        df_val
        .select(['cookie','node'])
        .with_columns(pl.lit(1).alias('label'))
    )
    
    return (
        df_cands
        .join(df_positive_val, on=['cookie','node'], how='left')
        .with_columns(pl.col('label').fill_null(0).alias('label'))
    )
    

def split_cands_train_val(
    df_cands: pl.DataFrame
) -> tuple[pl.DataFrame, pl.DataFrame]:
    """
    Split candidates for ranker training (80%) and validation (20%) by cookie group.

    Args:
        df_cands: full candidate DataFrame with 'cookie' and 'label'

    Returns:
        df_train: training subset for ranker
        df_val: validation subset for ranker
    """
    df_cands = df_cands.to_pandas()
    
    gss = GroupShuffleSplit(test_size=0.2, random_state=SEED)
    train_idx, val_idx = next(gss.split(
        X=df_cands,
        y=df_cands['label'],
        groups=df_cands['cookie']
    ))
    df_cands_train = pl.from_pandas(df_cands.iloc[train_idx])
    df_cands_val   = pl.from_pandas(df_cands.iloc[val_idx])
    return df_cands_train, df_cands_val


def recall_candidates(
    df_val: pl.DataFrame, 
    df_cands: pl.DataFrame
) -> float:
    """
    Candidate-stage recall: fraction of true contacts covered by candidates in df_cands.

    Args:
        df_val: true (cookie, node) contact pairs
        df_cands: candidate (cookie, node) pairs

    Returns:
        Average recall over cookies
    """
    df_true = df_val.select(['cookie','node']).unique()
    df_hit = df_true.join(
        df_cands.select(['cookie','node']),
        on=['cookie','node'],
        how='inner'
    )
    per_user = (
        df_true
        .group_by('cookie')
        .agg(pl.count().alias('true_cnt'))
        .join(
            df_hit.group_by('cookie').agg(pl.count().alias('hit_cnt')),
            on='cookie', how='left'
        )
        .with_columns(
            (pl.col('hit_cnt').fill_null(0) / pl.col('true_cnt'))
            .alias('recall_cand')
        )
    )
    return per_user['recall_cand'].mean()


def recall_at(
    df_true: pl.DataFrame,
    df_pred: pl.DataFrame,
    k: int = 40,
) -> float:
    """
    Compute Recall@k: fraction of true nodes appearing in top-k predictions per cookie.

    Args:
        df_true: true (cookie, node) pairs
        df_pred: scored predictions with 'cookie', 'node', 'score'
        k: cutoff

    Returns:
        scalar recall@k averaged over cookies
    """
    # sanity checks
    assert df_pred.group_by(['cookie']).agg(pl.col('node').count())['node'].max() < 41 , 'send more then 40 nodes per cookie'
    assert 'node' in df_pred.columns, 'node columns does not exist'
    assert 'cookie' in df_pred.columns, 'cookie columns does not exist'
    assert df_pred.with_columns(v = 1).group_by(['cookie','node']).agg(pl.col('v').count())['v'].max() == 1 , 'more then 1 cookie-node pair'
    
    return (
        df_true[['node', 'cookie']]
        .join(
            df_pred.group_by('cookie').head(k).with_columns(value=1)[['node', 'cookie', 'value']],
            how='left',
            on=['cookie', 'node']
        )
        .select([pl.col('value').fill_null(0), 'cookie'])
        .group_by('cookie')
        .agg([pl.col('value').sum() / pl.col('value').count()])['value']
        .mean()
    )


In [5]:
class ALSRecommender:
    """
    ALS-based collaborative filtering recommender.
    """
    
    def __init__(self, params: dict[str, float]):
        self.params = {key: value for key, value in params.items() if key.startswith('als_')}
        
        self.iterations     = self.params['als_iterations']
        self.factors        = self.params['als_factors']
        self.regularization = self.params['als_regularization']
        self.alpha          = self.params['als_alpha']
        self.N_cand         = self.params['als_N_cand']
        self.w_contact      = self.params['als_w_contact']
        
        self.user_to_index = {}
        self.node_to_index = {}
        self.index_to_node = {}
        self.matrix = None
        self.model = None

    def fit(self, df_train: pl.DataFrame):
        users = df_train['cookie'].to_list()
        nodes = df_train['node'].to_list()
        # build mappings
        self.user_to_index = {u: i for i, u in enumerate(sorted(set(users)))}
        self.node_to_index = {n: j for j, n in enumerate(sorted(set(nodes)))}
        self.index_to_node = {j: n for n, j in self.node_to_index.items()}
        # build sparse matrix
        rows = [self.user_to_index[u] for u in users]
        cols = [self.node_to_index[n] for n in nodes]
        data = df_train['is_contact'].map_elements(
            lambda el: 
            self.w_contact if el == 1 
            else 1
        ).to_numpy()
        
        self.matrix = csr_matrix(
            (data, (rows, cols)), 
            shape=(len(self.user_to_index), len(self.node_to_index))
        )
        
        self.model = implicit.als.AlternatingLeastSquares(
            iterations=self.iterations,
            factors=self.factors,
            regularization=self.regularization,
            alpha=self.alpha,
            num_threads=0,
            random_state=SEED
        )
        self.model.fit(self.matrix)
        return self

    def recommend(self, df_users: pl.DataFrame ) -> pl.DataFrame:
        """
        Generate top-als_N_cand recommendations per cookie.
        """
        user_list = df_users['cookie'].unique().to_list()
        idxs = [self.user_to_index[c] for c in user_list]
        recs, scores = self.model.recommend(
            userid=idxs,
            user_items=self.matrix[idxs],
            N=self.N_cand,
            filter_already_liked_items=True
        )
        
        df_pred = pl.DataFrame({
            'cookie': np.repeat(user_list, self.N_cand),
            'node':  np.concatenate([[self.index_to_node[j] for j in r] for r in recs]),
            'als_score': np.concatenate(scores)
        })
        return df_pred

In [6]:
class FeatureGenerator:
    """
    Generate cookie- and node-level features from clickstream.
    """
    
    def __init__(self):
        self.cookie_features = None
        self.node_features   = None

    def fit(
        self, 
        df_clickstream: pl.DataFrame, 
        df_events: pl.DataFrame, 
        df_cat: pl.DataFrame, 
        mode: str
    ):
        if FILES[mode]['cookie_features'] and FILES[mode]['node_features']:
            self.cookie_features = pl.read_parquet(FILES[mode]['cookie_features'])
            self.node_features   = pl.read_parquet(FILES[mode]['node_features'])
            return self
        
        self.df_cat  = df_cat.drop('clean_params')
        df = (
            df_clickstream
            .join(df_events, on='event', how='left')
            .drop('event')
            .join(self.df_cat.select(['item', 'location', 'category']), on='item', how='left')
        )
        self.cookie_features = self.get_cookie_features(df)
        self.node_features   = self.get_node_features(df)
        
        # cache
        self.cookie_features.write_parquet(f'cookie_features_{mode}_v{VERSION}.parquet')
        self.node_features.write_parquet(f'node_features_{mode}_v{VERSION}.parquet')
        return self
        
    def transform(self, df: pl.DataFrame) -> pl.DataFrame:       
        return (
            df
            .join(self.cookie_features, on='cookie', how='left')
            .join(self.node_features,   on='node',   how='left')
        )

    def get_cookie_features(self, df: pl.DataFrame) -> pl.DataFrame:
        return (
            df.select('cookie').unique('cookie')
            .join(self.get_cookie_events_contacts(df), on='cookie', how='left')
            .join(self.get_cookie_recency(df),         on='cookie', how='left')
            .join(
                self.get_obj_top_features(df, "cookie", ["surface", "platform", "location", "category"]),  
                on='cookie', how='left'
            )
        )

    def get_cookie_events_contacts(self, df: pl.DataFrame) -> pl.DataFrame:
        """
        Кол-во events, contacts и доля contact_rate у каждого cookie
        """
        df_total = (
            df
            .select('cookie')
            .group_by('cookie')
            .agg(pl.count().alias('cookie_events'))
        )
        df_contacts = (
            df
            .select(['cookie', 'is_contact'])
            .filter(pl.col('is_contact') == 1)
            .group_by('cookie')
            .agg(pl.count().alias('cookie_contacts'))
        )
        df_feats = (
            df_total
            .join(df_contacts, on='cookie', how='left')
            .fill_null(0)
            .with_columns(
                (pl.col('cookie_contacts') / pl.col('cookie_events'))
                .alias('cookie_contact_rate')
            )
        )
        return df_feats

    def get_cookie_recency(self, df: pl.DataFrame) -> pl.DataFrame:
        """
        Кол-во дней с последнего event у cookie
        """
        cookie_last_event = (
            df
            .select(['cookie', 'event_date'])
            .group_by('cookie')
            .agg(pl.col('event_date').max().alias('last_event'))
        )
        max_date = df['event_date'].max()
        return (
            cookie_last_event
            .with_columns(
                ((max_date - pl.col('last_event')).dt.total_days())
                .alias('cookie_recency')
            )
            .select(['cookie','cookie_recency'])
        )
        
    def get_obj_top_features(self, df: pl.DataFrame, obj: str, features: list[str]) -> pl.DataFrame:
        """
        Для каждого obj (cookie или node) вычисляет самое частое значение фичи из списка features
        """
        df_top_features = df.select(obj).unique()
        for f in features:
            top_f = (
                df
                .select([obj, f])
                .group_by([obj, f])
                .agg(pl.count().alias("cnt"))
                .sort([obj, "cnt"], descending=True)
                .group_by(obj)
                .agg(pl.first(f).alias(f"{obj}_top_{f}"))
            )
            df_top_features = df_top_features.join(top_f, on=obj, how="left")
        return df_top_features
    
    def get_node_features(self, df: pl.DataFrame) -> pl.DataFrame:
        return (
            df.select('node').unique('node')
            .join(self.get_node_events_contacts(df), on='node', how='left')
            .join(self.get_node_items(),             on='node', how='left')
            .join(
                self.get_obj_top_features(df, "node", ["location", "category"]),  
                on='node', how='left'
            )
        )
    
    def get_node_events_contacts(self, df: pl.DataFrame) -> pl.DataFrame:
        """
        Кол-во events, contacts и доля contact_rate у каждого node
        """
        df_total = (
            df
            .select('node')
            .group_by('node')
            .agg(pl.count().alias('node_events'))
        )
        df_contacts = (
            df
            .select(['node', 'is_contact'])
            .filter(pl.col('is_contact') == 1)
            .group_by('node')
            .agg(pl.count().alias('node_contacts'))
        )
        df_feats = (
            df_total
            .join(df_contacts, on='node', how='left')
            .fill_null(0)
            .with_columns(
                (pl.col('node_contacts') / pl.col('node_events'))
                .alias('node_contact_rate')
            )
        )
        return df_feats

    def get_node_items(self) -> pl.DataFrame:
        """
        Кол-во уникальных item у node
        """
        return (
            self.df_cat
            .select(['node', 'item'])
            .group_by('node')
            .agg(pl.col('item').n_unique().alias('node_items'))
        )


In [7]:
class RankerCatBoost:
    def __init__(self, params: dict[str, float]):
        self.params = {key[3:]: value for key, value in params.items() if key.startswith('cb_')}
        self.params_without_val = self.params | dict(
            task_type = 'GPU',
            eval_metric = 'RecallAt:top=40', 
            metric_period = 10,
            random_seed = SEED, 
            thread_count = -1
        )
        self.params_with_val = self.params_without_val | dict(
            early_stopping_rounds = 100, 
            use_best_model = True
        )

        self.features = None
        self.best_iteration = None
        self.model = None

    
    def fit(self, df_train: pl.DataFrame, df_val: pl.DataFrame):
        self.features = [col for col in df_train.columns if col not in ('cookie', 'node', 'label')]

        train_pool = Pool(
            data     = df_train[self.features].to_pandas(),
            label    = df_train['label'].to_pandas(),
            group_id = df_train['cookie'].to_pandas(),
        )
        val_pool = Pool(
            data     = df_val[self.features].to_pandas(),
            label    = df_val['label'].to_pandas(),
            group_id = df_val['cookie'].to_pandas(),
        )

        self.model = CatBoostRanker(**self.params_with_val)
        self.model.fit(
            train_pool,
            eval_set=val_pool,
            verbose=50
        )
        self.params_without_val['iterations'] = self.model.get_best_iteration()
        return self

    
    def refit_full(self, df_full: pl.DataFrame):
        full_pool = Pool(
            data     = df_full[self.features].to_pandas(),
            label    = df_full['label'].to_pandas(),
            group_id = df_full['cookie'].to_pandas(),
        )
        
        self.model = CatBoostRanker(**self.params_without_val)
        self.model.fit(
            full_pool,
            verbose=50
        )
        return self

    
    def predict(self, df_cand: pl.DataFrame) -> pl.DataFrame:
        """
        Score df_cand and return top-40 per cookie.
        """
        pred_pool = Pool(data = df_cand[self.features].to_pandas(),)
        scores  = self.model.predict(pred_pool)
        return (
            df_cand
            .with_columns(pl.Series(scores).alias('ranker_score'))
            .sort(['cookie','ranker_score'], descending=[False,True])
            .group_by('cookie')
            .head(40)
        )


In [8]:
def get_cands(
    df_clickstream: pl.DataFrame, 
    df_to_pred: pl.DataFrame, 
    df_events: pl.DataFrame, 
    mode: str = 'VAL'
) -> pl.DataFrame:
    """
    Generate combined candidate set from ALS, popularity, and text projection.
    """
    if mode not in ('VAL', 'TEST'):
        raise ValueError("mode must be 'VAL' or 'TEST'")
    
    df_cands_als = get_cands_als(df_clickstream, df_to_pred, mode)
    df_cands_popular = get_cands_popular(df_clickstream, df_to_pred, df_events, mode)
    df_cands_projection = get_cands_projection(df_clickstream, df_to_pred, mode)
    
    df_cands = (
        pl.concat([df_cands_als, df_cands_popular, df_cands_projection], how='diagonal_relaxed')
        .unique(['cookie', 'node'])
        .sort('cookie')
    )
    return filter_cands_not_seen(df_cands, df_clickstream)


def get_cands_als(
    df_clickstream: pl.DataFrame, 
    df_to_pred: pl.DataFrame, 
    mode: str
) -> pl.DataFrame:
    """
    Load or compute ALS-based candidates.
    """
    N_cands = PARAMS['als_N_cand']
    als_path = FILES[mode]['cands_als']
    
    if als_path is not None:
        return (
            pl.read_parquet(als_path)
            .sort(['cookie', 'als_score'], descending=[False, True])
            .group_by('cookie')
            .head(N_cands)
        )
    
    als = ALSRecommender(PARAMS)
    als.fit(df_clickstream)
    df_cands_als = als.recommend(df_to_pred)
    
    df_cands_als.write_parquet(f'cands_als_{mode}_v{VERSION}_n{N_cands}.parquet')
    return df_cands_als


def get_cands_popular(
    df_clickstream: pl.DataFrame,
    df_to_pred: pl.DataFrame,
    df_events: pl.DataFrame,
    mode: str
) -> pl.DataFrame:
    """
    Load or compute popularity-based candidates.
    """
    N_cands = PARAMS['pop_N_cand']
    pop_path = FILES[mode]['cands_pop']
    
    if pop_path is not None:
        df_popular_nodes = (
            pl.read_parquet(pop_path)
            .head(N_cands)
        )

    else:
        df = (
            df_clickstream
            .join(df_events, on='event', how='left')
            .filter(pl.col('is_contact') == 1)
            .select(['node', 'cookie', 'event_date'])
        )
        max_date = df_clickstream['event_date'].max()
        df_popular_nodes = (
            df
            .filter(pl.col('event_date') >= max_date - pl.duration(days=14))
            .group_by('node')
            .agg(pl.count('cookie').alias('popular_score'))
            .sort('popular_score', descending=True)
            .head(N_cands)
            .select(['node', 'popular_score'])
        )
        df_popular_nodes.write_parquet(f'cands_pop_{mode}_v{VERSION}_n{N_cands}.parquet')
    
    df_cands_popular = (
        df_to_pred.select('cookie').unique()
        .join(df_popular_nodes, how='cross')
    )
    return df_cands_popular


def get_cands_projection(
    df_clickstream: pl.DataFrame,
    df_to_pred: pl.DataFrame,
    mode: str
) -> pl.DataFrame:
    """
    Load or compute text-projection-based candidates using FAISS.
    """
    N_cands = PARAMS['proj_N_cand']
    proj_path = FILES[mode]['cands_proj']
    
    if proj_path is not None:
        return (
            pl.read_parquet(proj_path)
            .sort(['cookie', 'text_score'], descending=[False, True])
            .group_by('cookie')
            .head(N_cands)
        )

    node_projection   = pl.read_parquet(PROJECTION_NODE_PATH)
    cookie_projection = pl.read_parquet(PROJECTION_COOKIE_PATH)
    df_users = (
        df_to_pred.select('cookie').unique('cookie').sort('cookie')
        .join(cookie_projection, on='cookie', how='left')
    )

    nodes = node_projection['node'].to_numpy()
    X = np.vstack(node_projection['node_projection'].to_list()).astype('float32')
    faiss.normalize_L2(X)
    
    index = faiss.IndexFlatIP(X.shape[1])
    index.add(X)

    records = []
    for row in tqdm(df_users.rows(named=True), mininterval=60):
        u_id = row['cookie']
        u_vec = np.array(row['cookie_projection'], dtype='float32').reshape(1, -1)
        faiss.normalize_L2(u_vec)
    
        D, I = index.search(u_vec, N_cands)
        for dist, idx in zip(D[0], I[0]):
            records.append({
                'cookie': u_id, 
                'node': int(nodes[idx]), 
                'text_score': float(dist)
            })

    df_cands_projection = pl.DataFrame(records)
    df_cands_projection.write_parquet(f'cands_proj_{mode}_v{VERSION}_n{N_cands}.parquet')
    return df_cands_projection


def filter_cands_not_seen(
    df_cands: pl.DataFrame,
    df_clickstream: pl.DataFrame
) -> pl.DataFrame:
    """
    Remove (cookie,node) pairs that appear in historical clickstream (train dataset).
    """
    return df_cands.join(df_clickstream, on=['cookie', 'node'], how='anti')


# Models

## 1. Baseline - popularity

Каждому cookie будем рекомендовать топ-40 самых популярных node.  
Под популярностью подразумевается кол-во уникальных cookie, которые совершили любое действие с node

In [9]:
# Get data
df_test, df_clickstream, df_events, df_cat, df_text = get_data()
df_train, df_val = split_train_val(df_clickstream, df_events)

In [10]:
df_popular_nodes = (
    df_train
    .group_by('node')
    .agg(pl.count('cookie').alias('popular_score'))
    .sort('popular_score', descending=True)
    .head(40)
    .select('node')
)

df_predict = (
    df_val.select('cookie').unique()
    .join(df_popular_nodes, how='cross')
    )

print(f'Validation Recall_40 = {recall_at(df_val, df_predict, k=40):.5f}')

Validation Recall_40 = 0.05807


## 2. Baseline - ALS

Обучим ALS на действиях всех пользователей в train части clickstream. Будем присваивать разный вес разным действиям:  
- $w = 1$ - для действий, которые **НЕ** являются контактом
- $w = 20$ - для действий, которые являются контактом

Затем для каждого cookie предскажем топ-40 самых релевантных nodes

In [11]:
als_params = dict(
    als_iterations = 20,
    als_factors = 150,
    als_regularization = 0.1,
    als_alpha = 5.0,
    als_N_cand = 40,
    als_w_contact = 20,
)

als = ALSRecommender(als_params)
als.fit(df_train)
df_predict = als.recommend(df_val)

print(f'Validation Recall_40 = {recall_at(df_val, df_predict, k=40):.5f}')

  0%|          | 0/20 [00:00<?, ?it/s]

Validation Recall_40 = 0.15701


## 3. ALS + CatBoost

Перейдем к двухстадийной модели:
1. ALS обучается аналогично baseline, но генерирует больше кандидатов (300 - 500)
2. Ранкер CatBoostRanker ранжирует кандидатов на основе дополнительных фич и отбирает топ-40

Фичи генерируются в классе FeatureGenerator для объектов cookie и node в каждой паре (cookie, node), которые нужно отранжировать.  
Примеры фич: кол-во контактов, действий у node, кол-во items у node, отношение неконтактных действий к контактным.

Для нахождения лучшей итерации у CatBoostRanker, все кандидаты от ALS `df_cands` делятся на два датасета: `df_cands_train` и `df_cands_val`. Разбиение выполняется через `GroupShuffleSplit`, где группа - это кандидаты каждого cookie.  

Вначале ранкер обучается на `df_cands_train` и валидируется на `df_cands_val`. После этого, получив лучшую итерацию, можно заново обучить ранкер на всех данных через `ranker.refit_full(df_cands)` для получения максимального качества.

In [12]:
als.N_cand = 300

df_cands = als.recommend(df_val)
df_cands = get_labels(df_val, df_cands)

feature_gen = FeatureGenerator()
feature_gen.fit(df_train, df_events, df_cat, mode='VAL')
df_cands = feature_gen.transform(df_cands)

# Уменьшаем размер df_cands
df_cands = df_cands.select(pl.all().shrink_dtype())
df_cands = df_cands.cast({'cookie': pl.Int64, 'node': pl.Int64})

df_cands_train, df_cands_val = split_cands_train_val(df_cands)

ranker = RankerCatBoost(PARAMS)
ranker.fit(df_cands_train, df_cands_val)
# ranker.refit_full(df_cands)

df_predict = ranker.predict(df_cands)

print(f'Validation Recall_40 = {recall_at(df_val, df_predict, k=40):.5f}')

Metric PFound is not implemented on GPU. Will use CPU for metric computation, this could significantly affect learning time
Metric RecallAt:top=40 is not implemented on GPU. Will use CPU for metric computation, this could significantly affect learning time


0:	learn: 0.5622485	test: 0.5660626	best: 0.5660626 (0)	total: 17.4s	remaining: 14h 28m 6s
50:	learn: 0.6576561	test: 0.6549810	best: 0.6552184 (49)	total: 36.4s	remaining: 35m 5s
100:	learn: 0.6612197	test: 0.6572273	best: 0.6574121 (99)	total: 55.5s	remaining: 26m 32s
150:	learn: 0.6620482	test: 0.6595958	best: 0.6599150 (146)	total: 1m 14s	remaining: 23m 25s
200:	learn: 0.6632060	test: 0.6607583	best: 0.6609023 (181)	total: 1m 33s	remaining: 21m 41s
250:	learn: 0.6639782	test: 0.6608232	best: 0.6609831 (207)	total: 1m 52s	remaining: 20m 33s
300:	learn: 0.6644850	test: 0.6605407	best: 0.6609831 (207)	total: 2m 11s	remaining: 19m 39s
bestTest = 0.6609831277
bestIteration = 207
Shrink model to first 208 iterations.
Validation Recall_40 = 0.17921


In [13]:
del als, df_cands, df_cands_train, df_cands_val, ranker, df_predict
gc.collect()

0

## 4. ALS, popularity, projections + CatBoost

Сделаем несколько моделей для генерации кандидатов ранкеру:
1. **ALS** - остается с прошлого шага
2. **Popularity** - рекомендует топ-100 самых популярных nodes каждому cookie
3. **Projection distance** - рекомендует топ-100 nodes с самыми близкими векторами к вектору cookie

Подробнее про вектора:  
В соревновании были даны вектора для items. Из них получались вектора для node и cookie.  
Вектора для node рассчитывались как среднее векторов items, относящихся к node. Вектора для cookie рассчитывались как среднее векторов nodes, с которыми взаимодействовал cookie.  
Вектора nodes помещались в `faiss.IndexFlatIP()` и для каждого cookie искали топ-100 ближайших векторов по cosine similarity.

Далее для кандидатов от всех источников генерировались фичи, и затем они подавались в CatBoostRanker

In [14]:
df_cands = get_cands(df_train, df_val, df_events, mode='VAL')
df_cands = get_labels(df_val, df_cands)

df_cands = feature_gen.transform(df_cands)

# Уменьшаем размер df_cands
df_cands = df_cands.select(pl.all().shrink_dtype())
df_cands = df_cands.cast({'cookie': pl.Int64, 'node': pl.Int64})

df_cands_train, df_cands_val = split_cands_train_val(df_cands)

ranker = RankerCatBoost(PARAMS)
ranker.fit(df_cands_train, df_cands_val)
# ranker.refit_full(df_cands)

df_predict = ranker.predict(df_cands)

print(f'Validation Recall_40 = {recall_at(df_val, df_predict, k=40):.5f}')

  0%|          | 0/20 [00:01<?, ?it/s]

100%|██████████| 55081/55081 [07:39<00:00, 119.89it/s]
Metric PFound is not implemented on GPU. Will use CPU for metric computation, this could significantly affect learning time
Metric RecallAt:top=40 is not implemented on GPU. Will use CPU for metric computation, this could significantly affect learning time


0:	learn: 0.4594105	test: 0.4583124	best: 0.4583124 (0)	total: 1.44s	remaining: 1h 11m 51s
50:	learn: 0.5689375	test: 0.5630968	best: 0.5630968 (50)	total: 30.6s	remaining: 29m 32s
100:	learn: 0.5731994	test: 0.5649992	best: 0.5652972 (98)	total: 59.9s	remaining: 28m 38s
150:	learn: 0.5751450	test: 0.5674908	best: 0.5679771 (147)	total: 1m 29s	remaining: 28m 3s
200:	learn: 0.5764936	test: 0.5686706	best: 0.5688031 (193)	total: 1m 58s	remaining: 27m 29s
250:	learn: 0.5773610	test: 0.5699216	best: 0.5699216 (250)	total: 2m 27s	remaining: 26m 58s
300:	learn: 0.5777012	test: 0.5710842	best: 0.5710842 (300)	total: 2m 57s	remaining: 26m 27s
350:	learn: 0.5782304	test: 0.5710774	best: 0.5716400 (327)	total: 3m 26s	remaining: 25m 57s
400:	learn: 0.5789032	test: 0.5714365	best: 0.5717315 (399)	total: 3m 55s	remaining: 25m 27s
450:	learn: 0.5791322	test: 0.5717880	best: 0.5725231 (432)	total: 4m 24s	remaining: 24m 56s
500:	learn: 0.5796227	test: 0.5725602	best: 0.5725980 (496)	total: 4m 54s	rema