In [27]:
import numpy as np
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern, WhiteKernel, ConstantKernel as C
import math
from scipy.special import erf  # for Gaussian CDF approximation
import warnings

# --- 1. Simulate data ---
def simulate_data(n=200, noise_std=0.1, random_state=None):
    if random_state is not None:
        np.random.seed(random_state)
    X = np.linspace(0, 1, n)
    f = np.sin(2 * np.pi * X)
    y = f + np.random.normal(0, noise_std, size=n)
    return X, y, f

# --- 2. Kernel functions ---
def nw_predict(X_train, y_train, X_test, h, kernel='gaussian'):
    X_train = np.asarray(X_train).ravel()
    X_test  = np.asarray(X_test).ravel()
    u = (X_test[:, None] - X_train[None, :]) / h
    if kernel == 'gaussian':
        w = np.exp(-0.5 * u**2) / (h * np.sqrt(2 * np.pi))
    elif kernel == 'epanechnikov':
        mask = np.abs(u) <= 1
        w = np.zeros_like(u)
        w[mask] = 0.75 * (1 - u[mask]**2) / h
    else:
        raise ValueError(f"Unknown kernel '{kernel}'")
    # Handle zero-weight cases to avoid NaNs
    numerator = (w * y_train).sum(axis=1)
    denom = w.sum(axis=1)
    zero_mask = denom == 0
    if np.any(zero_mask):
        denom[zero_mask] = len(X_train)
        numerator[zero_mask] = y_train.sum()
    return numerator / denom

# --- 3. Cross-validation scorer ---
class CVScorer:
    def __init__(self, X, y, folds=5, kernel='gaussian'):
        self.X = np.asarray(X).ravel()
        self.y = np.asarray(y).ravel()
        self.kf = KFold(n_splits=folds, shuffle=True, random_state=0)
        self.kernel = kernel
        self.evals = 0

    def score(self, h):
        mses = []
        for train_idx, test_idx in self.kf.split(self.X):
            Xtr, Xte = self.X[train_idx], self.X[test_idx]
            ytr, yte = self.y[train_idx], self.y[test_idx]
            ypred = nw_predict(Xtr, ytr, Xte, h, kernel=self.kernel)
            mses.append(mean_squared_error(yte, ypred))
            self.evals += 1
        return np.mean(mses)

# --- 4. Bandwidth selectors ---
def grid_search_cv(scorer, h_grid):
    best_h, best_score = None, np.inf
    start = scorer.evals
    for h in h_grid:
        score = scorer.score(h)
        if score < best_score:
            best_score, best_h = score, h
    return best_h, best_score, scorer.evals - start


def plug_in_bandwidth(X):
    sigma = np.std(np.asarray(X).ravel(), ddof=1)
    n = len(X)
    return 1.06 * sigma * n ** (-1/5)


def newton_hessian_cv(scorer, h_init, h_min=1e-3, tol=1e-3, max_iter=10, eps=1e-4):
    h = max(h_init, h_min)
    start = scorer.evals
    for _ in range(max_iter):
        s0 = scorer.score(h)
        s1 = scorer.score(h + eps)
        s_1 = scorer.score(max(h - eps, h_min))
        grad = (s1 - s_1) / (2 * eps)
        hess = (s1 + s_1 - 2 * s0) / (eps ** 2)
        if hess <= 0:
            break
        h_new = max(h_min, h - grad / hess)
        if abs(h_new - h) < tol:
            h = h_new
            break
        h = h_new
    # final score and evals count
    score = scorer.score(h)
    evals = scorer.evals - start
    return h, score, evals


