In [23]:
# pip install mlflow

In [24]:
# pip install implicit

In [25]:
from datetime import timedelta
from scipy.sparse import csr_matrix
from catboost import CatBoostClassifier
import pandas as pd
import polars as pl
import numpy as np
import implicit
import mlflow
import itertools
import time
import os
from lightgbm import LGBMClassifier
from lightgbm import early_stopping, log_evaluation

mlflow.set_tracking_uri('http://51.250.35.156:5000/')
mlflow.set_experiment(experiment_name='homework-iamozolin.ext')

<Experiment: artifact_location='s3://mlflow/54', creation_time=1747415315534, experiment_id='54', last_update_time=1747415315534, lifecycle_stage='active', name='homework-iamozolin.ext', tags={}>

In [4]:
# Уменьшаем многопоточность для стабильности на Kaggle
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"

from threadpoolctl import threadpool_limits
threadpool_limits(1, "blas")

<threadpoolctl.threadpool_limits at 0x7eace75b91d0>

In [5]:
DATA_DIR = '/kaggle/input/experiments1/'

In [6]:
clickstream_df = pl.read_parquet(f'{DATA_DIR}clickstream.pq')
cat_features_df = pl.read_parquet(f'{DATA_DIR}cat_features.pq')
events_df = pl.read_parquet(f'{DATA_DIR}events.pq')
test_users_df = pl.read_parquet(f'{DATA_DIR}test_users.pq')
text_features_df = pl.read_parquet(f'{DATA_DIR}text_features.pq')

print("Event type columns:", events_df.columns)
display(events_df.head(5))
print("Number of test users:", test_users_df.height)
display(test_users_df.head(5))

Event type columns: ['event', 'is_contact']


event,is_contact
i64,i64
0,1
11,0
4,1
16,0
15,1


Number of test users: 92319


cookie
i64
52564
105000
57152
87303
37755


In [7]:
def recall_at(df_solution: pl.DataFrame, df_pred: pl.DataFrame, k: int = 40) -> float:
    return df_solution[["node", "cookie"]].join(
        df_pred.group_by("cookie").head(k).with_columns(value=1).select(["node", "cookie", "value"]),
        on=["cookie", "node"],
        how="left"
    ).select([
        pl.col("value").fill_null(0), "cookie"
    ]).group_by("cookie").agg(
        (pl.col("value").sum() / pl.col("value").count()).alias("recall")
    )["recall"].mean()

In [8]:
# Разделим события по времени: 14 дней на валидацию
DAYS_THR = 14
thr = clickstream_df['event_date'].max() - timedelta(days=DAYS_THR)

df_train = clickstream_df.filter(clickstream_df['event_date'] <= thr)
df_eval = (
    clickstream_df
    .filter(pl.col("event_date") > thr)
    .join(df_train, on=["cookie", "node"], how="anti")
    .filter(pl.col("event").is_in(events_df.filter(pl.col("is_contact") == 1)["event"].unique()))
    .filter(pl.col("cookie").is_in(df_train["cookie"].unique()))
    .filter(pl.col("node").is_in(df_train["node"].unique()))
    .unique(["cookie", "node"])
)

In [9]:
def prepare_matrix(users: pd.Series, nodes: pd.Series) -> csr_matrix:

    user_ids = users.unique().to_list()
    item_ids = nodes.unique().to_list()
        
    user_id_to_index = {user_id: idx for idx, user_id in enumerate(user_ids)}
    item_id_to_index = {item_id: idx for idx, item_id in enumerate(item_ids)}
    index_to_item_id = {v:k for k,v in item_id_to_index.items()}
    
    rows = users.replace_strict(user_id_to_index).to_list()
    cols = nodes.replace_strict(item_id_to_index).to_list()
    
    values = [1] * len(users)
    
    sparse_matrix = csr_matrix((values, (rows, cols)), shape=(len(user_ids), len(item_ids)))

    return sparse_matrix, user_id_to_index, index_to_item_id

