In [None]:
import os
import glob
import math
import gc
import sys
import warnings
import subprocess
import random
from typing import List, Tuple, Dict, Optional, Set
from datetime import timedelta, datetime
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
os.environ.setdefault("NUMEXPR_NUM_THREADS", "1")

import numpy as np
import polars as pl
import scipy.sparse as sp

def ensure_pkg(pkg, import_name=None, version=None):
    try:
        __import__(import_name or pkg)
    except Exception:
        cmd = [sys.executable, "-m", "pip", "install", "-q", pkg + (f"=={version}" if version else "")]
        print("Installing:", " ".join(cmd))
        subprocess.check_call(cmd)

ensure_pkg("implicit", "implicit")
ensure_pkg("annoy", "annoy")

try:
    from catboost import CatBoostRanker, Pool
    CATBOOST_AVAILABLE = True
except Exception:
    CATBOOST_AVAILABLE = False
    warnings.warn("CatBoost не найден. Реранкинг отключён.")

from sklearn.model_selection import GroupShuffleSplit
from implicit.nearest_neighbours import bm25_weight, BM25Recommender
from implicit.als import AlternatingLeastSquares
from annoy import AnnoyIndex

# -----------------------
# 1) Config
# -----------------------
SEED = 42
np.random.seed(SEED)
random.seed(SEED)
CPU_COUNT = os.cpu_count() or 4
NUM_THREADS = min(8, CPU_COUNT)

TMP_DIR = "./tmp_stream"
os.makedirs(TMP_DIR, exist_ok=True)

MIN_DELIVERED_ORDERS = 3
TRACKER_SAMPLE_FRAC = 0.4

TRAIN_WINDOW_DAYS = 120

# ALS
ALS_FACTORS = 16
ALS_ITERS = 8
ALS_REG = 0.01
ALS_REFRESH_EVERY = 0

# Candidate sizes
ALS_TOPN = 150
I2I_TOPN = 200
EMB_TOPN = 150
BLEND_TOPK = 100 
RECENT_K_FOR_I2I = 20

# Split
VAL_DAYS = 30
MAX_VAL_USERS = 3000
TRAIN_CANDS_PER_USER = 100

# CatBoost
CB_ITER = 120
CB_OD_WAIT = 40
CB_LR = 0.08
CB_DEPTH = 6

# Weights
track_w = {"page_view": 2.0, "view_description": 3.0, "to_cart": 1.0, "unfavorite": 0.0, "favorite": 5.0, "remove": 0.0, "review_view": 0.5}
orders_w = {"delivered_orders": 5.0, "proccesed_orders": 4.0, "canceled_orders": 1.5}

# Embeddings
ANNOY_TREES = 20

In [None]:
# -----------------------
# 2) Поиск партов
# -----------------------
def find_parts(base: str) -> List[str]:
    if os.path.isdir(base):
        pats = [os.path.join(base, "part-*.snappy.parquet"),
                os.path.join(base, "part-*.parquet"),
                os.path.join(base, "part-*")]
        out = []
        for p in pats:
            out.extend(glob.glob(p))
        return sorted(set(out))
    elif os.path.isfile(base):
        return [base]
    return []

root = "/kaggle/input/testststs"
test_path  = f"{root}/ml_ozon_recsys_test.snappy.parquet"
catal_dir  = f"{root}/ml_ozon_recsys_train_final_categories_tree/ml_ozon_recsys_train_final_categories_tree"
tracker_dir= f"{root}/ml_ozon_recsys_train_final_apparel_tracker_data/ml_ozon_recsys_train_final_apparel_tracker_data"
orders_dir = f"{root}/ml_ozon_recsys_train_final_apparel_orders_data/ml_ozon_recsys_train_final_apparel_orders_data"
items_dir  = f"{root}/ml_ozon_recsys_train_final_apparel_items_data/ml_ozon_recsys_train_final_apparel_items_data"

print("Собираем списки партов...")
test_files   = find_parts(test_path)
catal_files  = find_parts(catal_dir)
tracker_files= find_parts(tracker_dir)
orders_files = find_parts(orders_dir)
items_files  = find_parts(items_dir)
print(f"Найдено файлов: items={len(items_files)}, tracker={len(tracker_files)}, orders={len(orders_files)}")

print("Чтение test...")
test_df = pl.concat([pl.read_parquet(f) for f in test_files]) if len(test_files) > 1 else pl.read_parquet(test_files[0])
if "user_id" in test_df.columns:
    test_df = test_df.unique(subset=["user_id"], keep="last")
test_users = test_df["user_id"].unique().to_list()

In [None]:
# -----------------------
# 3) max ts
# -----------------------
def _detect_epoch_unit(sample: pl.Series) -> str:
    # Очень грубая, но рабочая эвристика по порядку величины
    if len(sample) == 0:
        return "ms"
    mx = int(pl.Series(sample).drop_nulls().max())  # защита от пустоты
    if mx > 10**14:
        return "ns"
    elif mx > 10**12:
        return "us"
    elif mx > 10**10:
        return "ms"
    else:
        return "s"

