In [2]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Protocol, Tuple
from typing import Iterator, Optional, Sequence, List

import numpy as np
import pandas as pd
import polars as pl

from catboost import CatBoostClassifier, Pool
from sklearn.metrics import average_precision_score
from sklearn.model_selection import GroupShuffleSplit
import pyarrow.parquet as pq


In [None]:
class Logger:
    def info(self, msg: str) -> None:
        print(f"[INFO] {msg}")

    def warn(self, msg: str) -> None:
        print(f"[WARN] {msg}")


@dataclass(frozen=True)
class Config:
    # paths
    data_dir: Path = Path("data/competition")
    train_file: str = "beauty_train.parquet"
    test_file: str = "beauty_test.parquet"

    # columns
    row_id: str = "id"
    id1: str = "id1"
    id2: str = "id2"
    target: str = "target"

    # features
    cat_features: Tuple[str, ...] = ("parentname1", "parentname2", "subjectname1", "subjectname2")
    text_features: Tuple[str, ...] = ("title1", "title2", "description1", "description2")

    # drop
    drop_cols: Tuple[str, ...] = ("characteristics1", "characteristics2")

    # split
    test_size: float = 0.2
    seed: int = 42

    # CatBoost
    task_type: str = "GPU"
    eval_metric: str = "PRAUC"
    verbose: int = 200

    # batching
    batch_size: int = 10_000

    # output
    submission_name: str = "submission.csv"


cfg = Config()
log = Logger()
cfg


In [None]:
class BatchSource(Protocol):
    def __iter__(self) -> Iterator[pd.DataFrame]: ...


@dataclass
class PyArrowParquetBatchIterator:
    """
    Iterator: выдаёт DataFrame батчами
    Adapter: адаптирует Parquet к pandas
    """
    path: Path
    batch_size: int = 10_000
    columns: Optional[Sequence[str]] = None
    drop_cols: Optional[Sequence[str]] = None

    def __iter__(self) -> Iterator[pd.DataFrame]:
        pf = pq.ParquetFile(str(self.path))
        cols: List[str] = list(self.columns) if self.columns is not None else list(pf.schema.names)

        if self.columns is None and self.drop_cols:
            drop_set = set(self.drop_cols)
            cols = [c for c in cols if c not in drop_set]

        for batch in pf.iter_batches(batch_size=self.batch_size, columns=cols):
            df = batch.to_pandas()
            if self.drop_cols:
                df = df.drop(columns=[c for c in self.drop_cols if c in df.columns], errors="ignore")
            yield df


def load_parquet_full(path: Path, drop_cols: Sequence[str] = ()) -> pd.DataFrame:
    pf = pq.ParquetFile(str(path))
    cols = [c for c in pf.schema.names if c not in set(drop_cols)]
    table = pf.read(columns=cols)
    return table.to_pandas()


In [None]:
class SimilarityStrategy(Protocol):
    def score(self, a: str, b: str) -> float: ...


class JaccardWordStrategy:
    def score(self, a: str, b: str) -> float:
        sa = set((a or "").lower().split())
        sb = set((b or "").lower().split())
        if not sa and not sb:
            return 1.0
        return len(sa & sb) / max(1, len(sa | sb))


class TokenSetLikeStrategy:
    def score(self, a: str, b: str) -> float:
        sa = set((a or "").lower().split())
        sb = set((b or "").lower().split())
        inter = len(sa & sb)
        union = len(sa | sb)
        if union == 0:
            return 1.0
        return min(1.0, 1.2 * inter / union)


class FeatureStrategy(Protocol):
    def transform(self, df_raw: pd.DataFrame, cfg: Config, is_train: bool) -> pd.DataFrame: ...


class BaselineFeatures:
    """Strategy: ничего не добавляем."""
    def transform(self, df_raw: pd.DataFrame, cfg: Config, is_train: bool) -> pd.DataFrame:
        return df_raw.copy()


class SimilarityAugmentFeatures:
    """Strategy: добавляем sim_title/sim_desc."""
    def __init__(self, sim: SimilarityStrategy):
        self.sim = sim

    def transform(self, df_raw: pd.DataFrame, cfg: Config, is_train: bool) -> pd.DataFrame:
        df = df_raw.copy()
        df["sim_title"] = [
            self.sim.score(a, b) for a, b in zip(df["title1"].astype(str), df["title2"].astype(str))
        ]
        df["sim_desc"] = [
            self.sim.score(a, b) for a, b in zip(df["description1"].astype(str), df["description2"].astype(str))
        ]
        return df


In [None]:
class Check(Protocol):
    def set_next(self, nxt: "Check") -> "Check": ...
    def handle(self, df: pd.DataFrame, cfg: Config) -> pd.DataFrame: ...


