<a href="https://colab.research.google.com/github/GwenTsang/SVM/blob/main/20newsgroup_hyperparameters_optimizations_with_gpu_assistance.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from __future__ import annotations

import itertools
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple

import numpy as np

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.svm import LinearSVC
from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import accuracy_score, f1_score, classification_report


def make_calibrated(estimator, cv=5, method: str = "sigmoid", ensemble: Optional[bool] = None):
    """
    Sklearn compatibility wrapper:
    - supports estimator/base_estimator rename
    - supports ensemble param when available
    """
    kwargs = dict(cv=cv, method=method)
    if ensemble is not None:
        kwargs["ensemble"] = ensemble

    try:
        return CalibratedClassifierCV(estimator=estimator, **kwargs)
    except TypeError:
        # older sklearn
        kwargs.pop("ensemble", None)
        return CalibratedClassifierCV(base_estimator=estimator, **kwargs)


def weighted_vote_proba(probas: Sequence[np.ndarray], weights: Tuple[int, ...]) -> np.ndarray:
    w = np.asarray(weights, dtype=np.float32)
    w /= w.sum()
    # stack -> (n_models, n_samples, n_classes)
    P = np.stack(probas, axis=0).astype(np.float32, copy=False)
    # weighted sum over models axis
    return np.tensordot(w, P, axes=(0, 0))


def _maybe_gpu_logreg_proba(
    X_train_csr,
    y_train: np.ndarray,
    X_pred_csr,
    *,
    C: float,
    max_iter: int,
    tol: float,
) -> Optional[np.ndarray]:
    """
    Optional GPU path using cuML + CuPy.
    If anything fails (missing libs or unsupported sparse), returns None.
    """
    try:
        import cupy as cp
        import cupyx.scipy.sparse as cpx_sp
        from cuml.linear_model import LogisticRegression as cuLR
    except Exception:
        return None

    try:
        Xg_tr = cpx_sp.csr_matrix(X_train_csr)
        yg_tr = cp.asarray(y_train, dtype=cp.int32)
        Xg_pr = cpx_sp.csr_matrix(X_pred_csr)

        # cuML API is sklearn-like, but exact params vary by version
        model = cuLR(
            C=float(C),
            penalty="l2",
            max_iter=int(max_iter),
            tol=float(tol),
            fit_intercept=True,
            verbose=0,
        )
        model.fit(Xg_tr, yg_tr)
        Pg = model.predict_proba(Xg_pr)
        return cp.asnumpy(Pg)
    except Exception:
        return None


@dataclass(frozen=True)
class VecCfg:
    ngram_range: Tuple[int, int]
    min_df: int
    max_df: float
    analyzer: str  # "word" or "char_wb"


def _build_vectorizer(cfg: VecCfg, *, max_features: Optional[int]) -> TfidfVectorizer:
    base = dict(
        strip_accents="unicode",
        lowercase=True,
        sublinear_tf=True,
        dtype=np.float32,
        max_features=max_features,
    )
    if cfg.analyzer == "word":
        base.update(dict(stop_words="english", analyzer="word"))
    else:
        base.update(dict(analyzer="char_wb"))
    return TfidfVectorizer(
        **base,
        ngram_range=cfg.ngram_range,
        min_df=cfg.min_df,
        max_df=cfg.max_df,
    )


def _two_stage_select_linear_svc(
    X_fit_text: Sequence[str],
    y_fit: np.ndarray,
    X_tune_text: Sequence[str],
    y_tune: np.ndarray,
    *,
    cfgs: List[VecCfg],
    C_grid: np.ndarray,
    top_k_cfg: int,
    max_features: Optional[int],
    random_state: int,
) -> Tuple[VecCfg, float]:
    """
    Fast selection:
    1) score each vectorizer cfg with baseline C=1
    2) keep top-k cfgs, sweep C on them
    """
    baseline_C = 1.0
    scored: List[Tuple[float, VecCfg, Any, Any]] = []

    for cfg in cfgs:
        vec = _build_vectorizer(cfg, max_features=max_features)
        Xf = vec.fit_transform(X_fit_text)
        Xt = vec.transform(X_tune_text)

        clf = LinearSVC(C=baseline_C, dual="auto", max_iter=2000, random_state=random_state)
        clf.fit(Xf, y_fit)
        acc = accuracy_score(y_tune, clf.predict(Xt))
        scored.append((acc, cfg, Xf, Xt))

    scored.sort(key=lambda t: t[0], reverse=True)
    top = scored[: max(1, top_k_cfg)]

    best_acc = -1.0
    best_cfg = top[0][1]
    best_C = float(baseline_C)

    for _, cfg, Xf, Xt in top:
        for C in C_grid:
            clf = LinearSVC(C=float(C), dual="auto", max_iter=2000, random_state=random_state)
            clf.fit(Xf, y_fit)
            acc = accuracy_score(y_tune, clf.predict(Xt))
            if acc > best_acc:
                best_acc = acc
                best_cfg = cfg
                best_C = float(C)

    return best_cfg, best_C