def analytic_newton_cv(scorer, h_init, h_min=1e-3, tol=1e-3, max_iter=10):
    """
    Analytic Newton on LOOCV objective using closed-form CV objective with Armijo line search.
    Supports Gaussian & Epanechnikov kernels analytically, counts zero CV evals.
    """
    # Helper: compute analytic LOOCV objective, gradient, and Hessian without counting CV calls
    def obj_grad_hess(h):
        grad, hess, obj = 0.0, 0.0, 0.0
        total = 0
        for train_idx, test_idx in scorer.kf.split(scorer.X):
            Xtr, Xte = scorer.X[train_idx], scorer.X[test_idx]
            ytr, yte = scorer.y[train_idx], scorer.y[test_idx]
            u = (Xte[:, None] - Xtr[None, :]) / h
            if scorer.kernel == 'gaussian':
                w = np.exp(-0.5 * u**2) / (h * np.sqrt(2 * np.pi))
                d_w = w * ((u**2 - 1) / h)
                dd_w = w * ((u**4 - 3*u**2 + 1) / (h**2))
            else:  # epanechnikov
                mask = np.abs(u) <= 1
                w = np.zeros_like(u)
                w[mask] = 0.75 * (1 - u[mask]**2) / h
                d_w = np.zeros_like(u)
                d_w[mask] = 0.75 * ((-1 + 3*u[mask]**2) / (h**2))
                dd_w = np.zeros_like(u)
                dd_w[mask] = 1.5 * ((1 - 6*u[mask]**2) / (h**3))
            w_sum = w.sum(axis=1)
            num = (w * ytr).sum(axis=1)
            zero_mask = w_sum == 0
            if np.any(zero_mask):
                w_sum[zero_mask] = len(ytr)
                num[zero_mask] = ytr.mean() * w_sum[zero_mask]
            m = num / w_sum
            residual = yte - m
            obj += np.sum(residual**2)
            d_num = (d_w * ytr).sum(axis=1)
            dd_num = (dd_w * ytr).sum(axis=1)
            d_den = d_w.sum(axis=1)
            dd_den = dd_w.sum(axis=1)
            dm = (d_num * w_sum - num * d_den) / (w_sum**2)
            ddm = (
                dd_num * w_sum - 2*d_num*d_den - num*dd_den
                + 2*num*(d_den**2)/w_sum
            ) / (w_sum**2)
            dm[zero_mask] = 0
            ddm[zero_mask] = 0
            grad += -2 * np.sum(residual * dm)
            hess += 2 * np.sum(dm**2 - residual * ddm)
            total += len(yte)
        return obj / total, grad, hess

    h = max(h_init, h_min)
    # initial evaluation
    current_obj, _, _ = obj_grad_hess(h)
    for _ in range(max_iter):
        obj_val, grad, hess = obj_grad_hess(h)
        # direction: Newton or gradient descent
        if hess > 0 and np.isfinite(hess):
            direction = -grad / hess
        else:
            direction = -grad
        # Armijo line search
        c1, tau = 1e-4, 0.5
        alpha = 1.0
        while alpha > 1e-4:
            h_trial = max(h_min, h + alpha * direction)
            new_obj, _, _ = obj_grad_hess(h_trial)
            if new_obj <= obj_val + c1 * alpha * grad * direction:
                break
            alpha *= tau
        h_new = h_trial
        if abs(h_new - h) < tol:
            h = h_new
            break
        h, current_obj = h_new, new_obj
    # returns h and zero CV evals
    return h, 0

def golden_section_cv(scorer, a, b, tol=1e-3, max_iter=20):
    phi = (1 + np.sqrt(5)) / 2
    start = scorer.evals
    c, d = b - (b - a) / phi, a + (b - a) / phi
    f_c, f_d = scorer.score(c), scorer.score(d)
    for _ in range(max_iter):
        if abs(b - a) < tol:
            break
        if f_c < f_d:
            b, f_d = d, f_c
            d = c
            c = b - (b - a) / phi
            f_c = scorer.score(c)
        else:
            a, f_c = c, f_d
            c = d
            d = a + (b - a) / phi
            f_d = scorer.score(d)
    h = (a + b) / 2
    return h, scorer.evals - start


def bayes_opt_cv(scorer, a, b, init_points=5, n_iter=10):
    """
    Bayesian optimization on CV score with automatic handling of ConvergenceWarning.
    """
    from sklearn.exceptions import ConvergenceWarning
    start = scorer.evals
    # Initial design points
    Xs = np.linspace(a, b, init_points)
    Ys = [scorer.score(x) for x in Xs]
    for _ in range(n_iter):
        # Fit GP with noise-level bounds, catch warnings
        X_train = Xs.reshape(-1,1)
        y_train = np.array(Ys)
        base_kernel = C(1.0, (1e-3, 1e3)) * Matern(nu=2.5)
        wk = WhiteKernel(noise_level=1e-3, noise_level_bounds=(1e-6, 1e6))
        kernel = base_kernel + wk
        attempts = 0
        while attempts < 2:
            with warnings.catch_warnings(record=True) as w:
                warnings.simplefilter("always", ConvergenceWarning)
                gp = GaussianProcessRegressor(kernel=kernel, normalize_y=True).fit(X_train, y_train)
                # If noise_level hit lower bound, relax it
                if any(issubclass(wi.category, ConvergenceWarning) for wi in w):
                    lb, ub = wk.noise_level_bounds
                    new_lb = max(lb / 10, 1e-8)
                    wk = WhiteKernel(noise_level=wk.noise_level, noise_level_bounds=(new_lb, ub))
                    kernel = base_kernel + wk
                    attempts += 1
                    continue
                break
        # Acquisition: Expected Improvement
        hs = np.linspace(a, b, 100)
        mu, sigma = gp.predict(hs.reshape(-1,1), return_std=True)
        best = np.min(Ys)
        Z = (best - mu) / np.maximum(sigma, 1e-8)
        cdf = 0.5 * (1 + erf(Z / math.sqrt(2)))
        pdf = np.exp(-0.5 * Z**2) / math.sqrt(2*math.pi)
        ei = (best - mu) * cdf + sigma * pdf
        x_next = hs[np.argmax(ei)]
        Ys.append(scorer.score(x_next))
        Xs = np.append(Xs, x_next)
    best_idx = np.argmin(Ys)
    return float(Xs[best_idx]), scorer.evals - start

