In [1]:
import numpy as np
from functools import partial
from typing import Callable, Iterable, Any, Dict, Tuple
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import StratifiedKFold, KFold, cross_val_score

def tune_and_train_1nn(
    X: np.ndarray,
    y: np.ndarray,
    metric_func: Callable[[np.ndarray, np.ndarray, Any], float],
    w_candidates: Iterable[Any],
    k: int = 5,
    stratify: bool = True,
    random_state: int | None = None,
    n_jobs: int = 1,
) -> Tuple[KNeighborsClassifier, Any, Dict[Any, float]]:
    """
    Tune hyperparameter w for a custom distance metric d(a,b,w) using k-fold CV,
    then fit a 1-NN classifier with the chosen w.

    Parameters
    ----------
    X : (n_samples, n_features) array
    y : (n_samples,) array of labels
    metric_func : callable(a, b, w) -> float
        Black-box distance between two 1-D vectors a and b, using hyperparam w.
    w_candidates : iterable of candidate w values
    k : number of folds for CV (default 5)
    stratify : whether to use StratifiedKFold when possible
    random_state : int or None for CV shuffling
    n_jobs : number of jobs for CV (set to 1 to avoid pickling issues with some callables)

    Returns
    -------
    clf_best : fitted KNeighborsClassifier (n_neighbors=1) using best w
    best_w : chosen hyperparameter
    cv_results : dict mapping w -> mean CV accuracy
    """
    X = np.asarray(X)
    y = np.asarray(y)
    if X.ndim != 2:
        raise ValueError("X must be 2D: (n_samples, n_features)")
    if X.shape[0] != y.shape[0]:
        raise ValueError("X and y must have same number of samples")

    # choose CV splitter
    if stratify and len(np.unique(y)) > 1:
        cv_splitter = StratifiedKFold(n_splits=k, shuffle=True, random_state=random_state)
    else:
        cv_splitter = KFold(n_splits=k, shuffle=True, random_state=random_state)

    cv_results = {}
    # IMPORTANT: use n_jobs=1 here by default because cross_val_score may pickle the callable metric.
    # If your metric and environment are picklable, you can set n_jobs > 1.
    for w in w_candidates:
        # create a callable metric that binds w
        bound_metric = lambda a, b, w=w: metric_func(a, b, w)
        # KNeighborsClassifier accepts a callable metric; this will use brute-force search
        clf = KNeighborsClassifier(n_neighbors=1, metric=bound_metric, algorithm='brute')
        scores = cross_val_score(clf, X, y, cv=cv_splitter, scoring='accuracy', n_jobs=n_jobs)
        cv_results[w] = float(np.mean(scores))

    # pick best w (max mean accuracy). tie -> first encountered in w_candidates
    best_w = max(w_candidates, key=lambda w: cv_results[w])

    # fit final classifier on entire data with the chosen w
    best_metric = lambda a, b, w=best_w: metric_func(a, b, w)
    clf_best = KNeighborsClassifier(n_neighbors=1, metric=best_metric, algorithm='brute')
    clf_best.fit(X, y)

    return clf_best, best_w, cv_results

# -----------------------
# Example usage
# -----------------------
if __name__ == "__main__":
    from sklearn.datasets import make_classification

    # example: weighted euclidean metric where w is a scalar weight or vector of per-feature weights
    def weighted_euclidean(a, b, w):
        a = np.asarray(a); b = np.asarray(b)
        if np.isscalar(w):
            wvec = np.full(a.shape, w)
        else:
            wvec = np.asarray(w)
            if wvec.shape != a.shape:
                raise ValueError("w must be scalar or same shape as vectors")
        diff = (a - b) * wvec
        return np.linalg.norm(diff)

    X, y = make_classification(n_samples=300, n_features=5, n_informative=3,
                               n_redundant=0, n_classes=3, random_state=0)

    w_candidates = [0.5, 1.0, 2.0, 5.0]
    clf, best_w, results = tune_and_train_1nn(X, y, weighted_euclidean, w_candidates,
                                              k=5, stratify=True, random_state=0, n_jobs=1)

    print("CV mean accuracies:")
    for w, acc in results.items():
        print(f"  w={w}: {acc:.4f}")
    print("Best w:", best_w)
    print("Example predictions:", clf.predict(X[:5]))


CV mean accuracies:
  w=0.5: 0.7533
  w=1.0: 0.7533
  w=2.0: 0.7533
  w=5.0: 0.7533