def robust_max_ts_from_files(files: List[str], ts_candidates: Tuple[str, ...]) -> Optional[datetime]:
    if not files:
        return None
    lf = pl.scan_parquet(files)
    schema = lf.schema

    # ищем реальную колонку времени
    chosen = None
    for c in ts_candidates:
        if c in schema:
            chosen = c
            break
    if chosen is None:
        print(f"[robust_max_ts] Нет подходящих колонок времени среди {ts_candidates}. Доступны: {list(schema.keys())[:20]}")
        return None

    dtype = schema[chosen]
    expr = pl.col(chosen)

    if dtype == pl.Utf8:
        # строковый ISO -> Datetime
        expr = expr.str.strptime(pl.Datetime, strict=False)
    elif dtype in (pl.Int64, pl.Int32, pl.UInt64, pl.UInt32, pl.Int16, pl.UInt16, pl.Int8, pl.UInt8):
        # epoch -> Datetime (определяем unit по сэмплу из первого файла)
        try:
            sample = pl.read_parquet(files[0], columns=[chosen], n_rows=500)[chosen]
        except Exception:
            sample = pl.Series([], dtype=pl.Int64)
        unit = _detect_epoch_unit(sample)
        expr = pl.from_epoch(expr, time_unit=unit)
    # если уже Datetime — оставляем как есть

    out = (
        lf.select(expr.drop_nulls().max().alias("max_ts"))
          .collect(streaming=True)
    )
    return out["max_ts"][0]

# Вызываем так (добавили кандидаты на названия колонок)
t1 = robust_max_ts_from_files(tracker_files, ("timestamp", "event_time", "event_timestamp"))
t2 = robust_max_ts_from_files(orders_files,  ("created_timestamp", "created_at", "timestamp", "event_time", "event_timestamp"))

if t1 is None and t2 is None:
    # Диагностика, чтобы понять, какие вообще есть поля
    print("Схема tracker:", pl.scan_parquet(tracker_files).schema if tracker_files else {})
    print("Схема orders:", pl.scan_parquet(orders_files).schema if orders_files else {})
    raise RuntimeError("Не удалось определить max_ts — не нашли колонок времени или все пустые")

max_ts = t1 if t2 is None else (t2 if t1 is None else max(t1, t2))
T = max_ts - timedelta(days=VAL_DAYS)
TRAIN_FROM = T - timedelta(days=TRAIN_WINDOW_DAYS)
print(f"Robust max timestamp: {max_ts}, Train/Val split T: {T}, Train from: {TRAIN_FROM}")

In [None]:
# -----------------------------------------------
# 3.5) Популярные товары
# -----------------------------------------------
def get_popular_items_from_orders(order_files: List[str], min_deliveries: int) -> Set[int]:
    print(f"Определяем популярные товары (минимум {min_deliveries} доставок)...")
    if not order_files:
        warnings.warn("Файлы заказов не найдены.")
        return set()
    scan = pl.scan_parquet(order_files)
    popular_items_df = (
        scan.filter(pl.col("last_status") == "delivered_orders")
            .group_by("item_id")
            .agg(pl.len().alias("delivery_count"))
            .filter(pl.col("delivery_count") >= min_deliveries)
            .select("item_id")
            .collect(streaming=True)
    )
    popular_set = set(popular_items_df["item_id"].to_list())
    print(f"Найдено {len(popular_set)} популярных товаров.")
    return popular_set

popular_items_set = get_popular_items_from_orders(orders_files, MIN_DELIVERED_ORDERS)
if not popular_items_set:
    raise RuntimeError("Не удалось сформировать список популярных товаров.")

# ----------------------------------------------------
# 4) Категории (id->cat_l0)
# ----------------------------------------------------
def extract_cat_levels(ids_list: Optional[List[int]], n_levels: int = 4) -> List[Optional[int]]:
    if ids_list is None: return [None] * n_levels
    clean = [x for x in ids_list if x != -1]
    out = clean + [None] * max(0, n_levels - len(clean))
    return out[:n_levels]

catalog_to_cat0: Dict[int, Optional[int]] = {}
print("Строим mapping catalogid -> cat_l0 ...")
for f in catal_files:
    df = pl.read_parquet(f, columns=["catalogid","ids"])
    if df.is_empty(): continue
    df = df.unique(subset=["catalogid"], keep="last")
    cat_lvls = [extract_cat_levels(x) for x in df["ids"].to_list()]
    cat0 = [lv[0] if len(lv) > 0 else None for lv in cat_lvls]
    for cid, c0 in zip(df["catalogid"].to_list(), cat0):
        catalog_to_cat0[int(cid)] = c0