# --- 5. Robustness Simulation ---
if __name__ == '__main__':
    replicates = 10
    noise_levels = [0.1, 0.2, 0.5]
    sample_sizes = [200, 500, 1000]
    kernels = ['gaussian', 'epanechnikov']
    h_grid = np.linspace(0.01, 0.5, 50)

    for noise in noise_levels:
        for n in sample_sizes:
            for kernel in kernels:
                print(f"\n-- noise={noise}, n={n}, kernel={kernel} --")
                methods = ['Grid','Plug-in','Newton','AnalyticNewton','Golden','Bayes']
                results = {m: [] for m in methods}
                for rep in range(replicates):
                    X, y, f_true = simulate_data(n=n, noise_std=noise, random_state=rep)
                    scorer = CVScorer(X, y, folds=5, kernel=kernel)
                    # Grid
                    h_g, _, e_g = grid_search_cv(scorer, h_grid)
                    mse_g = mean_squared_error(f_true, nw_predict(X, y, X, h_g, kernel))
                    results['Grid'].append((mse_g, e_g))
                    # Plug-in
                    h_p = plug_in_bandwidth(X)
                    e_p_start = scorer.evals
                    scorer.score(h_p)
                    e_p = scorer.evals - e_p_start
                    mse_p = mean_squared_error(f_true, nw_predict(X, y, X, h_p, kernel))
                    results['Plug-in'].append((mse_p, e_p))
                    # Newton (finite diff)
                    h_n, _, e_n = newton_hessian_cv(scorer, h_init=h_p, h_min=h_grid[0])
                    mse_n = mean_squared_error(f_true, nw_predict(X, y, X, h_n, kernel))
                    results['Newton'].append((mse_n, e_n))
                    # Analytic Newton
                    h_a, e_a = analytic_newton_cv(scorer, h_init=h_p, h_min=h_grid[0])
                    mse_a = mean_squared_error(f_true, nw_predict(X, y, X, h_a, kernel))
                    results['AnalyticNewton'].append((mse_a, e_a))
                    # Golden-section
                    h_o, e_o = golden_section_cv(scorer, a=h_grid[0], b=h_grid[-1])
                    mse_o = mean_squared_error(f_true, nw_predict(X, y, X, h_o, kernel))
                    results['Golden'].append((mse_o, e_o))
                    # Bayesian Optimization
                    h_b, e_b = bayes_opt_cv(scorer, a=h_grid[0], b=h_grid[-1], init_points=5, n_iter=10)
                    mse_b = mean_squared_error(f_true, nw_predict(X, y, X, h_b, kernel))
                    results['Bayes'].append((mse_b, e_b))
                # summary
                print("Method    MSE(mean±sd)      Evals(mean±sd)")
                for m in methods:
                    mses = [v[0] for v in results[m]]
                    evs  = [v[1] for v in results[m]]
                    print(f"{m:<15}{np.mean(mses):.4f}±{np.std(mses):.4f}    {np.mean(evs):.1f}±{np.std(evs):.1f}")



-- noise=0.1, n=200, kernel=gaussian --
Method    MSE(mean±sd)      Evals(mean±sd)
Grid           0.0009±0.0003    250.0±0.0
Plug-in        0.0301±0.0017    5.0±0.0
Newton         0.0014±0.0003    35.0±0.0
AnalyticNewton 0.0009±0.0003    0.0±0.0
Golden         0.0009±0.0003    75.0±0.0
Bayes          0.0009±0.0003    75.0±0.0

-- noise=0.1, n=200, kernel=epanechnikov --
Method    MSE(mean±sd)      Evals(mean±sd)
Grid           0.0009±0.0004    250.0±0.0
Plug-in        0.0041±0.0008    5.0±0.0
Newton         0.0041±0.0008    20.0±0.0
AnalyticNewton 0.0011±0.0004    0.0±0.0
Golden         0.0009±0.0004    75.0±0.0
Bayes          0.0009±0.0004    75.0±0.0

-- noise=0.1, n=500, kernel=gaussian --
Method    MSE(mean±sd)      Evals(mean±sd)
Grid           0.0005±0.0001    250.0±0.0
Plug-in        0.0185±0.0007    5.0±0.0
Newton         0.0005±0.0001    47.0±6.0
AnalyticNewton 0.0005±0.0001    0.0±0.0
Golden         0.0005±0.0001    75.0±0.0
Bayes          0.0005±0.0001    75.0±0.0

-- noise