
# 03 — Сборка фичей → LightGBM реранкер → сабмит (оптимизировано)
- Сборка по ключу `['query_id','item_id']` через **Polars Lazy** (минимум памяти, один `collect(streaming=True)`).
- Быстрый QC: *coverage* без materialize anti-join, *non-finite* за один проход.
- Обучение `LightGBM LGBMRanker (lambdarank)` с hold-out по `query_id` и отчётом кастомного `NDCG@10` (`0.97^pos`).
- Сабмит `solution.csv`.


In [1]:

##%%
# Требования:
# pip install polars pyarrow lightgbm tqdm numpy


In [2]:

##%%
import os, json, math
import numpy as np
import polars as pl
from tqdm.auto import tqdm

# ML
try:
    import lightgbm as lgb
    HAS_LGBM = True
except Exception as e:
    HAS_LGBM = False
    print("[warn] LightGBM не установлен:", e)


  from .autonotebook import tqdm as notebook_tqdm


In [3]:

##%%
# ---- Поиск корня проекта ----
def find_project_root(start=None):
    cur = os.path.abspath(start or os.getcwd())
    while True:
        has_data = os.path.isdir(os.path.join(cur, "Data"))
        has_feat = os.path.isdir(os.path.join(cur, "Features"))
        if has_data and has_feat:
            return cur
        parent = os.path.dirname(cur)
        if parent == cur:
            raise RuntimeError("Не нашёл корень проекта (ожидаю папки Data/ и Features/).")
        cur = parent

BASE_DIR = find_project_root()
DATA_DIR = os.path.join(BASE_DIR, "Data")
FEAT_DIR = os.path.join(BASE_DIR, "Features")

TRAIN_PATH = os.path.join(DATA_DIR, "train-dset.parquet")
TEST_PATH  = os.path.join(DATA_DIR, "test-dset-small.parquet")

TAB_DIR  = os.path.join(FEAT_DIR, "table-basic")
COS_TIT_DIR_CAND = [
    os.path.join(FEAT_DIR, "cos-e5-small"),
    DATA_DIR,
]
COS_DESC_BASE_DIR = os.path.join(FEAT_DIR, "cos-e5-base-desc")

def pick_cos_tit_dir():
    for p in COS_TIT_DIR_CAND:
        if os.path.exists(os.path.join(p, "train_cos.parquet")) and os.path.exists(os.path.join(p, "test_cos.parquet")):
            return p
    raise FileNotFoundError("Не нашёл train_cos.parquet/test_cos.parquet ни в Features/cos-e5-small, ни в Data/.")

COS_TIT_DIR = pick_cos_tit_dir()

print("[paths]")
print("BASE_DIR =", BASE_DIR)
print("DATA_DIR =", DATA_DIR)
print("FEAT_DIR =", FEAT_DIR)
print("TAB_DIR  =", TAB_DIR)
print("COS_TIT_DIR =", COS_TIT_DIR)
print("COS_DESC_BASE_DIR =", COS_DESC_BASE_DIR)


[paths]
BASE_DIR = C:\Users\idine\PycharmProjects\Avito_Test
DATA_DIR = C:\Users\idine\PycharmProjects\Avito_Test\Data
FEAT_DIR = C:\Users\idine\PycharmProjects\Avito_Test\Features
TAB_DIR  = C:\Users\idine\PycharmProjects\Avito_Test\Features\table-basic
COS_TIT_DIR = C:\Users\idine\PycharmProjects\Avito_Test\Features\cos-e5-small
COS_DESC_BASE_DIR = C:\Users\idine\PycharmProjects\Avito_Test\Features\cos-e5-base-desc


In [4]:
# ---- Lazy источники ----

KEYS = ["query_id", "item_id"]

# базовые пары + таргет (train)
train_base_lf = pl.scan_parquet(TRAIN_PATH).select(["query_id", "item_id", "item_contact"])
test_base_lf = pl.scan_parquet(TEST_PATH).select(["query_id", "item_id"])

# табличные фичи
train_tab_lf = pl.scan_parquet(os.path.join(TAB_DIR, "train_feats_tab.parquet"))
test_tab_lf = pl.scan_parquet(os.path.join(TAB_DIR, "test_feats_tab.parquet"))

# косинус query-title
train_cos_tit_lf = pl.scan_parquet(os.path.join(COS_TIT_DIR, "train_cos.parquet"))
test_cos_tit_lf = pl.scan_parquet(os.path.join(COS_TIT_DIR, "test_cos.parquet"))