In [None]:
# -----------------------
# 5) События одним ленивым пайплайном (NEW)
# -----------------------
def _make_ts_expr(lf: pl.LazyFrame, col_name: str, files: List[str]) -> pl.Expr:
    schema = lf.schema
    if col_name not in schema:
        raise pl.ColumnNotFoundError(f"Колонка {col_name} не найдена")
    dtype = schema[col_name]
    expr = pl.col(col_name)
    if dtype == pl.Utf8:
        expr = expr.str.strptime(pl.Datetime, strict=False)
    elif dtype in (pl.Int64, pl.Int32, pl.UInt64, pl.UInt32, pl.Int16, pl.UInt16, pl.Int8, pl.UInt8):
        try:
            sample = pl.read_parquet(files[0], columns=[col_name], n_rows=500)[col_name]
        except Exception:
            sample = pl.Series([], dtype=pl.Int64)
        unit = _detect_epoch_unit(sample)
        expr = pl.from_epoch(expr, time_unit=unit)
    return expr

def build_events_tables(tracker_files, orders_files, item_filter: Set[int]):
    decay = (-math.log(2.0)/30.0)

    # tracker
    ltr = pl.scan_parquet(tracker_files).select(["user_id","item_id","action_type","timestamp"])
    tr_ts = _make_ts_expr(ltr, "timestamp", tracker_files)
    tr = (
        ltr
        .with_columns(tr_ts.alias("timestamp"))
        .filter(pl.col("item_id").is_in(item_filter))
        .filter(pl.col("timestamp") >= TRAIN_FROM)
        .with_columns(
            pl.when(pl.col("action_type") == "page_view").then(pl.lit(track_w["page_view"]))
            .when(pl.col("action_type") == "view_description").then(pl.lit(track_w["view_description"]))
            .when(pl.col("action_type") == "to_cart").then(pl.lit(track_w["to_cart"]))
            .when(pl.col("action_type") == "favorite").then(pl.lit(track_w["favorite"]))
            .when(pl.col("action_type") == "review_view").then(pl.lit(track_w["review_view"]))
            .otherwise(pl.lit(1.0)).alias("weight")
        )
        .with_columns(
            (pl.col("weight") * (decay * (pl.lit(max_ts) - pl.col("timestamp")).dt.total_days().clip(0, None)).exp())
            .alias("strength")
        )
        .select(["user_id","item_id","timestamp","strength"])
    )

    # orders
    lod = pl.scan_parquet(orders_files).select(
        [pl.col("user_id"), pl.col("item_id"), pl.col("created_timestamp").alias("timestamp"), pl.col("last_status")]
    )
    od_ts = _make_ts_expr(lod, "timestamp", orders_files)
    od = (
        lod
        .with_columns(od_ts.alias("timestamp"))
        .filter(pl.col("item_id").is_in(item_filter))
        .filter(pl.col("timestamp") >= TRAIN_FROM)
        .with_columns(
            pl.when(pl.col("last_status") == "delivered_orders").then(pl.lit(orders_w["delivered_orders"]))
            .when(pl.col("last_status") == "proccesed_orders").then(pl.lit(orders_w["proccesed_orders"]))
            .when(pl.col("last_status") == "canceled_orders").then(pl.lit(orders_w["canceled_orders"]))
            .otherwise(pl.lit(1.0)).alias("weight")
        )
        .with_columns(
            (pl.col("weight") * (decay * (pl.lit(max_ts) - pl.col("timestamp")).dt.total_days().clip(0, None)).exp())
            .alias("strength")
        )
        .select(["user_id","item_id","timestamp","strength"])
    )

    ev = pl.concat([tr, od])

    ui_train = (
        ev.filter(pl.col("timestamp") < pl.lit(T))
          .group_by(["user_id","item_id"])
          .agg(pl.col("strength").sum().alias("ui_strength"))
          .collect(streaming=True)
    )

    ui_val = (
        ev.filter(pl.col("timestamp") >= pl.lit(T))
          .group_by(["user_id","item_id"])
          .agg(pl.col("strength").sum().alias("label"))
          .with_columns(pl.col("label").clip(0, 1.0))
          .collect(streaming=True)
    )

    item_pop_df = (
        ev.group_by("item_id")
          .agg(pl.col("strength").sum().alias("item_pop"))
          .collect(streaming=True)
    )

    return ui_train, ui_val, item_pop_df

print("Строим финальные ui_train/ui_val/pop (ленивый пайплайн)...")
ui_train, ui_val, item_pop_df = build_events_tables(tracker_files, orders_files, popular_items_set)

item_to_pop: Dict[int, float] = dict(zip(item_pop_df["item_id"].to_list(), item_pop_df["item_pop"].to_list()))
top_pop = item_pop_df.sort("item_pop", descending=True).head(400)["item_id"].to_list()