In [10]:
def als_predict(
    sparse_matrix: csr_matrix,
    users_to_pred: list,
    users_indicies: dict,
    index_items_ids: dict,
    factors: int = 50,
    iters: int = 10,
    regularization: float = 0.01,
    alpha: int = 10,
    use_cg: bool = True,
    random_state: int = 42
):
    als_model = implicit.als.AlternatingLeastSquares(
        factors=factors,
        iterations=iters,
        regularization=regularization,
        alpha=alpha,
        use_cg=use_cg,
        random_state=random_state
    )

    als_model.fit((sparse_matrix * alpha).astype('float64'))

    user4pred = np.array([users_indicies[user_id] for user_id in users_to_pred])

    recommendations, scores = als_model.recommend(
        user4pred, 
        sparse_matrix[user4pred], 
        N=40, 
        filter_already_liked_items=True
    )

    df_pred = pl.DataFrame(
        {
            'node': [
                [index_items_ids[rec_id] for rec_id in rec] for rec in recommendations.tolist()
            ], 
            'cookie': list(users_to_pred),
            'scores': scores.tolist()
        }
    )

    df_pred = df_pred.explode(['node', 'scores'])

    return df_pred

In [11]:
def mlflow_als_predict_log(users_data, als_params):

    als_predicted = []

    with mlflow.start_run(run_name='als'):
        mlflow.log_param('model_type', 'ALS')
        mlflow.log_params(als_params)

        print('Обучение параметрами:', params)

        try:
            train_start_time = time.time()

            als_predicted = als_predict(
                sparse_matrix=users_data['sparse_matrix'],
                users_to_pred=users_data['eval_users'],
                users_indicies=users_data['user_id_to_index'],
                index_items_ids=users_data['index_to_item_id'],
                factors=als_params.get("factors", 50),
                iters=als_params.get("iters", 10),
                regularization=als_params.get("regularization", 0.01),
                alpha=als_params.get("alpha", 10),
                use_cg=als_params.get("use_cg", True),
                random_state=als_params.get("random_state", 42)
            )

            train_time = time.time() - train_start_time
            mlflow.log_metric('seconds_training', train_time)

            print('ALS обучилась за', train_time)

            als_recall40 = 0.0

            if als_predicted is None or als_predicted.height == 0:
                pass
            else:
                als_recall40 = recall_at(
                    df_eval,
                    als_predicted,
                    k=40
                )

            mlflow.log_metric('Recall_40', als_recall40)
            print('Recall@40 =', als_recall40)

        except KeyboardInterrupt as kie:
            mlflow.log_param("error_message", 'Прервано вручную')
            mlflow.set_tag("run_status", "FAILED")

    return als_predicted

In [13]:
users = df_train["cookie"]
nodes = df_train["node"]
eval_users = df_eval['cookie'].unique().to_list()

In [14]:
sparse_matrix, user_id_to_index, index_to_item_id = prepare_matrix(users, nodes)

In [15]:
data = {
    'sparse_matrix': sparse_matrix,
    'eval_users': eval_users,
    'user_id_to_index': user_id_to_index,
    'index_to_item_id': index_to_item_id
}

In [52]:
als_params = {
    'factors': 200,
    'iters': 10,
    'regularization': 0.1,
    'alpha': 3
}

In [53]:
als_predicted_2 = mlflow_als_predict_log(data, als_params)

Обучение параметрами: {'factors': 150, 'iters': 20, 'regularization': 0.01, 'alpha': 5}


  0%|          | 0/10 [00:00<?, ?it/s]

ALS обучилась за 183.45749878883362
Recall@40 = 0.15807169603352375
🏃 View run als at: http://51.250.35.156:5000/#/experiments/54/runs/9a2ed5bc2c9f4335ac11f886a6fd866a
🧪 View experiment at: http://51.250.35.156:5000/#/experiments/54


In [54]:
params_3 = {
    'factors': 200,
    'iters': 20,
    'regularization': 0.1,
    'alpha': 3
}

In [55]:
als_predicted_3 = mlflow_als_predict_log(data, params_3)

Обучение параметрами: {'factors': 150, 'iters': 20, 'regularization': 0.01, 'alpha': 5}


  0%|          | 0/20 [00:00<?, ?it/s]

ALS обучилась за 262.61746048927307
Recall@40 = 0.15582006294940232
🏃 View run als at: http://51.250.35.156:5000/#/experiments/54/runs/f4a7c8c097cf4b52a4158f3d8b505d8f
🧪 View experiment at: http://51.250.35.156:5000/#/experiments/54


In [60]:
params_6 = {
    'factors': 220,
    'iters': 20,
    'regularization': 0.1,
    'alpha': 5
}

