# Avito Search Reranking

Этавы решения:
- Читаем только нужные колонки из `.parquet` (Arrow Dataset), без загрузки всего в память.
- Строим эмбеддинги `E5` **чанками** (батчами), напрямую на GPU через `transformers` (`AutoModel` + `AutoTokenizer`).
- Сохраняем эмбеддинги в **меммапы** (`np.memmap`, dtype=float16), чтобы не держать весь массив в RAM.
- Далее считаем косинусы по строкам **пакетами** на GPU.

In [1]:
# !pip -q install --upgrade pip > /dev/null
!pip -q install transformers>=4.41.0 torch>=2.1.0 sentencepiece pyarrow==16.0.0 catboost faiss-cpu

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
bigframes 2.8.0 requires google-cloud-bigquery-storage<3.0.0,>=2.30.0, which is not installed.
datasets 3.6.0 requires fsspec[http]<=2025.3.0,>=2023.1.0, but you have fsspec 2025.5.1 which is incompatible.
pandas-gbq 0.29.1 requires google-api-core<3.0.0,>=2.10.2, but you have google-api-core 1.34.1 which is incompatible.
bigframes 2.8.0 requires google-cloud-bigquery[bqstorage,pandas]>=3.31.0, but you have google-cloud-bigquery 3.25.0 which is incompatible.
bigframes 2.8.0 requires rich<14,>=12.4.4, but you have rich 14.0.0 which is incompatible.[0m[31m
[0m

In [2]:
import os, gc, sys, glob, math, json, random
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import torch
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, AutoModel

SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
assert torch.cuda.is_available(), 'GPU is not available.'
device = torch.device('cuda')
print('CUDA device:', torch.cuda.get_device_name(0))

# for P100
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128'
torch.backends.cudnn.benchmark = True
torch.set_float32_matmul_precision('high')

WORK_DIR = Path('/kaggle/working') if Path('/kaggle/working').exists() else Path('.')
INPUT_DIR = Path('/kaggle/input') if Path('/kaggle/input').exists() else Path('.')
CACHE_DIR = WORK_DIR / 'cache'; CACHE_DIR.mkdir(parents=True, exist_ok=True)

TRAIN_PATH, TEST_PATH = "/kaggle/input/avito-contest/train-dset.parquet", "/kaggle/input/avito-contest/test-dset-small.parquet"
print('Data:', TRAIN_PATH, TEST_PATH)

CUDA device: Tesla P100-PCIE-16GB
Data: /kaggle/input/avito-contest/train-dset.parquet /kaggle/input/avito-contest/test-dset-small.parquet


## 1. Считываем нужные колонки Arrow Dataset
Компактные таблицы уникальных `queries` и `items` без загрузки полного датафрейма.

In [4]:
import os
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.compute as pc
import pandas as pd
from tqdm.auto import tqdm

os.environ["PYARROW_NUM_THREADS"] = str(os.cpu_count() or 4)

def load_unique_from_parquet(train_path, test_path, id_col, text_cols, batch_rows=200_000):
    dataset = ds.dataset([train_path, test_path], format='parquet')
    cols = [id_col] + text_cols
    total = dataset.count_rows()
    pbar = tqdm(total=total, desc=f"scan {id_col}", unit="rows")

    uniq = {}  # id -> text
    scanner = dataset.scanner(columns=cols, batch_size=batch_rows, use_threads=True)

    for b in scanner.to_batches():
        arrays = [pc.fill_null(b[c], pa.scalar('')) for c in text_cols]
        if len(arrays) == 1:
            text_arr = arrays[0]
        else:
            parts = []
            for i, arr in enumerate(arrays):
                if i > 0:
                    parts.append(pa.scalar(' '))
                parts.append(arr)
            text_arr = pc.binary_join_element_wise(*parts)

        text_arr = pc.replace_substring_regex(text_arr, pattern=r"\s+", replacement=" ")
        text_arr = pc.fill_null(text_arr, pa.scalar(''))

        mini_tbl = pa.table({id_col: b[id_col], "text": text_arr})
        df = mini_tbl.to_pandas(types_mapper=None, use_threads=True)

        id_vals = df[id_col].to_numpy()
        text_vals = df["text"].astype(str).to_numpy()
        for k, v in zip(id_vals, text_vals):
            if k not in uniq:
                uniq[k] = v

        pbar.update(len(df))

    pbar.close()
    out = pd.DataFrame({id_col: list(uniq.keys()), "text": list(uniq.values())})
    out = out.drop_duplicates(subset=[id_col]).reset_index(drop=True)
    return out

queries_df = load_unique_from_parquet(TRAIN_PATH, TEST_PATH, 'query_id', ['query_text'])
items_df   = load_unique_from_parquet(TRAIN_PATH, TEST_PATH, 'item_id', ['item_title', 'item_description'])
print('Unique queries/items:', queries_df.shape, items_df.shape)

# queries_df.to_parquet(CACHE_DIR/'queries_unique.parquet', index=False)
# items_df.to_parquet(CACHE_DIR/'items_unique.parquet', index=False)

scan query_id:   0%|          | 0/8117138 [00:00<?, ?rows/s]

scan item_id:   0%|          | 0/8117138 [00:00<?, ?rows/s]

Unique queries/items: (690695, 2) (5986464, 2)


## 2. Получение текстовых эмбедингов
Используем `intfloat/multilingual-e5-small`

In [None]:
import os, gc
import numpy as np
from tqdm.auto import tqdm
import torch
from transformers import AutoTokenizer, AutoModel

os.environ.setdefault("TOKENIZERS_PARALLELISM", "true")
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True

MODEL_NAME = 'intfloat/multilingual-e5-small'
MAX_LEN    = 256
BATCH_TOK  = 1024
DTYPE      = torch.float16

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
model     = AutoModel.from_pretrained(MODEL_NAME, torch_dtype=DTYPE).eval().to(device)

def mean_pooling(last_hidden_state, attention_mask):
    mask = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
    summed = torch.sum(last_hidden_state * mask, dim=1)
    counts = torch.clamp(mask.sum(dim=1), min=1e-9)
    return summed / counts

@torch.inference_mode()
def encode_texts_iter(texts, is_query: bool, init_batch=BATCH_TOK, desc="emb"):
    prompt = 'query: ' if is_query else 'passage: '
    n  = len(texts)
    i  = 0
    bs = max(8, int(init_batch))
    pbar = tqdm(total=n, desc=desc, unit="texts")

    while i < n:
        tried = False
        while True:
            j = min(i + bs, n)
            batch_texts = [prompt + (t if isinstance(t, str) else str(t)) for t in texts[i:j]]
            try:
                tok = tokenizer(batch_texts, padding=True, truncation=True, max_length=MAX_LEN,
                                return_tensors='pt')
                for k in tok:
                    tok[k] = tok[k].pin_memory().to(device, non_blocking=True)

                with torch.cuda.amp.autocast(enabled=True, dtype=DTYPE):
                    out = model(**tok)
                    emb = mean_pooling(out.last_hidden_state, tok['attention_mask'])
                    emb = torch.nn.functional.normalize(emb, p=2, dim=1)

                yield emb.detach().to('cpu', dtype=torch.float16).numpy()
                pbar.update(j - i)
                i = j
                if tried and bs < init_batch:
                    bs = min(init_batch, int(bs * 1.25))
                break
            except torch.cuda.OutOfMemoryError:
                torch.cuda.empty_cache()
                if bs <= 8:
                    raise
                bs = max(8, bs // 2)
                tried = True
            finally:
                del tok
                gc.collect()
    pbar.close()

def estimate_token_lengths(texts, is_query: bool, batch=4096):
    prompt = 'query: ' if is_query else 'passage: '
    n = len(texts)
    lens = np.empty(n, dtype=np.int32)
    idx = 0
    for s in tqdm(range(0, n, batch), desc="length pass", unit="texts"):
        e = min(s + batch, n)
        bt = [prompt + (t if isinstance(t, str) else str(t)) for t in texts[s:e]]
        enc = tokenizer(bt, padding=False, truncation=False, return_length=True)

        arr = np.array(enc["length"], dtype=np.int32)
        arr = np.minimum(arr, MAX_LEN)
        lens[idx:idx + len(arr)] = arr
        idx += len(arr)
    return lens

def embed_to_memmap_grouped(texts, out_path, dtype='float16', is_query=False,
                            init_batch=BATCH_TOK, flush_every=16, desc="emb(grouped)"):
    n = len(texts)
    if n == 0:
        raise ValueError("texts is empty")

    lens = estimate_token_lengths(texts, is_query, batch=8192)
    order = np.argsort(lens, kind="mergesort")
    texts_sorted = [texts[i] for i in order]

    it = encode_texts_iter(texts_sorted, is_query, init_batch=init_batch, desc=desc)
    first = next(it)
    dim = first.shape[1]
    print(dim)

    mm = np.memmap(out_path, mode='w+',
                   dtype=(np.float16 if dtype == 'float16' else np.float32),
                   shape=(n, dim))

    wrote = 0
    mm[order[wrote:wrote + first.shape[0]]] = first
    wrote += first.shape[0]
    del first

    k = 1
    for block in it:
        mm[order[wrote:wrote + block.shape[0]]] = block
        wrote += block.shape[0]
        if (k % flush_every) == 0:
            mm.flush()
        k += 1

    mm.flush(); del mm
    gc.collect(); torch.cuda.empty_cache()
    return dim

q_texts = queries_df['text'].tolist()
i_texts = items_df['text'].tolist()

Q_MEM = str(CACHE_DIR/'q_emb_test.fp16.memmap')
I_MEM = str(CACHE_DIR/'i_emb._test.fp16.memmap')

print('Embedding queries to memmap')
q_dim = embed_to_memmap_grouped(q_texts, Q_MEM, dtype='float16', is_query=True,
                                init_batch=BATCH_TOK, desc="emb(queries grouped)")
print('Embedding items to memmap')
i_dim = embed_to_memmap_grouped(i_texts, I_MEM, dtype='float16', is_query=False,
                                init_batch=BATCH_TOK, desc="emb(items grouped)")

assert q_dim == i_dim, "Dims mismatch"
EMB_DIM = q_dim
print("Done. Embedding dim =", EMB_DIM)

## 3. Получаем `e5_cos`

In [5]:
from tqdm.auto import tqdm

EMB_DIM = 384

Q_MEM = "/kaggle/input/mmaps-01/cache/q_emb.fp16.memmap"
I_MEM = "/kaggle/input/mmaps-01/cache/i_emb.fp16.memmap"

qid_to_idx = {int(i): idx for idx, i in enumerate(queries_df['query_id'].astype(int).tolist())}
iid_to_idx = {int(i): idx for idx, i in enumerate(items_df['item_id'].astype(int).tolist())}

q_mm = np.memmap(Q_MEM, mode='r', dtype=np.float16, shape=(len(queries_df), EMB_DIM))
i_mm = np.memmap(I_MEM, mode='r', dtype=np.float16, shape=(len(items_df), EMB_DIM))

def stream_pairs_from_parquet(path, batch_rows=200_000, with_progress=True, desc="scan"):
    d = ds.dataset(path, format='parquet')
    cols = [c for c in d.schema.names if c in ['query_id','item_id','item_contact']]
    total = d.count_rows() if with_progress else None
    pbar = tqdm(total=total, desc=desc, unit="rows") if with_progress else None

    scanner = d.scanner(columns=cols, batch_size=batch_rows, use_threads=True)
    for rec_batch in scanner.to_batches():
        df = rec_batch.to_pandas()
        if pbar: pbar.update(len(df))
        yield df
    if pbar: pbar.close()

@torch.no_grad()
def cosine_for_df(df, bs=50_000, desc="cosine"):
    # map ids to indices
    q_idx = df['query_id'].astype(int).map(qid_to_idx).to_numpy()
    i_idx = df['item_id'].astype(int).map(iid_to_idx).to_numpy()

    out = np.empty(len(df), dtype=np.float32)
    # pbar = tqdm(total=len(df), desc=desc, unit="pairs")

    for s in range(0, len(df), bs):
        e = min(s + bs, len(df))
        Q_cpu = torch.from_numpy(q_mm[q_idx[s:e]]).pin_memory()
        I_cpu = torch.from_numpy(i_mm[i_idx[s:e]]).pin_memory()
        Q = Q_cpu.to(device, non_blocking=True)
        I = I_cpu.to(device, non_blocking=True)

        with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):
            sc = torch.sum(Q * I, dim=1)

        out[s:e] = sc.float().cpu().numpy()

        del Q_cpu, I_cpu, Q, I, sc
        torch.cuda.empty_cache()

        # pbar.update(e - s)

    # pbar.close()
    return out

print('Compute e5_cos for TRAIN')
train_parts = []
for part in stream_pairs_from_parquet(TRAIN_PATH, batch_rows=200_000, with_progress=True, desc="scan TRAIN"):
    part = part.copy()
    part['e5_cos'] = cosine_for_df(part, bs=50_000, desc="cos(TRAIN)")
    train_parts.append(part)
train = pd.concat(train_parts, axis=0, ignore_index=True)
print('Train with e5_cos:', train.shape)

print('Compute e5_cos for TEST.')
test_parts = []
for part in stream_pairs_from_parquet(TEST_PATH, batch_rows=200_000, with_progress=True, desc="scan TEST"):
    part = part.copy()
    if 'item_contact' not in part.columns:
        part['item_contact'] = 0
    part['e5_cos'] = cosine_for_df(part, bs=50_000, desc="cos(TEST)")
    test_parts.append(part[['query_id','item_id','e5_cos']])
test = pd.concat(test_parts, axis=0, ignore_index=True)
print('Test with e5_cos:', test.shape)

Compute e5_cos for TRAIN (streaming)...


scan TRAIN:   0%|          | 0/7781790 [00:00<?, ?rows/s]

cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/48576 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/48576 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/48576 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/48576 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/48576 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/48576 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/48576 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TRAIN):   0%|          | 0/41758 [00:00<?, ?pairs/s]

Train with e5_cos: (7781790, 4)
Compute e5_cos for TEST (streaming)...


  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


scan TEST:   0%|          | 0/335348 [00:00<?, ?rows/s]

cos(TEST):   0%|          | 0/200000 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


cos(TEST):   0%|          | 0/135348 [00:00<?, ?pairs/s]

  with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16):


Test with e5_cos: (335348, 3)


## 4. Простые табличные фичи
Добавляем к ранее полученному датафрейму

In [7]:
def stream_extra_features(path, cols, batch_rows=200_000):
    d = ds.dataset(path, format='parquet')
    keep = ['query_id','item_id'] + cols
    scanner = d.scanner(columns=[c for c in keep if c in d.schema.names], batch_size=batch_rows, use_threads=True)
    for b in scanner.to_batches():
        yield b.to_pandas()

def attach_basic_tabular(train_df, test_df):
    feats = [
        'price','item_query_click_conv',
        'query_mcat','query_cat','query_loc',
        'item_mcat_id','item_cat_id','item_loc'
    ]

    # train
    tmp_parts = []
    for part in stream_extra_features(TRAIN_PATH, feats):
        tmp_parts.append(part)
    aux_train = pd.concat(tmp_parts, ignore_index=True) if tmp_parts else pd.DataFrame(columns=['query_id','item_id'])
    train_df = train_df.merge(aux_train, on=['query_id','item_id'], how='left')

    # читаем test
    tmp_parts = []
    for part in stream_extra_features(TEST_PATH, feats):
        tmp_parts.append(part)
    aux_test = pd.concat(tmp_parts, ignore_index=True) if tmp_parts else pd.DataFrame(columns=['query_id','item_id'])
    test_df = test_df.merge(aux_test, on=['query_id','item_id'], how='left')

    def sget(df, col, default):
        if col in df.columns:
            return df[col]
        return pd.Series(default, index=df.index)

    for df in (train_df, test_df):
        # click_conv
        click_conv = pd.to_numeric(sget(df, 'item_query_click_conv', 0.0), errors='coerce').fillna(0.0)
        df['click_conv'] = click_conv.astype('float32')

        # price
        price_raw = sget(df, 'price', np.nan)
        price = pd.to_numeric(price_raw, errors='coerce')
        df['price_log1p'] = np.log1p(price.fillna(0.0)).astype('float32')

        qid = sget(df, 'query_id', np.nan)
        grp = price.groupby(qid)
        mean = grp.transform('mean')
        std  = grp.transform('std').replace(0.0, np.nan)

        df['price_z']        = ((price - mean) / std).fillna(0.0).astype('float32')
        df['price_rank']     = grp.rank(method='average').astype('float32')
        size                 = grp.transform('size').astype('float32')
        df['price_rank_pct'] = (df['price_rank'] / size).fillna(0.0).astype('float32')

        # equality flags
        def eq(a, b):
            A = sget(df, a, 0).apply(lambda x: int(x) if pd.notna(x) else 0)
            B = sget(df, b, 0).apply(lambda x: int(x) if pd.notna(x) else 0)
            return (A == B).astype('int8')
        df['mcat_eq'] = eq('query_mcat', 'item_mcat_id')
        df['cat_eq']  = eq('query_cat',  'item_cat_id')
        df['loc_eq']  = eq('query_loc',  'item_loc')

    return train_df, test_df

train, test = attach_basic_tabular(train, test)
print('Train/Test with tabular:', train.shape, test.shape)

Train/Test with tabular: (7781790, 20) (335348, 19)


## 5. Обучение модели, получение выхода
Обучаем `CatBoostRanker`, записываем в `solution.csv`

In [8]:
train

Unnamed: 0,query_id,item_id,item_contact,e5_cos,price,item_query_click_conv,query_mcat,query_cat,query_loc,item_mcat_id,item_cat_id,item_loc,click_conv,price_log1p,price_z,price_rank,price_rank_pct,mcat_eq,cat_eq,loc_eq
0,4,7349717282,0.0,0.891773,500.0,-1.0,38.0,29.0,624480.0,2179540,29,638660,-1.0,6.216606,-0.319819,5.0,0.500000,0,1,0
1,4,7519735286,0.0,0.883741,250.0,-1.0,38.0,29.0,624480.0,2179540,29,637640,-1.0,5.525453,-0.799164,3.0,0.300000,0,1,0
2,4,4384449104,0.0,0.872572,1500.0,-1.0,38.0,29.0,624480.0,2179540,29,623880,-1.0,7.313887,1.597561,9.0,0.900000,0,1,0
3,4,7283365509,0.0,0.915256,220.0,-1.0,38.0,29.0,624480.0,2179540,29,628530,-1.0,5.398163,-0.856686,2.0,0.200000,0,1,0
4,4,4452768560,1.0,0.914797,1648.0,-1.0,38.0,29.0,624480.0,2179540,29,637640,-1.0,7.407924,1.881334,10.0,1.000000,0,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7781785,824764,2410860885,0.0,0.000000,1000.0,-1.0,63.0,114.0,107620.0,1178214,114,638120,-1.0,6.908755,-0.217910,24.0,0.500000,0,1,0
7781786,824764,7467310892,0.0,0.000000,1500.0,-1.0,63.0,114.0,107620.0,1178215,114,639320,-1.0,7.313887,-0.206444,31.0,0.645833,0,1,0
7781787,824764,7513725672,0.0,0.000000,10000.0,-1.0,63.0,114.0,107620.0,1178215,114,637800,-1.0,9.210441,-0.011516,41.5,0.864583,0,1,0
7781788,824764,7550685038,0.0,0.000000,35000.0,-1.0,63.0,114.0,107620.0,1178212,114,637900,-1.0,10.463132,0.561803,46.5,0.968750,0,1,0


In [29]:
from catboost import CatBoostRanker, Pool

FEATS = ['e5_cos','click_conv','price_log1p','price_z','price_rank','price_rank_pct','mcat_eq','cat_eq','loc_eq']


order_tr = np.argsort(train['query_id'].values, kind='mergesort')
train_g  = train.iloc[order_tr].reset_index(drop=True)

X_train = train_g[FEATS].astype('float32')
y       = pd.to_numeric(train_g['item_contact'], errors='coerce').fillna(0).astype(int)
groups  = train_g['query_id'].astype('int64')

In [30]:
X_train

Unnamed: 0,e5_cos,click_conv,price_log1p,price_z,price_rank,price_rank_pct,mcat_eq,cat_eq,loc_eq
0,0.0,-1.0,6.398595,-0.332815,11.5,0.605263,1.0,1.0,1.0
1,0.0,0.0,7.378384,1.309650,16.5,0.868421,1.0,1.0,1.0
2,0.0,0.0,7.438972,1.473897,18.0,0.947368,1.0,1.0,1.0
3,0.0,-1.0,7.170888,0.816910,14.0,0.736842,1.0,1.0,1.0
4,0.0,0.0,6.216606,-0.497062,8.0,0.421053,1.0,1.0,1.0
...,...,...,...,...,...,...,...,...,...
7781785,0.0,-1.0,6.908755,-0.217910,24.0,0.500000,0.0,1.0,0.0
7781786,0.0,-1.0,7.313887,-0.206444,31.0,0.645833,0.0,1.0,0.0
7781787,0.0,-1.0,9.210441,-0.011516,41.5,0.864583,0.0,1.0,0.0
7781788,0.0,-1.0,10.463132,0.561803,46.5,0.968750,0.0,1.0,0.0


In [36]:
import numpy as np
import pandas as pd
from sklearn.model_selection import GroupKFold
from catboost import CatBoostRanker, Pool

# NDCG@10 0.97^pos 
def _dcg(v, k=10):
    w = 0.97 ** np.arange(len(v))
    return float((v * w)[:k].sum())

def _ndcg_for_group(rel, k=10):
    idcg = _dcg(np.sort(rel)[::-1], k)
    return 0.0 if idcg == 0 else _dcg(rel, k) / idcg

def ndcg_by_query_arr(group_ids, labels, scores, k=10):
    order = np.lexsort((-scores, group_ids))
    g = group_ids[order]
    y = labels[order]
    vals = []

    start = 0
    while start < len(g):
        end = start + 1
        while end < len(g) and g[end] == g[start]:
            end += 1
        vals.append(_ndcg_for_group(y[start:end], k))
        start = end
    return float(np.mean(vals)) if vals else 0.0

def take_rows(X, idx):
    return X.iloc[idx] if hasattr(X, "iloc") else X[idx]

y_arr = np.asarray(y, dtype=np.int32)
g_arr = np.asarray(groups, dtype=np.int64)

params = dict(
    loss_function='YetiRankPairwise',
    eval_metric='NDCG:top=10',
    iterations=3000,
    learning_rate=0.05,
    depth=8,
    l2_leaf_reg=3.0,
    task_type='GPU',
    random_seed=42,
    verbose=200,
    allow_writing_files=False,
)

n_splits = 5
kf = GroupKFold(n_splits=n_splits)

oof = np.zeros(len(y_arr), dtype=np.float32)
fold_scores = []
models = []

for fold, (tr_idx, va_idx) in enumerate(kf.split(np.zeros_like(y_arr), y_arr, groups=g_arr), start=1):
    X_tr, y_tr, g_tr = take_rows(X, tr_idx), y_arr[tr_idx], g_arr[tr_idx]
    X_va, y_va, g_va = take_rows(X, va_idx), y_arr[va_idx], g_arr[va_idx]

    order_tr = np.argsort(g_tr, kind='mergesort')
    order_va = np.argsort(g_va, kind='mergesort')

    X_tr_s, y_tr_s, g_tr_s = take_rows(X_tr, order_tr), y_tr[order_tr], g_tr[order_tr]
    X_va_s, y_va_s, g_va_s = take_rows(X_va, order_va), y_va[order_va], g_va[order_va]

    pool_tr = Pool(X_tr_s, label=y_tr_s, group_id=g_tr_s)
    pool_va = Pool(X_va_s, label=y_va_s, group_id=g_va_s)

    model = CatBoostRanker(**params)
    model.fit(pool_tr, eval_set=pool_va, use_best_model=True, early_stopping_rounds=400)
    models.append(model)

    pred_va_sorted = model.predict(pool_va).astype('float32')

    va_idx_sorted = va_idx[order_va]
    oof[va_idx_sorted] = pred_va_sorted

    ndcg10 = ndcg_by_query_arr(g_va_s, y_va_s, pred_va_sorted, k=10)
    fold_scores.append(ndcg10)
    print(f"[Fold {fold}] NDCG = {ndcg10*100:.2f}%")

oof_ndcg10 = ndcg_by_query_arr(g_arr, y_arr, oof, k=10)
print(f"OOF NDCG: {oof_ndcg10*100:.2f}%  |  folds: {[round(s*100,2) for s in fold_scores]}")

Default metric period is 5 because PFound, NDCG is/are not implemented for GPU
Metric PFound is not implemented on GPU. Will use CPU for metric computation, this could significantly affect learning time
Metric NDCG:top=10;type=Base is not implemented on GPU. Will use CPU for metric computation, this could significantly affect learning time


0:	test: 0.7772777	best: 0.7772777 (0)	total: 355ms	remaining: 17m 43s
200:	test: 0.8069594	best: 0.8069594 (200)	total: 54.3s	remaining: 12m 35s
400:	test: 0.8074539	best: 0.8075043 (393)	total: 1m 48s	remaining: 11m 42s
600:	test: 0.8075659	best: 0.8076176 (472)	total: 2m 42s	remaining: 10m 49s
800:	test: 0.8076052	best: 0.8076213 (778)	total: 3m 36s	remaining: 9m 54s
1000:	test: 0.8075685	best: 0.8076752 (845)	total: 4m 30s	remaining: 9m
1200:	test: 0.8075127	best: 0.8076752 (845)	total: 5m 24s	remaining: 8m 6s


KeyboardInterrupt: 

In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
from catboost import CatBoostRanker, Pool


order_te = np.argsort(test['query_id'].values, kind='mergesort')
test_g   = test.iloc[order_te].reset_index(drop=True)
X_te     = test_g[FEATS].astype('float32')

preds = np.mean([m.predict(X_te) for m in models], axis=0).astype('float32')

sub = test_g[['query_id','item_id']].copy()
sub['score'] = preds

solution = sub.sort_values(['query_id','score'], ascending=[True, False])[['query_id','item_id']]
out_csv = (WORK_DIR/'solution.csv') if 'WORK_DIR' in globals() else Path('solution.csv')
solution.to_csv(out_csv, header=['query_id','item_id'], index=False)
print('Saved submission to', out_csv)