# Advanced KNN from Scratch (NumPy core)
This notebook extends the NumPy-only KNN you already requested with **four advanced upgrades**:

1. **Learn k per query (adaptive k):** automatically chooses neighborhood size per test point.
2. **Prototype condensation:** reduces training set size using **CNN (Condensed Nearest Neighbor)** and optional **ENN (Edited Nearest Neighbor)** cleaning.
3. **Approximate neighbors:** fast candidate retrieval via **Random Hyperplane LSH** (multiple hash tables), then exact KNN on candidates.
4. **Calibrated conformal prediction:** split calibration set, compute quantile threshold, output **set-valued predictions** with coverage control.

Dataset demo uses **UCI Mammographic Mass** (961 instances, missing values). citeturn3view0


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import urllib.request
from pathlib import Path

def set_seed(seed: int = 42) -> None:
    np.random.seed(seed)

set_seed(42)


## 1) Load + preprocess Mammographic Mass (NumPy only)

In [None]:
DATA_URL = "https://archive.ics.uci.edu/ml/machine-learning-databases/mammographic-masses/mammographic_masses.data"
local_path = Path("mammographic_masses.data")

if not local_path.exists():
    urllib.request.urlretrieve(DATA_URL, local_path.as_posix())

raw = local_path.read_text().strip().splitlines()

def parse_mammographic_mass(lines):
    X_list, y_list = [], []
    for line in lines:
        parts = [p.strip() for p in line.split(",")]
        if len(parts) != 6:
            continue
        *feat, target = parts
        row = [np.nan if v == "?" else float(v) for v in feat]
        if target == "?":
            continue
        X_list.append(row)
        y_list.append(int(float(target)))
    return np.array(X_list, dtype=float), np.array(y_list, dtype=int)

X_raw, y = parse_mammographic_mass(raw)
X_raw.shape, np.bincount(y), np.mean(np.isnan(X_raw), axis=0)


In [None]:
def train_test_split_np(X, y, test_size=0.2, seed=42, stratify=True):
    rng = np.random.default_rng(seed)
    n = X.shape[0]
    idx = np.arange(n)

    if stratify:
        idx0 = idx[y == 0]
        idx1 = idx[y == 1]
        rng.shuffle(idx0); rng.shuffle(idx1)

        n_test0 = int(round(len(idx0) * test_size))
        n_test1 = int(round(len(idx1) * test_size))
        test_idx = np.concatenate([idx0[:n_test0], idx1[:n_test1]])
        train_idx = np.setdiff1d(idx, test_idx, assume_unique=False)

        rng.shuffle(train_idx); rng.shuffle(test_idx)
    else:
        rng.shuffle(idx)
        n_test = int(round(n * test_size))
        test_idx = idx[:n_test]
        train_idx = idx[n_test:]

    return X[train_idx], X[test_idx], y[train_idx], y[test_idx]

def nanmedian_impute_fit(X):
    return np.nanmedian(X, axis=0)

def nanmedian_impute_transform(X, med):
    X2 = X.copy()
    for j in range(X.shape[1]):
        m = np.isnan(X2[:, j])
        X2[m, j] = med[j]
    return X2

def standardize_fit(X):
    mu = np.mean(X, axis=0)
    sigma = np.std(X, axis=0)
    sigma = np.where(sigma == 0, 1.0, sigma)
    return mu, sigma

def standardize_transform(X, mu, sigma):
    return (X - mu) / sigma

X_train_raw, X_test_raw, y_train, y_test = train_test_split_np(X_raw, y, test_size=0.2, seed=42, stratify=True)

med = nanmedian_impute_fit(X_train_raw)
X_train_imp = nanmedian_impute_transform(X_train_raw, med)
X_test_imp  = nanmedian_impute_transform(X_test_raw,  med)

mu, sigma = standardize_fit(X_train_imp)
X_train = standardize_transform(X_train_imp, mu, sigma)
X_test  = standardize_transform(X_test_imp,  mu, sigma)

X_train.shape, X_test.shape


## 2) Base KNN (NumPy core) + predict_proba

