# HVSM Notebook: hvsm_prod_b.ipynb

- Runs with: slurm_scripts/hvsm_job_b.sh
- Purpose: GPU models (cuML) with CPU TF-IDF and tuning.



In [None]:
# Parameters (papermill)
DATA_DIR = "data"
TRAIN_CSV = "data/train.csv"
VAL_CSV = "data/val.csv"
TEST_CSV = "data/test.csv"

# HVSM: GPU TF-IDF + cuML LR/NB (tuning)

This notebook is a GPU-first rewrite using Polars + cuML, with random-search tuning on the validation split and expanded diagnostics.

**Inputs (strict):** `data/train.csv`, `data/val.csv`, `data/test.csv` in the `data/` folder. `data/test.csv` must have `id` and no `label`. The notebook creates `outputs/submission_hvsm_prod_b.csv`.


## Imports and guardrails

In [None]:
from __future__ import annotations
import os, re, warnings, json, hashlib
import gc
import time
import random
from typing import List
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import numpy as np
import polars as pl
from scipy import stats
import matplotlib.pyplot as plt
import scipy.sparse as sp
from sklearn.feature_extraction.text import (
    TfidfVectorizer as SkTfidfVectorizer,
)
from sklearn.model_selection import ParameterSampler

try:
    import cupy as cp
    import cupyx.scipy.sparse as cpx_sparse
    import cuml
    from cuml.linear_model import LogisticRegression
    from cuml.naive_bayes import MultinomialNB
except Exception as e:
    raise RuntimeError(
        "cuML + CUDA (cupy/cudf) required for GPU-first run."
    ) from e
try:
    import seaborn as sns
except Exception:
    sns = None
try:
    from textblob import TextBlob
except Exception:
    TextBlob = None
    warnings.warn("TextBlob missing; sentiment features set to zeros.")
np.set_printoptions(linewidth=79)
cuml.set_global_output_type("cupy")
RANDOM_SEED = 42
rng = np.random.default_rng(RANDOM_SEED)


## Configuration

In [None]:
@dataclass
class Config:
    tfidf_max_features: int = 5000
    tfidf_chunk_size: int = 5000
    proba_chunk_size: int = 20000
    tfidf_ngram_max: int = 7
    use_char_ngrams: bool = False
    min_df: int = 2
    kfolds: int = 3
    lr_iter: int = 8
    nb_iter: int = 6
    plot_level: str = "full"


CFG = Config()
print(CFG)

## Plotting helpers

In [None]:
def _gc() -> None:
    gc.collect()
    try:
        cp.get_default_memory_pool().free_all_blocks()
        try:
            cp.get_default_pinned_memory_pool().free_all_blocks()
        except Exception:
            pass
    except Exception:
        pass


_STEP_STARTS = {}


def log_step(msg: str) -> None:
    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{ts}] {msg}", flush=True)


def log_step_start(name: str) -> None:
    _STEP_STARTS[name] = time.perf_counter()
    log_step(f"START: {name}")


def log_step_end(name: str) -> None:
    start = _STEP_STARTS.pop(name, None)
    if start is None:
        log_step(f"END: {name}")
    else:
        elapsed = time.perf_counter() - start
        log_step(f"END: {name} (elapsed {elapsed:.1f}s)")
    try:
        cp.get_default_memory_pool().free_all_blocks()
        try:
            cp.get_default_pinned_memory_pool().free_all_blocks()
        except Exception:
            pass
    except Exception:
        pass




def _cp_asarray_with_backoff(
    arr,
    *,
    max_attempts: int = 16,
    base_sleep: float = 0.5,
    max_sleep: float = 60.0,
):
    for attempt in range(max_attempts):
        try:
            return cp.asarray(arr)
        except Exception as exc:
            msg = str(exc).lower()
            if "devicesunavailable" in msg or "busy or unavailable" in msg:
                if attempt >= max_attempts - 1:
                    raise
                sleep = min(max_sleep, base_sleep * (2 ** attempt))
                jitter = random.uniform(0, sleep * 0.3)
                wait = sleep + jitter
                log_step(
                    "CUDA busy/unavailable; retrying in "
                    f"{wait:.1f}s (attempt {attempt + 1}/{max_attempts})"
                )
                time.sleep(wait)
                continue
            raise