Best w: 0.5
Example predictions: [2 0 0 2 2]


In [2]:
import numpy as np
from typing import Callable, Iterable, Any, Dict, Tuple, List
from multiprocessing import Pool
from tqdm import tqdm
from sklearn.model_selection import KFold
from sklearn.neighbors import KNeighborsClassifier

# ---- module-level globals for worker processes (set via initializer) ----
_METRIC_FUNC = None
_X = None
_Y = None
_K = None
_RANDOM_STATE = None

def _worker_init(metric_func, X, y, k, random_state):
    """Initializer for worker processes: set globals once per worker."""
    global _METRIC_FUNC, _X, _Y, _K, _RANDOM_STATE
    _METRIC_FUNC = metric_func
    _X = X
    _Y = y
    _K = k
    _RANDOM_STATE = random_state

def _evaluate_w(w) -> Tuple[Any, float]:
    """
    Worker: evaluate one w via KFold CV (plain KFold since labels are balanced).
    Returns (w, mean_accuracy).
    """
    global _METRIC_FUNC, _X, _Y, _K, _RANDOM_STATE
    cv = KFold(n_splits=_K, shuffle=True, random_state=_RANDOM_STATE)
    scores: List[float] = []
    for train_idx, val_idx in cv.split(_X):
        X_tr, y_tr = _X[train_idx], _Y[train_idx]
        X_val, y_val = _X[val_idx], _Y[val_idx]

        correct = 0
        for xi, yi in zip(X_val, y_val):
            # brute-force distances (metric must be picklable and top-level)
            dists = [float(_METRIC_FUNC(xi, xt, w)) for xt in X_tr]
            nn = int(np.argmin(dists))
            if y_tr[nn] == yi:
                correct += 1
        scores.append(correct / len(y_val))
    return (w, float(np.mean(scores)))

def tune_and_train_1nn_parallel(
    X: np.ndarray,
    y: np.ndarray,
    metric_func: Callable[[np.ndarray, np.ndarray, Any], float],
    w_candidates: Iterable[Any],
    k: int = 5,
    random_state: int | None = None,
    n_procs: int = 8,
    verbose: bool = True,
) -> Tuple[KNeighborsClassifier, Any, Dict[Any, float]]:
    """
    Parallel search over w_candidates using multiprocessing.Pool and KFold CV.
    Returns: (fitted_1nn_classifier, best_w, cv_results_dict)
    """
    X = np.asarray(X)
    y = np.asarray(y)
    w_list = list(w_candidates)
    if len(w_list) == 0:
        raise ValueError("w_candidates must be non-empty")

    if verbose:
        print(f"Starting evaluation of {len(w_list)} candidates on {n_procs} processes...")

    init_args = (metric_func, X, y, k, random_state)
    with Pool(processes=n_procs, initializer=_worker_init, initargs=init_args) as pool:
        results_iter = pool.imap(_evaluate_w, w_list)
        results = list(tqdm(results_iter, total=len(w_list)))

    cv_results: Dict[Any, float] = {w: acc for (w, acc) in results}
    # pick best w (highest mean accuracy); tie -> first in w_list order
    best_w = max(w_list, key=lambda w: cv_results[w])

    # final classifier on full data using chosen best_w
    def bound_metric(a, b, w=best_w):
        return metric_func(a, b, w)

    clf = KNeighborsClassifier(n_neighbors=1, metric=bound_metric, algorithm='brute')
    clf.fit(X, y)

    if verbose:
        print("CV results (w -> mean accuracy):")
        for w in w_list:
            print(f"  {w}: {cv_results[w]:.4f}")
        print("Best w:", best_w)

    return clf, best_w, cv_results