class BaseCheck:
    def __init__(self, log: Logger):
        self.log = log
        self._next: Optional[Check] = None

    def set_next(self, nxt: Check) -> Check:
        self._next = nxt
        return nxt

    def handle(self, df: pd.DataFrame, cfg: Config) -> pd.DataFrame:
        return self._next.handle(df, cfg) if self._next else df


class SchemaCheck(BaseCheck):
    def __init__(self, log: Logger, required: Sequence[str]):
        super().__init__(log)
        self.required = list(required)

    def handle(self, df: pd.DataFrame, cfg: Config) -> pd.DataFrame:
        missing = [c for c in self.required if c not in df.columns]
        if missing:
            raise ValueError(f"SchemaCheck failed. Missing: {missing}")
        self.log.info("SchemaCheck OK")
        return super().handle(df, cfg)


class NullRateCheck(BaseCheck):
    def __init__(self, log: Logger, cols: Sequence[str], max_null_rate: float = 0.35):
        super().__init__(log)
        self.cols = list(cols)
        self.max_null_rate = max_null_rate

    def handle(self, df: pd.DataFrame, cfg: Config) -> pd.DataFrame:
        for c in self.cols:
            if c not in df.columns:
                continue
            rate = float(df[c].isna().mean())
            if rate > self.max_null_rate:
                self.log.warn(f"NullRateCheck: {c} null_rate={rate:.3f} > {self.max_null_rate}")
        self.log.info("NullRateCheck OK (warnings possible)")
        return super().handle(df, cfg)


class LeakageHeuristicCheck(BaseCheck):
    def handle(self, df: pd.DataFrame, cfg: Config) -> pd.DataFrame:
        if df[cfg.row_id].duplicated().any():
            raise ValueError("LeakageHeuristicCheck: duplicated row_id")
        if (df[cfg.id1] == df[cfg.id2]).any():
            self.log.warn("LeakageHeuristicCheck: found id1==id2 rows (suspicious)")
        self.log.info("LeakageHeuristicCheck OK")
        return super().handle(df, cfg)


class ClassBalanceCheck(BaseCheck):
    def __init__(self, log: Logger, min_pos_rate: float = 0.001):
        super().__init__(log)
        self.min_pos_rate = min_pos_rate

    def handle(self, df: pd.DataFrame, cfg: Config) -> pd.DataFrame:
        pos_rate = float(df[cfg.target].astype(int).mean())
        if pos_rate < self.min_pos_rate:
            self.log.warn(f"ClassBalanceCheck: pos_rate too low = {pos_rate:.6f}")
        self.log.info(f"ClassBalanceCheck: pos_rate={pos_rate:.6f}")
        return super().handle(df, cfg)


In [None]:
class ModelBackend(Protocol):
    def fit(self, train_pool: Pool, valid_pool: Pool, cfg: Config) -> None: ...
    def predict_proba(self, pool: Pool) -> np.ndarray: ...


class CatBoostBackend(ModelBackend):
    def __init__(self, log: Logger):
        self.log = log
        self.model: Optional[CatBoostClassifier] = None

    def fit(self, train_pool: Pool, valid_pool: Pool, cfg: Config) -> None:
        self.model = CatBoostClassifier(
            loss_function="Logloss",
            eval_metric=cfg.eval_metric,
            task_type=cfg.task_type,
            random_seed=cfg.seed,
            verbose=cfg.verbose,
            iterations=5000,
            learning_rate=0.05,
            depth=8,
        )
        self.model.fit(train_pool, eval_set=valid_pool, early_stopping_rounds=200, use_best_model=True)

    def predict_proba(self, pool: Pool) -> np.ndarray:
        if self.model is None:
            raise RuntimeError("Model not fitted")
        return self.model.predict_proba(pool)[:, 1]


In [None]:
class BackendProxy(ModelBackend):
    def __init__(self, real: ModelBackend, log: Logger):
        self.real = real
        self.log = log
        self._cache: Dict[str, np.ndarray] = {}

    def fit(self, train_pool: Pool, valid_pool: Pool, cfg: Config) -> None:
        self.log.info("Proxy.fit -> backend.fit")
        self.real.fit(train_pool, valid_pool, cfg)
        self._cache.clear()

    def predict_proba(self, pool: Pool) -> np.ndarray:
        key = str(id(pool))
        if key in self._cache:
            self.log.info("Proxy.predict_proba cache hit")
            return self._cache[key]
        self.log.info("Proxy.predict_proba cache miss -> backend.predict_proba")
        pred = self.real.predict_proba(pool)
        self._cache[key] = pred
        return pred