In [61]:
als_predicted_6 = mlflow_als_predict_log(data, params_6)

Обучение параметрами: {'factors': 150, 'iters': 20, 'regularization': 0.01, 'alpha': 5}


  0%|          | 0/20 [00:00<?, ?it/s]

ALS обучилась за 305.17872309684753
Recall@40 = 0.15057537538313412
🏃 View run als at: http://51.250.35.156:5000/#/experiments/54/runs/b70017c2bedb423eb752bc2c6cc36ee2
🧪 View experiment at: http://51.250.35.156:5000/#/experiments/54


In [16]:
def split_cands_train_val(df_cands, test_size=0.2, seed=42):
    df_cands = df_cands.to_pandas()
    unique_groups = df_cands['cookie'].unique()

    rng = np.random.RandomState(seed)
    rng.shuffle(unique_groups)

    n_test = int(len(unique_groups) * test_size)
    val_groups = unique_groups[:n_test]
    train_groups = unique_groups[n_test:]

    df_cands_train = df_cands[df_cands['cookie'].isin(train_groups)]
    df_cands_val   = df_cands[df_cands['cookie'].isin(val_groups)]

    return pl.from_pandas(df_cands_train), pl.from_pandas(df_cands_val)

In [17]:
def prepare_for_catboost(als_preds):
    df_eval_positive = (
        df_eval.select(['cookie','node'])
        .with_columns(pl.lit(1).alias('label'))
    )

    candidates_eval = (
        als_preds
        .join(df_eval_positive, on=['cookie','node'], how='left')
        .with_columns(pl.col('label').fill_null(0).alias('label'))
    )

    candidates_train, candidates_val = split_cands_train_val(candidates_eval)
    catboost_features = [col for col in candidates_train.columns if col not in ('cookie', 'node', 'label')]

    return (
        candidates_train,
        candidates_val,
        catboost_features,
        candidates_eval
    )

In [20]:
best_als_params = {
    'factors': 200,
    'iters': 10,
    'regularization': 0.1,
    'alpha': 3
}

In [21]:
catboost_params = {
    'iterations': 3000,
    'depth': 6,
    'l2_leaf_reg': 3.0,
    'learning_rate': 0.2,
    'loss_function': 'Logloss'
}

In [22]:
best_als_params_log = {f'als_{key}': value for key, value in best_als_params.items()}
catboost_params_log = {f'catboost_{key}': value for key, value in catboost_params.items()}

In [69]:
def run_als_catboost_experiment(
    catboost_params: dict,
    best_als_params: dict,
    best_als_params_log: dict,
    catboost_params_log: dict,
    data: dict,
    df_eval
):

    with mlflow.start_run(run_name='als + catboost'):
        mlflow.log_param('model_type', 'catboost + als')
        mlflow.log_params(best_als_params_log)
        mlflow.log_params(catboost_params_log)

        print('Начало обучения')
        train_start_time = time.time()

        best_als_predicted = als_predict(
            sparse_matrix=data['sparse_matrix'],
            users_to_pred=data['eval_users'],
            users_indicies=data['user_id_to_index'],
            index_items_ids=data['index_to_item_id'],
            factors=best_als_params.get("factors", 200),
            iters=best_als_params.get("iters", 10),
            regularization=best_als_params.get("regularization", 0.1),
            alpha=best_als_params.get("alpha", 3),
            use_cg=best_als_params.get("use_cg", True),
            random_state=best_als_params.get("random_state", 42)
        )

        candidates_train, candidates_val, catboost_features, candidates_eval = (
            prepare_for_catboost(best_als_predicted)
        )

        X_train = candidates_train.select(catboost_features).to_pandas()
        y_train = candidates_train['label'].to_numpy()
        X_val = candidates_val.select(catboost_features).to_pandas()
        y_val = candidates_val['label'].to_numpy()

        catboost_ranker = CatBoostClassifier(**catboost_params)
        catboost_ranker.fit(
            X_train, y_train,
            eval_set=(X_val, y_val),
            verbose=50
        )

        train_time = time.time() - train_start_time
        mlflow.log_metric('seconds_training', train_time)
        print('Два этапа обучились за', train_time)

        X = candidates_eval.select(catboost_features).to_pandas()
        probas = catboost_ranker.predict_proba(X)[:, 1]

        candidates_eval = candidates_eval.with_columns(pl.Series(probas).alias('ranker_score'))
        catboost_predicted = (
            candidates_eval
            .sort(['cookie', 'ranker_score'], descending=[False, True])
            .group_by('cookie')
            .head(40)
        )

        recall_40 = recall_at(df_eval, catboost_predicted, k=40)
        mlflow.log_metric('Recall_40', recall_40)

        print('Recall@40 =', recall_40)