def _two_stage_select_logreg(
    X_fit_text: Sequence[str],
    y_fit: np.ndarray,
    X_tune_text: Sequence[str],
    y_tune: np.ndarray,
    *,
    cfgs: List[VecCfg],
    C_grid: np.ndarray,
    top_k_cfg: int,
    max_features: Optional[int],
    random_state: int,
    prefer_gpu: bool,
) -> Tuple[VecCfg, float]:
    baseline_C = 1.0
    scored: List[Tuple[float, VecCfg, Any, Any]] = []

    for cfg in cfgs:
        vec = _build_vectorizer(cfg, max_features=max_features)
        Xf = vec.fit_transform(X_fit_text)
        Xt = vec.transform(X_tune_text)

        # Try GPU first (if asked); otherwise CPU directly
        Pg = _maybe_gpu_logreg_proba(Xf, y_fit, Xt, C=baseline_C, max_iter=1500, tol=1e-3) if prefer_gpu else None
        if Pg is not None:
            pred = Pg.argmax(axis=1)
            acc = accuracy_score(y_tune, pred)
        else:
            clf = LogisticRegression(
                solver="saga",
                penalty="l2",
                C=float(baseline_C),
                max_iter=1500,
                tol=1e-3,
                n_jobs=-1,
                multi_class="multinomial",
                random_state=random_state,
            )
            clf.fit(Xf, y_fit)
            acc = accuracy_score(y_tune, clf.predict(Xt))

        scored.append((acc, cfg, Xf, Xt))

    scored.sort(key=lambda t: t[0], reverse=True)
    top = scored[: max(1, top_k_cfg)]

    best_acc = -1.0
    best_cfg = top[0][1]
    best_C = float(baseline_C)

    for _, cfg, Xf, Xt in top:
        for C in C_grid:
            Pg = _maybe_gpu_logreg_proba(Xf, y_fit, Xt, C=float(C), max_iter=2000, tol=1e-3) if prefer_gpu else None
            if Pg is not None:
                pred = Pg.argmax(axis=1)
                acc = accuracy_score(y_tune, pred)
            else:
                clf = LogisticRegression(
                    solver="saga",
                    penalty="l2",
                    C=float(C),
                    max_iter=2000,
                    tol=1e-3,
                    n_jobs=-1,
                    multi_class="multinomial",
                    random_state=random_state,
                )
                clf.fit(Xf, y_fit)
                acc = accuracy_score(y_tune, clf.predict(Xt))

            if acc > best_acc:
                best_acc = acc
                best_cfg = cfg
                best_C = float(C)

    return best_cfg, best_C