In [None]:
class KNN:
    def __init__(
        self,
        k=5,
        task="classification",
        p=2.0,
        metric="minkowski",
        weights="uniform",
        distance_power=1.0,
        gaussian_sigma=1.0,
        eps=1e-12,
        nominal_idx=None,
        missing="ignore",
        learn_diagonal_metric=False,
        metric_strength=1.0
    ):
        self.k = int(k)
        self.task = task
        self.p = float(p)
        self.metric = metric
        self.weights = weights
        self.distance_power = float(distance_power)
        self.gaussian_sigma = float(gaussian_sigma)
        self.eps = float(eps)
        self.nominal_idx = set([] if nominal_idx is None else list(nominal_idx))
        self.missing = missing
        self.learn_diagonal_metric = bool(learn_diagonal_metric)
        self.metric_strength = float(metric_strength)

        self.X_ = None
        self.y_ = None
        self.diag_W_ = None

    def fit(self, X, y):
        X = np.asarray(X, dtype=float)
        y = np.asarray(y)
        if X.ndim != 2:
            raise ValueError("X must be 2D")
        if X.shape[0] != y.shape[0]:
            raise ValueError("X,y mismatch")
        if self.k <= 0 or self.k > X.shape[0]:
            raise ValueError("k invalid")
        self.X_ = X
        self.y_ = y
        self.diag_W_ = self._learn_diagonal_weights(X, y) if self.learn_diagonal_metric else np.ones(X.shape[1])
        return self

    def _learn_diagonal_weights(self, X, y):
        d = X.shape[1]
        eps = self.eps
        if self.task == "classification":
            classes = np.unique(y)
            if classes.size == 2:
                X0 = X[y == classes[0]]
                X1 = X[y == classes[1]]
                mu0 = np.nanmean(X0, axis=0); mu1 = np.nanmean(X1, axis=0)
                s0 = np.nanstd(X0, axis=0);  s1 = np.nanstd(X1, axis=0)
                pooled = np.sqrt((s0**2 + s1**2) / 2.0) + eps
                effect = np.abs(mu1 - mu0) / pooled
                w = (effect + eps) ** self.metric_strength
            else:
                scores = np.zeros(d, dtype=float)
                for c in classes:
                    scores += self._learn_diagonal_weights(X, (y == c).astype(int))
                w = scores / max(classes.size, 1)
        else:
            y2 = y.astype(float)
            y2 = (y2 - np.mean(y2)) / (np.std(y2) + eps)
            scores = np.zeros(d, dtype=float)
            for j in range(d):
                xj = X[:, j]
                m = ~np.isnan(xj)
                if np.sum(m) < 3:
                    scores[j] = 0.0
                    continue
                xj2 = (xj[m] - np.mean(xj[m])) / (np.std(xj[m]) + eps)
                scores[j] = np.abs(np.mean(xj2 * y2[m]))
            w = (scores + eps) ** self.metric_strength
        w = w / (np.mean(w) + eps)
        return w

    def _pairwise_distances(self, X_query):
        Xq = np.asarray(X_query, dtype=float)
        Xt = self.X_
        if Xq.ndim == 1:
            Xq = Xq[None, :]
        n_q, d = Xq.shape
        if d != Xt.shape[1]:
            raise ValueError("dim mismatch")

        sqrtW = np.sqrt(self.diag_W_)[None, :]
        XqW = Xq * sqrtW
        XtW = Xt * sqrtW

        if self.metric == "cosine":
            if self.missing == "ignore":
                raise ValueError("cosine requires imputation")
            Xq2 = np.nan_to_num(XqW, nan=0.0)
            Xt2 = np.nan_to_num(XtW, nan=0.0)
            qn = np.linalg.norm(Xq2, axis=1, keepdims=True) + self.eps
            tn = np.linalg.norm(Xt2, axis=1, keepdims=True) + self.eps
            sim = (Xq2 @ Xt2.T) / (qn @ tn.T)
            return 1.0 - sim

        D = np.empty((n_q, Xt.shape[0]), dtype=float)
        for i in range(n_q):
            qi = XqW[i]
            diff = XtW - qi[None, :]

            if self.missing == "ignore":
                mask = ~np.isnan(diff)
                if self.nominal_idx:
                    for j in self.nominal_idx:
                        mj = mask[:, j]
                        if np.any(mj):
                            a = self.X_[:, j][mj]
                            b = Xq[i, j]
                            diff[mj, j] = (a != b).astype(float) * sqrtW[0, j]
                present = np.sum(mask, axis=1)
                ad = np.abs(np.where(mask, diff, 0.0))
                dist_p = np.sum(ad ** self.p, axis=1)
                dist = dist_p ** (1.0 / self.p)
                D[i] = np.where(present > 0, dist, np.inf)
            else:
                diff2 = np.nan_to_num(diff, nan=0.0)
                if self.nominal_idx:
                    for j in self.nominal_idx:
                        a = self.X_[:, j]
                        b = Xq[i, j]
                        diff2[:, j] = (a != b).astype(float) * sqrtW[0, j]
                ad = np.abs(diff2)
                D[i] = (np.sum(ad ** self.p, axis=1) + self.eps) ** (1.0 / self.p)
        return D

    def _weights_from_distances(self, dist):
        if self.weights == "uniform":
            return np.ones_like(dist)
        if self.weights == "distance":
            return 1.0 / ((dist + self.eps) ** self.distance_power)
        if self.weights == "gaussian":
            s2 = (self.gaussian_sigma ** 2) + self.eps
            return np.exp(-(dist ** 2) / (2.0 * s2))
        raise ValueError("unknown weights")

    def kneighbors(self, X_query):
        D = self._pairwise_distances(X_query)
        k = self.k
        idx = np.argpartition(D, kth=k-1, axis=1)[:, :k]
        row = np.arange(D.shape[0])[:, None]
        dist_k = D[row, idx]
        order = np.argsort(dist_k, axis=1)
        return idx[row, order], dist_k[row, order]

    def predict_proba(self, X_query):
        if self.task != "classification":
            raise ValueError("predict_proba only for classification")
        idx, dist = self.kneighbors(X_query)
        neigh_y = self.y_[idx]
        w = self._weights_from_distances(dist)
        classes = np.unique(self.y_)
        votes = np.zeros((idx.shape[0], classes.size), dtype=float)
        for ci, c in enumerate(classes):
            votes[:, ci] = np.sum(w * (neigh_y == c), axis=1)
        proba = votes / (np.sum(votes, axis=1, keepdims=True) + self.eps)
        return classes, proba

    def predict(self, X_query):
        if self.task == "regression":
            idx, dist = self.kneighbors(X_query)
            neigh_y = self.y_[idx].astype(float)
            w = self._weights_from_distances(dist)
            return np.sum(w * neigh_y, axis=1) / (np.sum(w, axis=1) + self.eps)
        classes, proba = self.predict_proba(X_query)
        return classes[np.argmax(proba, axis=1)]