# -----------------------
# Example usage
# -----------------------
if __name__ == "__main__":
    from sklearn.datasets import make_classification

    # Example metric (top-level function so it's picklable)
    def weighted_euclidean(a, b, w):
        a = np.asarray(a); b = np.asarray(b)
        if np.isscalar(w):
            wvec = np.full(a.shape, float(w))
        else:
            wvec = np.asarray(w, dtype=float)
            if wvec.shape != a.shape:
                raise ValueError("w must be scalar or same shape as input vectors")
        diff = (a - b) * wvec
        return float(np.linalg.norm(diff))

    # Synthetic balanced dataset
    X, y = make_classification(n_samples=800, n_features=8, n_informative=6,
                               n_redundant=0, n_classes=4, weights=None, random_state=0)

    w_candidates = [0.2, 0.5, 1.0, 2.0, 5.0]
    clf, best_w, cv_results = tune_and_train_1nn_parallel(
        X, y, weighted_euclidean, w_candidates,
        k=5, random_state=0, n_procs=3, verbose=True
    )

    # Optional: parallel predictions per-sample using the same imap pattern
    # (keeps your syntax: with Pool(...); pool.imap(...))
    def _pred_worker_init(classifier):
        global _PRED_CLF
        _PRED_CLF = classifier

    def _predict_single(x):
        global _PRED_CLF
        return int(_PRED_CLF.predict([x])[0])

    X_test = X[:200]
    with Pool(processes=3, initializer=_pred_worker_init, initargs=(clf,)) as pool:
        y_pred_list = list(tqdm(pool.imap(_predict_single, X_test), total=len(X_test)))
    y_pred = np.array(y_pred_list)

    print("Example predictions (first 10):", y_pred[:10])


Starting evaluation of 5 candidates on 3 processes...


  0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 5/5 [00:04<00:00,  1.01it/s]

CV results (w -> mean accuracy):
  0.2: 0.7162
  0.5: 0.7162
  1.0: 0.7162
  2.0: 0.7162
  5.0: 0.7162
Best w: 0.2



100%|██████████| 200/200 [00:00<00:00, 467.69it/s]

Example predictions (first 10): [3 0 1 0 0 3 3 1 2 2]





In [3]:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import KFold
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

# Load dataset
X, y = load_iris(return_X_y=True)

# Define KFold
cv = 5
kf = KFold(n_splits=cv, shuffle=True, random_state=0)

# Store scores
scores = []
print(f"X : ", X)
# KFold loop
for train_idx, test_idx in kf.split(X):
    # Split data
    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]
    print(train_idx)
    # Train estimator
    clf = KNeighborsClassifier(n_neighbors=3)
    clf.fit(X_train, y_train)

    # Test estimator
    y_pred = clf.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    scores.append(acc)

print("Scores:", scores)
print("Mean accuracy:", np.mean(scores))


X :  [[5.1 3.5 1.4 0.2]
 [4.9 3.  1.4 0.2]
 [4.7 3.2 1.3 0.2]
 [4.6 3.1 1.5 0.2]
 [5.  3.6 1.4 0.2]
 [5.4 3.9 1.7 0.4]
 [4.6 3.4 1.4 0.3]
 [5.  3.4 1.5 0.2]
 [4.4 2.9 1.4 0.2]
 [4.9 3.1 1.5 0.1]
 [5.4 3.7 1.5 0.2]
 [4.8 3.4 1.6 0.2]
 [4.8 3.  1.4 0.1]
 [4.3 3.  1.1 0.1]
 [5.8 4.  1.2 0.2]
 [5.7 4.4 1.5 0.4]
 [5.4 3.9 1.3 0.4]
 [5.1 3.5 1.4 0.3]
 [5.7 3.8 1.7 0.3]
 [5.1 3.8 1.5 0.3]
 [5.4 3.4 1.7 0.2]
 [5.1 3.7 1.5 0.4]
 [4.6 3.6 1.  0.2]
 [5.1 3.3 1.7 0.5]
 [4.8 3.4 1.9 0.2]
 [5.  3.  1.6 0.2]
 [5.  3.4 1.6 0.4]
 [5.2 3.5 1.5 0.2]
 [5.2 3.4 1.4 0.2]
 [4.7 3.2 1.6 0.2]
 [4.8 3.1 1.6 0.2]
 [5.4 3.4 1.5 0.4]
 [5.2 4.1 1.5 0.1]
 [5.5 4.2 1.4 0.2]
 [4.9 3.1 1.5 0.2]
 [5.  3.2 1.2 0.2]
 [5.5 3.5 1.3 0.2]
 [4.9 3.6 1.4 0.1]
 [4.4 3.  1.3 0.2]
 [5.1 3.4 1.5 0.2]
 [5.  3.5 1.3 0.3]
 [4.5 2.3 1.3 0.3]
 [4.4 3.2 1.3 0.2]
 [5.  3.5 1.6 0.6]
 [5.1 3.8 1.9 0.4]
 [4.8 3.  1.4 0.3]
 [5.1 3.8 1.6 0.2]
 [4.6 3.2 1.4 0.2]
 [5.3 3.7 1.5 0.2]
 [5.  3.3 1.4 0.2]
 [7.  3.2 4.7 1.4]
 [6.4 3.2 4.5 1.5]
 [6.9 3