In [None]:
# -----------------------
# 6) ALS на всем ui_train
# -----------------------
def train_als(ui_train: pl.DataFrame):
    unique_users = ui_train["user_id"].unique().sort().to_list()
    unique_items = ui_train["item_id"].unique().sort().to_list()
    uid_to_uix = {u: i for i, u in enumerate(unique_users)}
    iid_to_iix = {i: j for j, i in enumerate(unique_items)}
    iix_to_itemid = unique_items

    user_map_df = pl.DataFrame({"user_id": list(uid_to_uix.keys()), "uix": list(uid_to_uix.values())}).with_columns(pl.col("user_id").cast(ui_train["user_id"].dtype))
    item_map_df = pl.DataFrame({"item_id": list(iid_to_iix.keys()), "iix": list(iid_to_iix.values())}).with_columns(pl.col("item_id").cast(ui_train["item_id"].dtype))

    ui_np = (
        ui_train.join(user_map_df, on="user_id", how="left")
                .join(item_map_df, on="item_id", how="left")
                .fill_null(-1)
                .filter((pl.col("uix") >= 0) & (pl.col("iix") >= 0))
    )

    data = ui_np["ui_strength"].cast(pl.Float32).to_numpy()
    rows = ui_np["uix"].cast(pl.Int32).to_numpy()
    cols = ui_np["iix"].cast(pl.Int32).to_numpy()

    n_users, n_items = len(uid_to_uix), len(iid_to_iix)
    user_items_csr = sp.csr_matrix((data, (rows, cols)), shape=(n_users, n_items), dtype=np.float32)

    item_users_bm25 = bm25_weight(user_items_csr.T, K1=100, B=0.8).astype(np.float32)

    als_model = AlternatingLeastSquares(
        factors=ALS_FACTORS, iterations=ALS_ITERS, regularization=ALS_REG,
        random_state=SEED, num_threads=NUM_THREADS
    )
    als_model.fit(item_users_bm25)

    return dict(
        als_model=als_model,
        user_items_csr=user_items_csr,
        uid_to_uix=uid_to_uix,
        iid_to_iix=iid_to_iix,
        iix_to_itemid=iix_to_itemid,
    )

print("Обучаем ALS...")
als_bundle = train_als(ui_train)
als_model = als_bundle["als_model"]
user_items_csr = als_bundle["user_items_csr"]
uid_to_uix      = als_bundle["uid_to_uix"]
iid_to_iix      = als_bundle["iid_to_iix"]
iix_to_itemid   = als_bundle["iix_to_itemid"]

In [None]:
# -----------------------
# 7) I2I on-demand через BM25Recommender (NEW, заменяет co-vis)
# -----------------------
class I2IOnDemand:
    def __init__(self, item_users: sp.csr_matrix, topk: int, iix_to_itemid: List[int]):
        self.topk = topk
        self.model = BM25Recommender(K=topk, num_threads=NUM_THREADS)
        self.model.fit(item_users)  # item_users: items x users CSR
        self.iix_to_itemid = iix_to_itemid
        self._cache = OrderedDict()
        self._lock = threading.Lock()
        self._capacity = 200000  # LRU для соседей

    def neighbors(self, item_id: int, iid_to_iix: Dict[int, int]):
        iix = iid_to_iix.get(item_id)
        if iix is None:
            return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.float32)

        with self._lock:
            if iix in self._cache:
                self._cache.move_to_end(iix)
                return self._cache[iix]

        ids, scores = self.model.similar_items(iix, N=self.topk + 1)
        nbrs, scs = [], []
        for j, s in zip(ids, scores):
            if int(j) == iix:
                continue
            if 0 <= int(j) < len(self.iix_to_itemid):
                nbrs.append(self.iix_to_itemid[int(j)])
                scs.append(float(s))
        nbrs = np.array(nbrs[:self.topk], dtype=np.int64)
        scs = np.array(scs[:self.topk], dtype=np.float32)

        with self._lock:
            self._cache[iix] = (nbrs, scs)
            if len(self._cache) > self._capacity:
                self._cache.popitem(last=False)
        return nbrs, scs

i2i_model = I2IOnDemand(user_items_csr.T.tocsr(), topk=120, iix_to_itemid=iix_to_itemid)


In [None]:
# -----------------------
# 8) Эмбеддинги и категории: строим ТОЛЬКО для используемых items (NEW)
# -----------------------
item_to_cat: Dict[int, Optional[int]] = {}
itemid_to_embix: Dict[int, int] = {}
embix_to_itemid_cache: Dict[int, int] = {}
emb_idx = None
emb_dim = None
annoy_next_ix = 0

# Таргетные товары: из train + top_pop
target_items = set(ui_train["item_id"].unique().to_list()) | set(top_pop)
target_items = list(target_items)

print(f"Строим cat map и Annoy только для {len(target_items)} items...")
target_set = set(target_items)

