# Cross-entropy kernel fit

In [None]:
import sys
from pathlib import Path
sys.path.insert(0, str(Path('..') / 'src'))
from kl_decomposition import rectangle_rule
from kl_decomposition.kernel_fit import fit_exp_sum_ce
import numpy as np

In [None]:
import numpy as np
from numba import njit, prange


@njit(parallel=True, fastmath=True)
def objective_numba(params: np.ndarray,
                    d: np.ndarray,
                    target: np.ndarray,
                    w: np.ndarray) -> np.ndarray:
    """
    Vectorised objective for K parameter sets.

    Parameters
    ----------
    params  : (K, 2*n_terms) log-space parameters
    d       : (N,)           data values (distances); will use d**2
    target  : (N,)           reference curve
    w       : (N,)           weights

    Returns
    -------
    obj_val : (K,)           objective for each parameter row
    """
    K, P = params.shape
    n_terms = P // 2
    N = d.size
    d2 = d * d                      # pre-compute d²
    out = np.empty(K, dtype=params.dtype)

    for k in prange(K):                  # loop over parameter sets in parallel
        # split, exponentiate (back to linear scale)
        a = np.exp(params[k, :n_terms])
        b = np.exp(params[k, n_terms:])

        acc = 0.0                        # accumulate Σ w*(pred-target)²
        for j in range(N):               # loop over data points
            dj2 = d2[j]
            predj = 0.0
            for i in range(n_terms):     # Σ_i a_i * exp(-b_i * d²)
                predj += a[i] * np.exp(-b[i] * dj2)
            diff = predj - target[j]
            acc += w[j] * diff * diff

        out[k] = acc

    return out

In [None]:
import time
import numpy as np
from typing import Callable, Dict, Tuple


def cross_entropy_numba(
    *,
    d: np.ndarray,
    target: np.ndarray,
    w: np.ndarray,
    n_terms: int,
    max_iter: int = 50,
    pop_size: int = 50,
    elite_frac: float = 0.1,
    mean: np.ndarray | None = None,
    cov: np.ndarray | None = None,
    rng: np.random.Generator | None = None,
) -> Tuple[np.ndarray, Dict[str, float | list[float] | np.ndarray]]:
    """
    Cross-entropy minimiser that evaluates the whole population
    via the parallel `objective_numba`.

    Returns
    -------
    best_params : (2*n_terms,)  vector giving the best solution found
    stats       : dict with keys ('iterations', 'best_score', 'eval_count',
                                   'history', 'runtime')
    """
    rng = rng or np.random.default_rng()
    dim = 2 * n_terms
    mean = np.zeros(dim) if mean is None else np.asarray(mean, dtype=float)
    cov = np.eye(dim)*9 if cov is None else np.asarray(cov, dtype=float)

    history: list[float] = []
    eval_count = 0
    best_params = mean.copy()
    best_score = float("inf")
    elite_num = max(1, int(pop_size * elite_frac))

    # --- warm-up call to compile objective_numba just once ---
    _ = objective_numba(np.zeros((1, dim)), d, target, w)

    start = time.time()
    for _ in range(max_iter):
        samples = rng.multivariate_normal(mean, cov, size=pop_size)      # (pop, dim)
        # --- keep (a_i , b_i) pairs sorted by ascending b_i -----------------
        a = samples[:, :n_terms]                 # first  half (log-a)
        b = samples[:, n_terms:]                 # second half (log-b)

        order = np.argsort(b, axis=1)            # sort indices for every row
        rows  = np.arange(samples.shape[0])[:, None]

        samples[:, :n_terms]  = a[rows, order]   # reorder a’s with same indices
        samples[:, n_terms:]  = b[rows, order]   # reorder b’s (already sorted)
        # --------------------------------------------------------------------

        scores = objective_numba(samples, d, target, w)                 # (pop,)
        eval_count += pop_size

        idx = np.argsort(scores)
        elite = samples[idx[:elite_num]]
        mean_new = elite.mean(axis=0)
        cov_new = np.cov(elite, rowvar=False)
        mean = 0.75 * mean + 0.25 * mean_new
        cov = 0.75 * cov + 0.25 * cov_new  # update covariance with some inertia

        best_iter_score = float(scores[idx[0]])
        history.append(best_iter_score)
        if best_iter_score < best_score:
            best_score = best_iter_score
            best_params = samples[idx[0]]
        if np.linalg.norm(cov_new) < 1e-6:
            # covariance matrix is too small, stop early
            print("Covariance matrix is too small, stopping early.")
            break

    runtime = time.time() - start
    stats = {"iterations": len(history),
             "best_score": best_score,
             "eval_count": eval_count,
             "history": history,
             "runtime": runtime,
             "final_cov": cov,
             "final_mean": mean}
    return best_params, stats

