In [1]:
# !pip install implicit

In [2]:
import os, gc, math, json
import numpy as np
import pandas as pd
import polars as pl

from scipy.sparse import csr_matrix
import implicit
from sklearn.decomposition import TruncatedSVD
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

from lightgbm import LGBMRanker, early_stopping, log_evaluation

In [3]:
# !pip install polars

In [4]:
pl.Config.set_tbl_rows(20)
pl.Config.set_fmt_str_lengths(120)
SEED = 42
IGNORE_KEEP_FRACTION = 0.10
rng  = np.random.default_rng(SEED)

In [5]:
PATH_TRAIN = "train_interactions.parquet" 
PATH_USERS = "users_meta.parquet"
PATH_ITEMS = "items_meta.parquet"
PATH_TEST  = "test_pairs.csv"
SUBMIT_OUT = "sample_submission.csv"

In [6]:
use_cols = ["user_id","item_id","timespent","like","dislike","share","bookmarks"]

scan = (
    pl.scan_parquet(PATH_TRAIN)
      .select(use_cols)
      .with_columns([
          (pl.col("like") - pl.col("dislike")).alias("explicit_weight"),
          pl.when(pl.col("like")==1).then(2)
            .when(pl.col("dislike")==1).then(0)
            .otherwise(1).alias("rank_label"),
          (pl.col("share") + pl.col("bookmarks")).alias("engage_cnt"),
      ])
)

In [7]:
#даунсемплинг ignore
M = 10_000
K = int(M * IGNORE_KEEP_FRACTION)

scan_posneg = scan.filter((pl.col("like") == 1) | (pl.col("dislike") == 1))

scan_ign = (
    scan
    .filter((pl.col("like") == 0) & (pl.col("dislike") == 0))
    .with_columns(
        (
            (pl.col("user_id").cast(pl.Int64) * 1_000_003
             + pl.col("item_id").cast(pl.Int64) * 10_007
             + SEED) % M
        ).alias("bucket")
    )
    .filter(pl.col("bucket") < K)
    .drop("bucket")
)

scan_small = pl.concat([scan_posneg, scan_ign])

train_df = scan_small.collect(streaming=True)
train_df = train_df.with_columns([
    pl.col("user_id").cast(pl.Int32),
    pl.col("item_id").cast(pl.Int32),
    pl.col("timespent").fill_null(0).cast(pl.Int32),
    pl.col("engage_cnt").cast(pl.Int8),
])

train_df.head()
print(train_df.shape)

  train_df = scan_small.collect(streaming=True)


(20725753, 10)


In [8]:
users = pl.read_parquet(PATH_USERS).with_columns([
    pl.col("user_id").cast(pl.Int32),
    pl.col("gender").cast(pl.Int8),
    pl.col("age").cast(pl.Int16).fill_null(0),
])

items = pl.read_parquet(PATH_ITEMS)
items = items.with_columns([
    pl.col("item_id").cast(pl.Int32),
    pl.col("source_id").cast(pl.Int32),
    pl.col("duration").cast(pl.Int16).fill_null(0),
])

In [9]:
has_emb = "embeddings" in items.columns
if has_emb:
    emb = np.vstack(items["embeddings"].to_list())
    svd = TruncatedSVD(n_components=16, random_state=SEED)
    emb16 = svd.fit_transform(emb)                  # (n_items, 16)

    emb_pl = pl.DataFrame(emb16, schema=[f"emb_{i}" for i in range(16)])
    items  = pl.concat([items.drop("embeddings"), emb_pl], how="horizontal")
else:
    for i in range(16):
        items = items.with_columns(pl.lit(0.0).alias(f"emb_{i}"))

  Q, _ = normalizer(A @ Q)
  Q, _ = normalizer(A @ Q)
  Q, _ = normalizer(A @ Q)
  Q, _ = normalizer(A.T @ Q)
  Q, _ = normalizer(A.T @ Q)
  Q, _ = normalizer(A.T @ Q)
  Q, _ = qr_normalizer(A @ Q)
  Q, _ = qr_normalizer(A @ Q)
  Q, _ = qr_normalizer(A @ Q)
  B = Q.T @ M
  B = Q.T @ M
  B = Q.T @ M
  U = Q @ Uhat
  U = Q @ Uhat
  U = Q @ Uhat
  ret = a @ b
  ret = a @ b
  ret = a @ b