def predict_proba_chunks(model, X, chunk_size: int = 50000):
    n = X.shape[0]
    out = cp.empty(n, dtype=cp.float32)
    for start in range(0, n, chunk_size):
        end = min(start + chunk_size, n)
        out[start:end] = model.predict_proba(X[start:end])[:, 1]
        _gc()
    return out


def _to_numpy(x):
    if isinstance(x, np.ndarray):
        return x
    if hasattr(x, "get"):
        return x.get()
    return np.asarray(x)


def f1_score_np(y_true, y_pred) -> float:
    y_true = _to_numpy(y_true).astype(int)
    y_pred = _to_numpy(y_pred).astype(int)
    tp = int(((y_true == 1) & (y_pred == 1)).sum())
    fp = int(((y_true == 0) & (y_pred == 1)).sum())
    fn = int(((y_true == 1) & (y_pred == 0)).sum())
    precision = tp / (tp + fp + 1e-12)
    recall = tp / (tp + fn + 1e-12)
    return float(2 * precision * recall / (precision + recall + 1e-12))


def confusion_matrix_np(y_true, y_pred) -> np.ndarray:
    y_true = _to_numpy(y_true).astype(int)
    y_pred = _to_numpy(y_pred).astype(int)
    tp = int(((y_true == 1) & (y_pred == 1)).sum())
    tn = int(((y_true == 0) & (y_pred == 0)).sum())
    fp = int(((y_true == 0) & (y_pred == 1)).sum())
    fn = int(((y_true == 1) & (y_pred == 0)).sum())
    return np.array([[tn, fp], [fn, tp]])


def classification_report_np(y_true, y_pred) -> str:
    y_true = _to_numpy(y_true).astype(int)
    y_pred = _to_numpy(y_pred).astype(int)

    def _prf(label):
        tp = int(((y_true == label) & (y_pred == label)).sum())
        fp = int(((y_true != label) & (y_pred == label)).sum())
        fn = int(((y_true == label) & (y_pred != label)).sum())
        precision = tp / (tp + fp + 1e-12)
        recall = tp / (tp + fn + 1e-12)
        f1 = 2 * precision * recall / (precision + recall + 1e-12)
        support = int((y_true == label).sum())
        return precision, recall, f1, support

    p0, r0, f0, s0 = _prf(0)
    p1, r1, f1, s1 = _prf(1)
    acc = float((y_true == y_pred).mean())
    macro_p = (p0 + p1) / 2
    macro_r = (r0 + r1) / 2
    macro_f = (f0 + f1) / 2
    total = s0 + s1
    w_p = (p0 * s0 + p1 * s1) / max(total, 1)
    w_r = (r0 * s0 + r1 * s1) / max(total, 1)
    w_f = (f0 * s0 + f1 * s1) / max(total, 1)
    lines = [
        "              precision    recall  f1-score   support",
        f"           0       {p0:0.3f}      {r0:0.3f}      {f0:0.3f}      {s0:5d}",
        f"           1       {p1:0.3f}      {r1:0.3f}      {f1:0.3f}      {s1:5d}",
        "",
        f"    accuracy                           {acc:0.3f}      {total:5d}",
        f"   macro avg       {macro_p:0.3f}      {macro_r:0.3f}      {macro_f:0.3f}      {total:5d}",
        f"weighted avg       {w_p:0.3f}      {w_r:0.3f}      {w_f:0.3f}      {total:5d}",
    ]
    return "\n".join(lines)


def roc_curve_np(y_true, y_score):
    y_true = _to_numpy(y_true).astype(int)
    y_score = _to_numpy(y_score).astype(float)
    order = np.argsort(-y_score)
    y_true = y_true[order]
    y_score = y_score[order]
    tps = np.cumsum(y_true == 1)
    fps = np.cumsum(y_true == 0)
    tpr = tps / max(tps[-1], 1)
    fpr = fps / max(fps[-1], 1)
    thresholds = y_score
    return fpr, tpr, thresholds


def precision_recall_curve_np(y_true, y_score):
    y_true = _to_numpy(y_true).astype(int)
    y_score = _to_numpy(y_score).astype(float)
    order = np.argsort(-y_score)
    y_true = y_true[order]
    y_score = y_score[order]
    tps = np.cumsum(y_true == 1)
    fps = np.cumsum(y_true == 0)
    precision = tps / np.maximum(tps + fps, 1)
    recall = tps / max(tps[-1], 1)
    return precision, recall, y_score