In [70]:
run_als_catboost_experiment(
    catboost_params=catboost_params,
    best_als_params=best_als_params,
    best_als_params_log=best_als_params_log,
    catboost_params_log=catboost_params_log,
    data=data,
    df_eval=df_eval
)

Начало обучения


  0%|          | 0/10 [00:00<?, ?it/s]

0:	learn: 0.3424376	test: 0.3424673	best: 0.3424673 (0)	total: 412ms	remaining: 20m 35s
50:	learn: 0.0542667	test: 0.0554719	best: 0.0554705 (32)	total: 9.65s	remaining: 9m 17s
100:	learn: 0.0542451	test: 0.0554809	best: 0.0554705 (32)	total: 18s	remaining: 8m 36s
150:	learn: 0.0542333	test: 0.0554908	best: 0.0554705 (32)	total: 26.4s	remaining: 8m 17s
200:	learn: 0.0542247	test: 0.0554994	best: 0.0554705 (32)	total: 34.7s	remaining: 8m 2s
250:	learn: 0.0542188	test: 0.0555066	best: 0.0554705 (32)	total: 43.7s	remaining: 7m 59s
300:	learn: 0.0542156	test: 0.0555126	best: 0.0554705 (32)	total: 52s	remaining: 7m 46s
350:	learn: 0.0542136	test: 0.0555155	best: 0.0554705 (32)	total: 1m	remaining: 7m 33s
400:	learn: 0.0542149	test: 0.0555186	best: 0.0554705 (32)	total: 1m 8s	remaining: 7m 22s
450:	learn: 0.0542140	test: 0.0555215	best: 0.0554705 (32)	total: 1m 17s	remaining: 7m 17s
500:	learn: 0.0542124	test: 0.0555233	best: 0.0554705 (32)	total: 1m 25s	remaining: 7m 7s
550:	learn: 0.054212

In [71]:
catboost_params_2 = {
    'iterations': 2000,
    'depth': 6,
    'l2_leaf_reg': 3.0,
    'learning_rate': 0.05,
    'loss_function': 'Logloss'
}
catboost_params_log_2 = {f'catboost_{key}': value for key, value in catboost_params_2.items()}

In [72]:
run_als_catboost_experiment(
    catboost_params=catboost_params_2,
    best_als_params=best_als_params,
    best_als_params_log=best_als_params_log,
    catboost_params_log=catboost_params_log_2,
    data=data,
    df_eval=df_eval
)

Начало обучения


  0%|          | 0/10 [00:00<?, ?it/s]

0:	learn: 0.5856511	test: 0.5861903	best: 0.5861903 (0)	total: 228ms	remaining: 7m 34s
50:	learn: 0.0548832	test: 0.0560165	best: 0.0560165 (50)	total: 9.12s	remaining: 5m 48s
100:	learn: 0.0542805	test: 0.0554732	best: 0.0554732 (100)	total: 18s	remaining: 5m 37s
150:	learn: 0.0542776	test: 0.0554698	best: 0.0554698 (148)	total: 26.5s	remaining: 5m 24s
200:	learn: 0.0542683	test: 0.0554697	best: 0.0554693 (168)	total: 34.6s	remaining: 5m 9s
250:	learn: 0.0542648	test: 0.0554715	best: 0.0554693 (168)	total: 42.7s	remaining: 4m 57s
300:	learn: 0.0542547	test: 0.0554712	best: 0.0554693 (168)	total: 51.7s	remaining: 4m 51s
350:	learn: 0.0542539	test: 0.0554743	best: 0.0554693 (168)	total: 59.8s	remaining: 4m 40s
400:	learn: 0.0542491	test: 0.0554758	best: 0.0554693 (168)	total: 1m 7s	remaining: 4m 30s
450:	learn: 0.0542474	test: 0.0554796	best: 0.0554693 (168)	total: 1m 16s	remaining: 4m 21s
500:	learn: 0.0542375	test: 0.0554822	best: 0.0554693 (168)	total: 1m 25s	remaining: 4m 14s
550:	l