In [10]:
#добавил в train_df source_id для агрегаций по автору
train_df = train_df.join(
    items.select(["item_id", "source_id", "duration"]),
    on="item_id", how="left"
)

#агрегаты по item
item_aggs = (
    train_df
    .group_by("item_id")
    .agg(
        pl.count().alias("i_cnt"),
        pl.col("like").sum().alias("i_like_sum"),
        pl.col("dislike").sum().alias("i_dislike_sum"),
        pl.col("timespent").mean().alias("i_ts_mean"),
        pl.col("engage_cnt").mean().alias("i_eng_mean"),
    )
    .with_columns(
        (pl.col("i_like_sum") / pl.col("i_cnt")).fill_null(0).alias("i_like_rate"),
        (pl.col("i_dislike_sum") / pl.col("i_cnt")).fill_null(0).alias("i_dislike_rate"),
        pl.when(pl.col("i_cnt") > 0)
          .then((pl.col("i_cnt") + 1).log())
          .otherwise(0)
          .alias("i_pop_log"),
    )
)

#агрегаты по user
user_aggs = (
    train_df
    .group_by("user_id")
    .agg(
        pl.count().alias("u_cnt"),
        pl.col("like").sum().alias("u_like_sum"),
        pl.col("dislike").sum().alias("u_dislike_sum"),
        pl.col("timespent").mean().alias("u_ts_mean"),
        pl.col("engage_cnt").mean().alias("u_eng_mean"),
    )
    .with_columns(
        (pl.col("u_like_sum") / pl.col("u_cnt")).fill_null(0).alias("u_like_rate"),
        (pl.col("u_dislike_sum") / pl.col("u_cnt")).fill_null(0).alias("u_dislike_rate"),
        pl.when(pl.col("u_cnt") > 0)
          .then((pl.col("u_cnt") + 1).log())
          .otherwise(0)
          .alias("u_act_log"),
    )
)

#агрегаты по автору(source_id)
src_aggs = (
    train_df
    .group_by("source_id")
    .agg(
        pl.count().alias("s_cnt"),
        pl.col("like").sum().alias("s_like_sum"),
        pl.col("dislike").sum().alias("s_dislike_sum"),
        pl.col("timespent").mean().alias("s_ts_mean"),
    )
    .with_columns(
        (pl.col("s_like_sum") / pl.col("s_cnt")).fill_null(0).alias("s_like_rate"),
        (pl.col("s_dislike_sum") / pl.col("s_cnt")).fill_null(0).alias("s_dislike_rate"),
        pl.when(pl.col("s_cnt") > 0)
          .then((pl.col("s_cnt") + 1).log())
          .otherwise(0)
          .alias("s_pop_log"),
    )
)

(Deprecated in version 0.20.5)
  pl.count().alias("i_cnt"),
(Deprecated in version 0.20.5)
  pl.count().alias("u_cnt"),
(Deprecated in version 0.20.5)
  pl.count().alias("s_cnt"),


### 6) Латентные факторы ALS (implicit)

In [11]:
# беру явный фидбэк без .collect()
explicit = (
    train_df
    .filter((pl.col("like") + pl.col("dislike")) >= 1)
    .select(["user_id", "item_id", "explicit_weight"])   # weight = like - dislike (добавлен ранее)
)

#размеры (по метаданным)
n_users = int(users["user_id"].max()) + 1
n_items = int(items["item_id"].max()) + 1

train_mat = csr_matrix(
    (
        explicit["explicit_weight"].to_numpy(),
        (explicit["user_id"].to_numpy(), explicit["item_id"].to_numpy()),
    ),
    shape=(n_users, n_items),
)


als = implicit.als.AlternatingLeastSquares(
    factors=32,
    iterations=15,
    regularization=0.1,
    alpha=100.0,
    calculate_training_loss=True,
)
als.fit(train_mat)

# добавил id-колонки через with_row_count
U = pl.DataFrame(als.user_factors, schema=[f"uf_{i}" for i in range(als.factors)]) \
      .with_row_count("user_id") \
      .with_columns(pl.col("user_id").cast(pl.Int32))

I = pl.DataFrame(als.item_factors, schema=[f"if_{i}" for i in range(als.factors)]) \
      .with_row_count("item_id") \
      .with_columns(pl.col("item_id").cast(pl.Int32))

  0%|          | 0/15 [00:00<?, ?it/s]

  .with_row_count("user_id") \
  .with_row_count("item_id") \


### 7) Собираем обучающую выборку для ранкинга (LGBMRanker)