def roc_auc_score_np(y_true, y_score) -> float:
    fpr, tpr, _ = roc_curve_np(y_true, y_score)
    return float(np.trapz(tpr, fpr))


def _tight() -> None:
    plt.tight_layout()


def qq_plot(residuals: np.ndarray, title: str) -> None:
    plt.figure(figsize=(5, 4))
    stats.probplot(residuals, dist="norm", plot=plt)
    plt.title(title)
    _tight()
    plt.show()


def residual_plot(y_true: np.ndarray, y_prob: np.ndarray, title: str) -> None:
    resid = y_true - y_prob
    plt.figure(figsize=(5, 4))
    plt.scatter(y_prob, resid, s=8)
    plt.axhline(0.0, linestyle="--")
    plt.xlabel("p(y=1)")
    plt.ylabel("residual")
    plt.title(title)
    _tight()
    plt.show()


def violin_by_label(
    df: pl.DataFrame, label_col: str, feat_col: str, title: str
) -> None:
    y = df.select(label_col).to_numpy().ravel()
    x = df.select(feat_col).to_numpy().ravel()
    if sns is None:
        plt.figure(figsize=(5, 4))
        plt.boxplot([x[y == 0], x[y == 1]], labels=["0", "1"])
        plt.title(title)
        _tight()
        plt.show()
        return
    plt.figure(figsize=(5, 4))
    sns.violinplot(x=y, y=x)
    plt.title(title)
    _tight()
    plt.show()


def plot_roc_pr(y_true: np.ndarray, y_prob: np.ndarray, title: str) -> None:
    fpr, tpr, _ = roc_curve_np(y_true, y_prob)
    prec, rec, _ = precision_recall_curve_np(y_true, y_prob)
    fig, ax = plt.subplots(1, 2, figsize=(10, 4))
    ax[0].plot(fpr, tpr)
    ax[0].set_title(f"ROC AUC={roc_auc_score_np(y_true, y_prob):.3f}")
    ax[0].set_xlabel("FPR")
    ax[0].set_ylabel("TPR")
    ax[1].plot(rec, prec)
    ax[1].set_title("Precision-Recall")
    ax[1].set_xlabel("Recall")
    ax[1].set_ylabel("Precision")
    _tight()
    plt.show()


def plot_confusion(y_true: np.ndarray, y_hat: np.ndarray, title: str) -> None:
    cm = confusion_matrix_np(y_true, y_hat)
    plt.figure(figsize=(4, 3))
    plt.imshow(cm, cmap="Blues")
    plt.title(title)
    plt.colorbar()
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, int(cm[i, j]), ha="center", va="center")
    plt.xlabel("Pred")
    plt.ylabel("True")
    _tight()
    plt.show()

## Processing and feature engineering

In [None]:
log_step_start("Processing and feature engineering")


def _ttr(text: str) -> float:
    words = re.findall(r"\S+", text.lower())
    return float(len(set(words)) / len(words)) if words else 0.0


def _sentiment(df: pl.DataFrame) -> pl.DataFrame:
    if TextBlob is None:
        return df.with_columns(
            [
                pl.lit(0.0).alias("sentiment_polarity"),
                pl.lit(0.0).alias("sentiment_subjectivity"),
            ]
        )

    def _polarity(x: str) -> float:
        return float(TextBlob(x).sentiment.polarity)

    def _subjectivity(x: str) -> float:
        return float(TextBlob(x).sentiment.subjectivity)

    return df.with_columns(
        [
            pl.col("text")
            .map_elements(_polarity, return_dtype=pl.Float64)
            .alias("sentiment_polarity"),
            pl.col("text")
            .map_elements(_subjectivity, return_dtype=pl.Float64)
            .alias("sentiment_subjectivity"),
        ]
    )