# косинус query_base ↔ desc_base (опционально)
has_desc_cos = (
        os.path.exists(os.path.join(COS_DESC_BASE_DIR, "train_cos_desc_base.parquet")) and
        os.path.exists(os.path.join(COS_DESC_BASE_DIR, "test_cos_desc_base.parquet"))
)
train_cos_desc_lf = (
    pl.scan_parquet(os.path.join(COS_DESC_BASE_DIR, "train_cos_desc_base.parquet"))
    .select(KEYS + ["cos_q_desc_base"])
    if has_desc_cos else None
)
test_cos_desc_lf = (
    pl.scan_parquet(os.path.join(COS_DESC_BASE_DIR, "test_cos_desc_base.parquet"))
    .select(KEYS + ["cos_q_desc_base"])
    if has_desc_cos else None
)


# ---- Вспомогательные функции ----

def schema_keys(lf: pl.LazyFrame) -> set[str]:
    return set(lf.collect_schema().keys())


def norm_keys(lf: pl.LazyFrame, extra: list[str] | None = None) -> pl.LazyFrame:
    """Выбираем только реально существующие колонки и кастуем ключи к Int64."""
    want = (extra or []) + KEYS
    have = schema_keys(lf)
    cols = [c for c in want if c in have]  # защита от ColumnNotFoundError
    lf = lf.select(cols)
    # если ключей нет в наборе (редко), добавим их пустыми, чтобы каст не упал
    if "query_id" not in cols and "query_id" in have:
        lf = lf.with_columns(pl.col("query_id"))
    if "item_id" not in cols and "item_id" in have:
        lf = lf.with_columns(pl.col("item_id"))
    # финальный каст ключей
    return lf.with_columns(
        pl.col("query_id").cast(pl.Int64),
        pl.col("item_id").cast(pl.Int64),
    )


# после получения схем:
train_tab_schema = train_tab_lf.collect_schema()
test_tab_schema  = test_tab_lf.collect_schema()

EXCLUDE = set(KEYS) | {"item_contact"}  # исключаем ключи и таргет

num_cols_train = [c for c, t in train_tab_schema.items() if t.is_numeric() and c not in EXCLUDE]
num_cols_test  = [c for c, t in  test_tab_schema.items()  if t.is_numeric() and c not in EXCLUDE]

train_tab_lf = norm_keys(train_tab_lf, num_cols_train)
test_tab_lf  = norm_keys(test_tab_lf,  num_cols_test)

train_cos_tit_lf = norm_keys(train_cos_tit_lf, ["cos_q_title"])
test_cos_tit_lf = norm_keys(test_cos_tit_lf, ["cos_q_title"])

train_base_lf = norm_keys(train_base_lf, ["item_contact"])
test_base_lf = norm_keys(test_base_lf)

if has_desc_cos:
    train_cos_desc_lf = norm_keys(train_cos_desc_lf, ["cos_q_desc_base"])
    test_cos_desc_lf = norm_keys(test_cos_desc_lf, ["cos_q_desc_base"])

# ---- Ленивые join'ы ----
train_lf = (
    train_base_lf
    .join(train_tab_lf, on=KEYS, how="left")
    .join(train_cos_tit_lf, on=KEYS, how="left")
)
test_lf = (
    test_base_lf
    .join(test_tab_lf, on=KEYS, how="left")
    .join(test_cos_tit_lf, on=KEYS, how="left")
)

if has_desc_cos:
    train_lf = train_lf.join(train_cos_desc_lf, on=KEYS, how="left")
    test_lf = test_lf.join(test_cos_desc_lf, on=KEYS, how="left")

# ---- Один сбор (streaming) ----
train_full = train_lf.collect(streaming=True)
test_full = test_lf.collect(streaming=True)

print("[assembled] train_full:", train_full.shape, "test_full:", test_full.shape)


  train_full = train_lf.collect(streaming=True)


[assembled] train_full: (7781790, 49) test_full: (335348, 48)


  test_full = test_lf.collect(streaming=True)


In [5]:

##%%
# ---- QC: быстрый coverage и non-finite ----

def coverage_count(pairs: pl.DataFrame, feats: pl.DataFrame, name: str):
    found = (
        pairs.select(KEYS)
             .join(feats.select(KEYS).with_columns(pl.lit(1).alias("__hit")),
                   on=KEYS, how="left")
             .select(pl.col("__hit").is_null().sum().alias("missing"))
             .item()
    )
    print(f"[coverage] {name}: missing pairs =", found)

# источники пар
train_pairs = pl.scan_parquet(TRAIN_PATH).select(KEYS).collect(streaming=True)
test_pairs  = pl.scan_parquet(TEST_PATH ).select(KEYS).collect(streaming=True)