In [12]:
#базовые фичи на уровень (user, item)
base = (train_df.select(["user_id","item_id","source_id","duration","rank_label"])
         .join(users, on="user_id", how="left")
         .join(items.drop(["emb_"+str(i) for i in range(16)] if has_emb else []), on="item_id", how="left"))  # duration уже есть

base = (base
    .join(user_aggs, on="user_id", how="left")
    .join(item_aggs, on="item_id", how="left")
    .join(src_aggs,  on="source_id", how="left")
    .join(U,         on="user_id", how="left")
    .join(I,         on="item_id", how="left")
)

uf_cols = [c for c in base.columns if c.startswith("uf_")]
if_cols = [c for c in base.columns if c.startswith("if_")]
base = base.with_columns([
    sum(pl.col(u)*pl.col(v) for u,v in zip(uf_cols, if_cols)).alias("als_dot")
])


if has_emb:
    emb_cols = [f"emb_{i}" for i in range(16)]
    base = base.join(items.select(["item_id", *emb_cols]), on="item_id", how="left")
else:
    emb_cols = []

# итоговые признаки
feat_cols = (["duration","gender","age",
              "i_cnt","i_like_rate","i_dislike_rate","i_ts_mean","i_eng_mean","i_pop_log",
              "u_cnt","u_like_rate","u_dislike_rate","u_ts_mean","u_eng_mean","u_act_log",
              "s_cnt","s_like_rate","s_dislike_rate","s_ts_mean","s_pop_log",
              "als_dot"] + uf_cols + if_cols + emb_cols)

base = base.with_columns([pl.col(c).fill_null(0) for c in feat_cols])


df_rank = base.select(["user_id","rank_label", *feat_cols]).to_pandas()
df_rank.head()

Unnamed: 0,user_id,rank_label,duration,gender,age,i_cnt,i_like_rate,i_dislike_rate,i_ts_mean,i_eng_mean,...,emb_6,emb_7,emb_8,emb_9,emb_10,emb_11,emb_12,emb_13,emb_14,emb_15
0,3163,2,15,1,33,414,0.487923,0.004831,14.248792,0.108696,...,0.032454,-0.072722,0.132638,-0.145529,-0.133499,0.339785,-0.09948,0.168454,0.058356,-0.041035
1,113832,2,6,2,23,870,0.590805,0.0,10.070115,0.111494,...,-0.17354,0.08389,-0.150695,0.270474,0.018849,0.070724,-0.01047,0.014538,-0.073787,0.093624
2,124909,2,22,2,26,1266,0.404423,0.00158,20.250395,0.039494,...,0.006189,0.052681,0.016499,0.044632,-0.100489,-0.025033,-0.019819,-0.073343,-0.054437,-0.127993
3,183076,2,12,2,20,253,0.474308,0.007905,8.893281,0.043478,...,0.179442,0.165594,-0.059439,0.057493,0.048572,0.163245,0.235075,0.097481,0.076888,0.274292
4,156215,2,5,1,24,1153,0.641804,0.001735,10.012142,0.035559,...,-0.067657,0.171057,-0.113981,0.008652,-0.020808,0.037663,0.083025,0.058166,-0.028428,0.027629


#### Тренировочное/валидационное разбиение с группировкой по пользователю

In [13]:
#разбиваю пользователей, чтобы не было утечки по одному и тому же юзеру
u_unique = df_rank["user_id"].unique()
u_tr, u_val = train_test_split(u_unique, test_size=0.1, random_state=SEED)

train_idx = df_rank["user_id"].isin(u_tr)
valid_idx = df_rank["user_id"].isin(u_val)

X_tr = df_rank.loc[train_idx, feat_cols]
y_tr = df_rank.loc[train_idx, "rank_label"]
X_va = df_rank.loc[valid_idx, feat_cols]
y_va = df_rank.loc[valid_idx, "rank_label"]

#размер групп, нужно для ранкера
def sizes_by_user(ids):
    vc = pd.Series(ids).value_counts(sort=False)
    return vc.sort_index().values

group_tr = sizes_by_user(df_rank.loc[train_idx, "user_id"])
group_va = sizes_by_user(df_rank.loc[valid_idx, "user_id"])

Обучаем LGBMRanker (классический бустинг с лоссом LambdaRank)