def main(random_state: int = 21, prefer_gpu_for_lr: bool = True) -> None:
    # 1) Load dataset
    data_train = fetch_20newsgroups(
        subset="train",
        remove=("headers", "footers", "quotes"),
        random_state=random_state,
    )
    data_test = fetch_20newsgroups(
        subset="test",
        remove=("headers", "footers", "quotes"),
        random_state=random_state,
    )

    X_train, y_train = data_train.data, data_train.target
    X_test, y_test = data_test.data, data_test.target

    print("Sizes:", (len(X_train), len(X_test), len(y_test)))

    # 2) Splits:
    #    - X_fit: used to fit candidates
    #    - X_tune: used for fast selection of vectorizer/C
    #    - then split tune into calibration + validation for ensemble weights
    X_fit, X_tune, y_fit, y_tune = train_test_split(
        X_train,
        y_train,
        test_size=0.10,
        random_state=random_state,
        stratify=y_train,
    )
    X_cal, X_val, y_cal, y_val = train_test_split(
        X_tune,
        y_tune,
        test_size=0.50,
        random_state=random_state,
        stratify=y_tune,
    )

    # 3) Search spaces (kept close to your original, but tuned for speed)
    word_cfgs = [
        VecCfg(ngram_range=ng, min_df=md, max_df=xd, analyzer="word")
        for ng in [(1, 1), (1, 2)]
        for md in [1, 2, 5]
        for xd in [0.85, 0.95, 1.0]
    ]
    char_cfgs = [
        VecCfg(ngram_range=ng, min_df=md, max_df=xd, analyzer="char_wb")
        for ng in [(3, 5), (3, 6), (4, 7)]
        for md in [1, 2, 5]
        for xd in [0.90, 0.98, 1.0]
    ]

    # Smaller C grids than your original randomized searches (fewer fits)
    C_svm = np.logspace(-1, 1, 8)   # 0.1 .. 10
    C_lr = np.logspace(-1, 1, 7)    # 0.1 .. 10

    # max_features caps vocab size (big speed win). Increase if you want max accuracy.
    max_features_word = 250_000
    max_features_char = 400_000

    top_k_cfg = 5

    print("\nSelecting best WORD LinearSVC (fast holdout)...")
    best_word_cfg, best_word_C = _two_stage_select_linear_svc(
        X_fit, y_fit, X_cal, y_cal,
        cfgs=word_cfgs, C_grid=C_svm, top_k_cfg=top_k_cfg,
        max_features=max_features_word, random_state=random_state
    )
    print("Best word SVM:", best_word_cfg, "C=", best_word_C)

    print("\nSelecting best CHAR LinearSVC (fast holdout)...")
    best_char_cfg, best_char_C = _two_stage_select_linear_svc(
        X_fit, y_fit, X_cal, y_cal,
        cfgs=char_cfgs, C_grid=C_svm, top_k_cfg=top_k_cfg,
        max_features=max_features_char, random_state=random_state
    )
    print("Best char SVM:", best_char_cfg, "C=", best_char_C)

    print("\nSelecting best WORD LogisticRegression (fast holdout, optional GPU)...")
    best_lr_cfg, best_lr_C = _two_stage_select_logreg(
        X_fit, y_fit, X_cal, y_cal,
        cfgs=word_cfgs, C_grid=C_lr, top_k_cfg=top_k_cfg,
        max_features=max_features_word, random_state=random_state,
        prefer_gpu=prefer_gpu_for_lr
    )
    print("Best word LR:", best_lr_cfg, "C=", best_lr_C)

    # 4) Fit models on X_fit, calibrate SVMs on X_cal, tune ensemble weights on X_val
    # Word SVM
    vec_word = _build_vectorizer(best_word_cfg, max_features=max_features_word)
    Xf_word = vec_word.fit_transform(X_fit)
    Xcal_word = vec_word.transform(X_cal)
    Xval_word = vec_word.transform(X_val)

    base_word_svm = LinearSVC(C=best_word_C, dual="auto", max_iter=4000, random_state=random_state)
    base_word_svm.fit(Xf_word, y_fit)
    cal_word_svm = make_calibrated(base_word_svm, cv="prefit", method="sigmoid")
    cal_word_svm.fit(Xcal_word, y_cal)

    # Char SVM
    vec_char = _build_vectorizer(best_char_cfg, max_features=max_features_char)
    Xf_char = vec_char.fit_transform(X_fit)
    Xcal_char = vec_char.transform(X_cal)
    Xval_char = vec_char.transform(X_val)

    base_char_svm = LinearSVC(C=best_char_C, dual="auto", max_iter=4000, random_state=random_state)
    base_char_svm.fit(Xf_char, y_fit)
    cal_char_svm = make_calibrated(base_char_svm, cv="prefit", method="sigmoid")
    cal_char_svm.fit(Xcal_char, y_cal)

    # Word LR
    vec_lr = _build_vectorizer(best_lr_cfg, max_features=max_features_word)
    Xf_lr = vec_lr.fit_transform(X_fit)
    Xval_lr = vec_lr.transform(X_val)

    # Try GPU for final tuning-time LR too (falls back automatically)
    P_lr_val = _maybe_gpu_logreg_proba(Xf_lr, y_fit, Xval_lr, C=best_lr_C, max_iter=2500, tol=1e-3) if prefer_gpu_for_lr else None
    if P_lr_val is None:
        lr = LogisticRegression(
            solver="saga",
            penalty="l2",
            C=best_lr_C,
            max_iter=2500,
            tol=1e-3,
            n_jobs=-1,
            multi_class="multinomial",
            random_state=random_state,
        )
        lr.fit(Xf_lr, y_fit)
        P_lr_val = lr.predict_proba(Xval_lr)
    else:
        lr = None  # will refit later on full train

    # Validation metrics (on X_val)
    val_pred_word = cal_word_svm.predict(Xval_word)
    val_pred_char = cal_char_svm.predict(Xval_char)
    val_pred_lr = P_lr_val.argmax(axis=1)

    print("\nValidation accuracy (X_val):")
    print("  word SVM (cal):", accuracy_score(y_val, val_pred_word))
    print("  char SVM (cal):", accuracy_score(y_val, val_pred_char))
    print("  word LR      :", accuracy_score(y_val, val_pred_lr))

    # 5) Tune ensemble weights on X_val
    P_word_val = cal_word_svm.predict_proba(Xval_word)
    P_char_val = cal_char_svm.predict_proba(Xval_char)

    candidates = [1, 2, 3, 4]
    best_w = (1, 1, 1)
    best_acc = -1.0
    for w in itertools.product(candidates, repeat=3):
        P = weighted_vote_proba([P_word_val, P_char_val, P_lr_val], w)
        pred = P.argmax(axis=1)
        acc = accuracy_score(y_val, pred)
        if acc > best_acc:
            best_acc = acc
            best_w = w

    print("\nBest ensemble weights (wordSVM, charSVM, wordLR):", best_w)
    print("Validation accuracy (ensemble):", best_acc)

    # 6) FINAL fit on full training, with *CV calibration* (still much less total work now)
    #    We refit vectorizers on full X_train for best test performance.
    print("\nRefitting final models on full training set...")

    # Word SVM final
    final_vec_word = _build_vectorizer(best_word_cfg, max_features=max_features_word)
    Xtr_word = final_vec_word.fit_transform(X_train)
    Xte_word = final_vec_word.transform(X_test)
    final_word_svm = LinearSVC(C=best_word_C, dual="auto", max_iter=6000, random_state=random_state)

    # cv=3 is a good speed/quality tradeoff; ensemble=False usually reduces overhead a bit
    final_cal_word = make_calibrated(final_word_svm, cv=3, method="sigmoid", ensemble=False)
    final_cal_word.fit(Xtr_word, y_train)
    P_word_test = final_cal_word.predict_proba(Xte_word)

    # Char SVM final
    final_vec_char = _build_vectorizer(best_char_cfg, max_features=max_features_char)
    Xtr_char = final_vec_char.fit_transform(X_train)
    Xte_char = final_vec_char.transform(X_test)
    final_char_svm = LinearSVC(C=best_char_C, dual="auto", max_iter=6000, random_state=random_state)
    final_cal_char = make_calibrated(final_char_svm, cv=3, method="sigmoid", ensemble=False)
    final_cal_char.fit(Xtr_char, y_train)
    P_char_test = final_cal_char.predict_proba(Xte_char)

    # Word LR final (GPU optional)
    final_vec_lr = _build_vectorizer(best_lr_cfg, max_features=max_features_word)
    Xtr_lr = final_vec_lr.fit_transform(X_train)
    Xte_lr = final_vec_lr.transform(X_test)

    P_lr_test = _maybe_gpu_logreg_proba(Xtr_lr, y_train, Xte_lr, C=best_lr_C, max_iter=3000, tol=1e-3) if prefer_gpu_for_lr else None
    if P_lr_test is None:
        final_lr = LogisticRegression(
            solver="saga",
            penalty="l2",
            C=best_lr_C,
            max_iter=3000,
            tol=1e-3,
            n_jobs=-1,
            multi_class="multinomial",
            random_state=random_state,
        )
        final_lr.fit(Xtr_lr, y_train)
        P_lr_test = final_lr.predict_proba(Xte_lr)

    # 7) Test evaluation
    P_ens_test = weighted_vote_proba([P_word_test, P_char_test, P_lr_test], best_w)
    test_pred = P_ens_test.argmax(axis=1)

    acc = accuracy_score(y_test, test_pred)
    f1m = f1_score(y_test, test_pred, average="macro")

    print("\nTEST RESULTS")
    print("Accuracy :", acc)
    print("Macro F1 :", f1m)
    print("\nClassification report:\n")
    print(classification_report(y_test, test_pred, target_names=data_test.target_names))