coverage_count(train_pairs, train_full, "train")
coverage_count(test_pairs,  test_full,  "test")

def non_finite_report_fast(df: pl.DataFrame, name: str, limit_print=20):
    num_cols = [c for c,t in df.schema.items() if t.is_numeric() and c not in ("query_id","item_id","item_contact")]
    if not num_cols:
        print(f"[non-finite] {name}: нет числовых фич")
        return
    res = df.select([(~pl.col(c).is_finite()).sum().alias(c) for c in num_cols])
    bad = res.melt(variable_name="col", value_name="bad").sort("bad", descending=True).filter(pl.col("bad") > 0)
    if bad.height == 0:
        print(f"[non-finite] {name}: всё ок")
    else:
        print(f"[non-finite] {name}: проблемные колонки (первые {limit_print})")
        print(bad.head(limit_print))

non_finite_report_fast(train_full, "train")
non_finite_report_fast(test_full,  "test")


  train_pairs = pl.scan_parquet(TRAIN_PATH).select(KEYS).collect(streaming=True)
  test_pairs  = pl.scan_parquet(TEST_PATH ).select(KEYS).collect(streaming=True)


[coverage] train: missing pairs = 0
[coverage] test: missing pairs = 0
[non-finite] train: всё ок
[non-finite] test: всё ок


  bad = res.melt(variable_name="col", value_name="bad").sort("bad", descending=True).filter(pl.col("bad") > 0)


In [6]:
# ---- Подготовка матриц для LGBM ----
drop_cols = {"query_id", "item_id", "item_contact"}

train_num = {c for c, t in train_full.schema.items() if t.is_numeric() and c not in drop_cols}
test_num  = {c for c, t in  test_full.schema.items() if t.is_numeric() and c not in drop_cols}

# пересечение (на случай item_contact_right и пр.)
feat_cols = [c for c in train_num & test_num]

def sanitize(df: pl.DataFrame, cols: list[str]) -> np.ndarray:
    X = df.select([pl.col(c).cast(pl.Float32).fill_null(0.0).alias(c) for c in cols]).to_numpy()
    X[~np.isfinite(X)] = 0.0
    return X.astype(np.float32, copy=False)

X_train = sanitize(train_full, feat_cols)
y_train = train_full.get_column("item_contact").to_numpy().astype(np.int32)
qids    = train_full.get_column("query_id").to_numpy()
X_test  = sanitize(test_full, feat_cols)

print("[matrix] X_train:", X_train.shape, "X_test:", X_test.shape, "features:", len(feat_cols))


[matrix] X_train: (7781790, 46) X_test: (335348, 46) features: 46


In [7]:

##%%
# ---- Hold-out по query_id ----
rng = np.random.default_rng(42)
uniq_q = np.unique(qids); rng.shuffle(uniq_q)
n_val = max(1, int(0.1 * len(uniq_q)))
val_set = set(uniq_q[:n_val])

val_mask = np.isin(qids, list(val_set))
tr_mask  = ~val_mask

tr_idx = np.where(tr_mask)[0]
va_idx = np.where(val_mask)[0]
tr_idx = tr_idx[np.argsort(qids[tr_idx], kind="mergesort")]
va_idx = va_idx[np.argsort(qids[va_idx], kind="mergesort")]

X_tr, y_tr, q_tr = X_train[tr_idx], y_train[tr_idx], qids[tr_idx]
X_va, y_va, q_va = X_train[va_idx], y_train[va_idx], qids[va_idx]

def group_sizes_from_sorted_ids(ids: np.ndarray) -> np.ndarray:
    _, counts = np.unique(ids, return_counts=True)
    return counts.astype(int)

tr_groups = group_sizes_from_sorted_ids(q_tr)
va_groups = group_sizes_from_sorted_ids(q_va)

print(f"[split] train rows={X_tr.shape[0]}, val rows={X_va.shape[0]}, queries train/val={len(np.unique(q_tr))}/{len(np.unique(q_va))}")


[split] train rows=7006773, val rows=775017, queries train/val=610371/67819


In [8]:

##%%
# ---- Кастомный NDCG@10 (0.97^pos) ----
def calc_dcg_at_k(v: np.ndarray, k: int = 10) -> float:
    w = 0.97 ** np.arange(len(v))
    return float((v * w)[:k].sum())

