In [6]:
# Imports
import os

import dask.dataframe as dd
import matplotlib.pyplot as plt
import numpy as np
from dask.diagnostics import ProgressBar
from dask.distributed import Client

plt.rcParams["figure.figsize"] = (8, 4)
plt.rcParams["axes.grid"] = True

# Reproducibility
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

# Paths (подставь свои)
DATA_DIR = "../data/raw"
DEBUG_OUT = "../data/transformed/train-debug.parquet"
TRAIN_PATH = os.path.join(DATA_DIR, "train-dset-001.parquet")
TEST_PATH = os.path.join(
    DATA_DIR, "test-dset-small.parquet"
)  # или test-dset.parquet если у тебя полный тест

client = Client("127.0.0.1:8786")
client

0,1
Connection method: Direct,
Dashboard: http://127.0.0.1:8787/status,

0,1
Comm: tcp://127.0.0.1:8786,Workers: 1
Dashboard: http://127.0.0.1:8787/status,Total threads: 1
Started: Just now,Total memory: 8.38 GiB

0,1
Comm: tcp://127.0.0.1:41041,Total threads: 1
Dashboard: http://127.0.0.1:39783/status,Memory: 8.38 GiB
Nanny: tcp://127.0.0.1:39399,
Local directory: /tmp/dask-scratch-space/worker-63_rl1v5,Local directory: /tmp/dask-scratch-space/worker-63_rl1v5
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 8.0%,Last seen: Just now
Memory usage: 57.96 MiB,Spilled bytes: 0 B
Read bytes: 14.47 kiB,Write bytes: 14.47 kiB


In [8]:
USE_COLS = [
    "query_id",
    "item_id",
    "query_text",
    "item_title",
    "item_description",
    "query_cat",
    "query_mcat",
    "query_loc",
    "item_cat_id",
    "item_mcat_id",
    "item_loc",
    "price",
    "item_query_click_conv",
    "item_contact",
]

train_dd = dd.read_parquet(
    TRAIN_PATH, columns=USE_COLS, engine="pyarrow", blocksize="256MB"
)
test_dd = dd.read_parquet(
    TEST_PATH,
    columns=[c for c in USE_COLS if c != "item_contact"],
    engine="pyarrow",
    blocksize="256MB",
)

CATEG_COLS = [
    "query_cat",
    "query_mcat",
    "query_loc",
    "item_cat_id",
    "item_mcat_id",
    "item_loc",
]
for c in CATEG_COLS:
    if c in train_dd.columns:
        train_dd[c] = train_dd[c].astype("category")
    if c in test_dd.columns:
        test_dd[c] = test_dd[c].astype("category")

with ProgressBar():
    print("Train rows (approx):", train_dd.shape[0].compute())
    print("Test rows  (approx):", test_dd.shape[0].compute())

train_dd.head(3)


Train rows (approx): 7781790
Test rows  (approx): 335348


Unnamed: 0,query_id,item_id,query_text,item_title,item_description,query_cat,query_mcat,query_loc,item_cat_id,item_mcat_id,item_loc,price,item_query_click_conv,item_contact
0,4,7349717282,ботинки детские zara 21,Ботинки детские Zara,Новые полуботинки фирмы Zara. \nразмеры 21 сте...,29.0,38.0,624480.0,29,2179540,638660,500.0,-1.0,0.0
1,4,7519735286,ботинки детские zara 21,Детские ботинки Zara унисекс,"Крутые ботинки, в отличном состоянии",29.0,38.0,624480.0,29,2179540,637640,250.0,-1.0,0.0
2,4,4384449104,ботинки детские zara 21,Ботинки детские zara,Челси димесезонные Zara \nВ идеальном состояни...,29.0,38.0,624480.0,29,2179540,623880,1500.0,-1.0,0.0


## Feature engineering (минимальный бейслайн)

-   Цена: логарифм + нормализация по категории.
-   CTR: очистка (NaN → 0, клиппинг на 99 перцентиле).
-   Текст: длины query/title/description (символы + слова).
-   Match: совпадение категорий и локаций.
-   Сохраняем в parquet для обучения моделей.


In [10]:
import numpy as np
from dask.diagnostics import ProgressBar

# 1) Глобальные агрегаты (соберём одной compute)
with ProgressBar():
    med_price_ser = train_dd.groupby("item_cat_id", observed=False)["price"].median()
    p99_click = train_dd["item_query_click_conv"].quantile(0.99)
    med_price_ser, clip_val = dd.compute(med_price_ser, p99_click)