if __name__ == "__main__":
    main(random_state=21, prefer_gpu_for_lr=True)

Sizes: (11314, 7532, 7532)

Selecting best WORD LinearSVC (fast holdout)...
Best word SVM: VecCfg(ngram_range=(1, 2), min_df=1, max_df=0.85, analyzer='word') C= 0.7196856730011519

Selecting best CHAR LinearSVC (fast holdout)...
Best char SVM: VecCfg(ngram_range=(3, 5), min_df=1, max_df=0.9, analyzer='char_wb') C= 0.372759372031494

Selecting best WORD LogisticRegression (fast holdout, optional GPU)...
Best word LR: VecCfg(ngram_range=(1, 1), min_df=1, max_df=0.85, analyzer='word') C= 10.0





Validation accuracy (X_val):
  word SVM (cal): 0.7879858657243817
  char SVM (cal): 0.7685512367491166
  word LR      : 0.7703180212014135

Best ensemble weights (wordSVM, charSVM, wordLR): (3, 2, 4)
Validation accuracy (ensemble): 0.7985865724381626

Refitting final models on full training set...

TEST RESULTS
Accuracy : 0.7140201805629315
Macro F1 : 0.7023330470079996

Classification report:

                          precision    recall  f1-score   support

             alt.atheism       0.54      0.50      0.52       319
           comp.graphics       0.67      0.74      0.70       389
 comp.os.ms-windows.misc       0.68      0.62      0.65       394
comp.sys.ibm.pc.hardware       0.69      0.67      0.68       392
   comp.sys.mac.hardware       0.75      0.71      0.73       385
          comp.windows.x       0.86      0.72      0.78       395
            misc.forsale       0.79      0.79      0.79       390
               rec.autos       0.52      0.78      0.63       396
      