def process_text_file(filename: str) -> pl.DataFrame:
    df = pl.read_csv(os.path.join(filename))
    assert "text" in df.columns, "CSV must contain a text column."
    df = df.with_columns(pl.col("text").cast(pl.Utf8))
    df = df.with_columns(
        [
            pl.col("text").str.len_chars().alias("text_length"),
            pl.col("text").str.count_matches(r"\S+").alias("word_count"),
            pl.col("text")
            .str.count_matches(r"[.!?]+")
            .alias("sentence_count"),
            pl.col("text").str.count_matches(r"[^\w\s]").alias("punct_count"),
            pl.col("text").str.count_matches(r"\d").alias("digit_count"),
            pl.col("text").str.count_matches(r"[A-Z]").alias("upper_count"),
            pl.col("text").str.count_matches(r"!").alias("bangs"),
            pl.col("text").str.count_matches(r"\?").alias("questions"),
        ]
    )
    df = df.with_columns(
        [
            pl.when(pl.col("sentence_count") == 0)
            .then(1)
            .otherwise(pl.col("sentence_count"))
            .alias("sentence_count"),
            pl.when(pl.col("text_length") == 0)
            .then(1)
            .otherwise(pl.col("text_length"))
            .alias("text_length_safe"),
        ]
    )
    avg_sentence_expr = pl.col("word_count") / pl.col("sentence_count")
    punct_expr = pl.col("punct_count") / pl.col("text_length_safe")
    df = df.with_columns(
        [
            pl.when(avg_sentence_expr > 100)
            .then(100)
            .otherwise(avg_sentence_expr)
            .alias("avg_sentence_length"),
            pl.when(punct_expr > 0.3)
            .then(0.3)
            .otherwise(punct_expr)
            .alias("punct_ratio"),
            (pl.col("digit_count") / pl.col("text_length_safe")).alias(
                "digit_ratio"
            ),
            (pl.col("upper_count") / pl.col("text_length_safe")).alias(
                "upper_ratio"
            ),
            pl.col("text")
            .map_elements(_ttr, return_dtype=pl.Float64)
            .alias("ttr"),
        ]
    )
    df = df.drop(["digit_count", "upper_count", "text_length_safe"])
    return df


log_step_end("Processing and feature engineering")

## Load data

In [None]:
log_step_start("Load data")

from pathlib import Path
import hashlib

PROJECT_ROOT = Path.cwd()
if not (PROJECT_ROOT / DATA_DIR).exists():
    for parent in PROJECT_ROOT.parents:
        if (parent / DATA_DIR).exists():
            PROJECT_ROOT = parent
            break


def resolve_path(path_str: str) -> str:
    p = Path(path_str)
    if p.is_absolute():
        return str(p)
    if p.parent == Path("."):
        data_dir = Path(DATA_DIR)
        if not data_dir.is_absolute():
            data_dir = PROJECT_ROOT / data_dir
        candidate = data_dir / p.name
        if candidate.exists():
            return str(candidate)
    return str((PROJECT_ROOT / p).resolve())


# Reassemble chunked CSVs if needed
def ensure_chunked_csv(path: Path) -> None:
    if path.exists():
        return
    parts = sorted(path.parent.glob(path.name + ".part*"))
    if not parts:
        raise FileNotFoundError(f"Missing {path} and no chunk files found.")
    tmp_path = path.with_suffix(path.suffix + ".tmp")
    if tmp_path.exists():
        tmp_path.unlink()
    hasher = hashlib.sha256()
    with tmp_path.open("wb") as out:
        for part in parts:
            with part.open("rb") as f:
                while True:
                    chunk = f.read(1024 * 1024)
                    if not chunk:
                        break
                    out.write(chunk)
                    hasher.update(chunk)
    sha_path = path.with_suffix(path.suffix + ".sha256")
    if sha_path.exists():
        expected = sha_path.read_text().split()[0]
        actual = hasher.hexdigest()
        if expected != actual:
            tmp_path.unlink(missing_ok=True)
            raise ValueError(
                f"SHA256 mismatch for {path}: expected {expected} got {actual}"
            )
    tmp_path.replace(path)
    log_step(f"Reassembled {path} from {len(parts)} chunks.")


train_path = Path(resolve_path(TRAIN_CSV))
val_path = Path(resolve_path(VAL_CSV))
test_path = Path(resolve_path(TEST_CSV))
ensure_chunked_csv(train_path)
ensure_chunked_csv(val_path)
ensure_chunked_csv(test_path)

train = process_text_file(str(train_path))
val = process_text_file(str(val_path))
test = process_text_file(str(test_path))
assert "label" in train.columns and "label" in val.columns
assert "label" not in test.columns and "id" in test.columns
print("Rows:", train.height, val.height, test.height)
log_step_end("Load data")


## Sentiment features

In [None]:
log_step_start("Sentiment features")
train = _sentiment(train)
val = _sentiment(val)
test = _sentiment(test)
log_step_end("Sentiment features")

## Numeric and TF–IDF features