## 3) (NEW) Learn **k per query** (adaptive k)

We compute distances to all training points, sort them, then choose k using an **elbow/ratio rule**:
- Let d1 <= d2 <= ... be sorted distances.
- Find the smallest i such that (d_{i+1} / (d_i + eps)) > tau.
- Set k = clip(i, min_k, max_k). If no elbow, k = max_k.

This makes k **small** in dense regions and **larger** in sparse regions.


In [None]:
class AdaptiveKNN(KNN):
    def __init__(self, *args, min_k=5, max_k=35, tau=1.35, **kwargs):
        super().__init__(*args, **kwargs)
        self.min_k = int(min_k)
        self.max_k = int(max_k)
        self.tau = float(tau)

    def _choose_k_from_sorted_distances(self, d_sorted):
        # d_sorted: [n_train] sorted ascending
        # find elbow using ratio rule
        eps = self.eps
        ratios = d_sorted[1:] / (d_sorted[:-1] + eps)
        elbow = np.where(ratios > self.tau)[0]
        if elbow.size == 0:
            k = self.max_k
        else:
            k = int(elbow[0] + 1)  # index+1 gives count
        k = max(self.min_k, min(self.max_k, k))
        k = min(k, self.X_.shape[0])
        return k

    def predict_proba(self, X_query):
        if self.task != "classification":
            raise ValueError("predict_proba only for classification")
        Xq = np.asarray(X_query, dtype=float)
        if Xq.ndim == 1:
            Xq = Xq[None, :]

        D = self._pairwise_distances(Xq)  # [n_q, n_train]
        classes = np.unique(self.y_)
        proba_out = np.zeros((Xq.shape[0], classes.size), dtype=float)

        for i in range(Xq.shape[0]):
            order = np.argsort(D[i])
            d_sorted = D[i, order]
            k_i = self._choose_k_from_sorted_distances(d_sorted)
            idx = order[:k_i]
            dist = d_sorted[:k_i][None, :]  # shape [1,k_i] for weight helper
            w = self._weights_from_distances(dist)[0]
            neigh_y = self.y_[idx]
            votes = np.zeros(classes.size, dtype=float)
            for ci, c in enumerate(classes):
                votes[ci] = np.sum(w * (neigh_y == c))
            proba_out[i] = votes / (np.sum(votes) + self.eps)

        return classes, proba_out


## 4) (NEW) Prototype condensation

### 4.1 Condensed Nearest Neighbor (CNN)
CNN builds a subset S such that 1-NN on S classifies the full training set correctly (greedy).