for f in items_files:
    try:
        df = pl.read_parquet(f, columns=["item_id","catalogid","fclip_embed"])
    except pl.ColumnNotFoundError:
        df = pl.read_parquet(f, columns=["item_id","catalogid"])
    if df.is_empty():
        continue
    df = df.filter(pl.col("item_id").is_in(target_set))
    if df.is_empty():
        continue
    df = df.rename({"fclip_embed": "embed"})

    # cat
    for it, cid in zip(df["item_id"].to_list(), df["catalogid"].to_list()):
        item_to_cat[int(it)] = catalog_to_cat0.get(int(cid), None)

    # embed
    if "embed" in df.columns:
        for it, emb in zip(df["item_id"].to_list(), df["embed"].to_list()):
            if emb is None: 
                continue
            vec = np.asarray(emb, dtype=np.float32)
            if emb_dim is None:
                emb_dim = len(vec)
                emb_idx = AnnoyIndex(emb_dim, "angular")
            if len(vec) != emb_dim:
                continue
            emb_idx.add_item(annoy_next_ix, vec.tolist())
            itemid_to_embix[int(it)] = annoy_next_ix
            embix_to_itemid_cache[annoy_next_ix] = int(it)
            annoy_next_ix += 1
    del df; gc.collect()

if emb_idx is not None and annoy_next_ix > 0:
    emb_idx.build(ANNOY_TREES)
else:
    print("Annoy: эмбеддинги не найдены или пусты.")


In [None]:
# -----------------------
# 9) Вспомогательные ф-и и кандидаты
# -----------------------
def normalize_scores(pairs: List[Tuple[int, float]]) -> Dict[int, float]:
    if not pairs:
        return {}
    s = np.array([v for _, v in pairs], dtype=np.float32)
    s_min, s_max = float(s.min()), float(s.max())
    denom = (s_max - s_min) if s_max > s_min else 1.0
    return {int(it): float((v - s_min) / denom) for it, v in pairs}

def get_user_row(u: int):
    uix = uid_to_uix.get(u)
    if uix is None:
        return None
    return user_items_csr.getrow(uix)

def topk_hist_from_row(row, k=50):
    if row is None or row.nnz == 0:
        return []
    idx = row.indices
    vals = row.data
    if len(vals) <= k:
        order = np.argsort(-vals)
    else:
        top_idx = np.argpartition(-vals, k-1)[:k]
        order = top_idx[np.argsort(-vals[top_idx])]
    return [(iix_to_itemid[int(idx[j])], float(vals[j])) for j in order]

def get_seen_from_row(row):
    if row is None or row.nnz == 0:
        return set()
    return set([iix_to_itemid[int(i)] for i in row.indices])

def get_pref_cats_from_row(row, k=200):
    hist = topk_hist_from_row(row, k=k)
    agg: Dict[int, float] = {}
    for it, w in hist:
        c0 = item_to_cat.get(int(it))
        if c0 is None: continue
        agg[c0] = agg.get(c0, 0.0) + float(w)
    if not agg:
        return set()
    cats = list(agg.keys())
    vals = np.array(list(agg.values()), dtype=np.float32)
    order = np.argsort(-vals)[:3]
    return set([int(cats[i]) for i in order])

def build_user_profile_from_row(row, use_k=50):
    if emb_idx is None:
        return None
    hist = topk_hist_from_row(row, k=use_k)
    vec = None
    denom = 0.0
    for it, w in hist:
        eix = itemid_to_embix.get(int(it))
        if eix is None:
            continue
        v = np.asarray(emb_idx.get_item_vector(eix), dtype=np.float32)
        vec = v * float(w) if vec is None else (vec + v * float(w))
        denom += float(w)
    if vec is None or denom <= 0:
        return None
    vec = vec / (np.linalg.norm(vec) + 1e-12)
    return vec.astype(np.float32)

def i2i_candidates_cov(u: int, topn: int = I2I_TOPN, recent_k: int = RECENT_K_FOR_I2I):
    row = get_user_row(u)
    hist = topk_hist_from_row(row, k=recent_k)
    if not hist:
        return []
    scores: Dict[int, float] = {}
    for it, w in hist:
        nbrs, scs = i2i_model.neighbors(int(it), iid_to_iix)
        for nb, s in zip(nbrs, scs):
            scores[int(nb)] = scores.get(int(nb), 0.0) + float(s) * float(w)
    if not scores:
        return []
    items = np.fromiter(scores.keys(), dtype=np.int64)
    vals = np.fromiter(scores.values(), dtype=np.float32)
    k = min(topn, len(vals))
    idx = np.argpartition(-vals, k-1)[:k]
    idx = idx[np.argsort(-vals[idx])]
    return [(int(items[i]), float(vals[i])) for i in idx]