In [None]:
log_step_start("Numeric and TF–IDF features")
feature_cols: List[str] = [
    "text_length",
    "word_count",
    "ttr",
    "sentence_count",
    "avg_sentence_length",
    "punct_ratio",
    "sentiment_polarity",
    "sentiment_subjectivity",
    "digit_ratio",
    "upper_ratio",
    "bangs",
    "questions",
]
Xtr_basic = cpx_sparse.csr_matrix(
    _cp_asarray_with_backoff(train.select(feature_cols).to_numpy().astype(np.float32))
)
Xva_basic = cpx_sparse.csr_matrix(
    _cp_asarray_with_backoff(val.select(feature_cols).to_numpy().astype(np.float32))
)
Xte_basic = cpx_sparse.csr_matrix(
    _cp_asarray_with_backoff(test.select(feature_cols).to_numpy().astype(np.float32))
)
train_text = train["text"].to_list()
val_text = val["text"].to_list()
test_text = test["text"].to_list()
vec_word = SkTfidfVectorizer(
    ngram_range=(1, CFG.tfidf_ngram_max),
    max_features=CFG.tfidf_max_features,
    min_df=CFG.min_df,
    stop_words="english",
    dtype=np.float32,
)
vec_word.fit(train_text)
_gc()


def _transform_in_chunks_cpu(texts, chunk_size: int):
    n = len(texts)
    chunks = []
    for start in range(0, n, chunk_size):
        end = min(start + chunk_size, n)
        X_chunk = vec_word.transform(texts[start:end])
        chunks.append(X_chunk)
        _gc()
    if not chunks:
        return sp.csr_matrix((0, 0))
    if len(chunks) == 1:
        return chunks[0].tocsr()
    return sp.vstack(chunks).tocsr()


Xtr_w_cpu = _transform_in_chunks_cpu(train_text, CFG.tfidf_chunk_size)
Xva_w_cpu = _transform_in_chunks_cpu(val_text, CFG.tfidf_chunk_size)
Xte_w_cpu = _transform_in_chunks_cpu(test_text, CFG.tfidf_chunk_size)

del train_text, val_text, test_text
_gc()

Xtr_w = cpx_sparse.csr_matrix(Xtr_w_cpu)
Xva_w = cpx_sparse.csr_matrix(Xva_w_cpu)
Xte_w = cpx_sparse.csr_matrix(Xte_w_cpu)

del Xtr_w_cpu, Xva_w_cpu, Xte_w_cpu
_gc()

X_train = cpx_sparse.hstack([Xtr_basic, Xtr_w]).tocsr()
X_val = cpx_sparse.hstack([Xva_basic, Xva_w]).tocsr()
X_test = cpx_sparse.hstack([Xte_basic, Xte_w]).tocsr()

del Xtr_basic, Xva_basic, Xte_basic, Xtr_w, Xva_w, Xte_w
_gc()

y_train = _cp_asarray_with_backoff(train["label"].to_numpy()).astype(cp.int32)
y_val = _cp_asarray_with_backoff(val["label"].to_numpy()).astype(cp.int32)
print("Shapes:", X_train.shape, X_val.shape, X_test.shape)
_gc()
log_step_end("Numeric and TF–IDF features")

## Random-search tuning (GPU)


In [None]:
log_step_start("Random-search tuning (GPU)")


def _lr_space():
    return {"C": [0.5, 1.0, 2.0, 4.0]}


def _nb_space():
    return {"alpha": [0.01, 0.05, 0.1, 0.5, 1.0]}


def _search_signature(name, space, n_iter, random_state, X_shape):
    payload = {
        "name": name,
        "space": space,
        "n_iter": n_iter,
        "random_state": random_state,
        "X_shape": list(X_shape),
    }
    raw = json.dumps(payload, sort_keys=True, default=str)
    return hashlib.md5(raw.encode()).hexdigest()[:10]


def _load_results(results_path: Path):
    rows = []
    if not results_path.exists():
        return rows
    with results_path.open() as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                rows.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return rows