### 4.2 Edited Nearest Neighbor (ENN)
ENN removes training points that disagree with the majority of their k neighbors (noise cleaning).


In [None]:
def condensed_nearest_neighbor(X, y, seed=42, max_passes=10):
    rng = np.random.default_rng(seed)
    n = X.shape[0]
    order = np.arange(n)
    rng.shuffle(order)

    # start with one example from each class (if possible)
    classes = np.unique(y)
    S_idx = []
    for c in classes:
        first = order[y[order] == c][0]
        S_idx.append(int(first))
    S_idx = list(dict.fromkeys(S_idx))  # unique

    base = KNN(k=1, task="classification", weights="uniform", missing="impute").fit(X[S_idx], y[S_idx])

    changed = True
    passes = 0
    while changed and passes < max_passes:
        changed = False
        passes += 1
        for i in order:
            pred = base.predict(X[i])[0]
            if pred != y[i]:
                S_idx.append(int(i))
                base.fit(X[S_idx], y[S_idx])
                changed = True

    S_idx = np.array(sorted(set(S_idx)), dtype=int)
    return S_idx

def edited_nearest_neighbor(X, y, k=7):
    knn = KNN(k=k, task="classification", weights="uniform", missing="impute").fit(X, y)
    y_pred = knn.predict(X)
    keep = (y_pred == y)
    keep_idx = np.where(keep)[0]
    return keep_idx


## 5) (NEW) Approximate neighbors with Random Hyperplane LSH

We build L hash tables.
Each table hashes x by sign(R @ x) where R is random Gaussian.
We only search points in the same buckets (union across tables), then run exact KNN on those candidates.


In [None]:
class RandomHyperplaneLSH:
    def __init__(self, n_tables=8, n_bits=14, seed=42):
        self.n_tables = int(n_tables)
        self.n_bits = int(n_bits)
        self.rng = np.random.default_rng(seed)
        self.R_ = None
        self.tables_ = None
        self.X_ = None

    def _hash_codes(self, X, R):
        # X: [n,d], R: [b,d] => proj: [n,b] => bits
        proj = X @ R.T
        bits = (proj >= 0).astype(np.uint8)
        # pack bits into int code
        codes = np.zeros(X.shape[0], dtype=np.uint32)
        for b in range(bits.shape[1]):
            codes |= (bits[:, b].astype(np.uint32) << np.uint32(b))
        return codes

    def fit(self, X):
        X = np.asarray(X, dtype=float)
        self.X_ = X
        d = X.shape[1]
        self.R_ = self.rng.normal(size=(self.n_tables, self.n_bits, d)).astype(float)
        self.tables_ = []

        for t in range(self.n_tables):
            R = self.R_[t]
            codes = self._hash_codes(X, R)
            table = {}
            for i, c in enumerate(codes):
                table.setdefault(int(c), []).append(i)
            self.tables_.append(table)
        return self

    def query_candidates(self, x):
        x = np.asarray(x, dtype=float).reshape(1, -1)
        cand = set()
        for t in range(self.n_tables):
            R = self.R_[t]
            c = int(self._hash_codes(x, R)[0])
            if c in self.tables_[t]:
                cand.update(self.tables_[t][c])
        return np.array(sorted(cand), dtype=int)

class ApproxKNN:
    def __init__(self, base_knn: KNN, lsh: RandomHyperplaneLSH, min_candidates=50):
        self.base = base_knn
        self.lsh = lsh
        self.min_candidates = int(min_candidates)

    def fit(self, X, y):
        self.X_ = np.asarray(X, dtype=float)
        self.y_ = np.asarray(y)
        self.base.fit(self.X_, self.y_)
        self.lsh.fit(self.X_)
        return self

    def predict(self, X_query):
        Xq = np.asarray(X_query, dtype=float)
        if Xq.ndim == 1:
            Xq = Xq[None, :]

        preds = []
        for i in range(Xq.shape[0]):
            cand = self.lsh.query_candidates(Xq[i])
            # fallback if bucket too small: take random subset + all candidates
            if cand.size < self.min_candidates:
                rng = np.random.default_rng(123)
                extra = rng.choice(self.X_.shape[0], size=min(self.min_candidates, self.X_.shape[0]), replace=False)
                cand = np.unique(np.concatenate([cand, extra]))

            # exact KNN on candidate subset
            sub = KNN(
                k=self.base.k,
                task=self.base.task,
                p=self.base.p,
                metric=self.base.metric,
                weights=self.base.weights,
                distance_power=self.base.distance_power,
                gaussian_sigma=self.base.gaussian_sigma,
                eps=self.base.eps,
                nominal_idx=list(self.base.nominal_idx),
                missing=self.base.missing,
                learn_diagonal_metric=False
            ).fit(self.X_[cand], self.y_[cand])

            preds.append(sub.predict(Xq[i])[0])

        return np.array(preds)