In [14]:
#сортирую по user_id и считаю размер блоков
def make_grouped_frame(df_rank, user_ids, feat_cols):
    df = df_rank[df_rank["user_id"].isin(user_ids)].copy()
    df.sort_values(["user_id"], inplace=True)
    X = df[feat_cols]
    y = df["rank_label"].astype(np.int32)
    group = df.groupby("user_id").size().to_numpy()
    return df, X, y, group

tr_df, X_tr, y_tr, group_tr = make_grouped_frame(df_rank, u_tr, feat_cols)
va_df, X_va, y_va, group_va = make_grouped_frame(df_rank, u_val, feat_cols)

ranker = LGBMRanker(
    objective="lambdarank",
    metric="ndcg",
    eval_at=[10],
    n_estimators=800,
    learning_rate=0.05,
    num_leaves=63,
    subsample=0.8,
    colsample_bytree=0.7,
    reg_alpha=1.0,
    reg_lambda=2.0,
    random_state=SEED,
    n_jobs=-1,
    verbosity=-1,
)

ranker.fit(
    X_tr, y_tr,
    group=group_tr,
    eval_set=[(X_va, y_va)],
    eval_group=[group_va],
    callbacks=[
        log_evaluation(period=50), # вывлодить метрики каждые 50 итераций
        early_stopping(stopping_rounds=100), #ранняя остановка
    ],
)



Training until validation scores don't improve for 100 rounds
[50]	valid_0's ndcg@10: 0.853676
[100]	valid_0's ndcg@10: 0.869189
[150]	valid_0's ndcg@10: 0.878095
[200]	valid_0's ndcg@10: 0.885341
[250]	valid_0's ndcg@10: 0.889846
[300]	valid_0's ndcg@10: 0.893372
[350]	valid_0's ndcg@10: 0.895772
[400]	valid_0's ndcg@10: 0.897348
[450]	valid_0's ndcg@10: 0.898905
[500]	valid_0's ndcg@10: 0.900154
[550]	valid_0's ndcg@10: 0.901153
[600]	valid_0's ndcg@10: 0.901954
[650]	valid_0's ndcg@10: 0.902424
[700]	valid_0's ndcg@10: 0.902916
[750]	valid_0's ndcg@10: 0.903477
[800]	valid_0's ndcg@10: 0.903849
Did not meet early stopping. Best iteration is:
[797]	valid_0's ndcg@10: 0.903877


0,1,2
,boosting_type,'gbdt'
,num_leaves,63
,max_depth,-1
,learning_rate,0.05
,n_estimators,800
,subsample_for_bin,200000
,objective,'lambdarank'
,class_weight,
,min_split_gain,0.0
,min_child_weight,0.001


### 8) Предсказания на test_pairs

In [15]:
test = pl.read_csv(PATH_TEST).with_columns([
    pl.col("user_id").cast(pl.Int32),
    pl.col("item_id").cast(pl.Int32),
])

#присоединяю метаданные и агрегаты
test_feat = (test
    .join(items.select(["item_id","source_id","duration"]), on="item_id", how="left")
    .join(users, on="user_id", how="left")
    .join(user_aggs, on="user_id", how="left")
    .join(item_aggs, on="item_id", how="left")
    .join(src_aggs,  on="source_id", how="left")
    .join(U,         on="user_id", how="left")
    .join(I,         on="item_id", how="left")
)

# dot(u,i)
test_feat = test_feat.with_columns([
    sum(pl.col(u)*pl.col(v) for u,v in zip(uf_cols, if_cols)).alias("als_dot")
])

#эмбеддинги SVD для item
if has_emb:
    test_feat = test_feat.join(items.select(["item_id", *emb_cols]), on="item_id", how="left")

# финальные фичи
test_feat = test_feat.with_columns([pl.col(c).fill_null(0) for c in feat_cols])
X_test = test_feat.select(feat_cols).to_pandas()

pred_lgbm = ranker.predict(X_test)

#бленд с ALS
als_raw = test_feat["als_dot"].to_numpy()
als_std = (als_raw - als_raw.mean()) / (als_raw.std() + 1e-9)

predict = 0.7 * pred_lgbm + 0.3 * als_std


sub = test.select(["user_id","item_id"]).to_pandas()
sub["predict"] = predict
sub.to_csv(SUBMIT_OUT, index=False)
sub.head(), SUBMIT_OUT



(   user_id  item_id   predict
 0        1     7363  0.148169
 1        1    73770 -1.415272
 2        1    75700  0.792167
 3        1    81204  0.775589
 4        1   110249 -0.881769,
 'sample_submission.csv')