In [None]:
class Trainer:
    def __init__(self, backend: ModelBackend, feature_strategy: FeatureStrategy, cfg: Config, log: Logger):
        self.backend = backend
        self.feature_strategy = feature_strategy
        self.cfg = cfg
        self.log = log

    def _pool(self, X: pd.DataFrame, y: Optional[pd.Series]) -> Pool:
        return Pool(
            X, y,
            cat_features=list(self.cfg.cat_features),
            text_features=list(self.cfg.text_features),
        )

    def _split_group_min_id(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
        groups = np.minimum(df[self.cfg.id1].astype(str).values, df[self.cfg.id2].astype(str).values)
        splitter = GroupShuffleSplit(n_splits=1, test_size=self.cfg.test_size, random_state=self.cfg.seed)
        tr_idx, va_idx = next(splitter.split(df, groups=groups))
        return df.iloc[tr_idx].copy(), df.iloc[va_idx].copy()

    def fit_validate(self, df_train_raw: pd.DataFrame) -> float:
        required = [self.cfg.row_id, self.cfg.id1, self.cfg.id2, self.cfg.target, *self.cfg.cat_features, *self.cfg.text_features]
        chain = SchemaCheck(self.log, required)
        chain.set_next(NullRateCheck(self.log, self.cfg.text_features, 0.35))\
             .set_next(LeakageHeuristicCheck(self.log))\
             .set_next(ClassBalanceCheck(self.log, 0.001))

        df_train_raw = chain.handle(df_train_raw, self.cfg)

        # Strategy
        df_train = self.feature_strategy.transform(df_train_raw, self.cfg, is_train=True)

        # Split anti-leak
        df_tr, df_va = self._split_group_min_id(df_train)

        y_tr = df_tr[self.cfg.target].astype(int)
        y_va = df_va[self.cfg.target].astype(int)

        X_tr = df_tr.drop(columns=[self.cfg.target, self.cfg.row_id, self.cfg.id1, self.cfg.id2])
        X_va = df_va.drop(columns=[self.cfg.target, self.cfg.row_id, self.cfg.id1, self.cfg.id2])

        train_pool = self._pool(X_tr, y_tr)
        valid_pool = self._pool(X_va, y_va)

        self.log.info(f"Fit: train={len(df_tr)} valid={len(df_va)}")
        self.backend.fit(train_pool, valid_pool, self.cfg)

        pred = self.backend.predict_proba(valid_pool)
        ap = float(average_precision_score(y_va.values, pred))
        base = float(y_va.mean())
        self.log.info(f"Valid PR-AUC(AP)={ap:.6f} baseline≈{base:.6f}")
        return ap


In [None]:
def predict_test_batched_to_csv(
    cfg: Config,
    log: Logger,
    backend: ModelBackend,
    feature_strategy: FeatureStrategy,
    test_source: BatchSource,
) -> Path:
    out_path = cfg.data_dir / cfg.submission_name
    first = True

    for df_raw in test_source:
        df_feat = feature_strategy.transform(df_raw, cfg, is_train=False)

        X = df_feat.drop(columns=[c for c in [cfg.target, cfg.row_id, cfg.id1, cfg.id2] if c in df_feat.columns])

        pool = Pool(
            X,
            cat_features=list(cfg.cat_features),
            text_features=list(cfg.text_features),
        )
        pred = backend.predict_proba(pool)

        sub_part = pd.DataFrame({cfg.row_id: df_raw[cfg.row_id].values, "y_pred": pred})
        sub_part.to_csv(out_path, mode="w" if first else "a", index=False, header=first)
        first = False

    log.info(f"Saved submission: {out_path}")
    return out_path


In [None]:
train_path = cfg.data_dir / cfg.train_file
test_path = cfg.data_dir / cfg.test_file

feat_strategy = SimilarityAugmentFeatures(sim=JaccardWordStrategy())

backend = BackendProxy(CatBoostBackend(log), log)

log.info("Loading train parquet (full)...")
df_train = load_parquet_full(train_path, drop_cols=cfg.drop_cols)
log.info(f"Train loaded: shape={df_train.shape}")

trainer = Trainer(backend=backend, feature_strategy=feat_strategy, cfg=cfg, log=log)

ap = trainer.fit_validate(df_train)
ap


In [None]:
log.info("Batched test inference...")
test_iter = PyArrowParquetBatchIterator(
    path=test_path,
    batch_size=cfg.batch_size,
    columns=None,
    drop_cols=cfg.drop_cols,
)

out_path = predict_test_batched_to_csv(cfg, log, backend, feat_strategy, test_iter)
out_path