## 6) (NEW) Calibrated conformal prediction (classification)

Split data into:
- train (for the KNN model)
- calibration (to compute conformal threshold)
- test

Nonconformity score:
- s_i = 1 - p_model(y_i | x_i) on calibration set

For a new x, return prediction set:
- Γ(x) = { y : 1 - p(y|x) <= q̂ }
where q̂ is the (⌈(n+1)(1-α)⌉ / n) quantile of calibration scores.


In [None]:
class ConformalKNNClassifier:
    def __init__(self, knn: KNN, alpha=0.1):
        if knn.task != "classification":
            raise ValueError("Conformal wrapper expects classification KNN")
        self.knn = knn
        self.alpha = float(alpha)
        self.qhat_ = None
        self.classes_ = None

    def fit(self, X_train, y_train, X_cal, y_cal):
        self.knn.fit(X_train, y_train)
        classes, proba = self.knn.predict_proba(X_cal)
        self.classes_ = classes

        # map y_cal to column index
        class_to_idx = {int(c): i for i, c in enumerate(classes)}
        p_true = np.array([proba[i, class_to_idx[int(y_cal[i])]] for i in range(len(y_cal))], dtype=float)

        scores = 1.0 - p_true
        scores_sorted = np.sort(scores)

        n = scores_sorted.size
        # quantile index for split conformal (classification)
        k = int(np.ceil((n + 1) * (1.0 - self.alpha))) - 1
        k = max(0, min(n - 1, k))
        self.qhat_ = float(scores_sorted[k])
        return self

    def predict_set(self, X_query):
        classes, proba = self.knn.predict_proba(X_query)
        sets = []
        for i in range(proba.shape[0]):
            mask = (1.0 - proba[i]) <= self.qhat_
            sets.append(classes[mask])
        return sets

    def predict(self, X_query):
        return self.knn.predict(X_query)


## 7) Run experiments on Mammographic Mass

In [None]:
def accuracy(y_true, y_pred):
    return float(np.mean(y_true == y_pred))

def confusion_matrix(y_true, y_pred, labels=None):
    if labels is None:
        labels = np.unique(np.concatenate([y_true, y_pred]))
    labels = np.asarray(labels)
    m = np.zeros((labels.size, labels.size), dtype=int)
    for i, a in enumerate(labels):
        for j, b in enumerate(labels):
            m[i, j] = int(np.sum((y_true == a) & (y_pred == b)))
    return labels, m

def plot_confusion_matrix(cm, labels, title="Confusion matrix"):
    plt.figure(figsize=(4.5, 4))
    plt.imshow(cm, interpolation="nearest")
    plt.title(title)
    plt.xticks(np.arange(len(labels)), labels)
    plt.yticks(np.arange(len(labels)), labels)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, str(cm[i, j]), ha="center", va="center")
    plt.tight_layout()
    plt.show()

nominal_idx = [2, 3]  # shape, margin (treat as nominal mismatch)

base = KNN(
    k=11, task="classification",
    weights="distance", distance_power=2.0,
    nominal_idx=nominal_idx,
    missing="impute",
    learn_diagonal_metric=True
).fit(X_train, y_train)

y_pred_base = base.predict(X_test)
acc_base = accuracy(y_test, y_pred_base)

labels, cm = confusion_matrix(y_test, y_pred_base, labels=np.array([0,1]))
plot_confusion_matrix(cm, labels, title="Base KNN confusion matrix")
acc_base


In [None]:
# Adaptive-k KNN
adap = AdaptiveKNN(
    k=11, task="classification",
    weights="distance", distance_power=2.0,
    nominal_idx=nominal_idx,
    missing="impute",
    learn_diagonal_metric=True,
    min_k=5, max_k=45, tau=1.35
).fit(X_train, y_train)

y_pred_adap = adap.predict(X_test)
acc_adap = accuracy(y_test, y_pred_adap)
acc_adap


In [None]:
# Prototype condensation: (optional) ENN cleaning then CNN condensation
enn_idx = edited_nearest_neighbor(X_train, y_train, k=7)
X_enn, y_enn = X_train[enn_idx], y_train[enn_idx]