# dict для map
med_price_pdf = med_price_ser.rename("price_median_cat").reset_index()
# тип ключа подровняем:
try:
    med_price_pdf["item_cat_id"] = med_price_pdf["item_cat_id"].astype(
        train_dd["item_cat_id"].dtype
    )
except Exception:
    med_price_pdf["item_cat_id"] = med_price_pdf["item_cat_id"].astype("object")
med_price_map = dict(
    zip(
        med_price_pdf["item_cat_id"].astype("object"), med_price_pdf["price_median_cat"]
    )
)
if np.isnan(clip_val):
    clip_val = 0.0


# 2) Функции фичей (meta + даункаст)
def add_price_features(df, med_map):
    df = df.assign(
        price_log=np.log1p(df["price"].fillna(0)).astype("float32"),
        price_median_cat=df["item_cat_id"]
        .astype("object")
        .map(med_map, meta=("price_median_cat", "float64"))
        .astype("float32"),
    )
    df["price_norm"] = (
        (df["price"] / df["price_median_cat"]).fillna(1.0).astype("float32")
    )
    return df.drop(columns=["price_median_cat"])


def add_ctr_features(df, clip_val):
    x = df["item_query_click_conv"]
    df["click_conv"] = x.fillna(0).clip(lower=0, upper=clip_val).astype("float32")
    return df


def add_text_features(df):
    # Быстро и векторизованно: длина слов регуляркой вместо split.apply(len)
    for col in ["query_text", "item_title", "item_description"]:
        s = df[col].fillna("")
        df[col + "_len_chars"] = s.str.len().astype("int32")
        df[col + "_len_words"] = s.str.count(r"\S+").astype("int32")
    return df


def add_match_features(df):
    # Каст к object только локально; результат — int8
    df["match_cat"] = (
        df["query_cat"].astype("object") == df["item_cat_id"].astype("object")
    ).astype("int8")
    df["match_mcat"] = (
        df["query_mcat"].astype("object") == df["item_mcat_id"].astype("object")
    ).astype("int8")
    df["match_loc"] = (
        df["query_loc"].astype("object") == df["item_loc"].astype("object")
    ).astype("int8")
    return df


# 3) Применяем лениво
train_dd = add_price_features(train_dd, med_price_map)
test_dd = add_price_features(test_dd, med_price_map)

train_dd = add_ctr_features(train_dd, clip_val)
test_dd = add_ctr_features(test_dd, clip_val)

train_dd = add_text_features(train_dd)
test_dd = add_text_features(test_dd)

train_dd = add_match_features(train_dd)
test_dd = add_match_features(test_dd)

# 4) Явный список фич (и даункаст таргета)
FEATURE_COLS = [
    "price_log",
    "price_norm",
    "click_conv",
    "query_text_len_chars",
    "query_text_len_words",
    "item_title_len_chars",
    "item_title_len_words",
    "item_description_len_chars",
    "item_description_len_words",
    "match_cat",
    "match_mcat",
    "match_loc",
]
ALL_COLS_TRAIN = ["query_id", "item_id", "item_contact"] + FEATURE_COLS
ALL_COLS_TEST = ["query_id", "item_id"] + FEATURE_COLS

train_dd["item_contact"] = train_dd["item_contact"].fillna(0).astype("int8")

# 5) Persist — чтобы не пересчитывать граф заново при записи
train_dd = train_dd[ALL_COLS_TRAIN].persist()
test_dd = test_dd[ALL_COLS_TEST].persist()

# 6) Запись напрямую из Dask → Parquet-директории (без .compute())
OUTPUT_DIR = "../data/transformed"
os.makedirs(OUTPUT_DIR, exist_ok=True)

train_out_dir = os.path.join(OUTPUT_DIR, "train-features")
test_out_dir = os.path.join(OUTPUT_DIR, "test-features")

# Пишем партиционированно; читать потом так: dd.read_parquet(train_out_dir)
with ProgressBar():
    dd.to_parquet(
        train_dd,
        train_out_dir,
        engine="pyarrow",
        write_index=False,
        compression="snappy",
        schema="infer",
    )
    dd.to_parquet(
        test_dd,
        test_out_dir,
        engine="pyarrow",
        write_index=False,
        compression="snappy",
        schema="infer",
    )

print("Saved dirs:", train_out_dir, test_out_dir)

Saved dirs: ../data/transformed/train-features ../data/transformed/test-features