def calc_ndcg_at_k(labels: np.ndarray, preds: np.ndarray, groups: np.ndarray, k: int = 10) -> float:
    order = np.argsort(groups, kind="mergesort")
    labels, preds, groups = labels[order], preds[order], groups[order]
    uq, counts = np.unique(groups, return_counts=True)
    s = 0
    scores = []
    for c in counts:
        sl = slice(s, s+c)
        l = labels[sl]; p = preds[sl]
        idx = np.argsort(-p, kind="mergesort")
        idcg = calc_dcg_at_k(np.sort(l)[::-1], k) + 1e-12
        scores.append(calc_dcg_at_k(l[idx], k) / idcg)
        s += c
    return float(np.mean(scores)) if scores else 0.0


In [9]:

##%%
# ---- Обучение LGBM ----
if not HAS_LGBM:
    raise RuntimeError("LightGBM недоступен в окружении. Установи пакет lightgbm.")

params = dict(
    objective="lambdarank",
    metric="ndcg",
    ndcg_eval_at=[10],
    learning_rate=0.05,
    num_leaves=127,
    min_data_in_leaf=100,
    feature_fraction=0.9,
    bagging_fraction=0.8,
    bagging_freq=1,
    verbose=-1,
    seed=42,
    device_type="cpu",  # "gpu" если есть GPU-версия LGBM
)

dtr = lgb.Dataset(X_tr, label=y_tr, group=tr_groups, feature_name=feat_cols)
dva = lgb.Dataset(X_va, label=y_va, group=va_groups, feature_name=feat_cols, reference=dtr)

model = lgb.train(
    params,
    dtr,
    valid_sets=[dva],
    num_boost_round=3000,
    callbacks=[
        lgb.early_stopping(100, verbose=False),
        lgb.log_evaluation(100),
    ],
)
print("[train] best_iteration:", model.best_iteration)


[100]	valid_0's ndcg@10: 0.88796
[200]	valid_0's ndcg@10: 0.889113
[300]	valid_0's ndcg@10: 0.889281
[400]	valid_0's ndcg@10: 0.889521
[500]	valid_0's ndcg@10: 0.88967
[600]	valid_0's ndcg@10: 0.889788
[700]	valid_0's ndcg@10: 0.889844
[800]	valid_0's ndcg@10: 0.890122
[900]	valid_0's ndcg@10: 0.890087
[train] best_iteration: 830


In [12]:

##%%
# ---- Валидация кастомным NDCG@10 ----
preds_va = model.predict(X_va, num_iteration=model.best_iteration)
ndcg_val = calc_ndcg_at_k(y_va.astype(float), preds_va.astype(float), q_va, k=10)
print(f"[holdout] custom NDCG@10 (0.97^pos) = {ndcg_val:.5f}")


[holdout] custom NDCG@10 (0.97^pos) = 0.30772


In [13]:

##%%
# ---- Предсказания и сабмит ----
test_pred = model.predict(X_test, num_iteration=model.best_iteration)

sub = test_full.select(["query_id","item_id"]).with_columns(
    pl.Series("pred", test_pred)
).sort(["query_id","pred"], descending=[False, True]).select(["query_id","item_id"])

SUB_PATH = os.path.join(BASE_DIR, "solution.csv")
sub.write_csv(SUB_PATH, include_header=True)
print("[save] submission ->", SUB_PATH, "rows=", sub.height)


[save] submission -> C:\Users\idine\PycharmProjects\Avito_Test\solution.csv rows= 335348


In [15]:
# ---- Кастомная метрика ndcg97@10 для LightGBM ----
def _dcg_decay97(rel_sorted: np.ndarray, k: int = 10) -> float:
    # Уже отсортированный по убыванию релевантности вектор
    n = min(k, rel_sorted.shape[0])
    if n == 0:
        return 0.0
    w = (0.97 ** np.arange(n)).astype(np.float64)
    return float((rel_sorted[:n] * w).sum())


def _ndcg97_for_group(labels: np.ndarray, scores: np.ndarray, k: int = 10) -> float:
    if labels.size == 0:
        return 0.0
    # предсказанное ранжирование
    order = np.argsort(-scores, kind="mergesort")
    dcg = _dcg_decay97(labels[order], k)
    # идеальное ранжирование
    idcg = _dcg_decay97(np.sort(labels)[::-1], k)
    return 0.0 if idcg <= 1e-12 else (dcg / idcg)


def feval_ndcg97_at_10(preds: np.ndarray, train_dataset: "lgb.Dataset"):
    """LightGBM feval: возвращает средний ndcg97@10 по группам."""
    y = train_dataset.get_label().astype(np.float64, copy=False)
    group = train_dataset.get_group()  # размеры групп в порядке следования строк
    assert group is not None, "group sizes are required for lambdarank"
    out = []
    s = 0
    for g in group:
        e = s + int(g)
        out.append(_ndcg97_for_group(y[s:e], preds[s:e], k=10))
        s = e
    val = float(np.mean(out)) if out else 0.0
    # (name, value, higher_is_better)
    return ("ndcg97@10", val, True)

