In [7]:
import numpy as np
import cv2
from pathlib import Path
from PIL import Image
from skimage.color import rgb2gray
from skimage.feature import hog
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline
from sklearn.model_selection import StratifiedKFold, ParameterGrid
from sklearn.metrics import accuracy_score
from joblib import dump, load
from tqdm import tqdm
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import time
import os
import gc

# configs
DATA_DIR = Path("../data/OCT2017 /train")  # (kept exactly as you said it's correct)
TARGET_SIZE = 128
IMG_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff")
BATCH_SIZE = 64  # tune for your RAM / disk
RANDOM_STATE = 192

CHECKPOINT_PATH = Path("checkpoints/tuning_state.pkl")
BEST_MODEL_PATH = Path("checkpoints/best_model.pkl")
REPORTS_DIR = Path("reports")
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
PDF_PATH = REPORTS_DIR / "feature_examples.pdf"

# dataloader etc
def load_and_preprocess_image(path, target_size=TARGET_SIZE):
    img = Image.open(path).convert("RGB")
    w, h = img.size
    scale = target_size / max(w, h)
    new_w, new_h = int(w * scale), int(h * scale)
    img = img.resize((new_w, new_h), Image.LANCZOS)
    canvas = Image.new("RGB", (target_size, target_size), (255, 255, 255))  # pad with white
    canvas.paste(img, ((target_size - new_w) // 2, (target_size - new_h) // 2))
    return np.asarray(canvas)

def enumerate_paths_and_labels(base_dir=DATA_DIR):
    classes = sorted([d.name for d in base_dir.iterdir() if d.is_dir()])
    label_map = {name: idx for idx, name in enumerate(classes)}
    paths, labels = [], []
    for name in classes:
        for p in sorted((base_dir / name).glob("*")):
            if p.suffix.lower() in IMG_EXTS:
                paths.append(p)
                labels.append(label_map[name])
    y = np.array(labels, dtype=np.int64)
    return paths, y, label_map

def batch_iterator(paths, indices=None, batch_size=BATCH_SIZE):
    if indices is None:
        indices = np.arange(len(paths))
    n = len(indices)
    for i in range(0, n, batch_size):
        idxs = indices[i:i+batch_size]
        imgs = [load_and_preprocess_image(paths[j]) for j in idxs]
        yield np.stack(imgs), idxs

# HOG
class HOGTransformer(BaseEstimator, TransformerMixin):
    """
    Converts (N, H, W, 3) uint8 images -> (N, F) HOG feature vectors (float64).
    """
    def __init__(self, orientations=9, pixels_per_cell=(8, 8), cells_per_block=(2, 2), block_norm="L2-Hys", visualize=False):
        self.orientations = orientations
        self.pixels_per_cell = pixels_per_cell
        self.cells_per_block = cells_per_block
        self.block_norm = block_norm
        self.visualize = visualize  # only used for making the PDF examples

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        feats = []
        for img in X:
            # normalize to float to avoid warnings & match expectations
            g = rgb2gray(img.astype(np.float32) / 255.0)
            f = hog(
                g,
                orientations=self.orientations,
                pixels_per_cell=self.pixels_per_cell,
                cells_per_block=self.cells_per_block,
                block_norm=self.block_norm,
                feature_vector=True
            )
            feats.append(f)
        return np.vstack(feats)

    def transform_with_viz(self, img_single):
        """Return (features, hog_image) for one image, to visualize in the PDF."""
        g = rgb2gray(img_single.astype(np.float32) / 255.0)
        f, hog_img = hog(
            g,
            orientations=self.orientations,
            pixels_per_cell=self.pixels_per_cell,
            cells_per_block=self.cells_per_block,
            block_norm=self.block_norm,
            visualize=True,
            feature_vector=True
        )
        return f, hog_img

# SIFT
class SIFTTransformer(BaseEstimator, TransformerMixin):
    """
    Extracts SIFT descriptors per image and pools to a fixed-length vector:
        feature = concat(mean(desc, axis=0), std(desc, axis=0), [count])
    Resulting dimension = 128 + 128 + 1 = 257
    """
    def __init__(self, n_features=0, contrastThreshold=0.04, edgeThreshold=10, sigma=1.6):
        # n_features=0 lets SIFT choose; you can clamp e.g., 500 to speed up
        self.n_features = n_features
        self.contrastThreshold = contrastThreshold
        self.edgeThreshold = edgeThreshold
        self.sigma = sigma
        self._sift = None

    def _ensure_sift(self):
        if self._sift is None:
            if not hasattr(cv2, "SIFT_create"):
                raise RuntimeError(
                    "SIFT not available in your OpenCV build. Install `opencv-contrib-python`."
                )
            self._sift = cv2.SIFT_create(
                nfeatures=self.n_features,
                contrastThreshold=self.contrastThreshold,
                edgeThreshold=self.edgeThreshold,
                sigma=self.sigma
            )

    def fit(self, X, y=None):
        self._ensure_sift()
        return self

    def _img_to_gray(self, img):
        if img.ndim == 3 and img.shape[2] == 3:
            return cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        return img

    def transform(self, X):
        self._ensure_sift()
        out = []
        for img in X:
            g = self._img_to_gray(img)
            kps, desc = self._sift.detectAndCompute(g, None)
            if desc is None or len(desc) == 0:
                # No keypoints: zeros
                mean = np.zeros(128, dtype=np.float32)
                std = np.zeros(128, dtype=np.float32)
                count = 0.0
            else:
                mean = desc.mean(axis=0)
                std = desc.std(axis=0)
                count = float(len(desc))
            vec = np.concatenate([mean, std, np.array([count], dtype=np.float32)], axis=0)
            out.append(vec.astype(np.float32))
        return np.vstack(out)

    def detect_keypoints(self, img_single):
        """Return keypoints for visualization purposes."""
        self._ensure_sift()
        g = self._img_to_gray(img_single)
        kps = self._sift.detect(g, None)
        return kps

# HOG+SIFT
class HOGSIFTConcatTransformer(BaseEstimator, TransformerMixin):
    """
    Computes HOG and SIFT features, then concatenates them:
      X -> [HOG(X) | SIFT(X)]
    Optional per-branch weights let you emphasize one feature family.
    """
    def __init__(
        self,
        hog_params=None,
        sift_params=None,
        hog_weight=1.0,
        sift_weight=1.0,
    ):
        self.hog_params = hog_params or {}
        self.sift_params = sift_params or {}
        self.hog_weight = float(hog_weight)
        self.sift_weight = float(sift_weight)
        self._hog = HOGTransformer(**self.hog_params)
        self._sift = SIFTTransformer(**self.sift_params)

    def fit(self, X, y=None):
        # Ensure sub-transformers are ready (SIFT alloc)
        self._hog.fit(X, y)
        self._sift.fit(X, y)
        return self

    def transform(self, X):
        H = self._hog.transform(X).astype(np.float32, copy=False)
        S = self._sift.transform(X).astype(np.float32, copy=False)
        if self.hog_weight != 1.0:
            H = H * self.hog_weight
        if self.sift_weight != 1.0:
            S = S * self.sift_weight
        return np.hstack([H, S]).astype(np.float32, copy=False)

def make_pipeline(
    feat_type="hog",
    hog_params=None,
    sift_params=None,
    pca_n=None,
    C=1.0,
    hog_weight=1.0,
    sift_weight=1.0,
):
    if feat_type == "hog":
        feat = HOGTransformer(**(hog_params or {}))
    elif feat_type == "sift":
        feat = SIFTTransformer(**(sift_params or {}))
    elif feat_type in ("hog+sift", "concat", "hog_sift"):
        feat = HOGSIFTConcatTransformer(
            hog_params=(hog_params or {}),
            sift_params=(sift_params or {}),
            hog_weight=hog_weight,
            sift_weight=sift_weight,
        )
    else:
        raise ValueError("feat_type must be 'hog', 'sift', or 'hog+sift'")

    steps = [("feat", feat), ("scaler", StandardScaler(with_mean=True))]
    if pca_n is not None:
        steps.append(("pca", PCA(n_components=pca_n, svd_solver="auto", random_state=RANDOM_STATE)))
    steps.append(("svm", LinearSVC(C=C, dual=False, max_iter=10000, random_state=RANDOM_STATE)))
    return Pipeline(steps)

def _feature_dim_of_transformer(transformer, example_img):
    """Probe feature dimensionality with one image."""
    f = transformer.transform(np.expand_dims(example_img, 0))
    return f.shape[1]

def _build_feats_memmap(n_rows, n_cols, dtype=np.float32):
    tmp_dir = Path("feature_cache")
    tmp_dir.mkdir(exist_ok=True)
    path = tmp_dir / f"feats_{n_rows}x{n_cols}_{int(time.time())}.dat"
    arr = np.memmap(path, mode="w+", dtype=dtype, shape=(n_rows, n_cols))
    return arr, path

def _extract_features_batched(paths, indices, transformer, probe_img, batch_size=BATCH_SIZE, pbar_desc=None,
                              pbar_position=2, pbar_leave=False):
    fdim = _feature_dim_of_transformer(transformer, probe_img)
    feats, path = _build_feats_memmap(len(indices), fdim, dtype=np.float32)
    write_ptr = 0
    desc = pbar_desc or f"Extract {type(transformer).__name__} (n={len(indices)})"
    with tqdm(total=len(indices), desc=desc, unit="img", position=pbar_position, leave=pbar_leave, dynamic_ncols=True) as pbar:
        for Xb, idxs in batch_iterator(paths, indices, batch_size):
            Fb = transformer.transform(Xb).astype(np.float32, copy=False)
            n = len(idxs)
            feats[write_ptr:write_ptr+n] = Fb
            write_ptr += n
            pbar.update(n)
    feats.flush()
    return feats, path

def fit_eval_pipeline(paths, y, params, cv_splits=5, random_state=RANDOM_STATE):
    """
    Train/validate one param set using stratified K-fold; returns mean accuracy and per-fold scores.
    Featurization happens in batches directly from disk paths.
    """
    feat_type = params["feat__type"]  # "hog" or "sift" or "hog+sift"

    if feat_type == "hog":
        feat_params = dict(
            orientations=params["hog__orientations"],
            pixels_per_cell=params["hog__pixels_per_cell"],
            cells_per_block=params["hog__cells_per_block"],
            block_norm=params.get("hog__block_norm", "L2-Hys"),
        )
        transformer = HOGTransformer(**feat_params)

    elif feat_type == "sift":
        feat_params = dict(
            n_features=params.get("sift__n_features", 0),
            contrastThreshold=params.get("sift__contrastThreshold", 0.04),
            edgeThreshold=params.get("sift__edgeThreshold", 10),
            sigma=params.get("sift__sigma", 1.6),
        )
        transformer = SIFTTransformer(**feat_params)

    elif feat_type in ("hog+sift", "concat", "hog_sift"):
        hogp = dict(
            orientations=params["hog__orientations"],
            pixels_per_cell=params["hog__pixels_per_cell"],
            cells_per_block=params["hog__cells_per_block"],
            block_norm=params.get("hog__block_norm", "L2-Hys"),
        )
        siftp = dict(
            n_features=params.get("sift__n_features", 0),
            contrastThreshold=params.get("sift__contrastThreshold", 0.04),
            edgeThreshold=params.get("sift__edgeThreshold", 10),
            sigma=params.get("sift__sigma", 1.6),
        )
        transformer = HOGSIFTConcatTransformer(
            hog_params=hogp,
            sift_params=siftp,
            hog_weight=params.get("concat__hog_weight", 1.0),
            sift_weight=params.get("concat__sift_weight", 1.0),
        )

    else:
        raise ValueError("Unknown feat__type")

    pca_n = params.get("pca__n_components", None)
    C = params["svm__C"]

    # For probing feature dim, grab one image
    probe_img = load_and_preprocess_image(paths[0])

    skf = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=random_state)
    fold_scores = []

    with tqdm(total=cv_splits, desc="CV folds", position=1, leave=False, dynamic_ncols=True) as foldbar:
        for fold_i, (train_idx, val_idx) in enumerate(skf.split(np.zeros(len(y)), y), start=1):
            transformer.fit(None)  # initialize

            Xtr_feats, tr_path = _extract_features_batched(
                paths, train_idx, transformer, probe_img,
                pbar_desc=f"Extract {type(transformer).__name__} [train f{fold_i}]", pbar_position=2,
                pbar_leave=False
            )
            Xval_feats, val_path = _extract_features_batched(
                paths, val_idx, transformer, probe_img,
                pbar_desc=f"Extract {type(transformer).__name__} [val   f{fold_i}]",
                pbar_position=3,
                pbar_leave=False
            )

            # Convert to ndarray views
            Xtr = np.asarray(Xtr_feats)
            Xval = np.asarray(Xval_feats)

            # Decide SVM dual based on dimensionality
            n_tr, d_tr = Xtr.shape
            dual = d_tr > n_tr

            steps = [("scaler", StandardScaler(with_mean=True))]
            # Clamp PCA to valid range
            use_pca_n = None
            if pca_n is not None:
                max_pca = min(n_tr, d_tr) - 1
                if max_pca >= 1 and pca_n <= max_pca:
                    use_pca_n = pca_n
                else:
                    tqdm.write(f"  -> Skipping PCA (n_components={pca_n} > max={max_pca})")
            if use_pca_n is not None:
                steps.append(("pca", PCA(n_components=use_pca_n, svd_solver="auto", random_state=random_state)))
            steps.append(("svm", LinearSVC(C=C, dual=dual, max_iter=10000, random_state=random_state)))
            clf = Pipeline(steps)

            clf.fit(Xtr, y[train_idx])
            preds = clf.predict(Xval)
            acc = accuracy_score(y[val_idx], preds)
            fold_scores.append(acc)

            # Clean up memmaps safely
            try:
                Xtr_feats.flush(); del Xtr_feats
                Xval_feats.flush(); del Xval_feats
                del Xtr, Xval
                gc.collect()
                os.remove(tr_path)
                os.remove(val_path)
            except Exception:
                pass

            foldbar.update(1)

    return float(np.mean(fold_scores)), fold_scores

def save_state(state_path, state):
    tmp = state_path.with_suffix(".tmp")
    dump(state, tmp)
    os.replace(tmp, state_path)

def load_state(state_path):
    return load(state_path)


In [8]:
# ===================== Post-tuning helpers (plots + final refit) =====================
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from pathlib import Path

def results_to_dataframe(state: dict) -> pd.DataFrame:
    """
    Flatten state['results'] (both successes and errors) into a tidy DataFrame.
    Keeps params as columns; tuples/lists are stringified for CSV friendliness.
    """
    rows = []
    for r in state.get("results", []):
        base = {
            "idx": r.get("idx"),
            "seconds": r.get("seconds", np.nan),
        }
        # Flatten params (tuples -> strings)
        params = r.get("params", {})
        for k, v in params.items():
            base[k] = str(tuple(v)) if isinstance(v, (tuple, list)) else v

        # Metrics or error
        if "mean_accuracy" in r:
            base["mean_accuracy"] = float(r["mean_accuracy"])
            fs = r.get("fold_scores", [])
            base["fold_scores"] = json.dumps([float(x) for x in fs])
            base["fold_mean"]   = float(np.mean(fs)) if fs else np.nan
            base["fold_std"]    = float(np.std(fs))  if fs else np.nan
        else:
            base["mean_accuracy"] = np.nan
            base["error"] = r.get("error", "Unknown error")

        rows.append(base)

    if not rows:
        raise ValueError("No results found in state['results'].")
    df = pd.DataFrame(rows).sort_values("idx").reset_index(drop=True)
    return df


def plot_search_figures(df: pd.DataFrame, outdir="reports") -> dict:
    """
    Generate a compact set of figures from the grid search results:
      - progress over evaluations
      - avg accuracy per feature family
      - accuracy vs C (log x), colored by PCA
      - runtime vs accuracy
      - HOG heatmap (orientations x pixels_per_cell) if available
    Saves one PDF + individual PNGs + a CSV with top 20 rows.

    Returns dict with saved paths.
    """
    outdir = Path(outdir)
    outdir.mkdir(parents=True, exist_ok=True)

    pdf_path = outdir / "gridsearch_summary.pdf"
    created_pngs = []

    plt.ioff()  # headless-friendly

    dff = df.dropna(subset=["mean_accuracy"]).copy()

    # A) progress
    fig = plt.figure(figsize=(8, 4))
    dff_sorted = dff.sort_values("idx")
    plt.plot(dff_sorted["idx"], dff_sorted["mean_accuracy"], marker="o", lw=1)
    plt.xlabel("Evaluation index")
    plt.ylabel("Mean CV accuracy")
    plt.title("Grid search progress")
    plt.grid(True, ls="--", alpha=0.3)
    created_pngs.append(("progress", fig))

    # B) accuracy by feature type
    if "feat__type" in dff.columns:
        fig = plt.figure(figsize=(7, 4))
        agg = dff.groupby("feat__type")["mean_accuracy"].agg(["mean", "max", "count"]).sort_values("mean", ascending=False)
        plt.bar(agg.index, agg["mean"])
        plt.ylabel("Mean CV accuracy")
        plt.title("Average accuracy per feature type")
        created_pngs.append(("acc_by_feat_type", fig))

    # C) accuracy vs C
    if "svm__C" in dff.columns:
        fig = plt.figure(figsize=(7, 5))
        sub = dff.copy()
        if "pca__n_components" in sub.columns:
            pcs = sorted([p for p in sub["pca__n_components"].dropna().unique()])
            for p in pcs:
                ss = sub[sub["pca__n_components"] == p]
                plt.scatter(ss["svm__C"], ss["mean_accuracy"], label=f"PCA={int(p)}", s=40)
            nopca = sub[sub["pca__n_components"].isna()]
            if not nopca.empty:
                plt.scatter(nopca["svm__C"], nopca["mean_accuracy"], label="PCA=None", s=50, marker="x")
            if pcs or not nopca.empty:
                plt.legend()
        else:
            plt.scatter(sub["svm__C"], sub["mean_accuracy"], s=40)
        try:
            plt.xscale("log")
        except Exception:
            pass
        plt.xlabel("C")
        plt.ylabel("Mean CV accuracy")
        plt.title("Accuracy vs C")
        plt.grid(True, ls="--", alpha=0.3)
        created_pngs.append(("acc_vs_C", fig))

    # D) runtime vs accuracy
    if "seconds" in dff.columns:
        fig = plt.figure(figsize=(7, 5))
        plt.scatter(dff["seconds"], dff["mean_accuracy"], s=40)
        plt.xlabel("Seconds per evaluation")
        plt.ylabel("Mean CV accuracy")
        plt.title("Runtime vs accuracy")
        plt.grid(True, ls="--", alpha=0.3)
        created_pngs.append(("time_vs_acc", fig))

    # E) HOG heatmap (max accuracy over cells_per_block/others)
    if set(["hog__orientations", "hog__pixels_per_cell"]).issubset(dff.columns):
        try:
            sub = dff.copy()
            sub["hog__pixels_per_cell"] = sub["hog__pixels_per_cell"].astype(str)
            piv = sub.pivot_table(
                index="hog__orientations",
                columns="hog__pixels_per_cell",
                values="mean_accuracy",
                aggfunc="max",
            )
            fig = plt.figure(figsize=(8, 5))
            im = plt.imshow(piv.values, aspect="auto")
            plt.colorbar(im, fraction=0.046, pad=0.04, label="Mean CV accuracy (max)")
            plt.xticks(np.arange(piv.shape[1]), piv.columns, rotation=0)
            plt.yticks(np.arange(piv.shape[0]), piv.index)
            plt.xlabel("pixels_per_cell")
            plt.ylabel("orientations")
            plt.title("HOG — max accuracy by (orientations × pixels_per_cell)")
            created_pngs.append(("hog_heatmap", fig))
        except Exception as e:
            print(f"[WARN] Skipped HOG heatmap: {e}")

    # Save one PDF with all figs + individual PNGs
    with PdfPages(pdf_path) as pdf:
        for name, fig in created_pngs:
            fig.tight_layout()
            pdf.savefig(fig)
            fig.savefig(outdir / f"{name}.png", dpi=150, bbox_inches="tight")
            plt.close(fig)

    # Top-20 table
    top20_path = outdir / "top20_results.csv"
    dff.sort_values("mean_accuracy", ascending=False).head(20).to_csv(top20_path, index=False)

    print(f"[PLOTS] Saved: {pdf_path}")
    return {
        "pdf": str(pdf_path),
        "pngs": [str(outdir / f"{name}.png") for name, _ in created_pngs],
        "top20_csv": str(top20_path),
    }


def extract_best_params(df: pd.DataFrame) -> dict:
    """Return the row (as dict) of the best mean_accuracy."""
    dff = df.dropna(subset=["mean_accuracy"])
    if dff.empty:
        raise ValueError("No successful runs with mean_accuracy.")
    best = dff.loc[dff["mean_accuracy"].idxmax()].to_dict()
    # Keep only hyperparameter keys (contain '__')
    best_params = {k: best[k] for k in best.keys() if "__" in k}
    # Convert stringified tuples back where relevant
    # (Only the ones your code expects as tuples)
    def _maybe_tuple(x):
        if isinstance(x, str) and x.startswith("(") and x.endswith(")"):
            try:
                return eval(x)
            except Exception:
                return x
        return x
    for k in list(best_params.keys()):
        best_params[k] = _maybe_tuple(best_params[k])
    return best_params


def refit_on_full(paths, y, best_params: dict, best_model_path="checkpoints/best_model.pkl"):
    """
    Recompute features on ALL data for the selected hyperparams and fit the
    scaler/[optional PCA]/LinearSVC pipeline once. Saves to best_model_path.
    """
    best_model_path = Path(best_model_path)
    best_model_path.parent.mkdir(parents=True, exist_ok=True)

    feat_type = best_params["feat__type"]

    if feat_type == "hog":
        feat_params = dict(
            orientations=best_params["hog__orientations"],
            pixels_per_cell=best_params["hog__pixels_per_cell"],
            cells_per_block=best_params["hog__cells_per_block"],
            block_norm=best_params.get("hog__block_norm", "L2-Hys"),
        )
        transformer = HOGTransformer(**feat_params)
        save_feat_params = feat_params

    elif feat_type == "sift":
        feat_params = dict(
            n_features=best_params.get("sift__n_features", 0),
            contrastThreshold=best_params.get("sift__contrastThreshold", 0.04),
            edgeThreshold=best_params.get("sift__edgeThreshold", 10),
            sigma=best_params.get("sift__sigma", 1.6),
        )
        transformer = SIFTTransformer(**feat_params)
        save_feat_params = feat_params

    elif feat_type in ("hog+sift", "concat", "hog_sift"):
        hogp = dict(
            orientations=best_params["hog__orientations"],
            pixels_per_cell=best_params["hog__pixels_per_cell"],
            cells_per_block=best_params["hog__cells_per_block"],
            block_norm=best_params.get("hog__block_norm", "L2-Hys"),
        )
        siftp = dict(
            n_features=best_params.get("sift__n_features", 0),
            contrastThreshold=best_params.get("sift__contrastThreshold", 0.04),
            edgeThreshold=best_params.get("sift__edgeThreshold", 10),
            sigma=best_params.get("sift__sigma", 1.6),
        )
        transformer = HOGSIFTConcatTransformer(
            hog_params=hogp,
            sift_params=siftp,
            hog_weight=best_params.get("concat__hog_weight", 1.0),
            sift_weight=best_params.get("concat__sift_weight", 1.0),
        )
        save_feat_params = {
            "hog_params": hogp, "sift_params": siftp,
            "hog_weight": best_params.get("concat__hog_weight", 1.0),
            "sift_weight": best_params.get("concat__sift_weight", 1.0),
        }
    else:
        raise ValueError(f"Unknown feat__type: {feat_type}")

    # Features on all data (batched)
    probe_img = load_and_preprocess_image(paths[0])
    transformer.fit(None)
    all_indices = np.arange(len(paths))
    X_feats, feats_path = _extract_features_batched(
        paths, all_indices, transformer, probe_img,
        pbar_desc="Extract features [full refit]",
        pbar_position=1, pbar_leave=False
    )

    X_all = np.asarray(X_feats)
    n_all, d_all = X_all.shape
    dual = d_all > n_all

    # Post-feature pipeline
    steps = [("scaler", StandardScaler(with_mean=True))]
    pca_n = best_params.get("pca__n_components", None)
    if pca_n is not None:
        max_pca = min(n_all, d_all) - 1
        if max_pca >= 1 and pca_n <= max_pca:
            steps.append(("pca", PCA(n_components=pca_n, svd_solver="auto", random_state=RANDOM_STATE)))
        else:
            print(f"[Refit] Skipping PCA (n_components={pca_n} > max={max_pca})")
    steps.append(("svm", LinearSVC(C=best_params["svm__C"], dual=dual, max_iter=10000, random_state=RANDOM_STATE)))
    final_pipe = Pipeline(steps)
    final_pipe.fit(X_all, y)

    dump({
        "feature_type": feat_type,
        "feature_params": save_feat_params,
        "post_feat_pipeline": final_pipe
    }, best_model_path)

    # Cleanup memmap
    try:
        X_feats.flush(); del X_feats; del X_all
        gc.collect()
        os.remove(feats_path)
    except Exception:
        pass

    print(f"[Refit] Saved final model to: {best_model_path}")
    return str(best_model_path)


In [9]:
state2 = load(CHECKPOINT_PATH)
df2 = results_to_dataframe(state2)
plot_search_figures(df2, REPORTS_DIR)

[PLOTS] Saved: reports/gridsearch_summary.pdf


{'pdf': 'reports/gridsearch_summary.pdf',
 'pngs': ['reports/progress.png',
  'reports/acc_by_feat_type.png',
  'reports/acc_vs_C.png',
  'reports/time_vs_acc.png',
  'reports/hog_heatmap.png'],
 'top20_csv': 'reports/top20_results.csv'}

In [12]:
print(df2.head())

   idx     seconds feat__type hog__block_norm hog__cells_per_block  \
0    0  208.174597        hog          L2-Hys               (2, 2)   
1    1  205.430000        hog          L2-Hys               (2, 2)   
2    2  206.480794        hog          L2-Hys               (2, 2)   
3    3  206.286557        hog          L2-Hys               (2, 2)   
4    4  202.956786        hog          L2-Hys               (2, 2)   

   hog__orientations hog__pixels_per_cell  pca__n_components  svm__C  \
0                6.0               (8, 8)                128   0.002   
1                6.0               (8, 8)                128   0.250   
2                6.0               (8, 8)                128   1.000   
3                6.0               (8, 8)                128   4.000   
4                6.0               (8, 8)                256   0.002   

   mean_accuracy                               fold_scores  fold_mean  \
0       0.757786  [0.7569477719214183, 0.7586248203162435]   0.757786   


In [11]:
df2.to_csv('gridsearch.csv', index=False)

In [14]:
sift_df = df2[df2["feat__type"] == "sift"]

In [15]:
best_sift = sift_df.loc[sift_df["mean_accuracy"].idxmax()]

In [17]:
best_sift[["sift__n_features", "sift__contrastThreshold",
           "sift__edgeThreshold", "sift__sigma", "mean_accuracy", "svm__C"]]

sift__n_features                0.0
sift__contrastThreshold        0.02
sift__edgeThreshold            15.0
sift__sigma                     1.2
mean_accuracy              0.771083
svm__C                         0.25
Name: 541, dtype: object