def _to_jsonable(obj):
    if isinstance(obj, dict):
        return {k: _to_jsonable(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [_to_jsonable(v) for v in obj]
    if isinstance(obj, np.generic):
        return obj.item()
    return obj


def tune_model(
    name: str,
    build_fn,
    space,
    X_tr,
    y_tr,
    X_va,
    y_va,
    n_iter: int,
    random_state: int,
):
    ckpt_root = Path(PROJECT_ROOT) if "PROJECT_ROOT" in globals() else Path.cwd()
    ckpt_dir = ckpt_root / "checkpoints"
    ckpt_dir.mkdir(parents=True, exist_ok=True)
    sig = _search_signature(name, space, n_iter, random_state, X_tr.shape)
    base = f"{name.lower()}_search_{sig}"
    candidates_path = ckpt_dir / f"{base}_candidates.json"
    results_path = ckpt_dir / f"{base}_results.jsonl"
    best_path = ckpt_dir / f"{base}_best.json"
    meta_path = ckpt_dir / f"{base}_meta.json"

    if candidates_path.exists():
        candidates = json.loads(candidates_path.read_text())
    else:
        candidates = list(
            ParameterSampler(space, n_iter=n_iter, random_state=random_state)
        )
        candidates_path.write_text(json.dumps(_to_jsonable(candidates), indent=2))

    meta_path.write_text(
        json.dumps(
            {
                "signature": sig,
                "name": name,
                "n_iter": n_iter,
                "random_state": random_state,
                "X_shape": list(X_tr.shape),
                "n_candidates": len(candidates),
            },
            indent=2,
        )
    )

    rows = _load_results(results_path)
    done = set()
    best_params, best_f1 = None, -1.0
    for row in rows:
        idx = row.get("iter_idx")
        if idx is not None:
            done.add(int(idx))
        if row.get("status") == "ok":
            f1 = float(row.get("f1", -1.0))
            if f1 > best_f1:
                best_f1 = f1
                best_params = row.get("params")

    if rows:
        log_step(
            f"{name} resume: {len(done)}/{len(candidates)} candidates done"
        )

    with results_path.open("a") as f:
        for i, params in enumerate(candidates):
            if i in done:
                continue
            iter_name = f"{name} iter {i + 1}/{len(candidates)}"
            log_step_start(iter_name)
            status = "ok"
            f1 = None
            error = None
            model = build_fn(**params)
            try:
                model.fit(X_tr, y_tr)
                p = model.predict_proba(X_va)[:, 1]
                f1 = float(_f1_from_proba(y_va, p))
            except Exception as exc:
                status = "fail"
                error = f"{type(exc).__name__}: {exc}"
            record = {
                "iter_idx": i,
                "status": status,
                "f1": f1,
                "params": _to_jsonable(params),
                "error": error,
            }
            f.write(json.dumps(record) + "
")
            f.flush()
            os.fsync(f.fileno())
            if status == "ok":
                if f1 > best_f1:
                    best_f1 = f1
                    best_params = _to_jsonable(params)
                    best_path.write_text(
                        json.dumps(
                            {
                                "best_f1": best_f1,
                                "best_params": best_params,
                            },
                            indent=2,
                        )
                    )
                log_step(f"{iter_name} f1={f1:.4f} best={best_f1:.4f}")
            else:
                log_step(f"{iter_name} failed: {error}")
            log_step_end(iter_name)
            del model
            _gc()

    print(f"Best {name}: {best_params} | F1={best_f1:.4f}")
    return best_params


lr_params = tune_model(
    "LR",
    lambda **p: LogisticRegression(max_iter=2000, **p),
    _lr_space(),
    X_train,
    y_train,
    X_val,
    y_val,
    CFG.lr_iter,
    random_state=RANDOM_SEED,
)
nb_params = tune_model(
    "NB",
    lambda **p: MultinomialNB(**p),
    _nb_space(),
    X_train,
    y_train,
    X_val,
    y_val,
    CFG.nb_iter,
    random_state=RANDOM_SEED + 17,
)
log_step_end("Random-search tuning (GPU)")


## Fit final models and calibrate

In [None]:
log_step_start("Fit final models and calibrate")


class PlattCalibrator:
    def __init__(self):
        self.model = LogisticRegression(max_iter=1000)

    def fit(self, scores, y):
        scores = _cp_asarray_with_backoff(scores).reshape(-1, 1)
        self.model.fit(scores, y)
        return self

    def predict_proba(self, scores):
        scores = _cp_asarray_with_backoff(scores).reshape(-1, 1)
        return self.model.predict_proba(scores)[:, 1]


X_trval = cpx_sparse.vstack([X_train, X_val]).tocsr()
y_trval = cp.concatenate([y_train, y_val])
lr_tuned = LogisticRegression(max_iter=2000, **lr_params)
nb_tuned = MultinomialNB(**nb_params)
log_step_start("Fold 1/1 (single split)")
log_step_start("LR fit")
lr_tuned.fit(X_trval, y_trval)
log_step_end("LR fit")
log_step_start("NB fit")
nb_tuned.fit(X_trval, y_trval)
log_step_end("NB fit")
log_step_end("Fold 1/1 (single split)")

cal_lr = PlattCalibrator().fit(lr_tuned.predict_proba(X_val)[:, 1], y_val)
cal_nb = PlattCalibrator().fit(nb_tuned.predict_proba(X_val)[:, 1], y_val)
_gc()
log_step_end("Fit final models and calibrate")

## Ensemble and threshold tuning on val

In [None]:
log_step_start("Ensemble and threshold tuning on val")
scores_lr = predict_proba_chunks(
    lr_tuned, X_val, chunk_size=CFG.proba_chunk_size
)
scores_nb = predict_proba_chunks(
    nb_tuned, X_val, chunk_size=CFG.proba_chunk_size
)
p_lr = _to_numpy(cal_lr.predict_proba(scores_lr))
p_nb = _to_numpy(cal_nb.predict_proba(scores_nb))
best_w, best_f1, best_thr = 0.5, -1.0, 0.5
for w in np.linspace(0.0, 1.0, 21):
    p = w * p_nb + (1.0 - w) * p_lr
    for thr in np.arange(0.1, 0.91, 0.01):
        yhat = (p >= thr).astype(int)
        f1 = f1_score_np(y_val, yhat)
        if f1 > best_f1:
            best_w, best_f1, best_thr = float(w), float(f1), float(thr)
print(f"Ensemble w={best_w:.2f} thr={best_thr:.2f} F1={best_f1:.4f}")
_gc()
log_step_end("Ensemble and threshold tuning on val")

## Validation diagnostics

In [None]:
log_step_start("Validation diagnostics")
p_ens = best_w * p_nb + (1.0 - best_w) * p_lr
yhat_val = (p_ens >= best_thr).astype(int)
print(classification_report_np(y_val, yhat_val))
residual_plot(_to_numpy(y_val), p_ens, "Residuals: validation ensemble")
qq_plot(_to_numpy(y_val) - p_ens, "QQ: residuals (validation)")
plot_roc_pr(_to_numpy(y_val), p_ens, "Validation ROC/PR (ensemble)")
plot_confusion(_to_numpy(y_val), yhat_val, "Confusion (validation)")
violin_by_label(train, "label", "text_length", "Text length by label")
log_step_end("Validation diagnostics")

## Coefficient snapshots


In [None]:
log_step_start("Coefficient snapshots")
try:
    if hasattr(lr_tuned, "coef_"):
        coef = _to_numpy(lr_tuned.coef_).ravel()
        idx = np.argsort(np.abs(coef))[::-1][:25]
        print("Top 25 |coef| for LR: indices and values:")
        for i in idx:
            print(i, float(coef[i]))
except Exception as e:
    warnings.warn(f"LR coef summary failed: {e}")
log_step_end("Coefficient snapshots")

## Predict on test and save submission

In [None]:
log_step_start("Predict on test and save submission")
scores_lr = predict_proba_chunks(
    lr_tuned, X_test, chunk_size=CFG.proba_chunk_size
)
scores_nb = predict_proba_chunks(
    nb_tuned, X_test, chunk_size=CFG.proba_chunk_size
)
p_lr_te = _to_numpy(cal_lr.predict_proba(scores_lr))
p_nb_te = _to_numpy(cal_nb.predict_proba(scores_nb))
p_ens_te = best_w * p_nb_te + (1.0 - best_w) * p_lr_te
yhat_te = (p_ens_te >= best_thr).astype(int)
submission = pl.DataFrame({"id": test["id"], "label": yhat_te})
outputs_dir = "outputs"
os.makedirs(outputs_dir, exist_ok=True)
submission_path = os.path.join(outputs_dir, "submission_hvsm_prod_b.csv")
submission.write_csv(submission_path)
print("Saved", submission_path, "with", submission.height, "rows")
_gc()
log_step_end("Predict on test and save submission")

## Final checks

In [None]:
log_step_start("Final checks")
assert set(submission["label"].to_list()).issubset({0, 1})
print("Done. All checks passed.")
log_step_end("Final checks")