def als_candidates(u: int, topn: int = ALS_TOPN):
    uix = uid_to_uix.get(u)
    if uix is None:
        return []
    try:
        ids, scores = als_model.recommend(
            userid=uix, user_items=user_items_csr[uix], N=topn,
            filter_already_liked_items=True, recalculate_user=False
        )
    except Exception as e:
        warnings.warn(f"ALS recommend failed for user {u}: {e}")
        return []
    out = []
    for i, s in zip(ids, scores):
        if 0 <= int(i) < len(iix_to_itemid):
            out.append((iix_to_itemid[int(i)], float(s)))
    return out

def embed_knn(u: int, topn: int = EMB_TOPN):
    if emb_idx is None:
        return []
    row = get_user_row(u)
    prof = build_user_profile_from_row(row)
    if prof is None:
        return []
    try:
        idxs, dists = emb_idx.get_nns_by_vector(prof.tolist(), n=topn, include_distances=True)
    except Exception as e:
        warnings.warn(f"Annoy KNN failed for user {u}: {e}")
        return []
    out = []
    for ix in idxs:
        it = embix_to_itemid_cache.get(ix)
        if it is None:
            continue
        v = np.asarray(emb_idx.get_item_vector(ix), dtype=np.float32)
        s = float(v @ prof)
        out.append((int(it), s))
    return out

def blend_user(u: int, topk: int = BLEND_TOPK):
    c_cov = i2i_candidates_cov(u, topn=I2I_TOPN, recent_k=RECENT_K_FOR_I2I)
    c_als = als_candidates(u, topn=ALS_TOPN)
    c_emb = embed_knn(u, topn=EMB_TOPN)

    s_cov = normalize_scores(c_cov)
    s_als = normalize_scores(c_als)
    s_emb = normalize_scores(c_emb)

    w_cov, w_als, w_emb = 0.6, 0.25, 0.15
    all_items = set(s_cov) | set(s_als) | set(s_emb)

    row = get_user_row(u)
    seen = get_seen_from_row(row)
    pref_cats = get_pref_cats_from_row(row, k=200)

    scores = []
    for it in all_items:
        if it in seen:
            continue
        sc = w_cov * s_cov.get(it, 0.0) + w_als * s_als.get(it, 0.0) + w_emb * s_emb.get(it, 0.0)
        c0 = item_to_cat.get(int(it))
        if c0 is not None and c0 in pref_cats:
            sc += 0.03
        pop = item_to_pop.get(int(it), 0.0)
        sc += math.log1p(pop) * 1e-3
        scores.append((int(it), float(sc), float(s_cov.get(it, 0.0)), float(s_als.get(it, 0.0)), float(s_emb.get(it, 0.0))))

    scores.sort(key=lambda x: x[1], reverse=True)
    
    if len(scores) < topk:
        existing_items = {s[0] for s in scores} | seen
        pop_candidates = []
        
        for it in top_pop:
            if it not in existing_items:
                pop_candidates.append((int(it), 0.0, 0.0, 0.0, 0.0))
                if len(scores) + len(pop_candidates) >= topk:
                    break
        
        if len(scores) + len(pop_candidates) < topk:
            all_pop_items = list(item_to_pop.keys())
            all_pop_items.sort(key=lambda x: item_to_pop[x], reverse=True)
            for it in all_pop_items:
                if it not in existing_items and it not in {p[0] for p in pop_candidates}:
                    pop_candidates.append((int(it), 0.0, 0.0, 0.0, 0.0))
                    if len(scores) + len(pop_candidates) >= topk:
                        break
        
        scores.extend(pop_candidates)
    
    return scores[:topk]