cnn_idx = condensed_nearest_neighbor(X_enn, y_enn, seed=42, max_passes=10)
X_cnn, y_cnn = X_enn[cnn_idx], y_enn[cnn_idx]

X_train.shape[0], X_enn.shape[0], X_cnn.shape[0]


In [None]:
# Train on condensed prototypes
proto_knn = KNN(
    k=11, task="classification",
    weights="distance", distance_power=2.0,
    nominal_idx=nominal_idx,
    missing="impute",
    learn_diagonal_metric=True
).fit(X_cnn, y_cnn)

y_pred_proto = proto_knn.predict(X_test)
acc_proto = accuracy(y_test, y_pred_proto)
acc_proto


In [None]:
# Approximate neighbors via Random Hyperplane LSH
lsh = RandomHyperplaneLSH(n_tables=10, n_bits=14, seed=42)
approx_model = ApproxKNN(
    base_knn=KNN(
        k=11, task="classification",
        weights="distance", distance_power=2.0,
        nominal_idx=nominal_idx,
        missing="impute"
    ),
    lsh=lsh,
    min_candidates=80
).fit(X_train, y_train)

y_pred_approx = approx_model.predict(X_test)
acc_approx = accuracy(y_test, y_pred_approx)
acc_approx


In [None]:
# Calibrated conformal prediction
# Split train into (proper train) + calibration
X_prop, X_cal, y_prop, y_cal = train_test_split_np(X_train, y_train, test_size=0.25, seed=123, stratify=True)

knn_for_conf = KNN(
    k=11, task="classification",
    weights="distance", distance_power=2.0,
    nominal_idx=nominal_idx,
    missing="impute",
    learn_diagonal_metric=True
)

conf = ConformalKNNClassifier(knn_for_conf, alpha=0.1).fit(X_prop, y_prop, X_cal, y_cal)

pred_sets = conf.predict_set(X_test)

# Coverage: fraction of times true label is in the set
coverage = np.mean([y_test[i] in set(pred_sets[i].tolist()) for i in range(len(y_test))])

# Set size statistics
set_sizes = np.array([len(s) for s in pred_sets], dtype=int)
coverage, set_sizes.mean(), set_sizes.min(), set_sizes.max()


## 8) Visualization: PCA 2D decision regions (still NumPy PCA)

In [None]:
def pca_fit(X, n_components=2):
    mu = np.mean(X, axis=0, keepdims=True)
    Xc = X - mu
    C = (Xc.T @ Xc) / (Xc.shape[0] - 1)
    eigvals, eigvecs = np.linalg.eigh(C)
    order = np.argsort(eigvals)[::-1]
    W = eigvecs[:, order[:n_components]]
    return mu, W

def pca_transform(X, mu, W):
    return (X - mu) @ W

mu_pca, W_pca = pca_fit(X_train, n_components=2)
Z_train = pca_transform(X_train, mu_pca, W_pca)
Z_test  = pca_transform(X_test,  mu_pca, W_pca)

knn2d = KNN(k=21, task="classification", weights="distance", distance_power=2.0, missing="impute").fit(Z_train, y_train)

pad = 0.5
x_min, x_max = Z_train[:, 0].min() - pad, Z_train[:, 0].max() + pad
y_min, y_max = Z_train[:, 1].min() - pad, Z_train[:, 1].max() + pad

grid_n = 250
xs = np.linspace(x_min, x_max, grid_n)
ys = np.linspace(y_min, y_max, grid_n)
xx, yy = np.meshgrid(xs, ys)
grid = np.c_[xx.ravel(), yy.ravel()]

grid_pred = knn2d.predict(grid).reshape(xx.shape)

plt.figure(figsize=(7, 5))
plt.contourf(xx, yy, grid_pred, alpha=0.35)
plt.scatter(Z_test[:, 0], Z_test[:, 1], c=y_test, s=25)
plt.title("Decision regions (PCA 2D) + test points")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.tight_layout()
plt.show()


## 9) TensorFlow interoperability (optional)

In [None]:
try:
    import tensorflow as tf

    knn_tf = base  # trained NumPy KNN

    @tf.function
    def knn_predict_tf(x_batch):
        y_out = tf.numpy_function(func=lambda a: knn_tf.predict(a), inp=[x_batch], Tout=tf.int64)
        y_out.set_shape([None])
        return y_out

    knn_predict_tf(tf.constant(X_test[:8], dtype=tf.float32)).numpy()
except Exception as e:
    print("TensorFlow not available here, but tf.numpy_function wrapper pattern works.")
    print("Error:", e)


## 5b) (UPGRADE) Multi-probe LSH (neighboring buckets)