In [None]:
# --- synthetic data ---------------------------------------------------
n_terms = 4                 # number of (a_i, b_i) pairs
N       = 500               # data points

d, w = rectangle_rule(0.0, 2.0, N)
target = np.exp(-d)                  # any reference curve you like

means = np.zeros(2 * n_terms)  # initial mean for (a_i, b_i)
cov = np.eye(2 * n_terms) * 9  # initial covariance matrix
# --- optimiser call ---------------------------------------------------
best, stats = cross_entropy_numba(
    d=d, target=target, w=w,
    n_terms=n_terms,
    max_iter=1000,
    pop_size=10000,
    elite_frac=0.01,
    mean=means,
    cov=cov
)

print("Best score :", stats["best_score"])
print("Best params:", best)
print("Iterations :", stats["iterations"])
print("Runtime    :", stats["runtime"], "s")

In [None]:

# --- synthetic data ---------------------------------------------------

N = 500               # data points

d, w = rectangle_rule(0.0, 2.0, N)
target = np.exp(-d)                  # any reference curve you like
best_init = np.array([0,0])

for n_terms in range(1,10):                 # number of (a_i, b_i) pairs
    means = best_init  # initial mean for (a_i, b_i)
    cov = np.eye(2 * n_terms) * 4  # initial covariance matrix
    # --- optimiser call ---------------------------------------------------
    best, stats = cross_entropy_numba(
        d=d, target=target, w=w,
        n_terms=n_terms,
        max_iter=10000,
        pop_size=10000,
        elite_frac=0.1,
        mean=means,
        cov=cov
    )

    print("Best score :", stats["best_score"])
    print("Best params:", best)
    print("Iterations :", stats["iterations"])
    print("Runtime    :", stats["runtime"], "s")

    first_half = best[:n_terms]
    second_half = best[n_terms:]

    # Original equidistant indices (0 to n_terms-1), new indices from 0 to n_terms
    x_old = np.linspace(0, 1, n_terms)
    x_new = np.linspace(0, 1, n_terms + 1)

    interp_first = np.interp(x_new, x_old, first_half)
    interp_second = np.interp(x_new, x_old, second_half)

    best_init = np.hstack([interp_first, interp_second])

In [None]:
best_init

In [None]:
import matplotlib.pyplot as plt
plt.plot(stats["history"])
plt.xlabel("Iteration")
plt.ylabel("Best score")
plt.title("Convergence of the Cross-Entropy Method")
plt.grid()
plt.yscale('log')
plt.show()

In [None]:
plt.plot(best[n_terms:], 'o-')
plt.xlabel("Index")
plt.ylabel("b_i (log scale)")
plt.title("Fitted b_i parameters")
plt.grid()
plt.show()

In [None]:
from scipy.optimize import lsq_linear

# compute using least squares the coefficients for approximation with squared exponential kernels
# Construct the design matrix for squared exponential kernels
A = np.zeros((len(d), n_terms))
b_params = np.exp(best[n_terms:])

for i in range(n_terms):
    A[:, i] = np.exp(-b_params[i] * d**2)

# Solve the weighted least squares problem: minimize ||W(Aa - target)||^2
W = np.sqrt(w)
Aw = A * W[:, None]
tw = target * W

res = lsq_linear(Aw, tw, bounds=(0, np.inf))
squared_exponential_coeffs = res.x
print("Squared exponential coefficients:", np.log(squared_exponential_coeffs))

# compute fit with this coefficients
params = np.hstack([np.log(squared_exponential_coeffs), np.log(b_params)])

res = objective_numba(np.array([params]), d, target, w)
print("Objective value with least squares coefficients:", res[0])



In [None]:
x, w = rectangle_rule(0.0, 2.0, 500)
f = lambda t: np.exp(-t)
a, b, info = fit_exp_sum_ce(5, x, w, f, iterations=2000, pop_size=1000)
print('a:', a)
print('b:', b)
print('best_score:', info.best_score)