In [None]:
# -----------------------
# 10) Генерация рекомендаций батчами
# -----------------------
def generate_recs_for_users(users: List[int], batch_size: int = 4000, max_workers: Optional[int] = None) -> pl.DataFrame:
    all_dfs = []
    total_users = len(users)
    print(f"Генерируем рекомендации для {total_users} пользователей...")
    
    for i in range(0, len(users), batch_size):
        batch = users[i:i+batch_size]
        rows = []
        workers = max_workers or min(8, CPU_COUNT // 2 or 1)
        
        with ThreadPoolExecutor(max_workers=workers) as ex:
            futs = {ex.submit(blend_user, u, BLEND_TOPK): u for u in batch}
            for fut in as_completed(futs):
                u = futs[fut]
                try:
                    recs = fut.result()
                    if len(recs) < BLEND_TOPK:
                        print(f"Предупреждение: пользователь {u} получил только {len(recs)} рекомендаций")
                    for rank, (it, sc, cov_s, als_s, emb_s) in enumerate(recs, 1):
                        rows.append((u, int(it), int(rank), float(sc), float(cov_s), float(als_s), float(emb_s)))
                except Exception as e:
                    warnings.warn(f"Failed to generate recs for user {u}: {e}")
                    for rank, it in enumerate(top_pop[:BLEND_TOPK], 1):
                        rows.append((u, int(it), int(rank), 0.0, 0.0, 0.0, 0.0))
        
        print(f"Обработано {min(i+batch_size, total_users)}/{total_users} пользователей")
        dfb = pl.DataFrame(rows, schema=["user_id", "item_id", "rank", "cand_score", "i2i_s", "als_s", "clip_s"])
        all_dfs.append(dfb)
        del rows; gc.collect()
    
    result = pl.concat(all_dfs) if all_dfs else pl.DataFrame(schema=["user_id","item_id","rank","cand_score","i2i_s","als_s","clip_s"])
    
    user_counts = result.group_by("user_id").agg(pl.len().alias("count"))
    print(f"Статистика количества кандидатов на пользователя до обработки:")
    print(user_counts["count"].describe())
    
    return result


In [None]:
# -----------------------
# 11) Лёгкие фичи для реранка
# -----------------------
def build_features(cand_df: pl.DataFrame) -> pl.DataFrame:
    if cand_df.is_empty():
        return cand_df
    parts = cand_df.partition_by("user_id", as_dict=True)
    out = []
    for key, g in parts.items():
        u = int(key[0] if isinstance(key, tuple) else key)
        row = get_user_row(u)
        prof = build_user_profile_from_row(row)
        pref_cats = get_pref_cats_from_row(row, k=200)

        if prof is not None and emb_idx is not None:
            sims = []
            for it in g["item_id"].to_list():
                eix = itemid_to_embix.get(int(it))
                if eix is None:
                    sims.append(0.0)
                else:
                    try:
                        v = np.asarray(emb_idx.get_item_vector(eix), dtype=np.float32)
                        sims.append(float(v @ prof))
                    except:
                        sims.append(0.0)
            g = g.with_columns(pl.Series("sim_up_item", np.array(sims, dtype=np.float32)))
        else:
            g = g.with_columns(pl.lit(0.0).cast(pl.Float32).alias("sim_up_item"))

        pops = [float(item_to_pop.get(int(it), 0.0)) for it in g["item_id"].to_list()]
        flags = []
        for it in g["item_id"].to_list():
            c0 = item_to_cat.get(int(it))
            flags.append(1 if (c0 is not None and c0 in pref_cats) else 0)
        g = g.with_columns([
            pl.Series("item_pop", np.array(pops, dtype=np.float32)),
            pl.Series("same_cat_l0", np.array(flags, dtype=np.int8)),
        ])
        out.append(g)
    df = pl.concat(out) if out else cand_df
    for col in ["i2i_s", "als_s", "clip_s", "sim_up_item", "item_pop"]:
        if col in df.columns:
            df = df.with_columns(pl.col(col).cast(pl.Float32))
    if "same_cat_l0" in df.columns:
        df = df.with_columns(pl.col("same_cat_l0").cast(pl.Int8))
    return df

In [None]:
# -----------------------
# 12) Реранк
# -----------------------
positives_by_user: Dict[int, set] = {}
for r in ui_val.iter_rows(named=True):
    positives_by_user.setdefault(int(r["user_id"]), set()).add(int(r["item_id"]))

val_users = list(set(ui_val["user_id"].unique().to_list()))
if MAX_VAL_USERS and len(val_users) > MAX_VAL_USERS:
    rng = np.random.default_rng(SEED)
    val_users = rng.choice(val_users, size=MAX_VAL_USERS, replace=False).tolist()

rerank_train_rows = []
for u in val_users[:400]:
    try:
        cands = blend_user(u, topk=TRAIN_CANDS_PER_USER)
    except Exception as e:
        warnings.warn(f"Failed to generate training candidates for user {u}: {e}")
        continue
    if not cands:
        continue
    pos = positives_by_user.get(u, set())
    base = [(u, it, cov_s, als_s, emb_s) for it, _, cov_s, als_s, emb_s in cands]
    dfb = pl.DataFrame(base, schema=["user_id", "item_id", "i2i_s", "als_s", "clip_s"])
    g = pl.DataFrame([{"user_id": u, "item_id": int(it), "label": 1.0 if it in pos else 0.0} for it in (set(dfb["item_id"].to_list()) | pos)])
    g = g.join(dfb, on=["user_id", "item_id"], how="left").fill_null(0.0)
    rerank_train_rows.append(g)

rerank_train_df = pl.concat(rerank_train_rows) if rerank_train_rows else pl.DataFrame({"user_id": [], "item_id": [], "label": [], "i2i_s": [], "als_s": [], "clip_s": []})

ranker = None
if CATBOOST_AVAILABLE and not rerank_train_df.is_empty():
    try:
        train_feat = build_features(rerank_train_df)
        features = ["i2i_s", "als_s", "clip_s", "sim_up_item", "item_pop", "same_cat_l0"]
        all_train_users = train_feat["user_id"].unique().to_list()
        if len(all_train_users) > 1:
            train_users_idx, eval_users_idx = next(
                GroupShuffleSplit(test_size=0.1, n_splits=1, random_state=SEED).split(all_train_users, groups=all_train_users)
            )
            train_users = {all_train_users[i] for i in train_users_idx}
            eval_users = {all_train_users[i] for i in eval_users_idx}

            train_part = train_feat.filter(pl.col("user_id").is_in(train_users))
            eval_part = train_feat.filter(pl.col("user_id").is_in(eval_users))
            if not train_part.is_empty() and not eval_part.is_empty():
                train_pool = Pool(data=train_part.select(features).to_pandas(), label=train_part["label"].to_pandas(), group_id=train_part["user_id"].to_pandas())
                eval_pool = Pool(data=eval_part.select(features).to_pandas(), label=eval_part["label"].to_pandas(), group_id=eval_part["user_id"].to_pandas())

                task_type = "GPU" if os.environ.get("CUDA_VISIBLE_DEVICES") else "CPU"
                ranker = CatBoostRanker(
                    iterations=CB_ITER, depth=CB_DEPTH, learning_rate=CB_LR,
                    loss_function="YetiRank", random_seed=SEED,
                    task_type=task_type, devices='0' if task_type == "GPU" else None,
                    thread_count=NUM_THREADS, od_type="Iter", od_wait=CB_OD_WAIT, verbose=100
                )
                ranker.fit(train_pool, eval_set=eval_pool, use_best_model=True)
    except Exception as e:
        warnings.warn(f"CatBoost training failed: {e}")
        ranker = None
else:
    print("CatBoost недоступен/нет данных — используем blended score.")

print("Генерируем рекомендации для теста...")
test_cand_df = generate_recs_for_users(test_users, batch_size=4000)

# Проверяем сколько кандидатов сгенерировано
print(f"Всего сгенерировано строк: {len(test_cand_df)}")
print(f"Уникальных пользователей: {test_cand_df['user_id'].n_unique()}")

test_feat = build_features(test_cand_df.select(["user_id", "item_id", "i2i_s", "als_s", "clip_s"]))
test_feat = test_feat.join(test_cand_df, on=["user_id", "item_id"], how="left")

if CATBOOST_AVAILABLE and ranker is not None and not test_feat.is_empty():
    print("Реранк CatBoost...")
    features = ["i2i_s", "als_s", "clip_s", "sim_up_item", "item_pop", "same_cat_l0"]
    preds = ranker.predict(Pool(data=test_feat.select(features).to_pandas()))
    test_feat = test_feat.with_columns(pl.Series(name="rerank_score", values=preds))
    recs_df = (
        test_feat
        .with_columns(pl.col("rerank_score").alias("score"))
        .sort(["user_id", "score"], descending=[False, True])
        .group_by("user_id", maintain_order=True)
        .head(100)
        .select(["user_id", "item_id"])
    )
else:
    print("Используем blended score.")
    recs_df = (
        test_feat
        .with_columns(pl.col("cand_score").alias("score"))
        .sort(["user_id", "score"], descending=[False, True])
        .group_by("user_id", maintain_order=True)
        .head(100)
        .select(["user_id", "item_id"])
    )

user_rec_counts = recs_df.group_by("user_id").agg(pl.len().alias("rec_count"))
print(f"Статистика количества рекомендаций на пользователя после ранжирования:")
print(user_rec_counts["rec_count"].describe())

users_needing_more = user_rec_counts.filter(pl.col("rec_count") < 100)
if len(users_needing_more) > 0:
    print(f"Дополняем рекомендации для {len(users_needing_more)} пользователей...")
    additional_recs = []
    
    for row in users_needing_more.iter_rows(named=True):
        u = row["user_id"]
        current_count = row["rec_count"]
        needed = 100 - current_count
        
        existing = set(recs_df.filter(pl.col("user_id") == u)["item_id"].to_list())
        
        added = 0
        for it in top_pop:
            if it not in existing:
                additional_recs.append({"user_id": u, "item_id": it})
                existing.add(it)
                added += 1
                if added >= needed:
                    break
        
        if added < needed:
            all_items = list(item_to_pop.keys())
            all_items.sort(key=lambda x: item_to_pop[x], reverse=True)
            for it in all_items:
                if it not in existing:
                    additional_recs.append({"user_id": u, "item_id": it})
                    added += 1
                    if added >= needed:
                        break
    
    if additional_recs:
        additional_df = pl.DataFrame(additional_recs)
        recs_df = pl.concat([recs_df, additional_df])

In [None]:
# Финальная проверка
final_counts = recs_df.group_by("user_id").agg(pl.len().alias("rec_count"))
print(f"\nФинальная статистика количества рекомендаций:")
print(f"Минимум: {final_counts['rec_count'].min()}")
print(f"Максимум: {final_counts['rec_count'].max()}")
print(f"Среднее: {final_counts['rec_count'].mean():.2f}")

# Сохраняем
recs_df.write_csv("contribution.csv")
print(f"contribution.csv сохранён. Размер: {len(recs_df)} строк")