Basic LSH queries only the **exact bucket** the query hashes into.  
Multi-probe LSH improves recall by also probing **nearby buckets** in Hamming space.

### Random-hyperplane hash recap
For each table we compute a `b`-bit code:
- `bit_j = 1[ (r_j · x) >= 0 ]`

Nearby buckets correspond to flipping a small number of bits.
We probe all codes within Hamming radius `r` (typically r=1..3) and union the candidate indices.

This keeps the same preprocessing/training time but usually boosts recall
(accuracy closer to exact KNN) for a modest increase in query time.


In [None]:
def hamming_neighbors(code: int, n_bits: int, radius: int):
    # Generate integer codes within Hamming distance <= radius by flipping bits.
    # radius=0 -> [code]
    out = [int(code)]
    if radius <= 0:
        return out

    # radius 1
    flips1 = [code ^ (1 << i) for i in range(n_bits)]
    out.extend(int(c) for c in flips1)

    if radius == 1:
        return out

    # radius 2,3,... (combinatorial, keep small)
    prev = flips1
    for r in range(2, radius + 1):
        new_codes = set()
        # to avoid duplicates, enforce increasing bit indices implicitly by building combos
        # We generate combos by picking bit pairs/triples via indices.
        # For small radius this is fine; for larger radius you should do heuristic probing.
        if r == 2:
            for i in range(n_bits):
                for j in range(i + 1, n_bits):
                    new_codes.add(int(code ^ (1 << i) ^ (1 << j)))
        elif r == 3:
            for i in range(n_bits):
                for j in range(i + 1, n_bits):
                    for k in range(j + 1, n_bits):
                        new_codes.add(int(code ^ (1 << i) ^ (1 << j) ^ (1 << k)))
        else:
            # Keep notebook lightweight; for r>3 use a heuristic multi-probe schedule.
            break
        out.extend(sorted(new_codes))
    return out

class RandomHyperplaneLSHMultiProbe(RandomHyperplaneLSH):
    def __init__(self, n_tables=8, n_bits=14, radius=1, seed=42):
        super().__init__(n_tables=n_tables, n_bits=n_bits, seed=seed)
        self.radius = int(radius)

    def query_candidates(self, x):
        x = np.asarray(x, dtype=float).reshape(1, -1)
        cand = set()

        for t in range(self.n_tables):
            R = self.R_[t]
            base_code = int(self._hash_codes(x, R)[0])

            # probe base + neighbors
            probe_codes = hamming_neighbors(base_code, self.n_bits, self.radius)
            table = self.tables_[t]
            for c in probe_codes:
                if c in table:
                    cand.update(table[c])

        return np.array(sorted(cand), dtype=int)


### Multi-probe approximate KNN demo

We compare:
- single-probe LSH (exact bucket only)
- multi-probe LSH (Hamming radius 2)


In [None]:
# Single-probe (already defined): RandomHyperplaneLSH
single_lsh = RandomHyperplaneLSH(n_tables=10, n_bits=14, seed=42)
approx_single = ApproxKNN(
    base_knn=KNN(k=11, task="classification", weights="distance", distance_power=2.0, nominal_idx=[2,3], missing="impute"),
    lsh=single_lsh,
    min_candidates=80
).fit(X_train, y_train)

y_pred_single = approx_single.predict(X_test)
acc_single = accuracy(y_test, y_pred_single)

# Multi-probe
multi_lsh = RandomHyperplaneLSHMultiProbe(n_tables=10, n_bits=14, radius=2, seed=42)
approx_multi = ApproxKNN(
    base_knn=KNN(k=11, task="classification", weights="distance", distance_power=2.0, nominal_idx=[2,3], missing="impute"),
    lsh=multi_lsh,
    min_candidates=80
).fit(X_train, y_train)

y_pred_multi = approx_multi.predict(X_test)
acc_multi = accuracy(y_test, y_pred_multi)

acc_single, acc_multi


## 6b) (UPGRADE) Theoretical note: split conformal coverage

The conformal wrapper in this notebook uses **split conformal prediction** for classification.

### Setup
- Fit the model on a **proper training** set.
- Compute **nonconformity scores** on an independent **calibration** set:
  - `s_i = 1 - p_model(y_i | x_i)`

### Prediction set
For a new `x`, define:
- `Γ(x) = { y : 1 - p_model(y | x) <= q̂ }`

Where `q̂` is the empirical quantile:
- `q̂ = Quantile_{⌈(n_cal+1)(1-α)⌉ / n_cal}( s_1,...,s_ncal )`