In [26]:
def run_als_lgbm_experiment(
    lgbm_params: dict,
    best_als_params: dict,
    best_als_params_log: dict,
    lgbm_params_log: dict,
    data: dict,
    df_eval
):

    with mlflow.start_run(run_name='best_als + lgbm'):
        mlflow.log_param('model_type', 'lgbm + als')
        mlflow.log_params(best_als_params_log)
        mlflow.log_params(lgbm_params_log)

        print('Начало обучения')
        train_start_time = time.time()

        best_als_predicted = als_predict(
            sparse_matrix=data['sparse_matrix'],
            users_to_pred=data['eval_users'],
            users_indicies=data['user_id_to_index'],
            index_items_ids=data['index_to_item_id'],
            factors=best_als_params.get("factors", 200),
            iters=best_als_params.get("iters", 10),
            regularization=best_als_params.get("regularization", 0.1),
            alpha=best_als_params.get("alpha", 3),
            use_cg=best_als_params.get("use_cg", True),
            random_state=best_als_params.get("random_state", 42)
        )

        candidates_train, candidates_val, lgbm_features, candidates_eval = (
            prepare_for_catboost(best_als_predicted)
        )

        X_train = candidates_train.select(lgbm_features).to_pandas()
        y_train = candidates_train['label'].to_numpy()
        X_val = candidates_val.select(lgbm_features).to_pandas()
        y_val = candidates_val['label'].to_numpy()

        lgbm_ranker = LGBMClassifier(**lgbm_params, verbose=-1)
        lgbm_ranker.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            callbacks=[]  
        )

        train_time = time.time() - train_start_time
        mlflow.log_metric('seconds_training', train_time)
        print('Два этапа обучились за', train_time)

        X = candidates_eval.select(lgbm_features).to_pandas()
        probas = lgbm_ranker.predict_proba(X)[:, 1]

        candidates_eval = candidates_eval.with_columns(pl.Series(probas).alias('ranker_score'))
        lgbm_predicted = (
            candidates_eval
            .sort(['cookie', 'ranker_score'], descending=[False, True])
            .group_by('cookie')
            .head(40)
        )

        recall_40 = recall_at(df_eval, lgbm_predicted, k=40)
        mlflow.log_metric('Recall_40', recall_40)

        print('Recall@40 =', recall_40)

In [27]:
lgbm_params = {
    'n_estimators': 1000,
    'num_leaves': 64,
    'learning_rate': 0.05,
    'reg_alpha': 1.0,
    'reg_lambda': 1.0,
    'objective': 'binary'
}

lgbm_params_log = {f'lgbm_{key}': value for key, value in lgbm_params.items()}

run_als_lgbm_experiment(
    lgbm_params=lgbm_params,
    best_als_params=best_als_params,
    best_als_params_log=best_als_params_log,
    lgbm_params_log=lgbm_params_log,
    data=data,
    df_eval=df_eval
)

Начало обучения


  0%|          | 0/10 [00:00<?, ?it/s]

Два этапа обучились за 281.57664608955383
Recall@40 = 0.1583176397642549
🏃 View run best_als + lgbm at: http://51.250.35.156:5000/#/experiments/54/runs/4c1259beaf9c4f9cb5c2ed408fd4f528
🧪 View experiment at: http://51.250.35.156:5000/#/experiments/54


In [28]:
from mlflow.tracking import MlflowClient

def tag_best_run(exp_name: str, metric: str = "Recall_40"):
    client = MlflowClient()
    experiment = client.get_experiment_by_name(exp_name)
    runs = client.search_runs(
        experiment_ids=[experiment.experiment_id],
        order_by=[f"metrics.{metric} DESC"],
        max_results=1
    )

    if not runs:
        print("Run'ов не найдено.")
        return

    top_run = runs[0]
    run_id = top_run.info.run_id
    score = top_run.data.metrics.get(metric)
    model = top_run.data.params.get("model_type", "unknown")
    name = getattr(top_run.info, 'run_name', "<unknown>")

    client.set_tag(run_id, "best", "true")
    print(f"Лучший запуск: {name} (модель: {model}) с {metric} = {score}")

In [29]:
tag_best_run("homework-iamozolin.ext")

Лучший запуск: best_als + lgbm (модель: lgbm + als) с Recall_40 = 0.1583176397642549