In [17]:
# ---- Обучение LGBM по ndcg97@10 ----
if not HAS_LGBM:
    raise RuntimeError("LightGBM недоступен. Установи пакет lightgbm.")

params = dict(
    objective="lambdarank",
    metric="None",            # отключаем встроенную метрику
    learning_rate=0.05,
    num_leaves=127,
    min_data_in_leaf=100,
    feature_fraction=0.9,
    bagging_fraction=0.8,
    bagging_freq=1,
    verbose=-1,
    seed=42,
    device_type="gpu",
)

dtr = lgb.Dataset(X_tr, label=y_tr, group=tr_groups, feature_name=feat_cols)
dva = lgb.Dataset(X_va, label=y_va, group=va_groups, feature_name=feat_cols, reference=dtr)

model = lgb.train(
    params,
    dtr,
    num_boost_round=4000,
    valid_sets=[dva],                 # <-- только Dataset
    valid_names=["valid"],            # <-- имя отдельно
    feval=feval_ndcg97_at_10,         # кастомная метрика (0.97^rank)
    callbacks=[
        lgb.early_stopping(150, verbose=False),  # остановка по ndcg97@10
        lgb.log_evaluation(100),
    ],
)

print("[train] best_iteration:", model.best_iteration)
preds_va = model.predict(X_va, num_iteration=model.best_iteration)
ndcg_val = calc_ndcg_at_k(y_va.astype(float), preds_va.astype(float), q_va, k=10)
print(f"[holdout] ndcg97@10 = {ndcg_val:.5f}")


[100]	valid's ndcg97@10: 0.306207
[200]	valid's ndcg97@10: 0.306635
[300]	valid's ndcg97@10: 0.306797
[400]	valid's ndcg97@10: 0.306835
[train] best_iteration: 256
[holdout] ndcg97@10 = 0.30701


In [None]:
assert HAS_LGBM, "LightGBM недоступен. Установи пакет lightgbm."

# 1) забираем лучшее число итераций с валидации
best_iter = int(model.best_iteration or 1000)
print(f"[full-train] using best_iter = {best_iter}")

# 2) склеиваем train+valid (важно: в исходном порядке по группам)
import numpy as np
import pandas as pd

X_all = np.vstack([X_tr, X_va])            # если это pd.DataFrame — используй pd.concat([X_tr, X_va])
y_all = np.concatenate([y_tr, y_va])
groups_all = np.concatenate([tr_groups, va_groups])

# 3) собираем Dataset и переобучаем без early stopping
params_full = dict(
    objective="lambdarank",
    metric="None",
    learning_rate=0.05,
    num_leaves=127,
    min_data_in_leaf=100,
    feature_fraction=0.9,
    bagging_fraction=0.8,
    bagging_freq=1,
    verbose=-1,
    seed=42,
    device_type="gpu",
)

dall = lgb.Dataset(X_all, label=y_all, group=groups_all, feature_name=feat_cols)

final_model = lgb.train(
    params_full,
    dall,
    num_boost_round=best_iter,     # фиксируем число итераций
    valid_sets=[],                 # без валидации
    feval=None,                    # кастомная метрика не нужна здесь
    callbacks=[lgb.log_evaluation(200)],
)

print("[full-train] done. num_trees:", final_model.num_trees())

# 4) предсказываем на тесте
preds_te = final_model.predict(X_te, num_iteration=best_iter)

# 5) формируем сабмит
# Вариант A: если сабмит — просто скоры на каждую пару (qid, doc_id)
sub = pd.DataFrame({
    "qid": qid_te,            # подставь своё имя колонки с ID запроса
    "doc_id": doc_ids_te,     # подставь своё имя колонки с ID документа
    "score": preds_te.astype(float),
})

# Вариант B (если нужен ранжированный список или rank по каждому qid):
# присвоим ранги внутри каждого qid по убыванию score
sub["rank"] = (
    sub.groupby("qid")["score"]
       .rank(method="first", ascending=False)
       .astype(int)
)

# если требуется top-10: оставляем rank<=10
# sub_top10 = sub[sub["rank"] <= 10].copy()

# 6) сохраняем
sub_path = "submission_full.csv"
sub.to_csv(sub_path, index=False)
print(f"[submit] saved to {sub_path}")