### Guarantee (informal but accurate)
If calibration points are **exchangeable** with test points (i.i.d. is the common case),
then the split conformal set satisfies:

- `P( y_true ∈ Γ(x) ) >= 1 - α`

This guarantee does **not** require the model to be correct or calibrated.
It only relies on exchangeability and using the correct quantile construction.

In finite samples, coverage is often slightly above `1-α` due to quantization.


## 6c) (UPGRADE) Diagnostics plots

We plot:
1. **Coverage vs α** on the test set (empirical).
2. **Histogram of set sizes** (how ambiguous predictions are).
3. **Scatter: max probability vs set size** (how confidence relates to set cardinality).


In [None]:
def conformal_diagnostics(X_train_full, y_train_full, X_test, y_test, alphas):
    # Split train into proper + calibration (fixed split for comparability across alpha)
    X_prop, X_cal, y_prop, y_cal = train_test_split_np(X_train_full, y_train_full, test_size=0.25, seed=123, stratify=True)

    # Base KNN used inside conformal wrapper
    base_knn = KNN(
        k=11, task="classification",
        weights="distance", distance_power=2.0,
        nominal_idx=[2, 3],
        missing="impute",
        learn_diagonal_metric=True
    )

    coverages = []
    mean_set_sizes = []

    # Also compute per-sample confidence once for a representative alpha
    # (we’ll reuse the trained model; qhat changes with alpha)
    base_knn.fit(X_prop, y_prop)
    classes, proba_test = base_knn.predict_proba(X_test)
    maxp = np.max(proba_test, axis=1)

    for a in alphas:
        conf = ConformalKNNClassifier(base_knn, alpha=float(a)).fit(X_prop, y_prop, X_cal, y_cal)
        pred_sets = conf.predict_set(X_test)

        coverage = np.mean([y_test[i] in set(pred_sets[i].tolist()) for i in range(len(y_test))])
        sizes = np.array([len(s) for s in pred_sets], dtype=int)

        coverages.append(float(coverage))
        mean_set_sizes.append(float(np.mean(sizes)))

    return np.array(coverages), np.array(mean_set_sizes), maxp

alphas = np.linspace(0.01, 0.30, 16)
coverages, mean_sizes, maxp = conformal_diagnostics(X_train, y_train, X_test, y_test, alphas)

# Plot 1: Coverage vs alpha
plt.figure(figsize=(6.5, 4))
plt.plot(alphas, coverages, marker="o")
plt.plot(alphas, 1.0 - alphas, linestyle="--")  # target line
plt.xlabel("alpha")
plt.ylabel("empirical coverage")
plt.title("Split conformal: coverage vs alpha")
plt.tight_layout()
plt.show()

# Plot 2: Mean set size vs alpha
plt.figure(figsize=(6.5, 4))
plt.plot(alphas, mean_sizes, marker="o")
plt.xlabel("alpha")
plt.ylabel("mean |prediction set|")
plt.title("Split conformal: mean set size vs alpha")
plt.tight_layout()
plt.show()

# Plot 3: Set size histogram at a chosen alpha
alpha0 = 0.10
X_prop, X_cal, y_prop, y_cal = train_test_split_np(X_train, y_train, test_size=0.25, seed=123, stratify=True)
knn_for_conf = KNN(
    k=11, task="classification",
    weights="distance", distance_power=2.0,
    nominal_idx=[2, 3],
    missing="impute",
    learn_diagonal_metric=True
)
conf0 = ConformalKNNClassifier(knn_for_conf, alpha=alpha0).fit(X_prop, y_prop, X_cal, y_cal)
sets0 = conf0.predict_set(X_test)
sizes0 = np.array([len(s) for s in sets0], dtype=int)

plt.figure(figsize=(6.5, 4))
plt.hist(sizes0, bins=np.arange(sizes0.min(), sizes0.max() + 2) - 0.5, rwidth=0.9)
plt.xlabel("|prediction set|")
plt.ylabel("count")
plt.title(f"Prediction set size histogram (alpha={alpha0})")
plt.tight_layout()
plt.show()

# Plot 4: Confidence vs set size (alpha0)
classes, proba0 = conf0.knn.predict_proba(X_test)
maxp0 = np.max(proba0, axis=1)

plt.figure(figsize=(6.5, 4))
plt.scatter(maxp0, sizes0, s=18)
plt.xlabel("max predicted probability")
plt.ylabel("|prediction set|")
plt.title(f"Confidence vs set size (alpha={alpha0})")
plt.tight_layout()
plt.show()
