In [None]:
import numpy as np
import matplotlib.pyplot as plt

rng = np.random.default_rng(42)

def paraboloid_model(theta, xi=0.0, A=1.0, B=0.5, C=1.5):
    """Vectorized paraboloid, mild noise; supports scalar or vector xi."""
    theta = np.atleast_2d(theta).astype(float)
    x1, x2 = theta[:, 0], theta[:, 1]
    xi = np.asarray(xi, float)
    if xi.ndim == 0:
        xi = np.full(theta.shape[0], xi)
    elif xi.ndim == 2:
        xi = xi.ravel()
    y = A * x1**2 + B * x1 * x2 * (1.0 + xi) + C * (x2 + xi) ** 2
    y = y + 0.2 * np.random.randn(theta.shape[0])  # small noise
    return y.reshape(-1, 1) if theta.shape[0] > 1 else np.array([y.item()])

def theta_sampler(n, lb=-15, ub=15):
    return np.random.uniform(lb, ub, size=(n, 2))

def scatter_post(ax, theta, truth=None, title="", alpha=0.30, s=6, label="Posterior"):
    ax.scatter(theta[:,0], theta[:,1], s=s, alpha=alpha, label=label)
    if truth is not None:
        ax.scatter(truth[:,0], truth[:,1], c="r", marker="x", s=60, label="θ true cloud")
    ax.set_title(title); ax.set_xlabel("θ1"); ax.set_ylabel("θ2"); ax.grid(True); ax.legend()



### Adaptive DB expansion for KNN-based calibration


A semi Bayesian procedure
1. generate $D^{sim} =\{x_i,y_i\}_{i=1}^{n_{s}}$ of simulated input-out pairs from a Sim model $M$ sampled according to $f_x(X)$ (prior)
2. gather $D^{emp} = \{y^{emp}_j\}_{k=1}^{n_{e}}$ of empirical samples from a real system
3. for each empirical vector $ y^{emp} \in D_{emp}$ find a set $\mathcal{K}(y^{emp})$ with the $k$ input generating the closest output responses $ \mathcal{K}(y) \subset D_{sim}$, such that, e.g., $\sum\limits_{(x,y)\in \mathcal{K}(y^{emp})}||y^{emp}_k - y||_2^2$ is minimized
4. combine data in a data set of input $ D_{cal} = \bigcup \limits_{y\in D_{emp}} \{x \in \mathcal{K}(y)\}$
5. use it to fit a non-parametric posterior $f_x(X|D^{emp}) \propto f_x(X|D_{cal}) f_x(X)$


Now extend it so that the D^{sim} is augmented in case the variance of $\mathcal{K}(y^{emp})$ is too large (or some other approach to reduce sparsity in x)
1. generate $D^{sim} =\{x_i,y_i\}_{i=1}^{n_{s}}$ of simulated input-out pairs from a Sim model $M$ sampled according to $f_x(X)$ (prior)
2. gather $D^{emp} = \{y^{emp}_j\}_{k=1}^{n_{e}}$ of empirical samples from a real system
3. for each empirical vector $y^{emp} \in D_{emp}$ find a set $\mathcal{K}(y^{emp})$ with the $k$ input generating the closest output responses $ \mathcal{K}(y) \subset D_{sim}$, such that, e.g., $\sum\limits_{(x,y)\in \mathcal{K}(y^{emp})}||y^{emp} - y||_2^2$ is minimized
4. if the set $\mathcal{X} =\{x \in \mathcal{K}(y^{emp}) \} $ has a large variance (in comparison to the other  $y^{emp} \in D_{emp}$ ). Then resample arround $x$ from $f_x(X)$....assign weights accordingly? then repeat 3? refine the K set

In [None]:

from src.backward import AdaptiveKNNCalibrator
from src.dgm import data_generation_mechanism

# Example simple simulator (replace with your expensive model)
def simulator(X, xi=0.0):
    return paraboloid_model(theta=X, xi=xi)

# Prior sampler
def sample_prior(n):
    return theta_sampler(n=n)



.Generate empirical evidence (data from an unknown data gen process)

* Case 1 - 1 sample (y), 1 target (θ point-valued), 1 experiment (ξ)
* Case 2 - 100 samples (y) from 100 samples from the targets (θ distribution), and 1 experiment (ξ)
* Case 3 - 100 samples (y) from 100 samples from the target (θ distribution), and for 4 experiments (ξ)

In [None]:

observations_c1 , theta_true_c1= data_generation_mechanism(case = 1)
observations_c2 , theta_true_c2= data_generation_mechanism(case = 2)
observations_c3 , theta_true_c3= data_generation_mechanism(case = 3)

In [None]:
y_emp, xi = observations_c1[0]
cal = AdaptiveKNNCalibrator(simulator=lambda x: simulator(x, xi),
                            sample_prior=sample_prior)
D_cal1 = cal.run(Y_emp=y_emp, n_s=2500)


y_emp, xi = observations_c2[0]
cal = AdaptiveKNNCalibrator(simulator=lambda x: simulator(x, xi),
                            sample_prior=sample_prior)
D_cal2 = cal.run(Y_emp=y_emp, n_s=2500)

posterior_list = []
for (y_emp, xi) in observations_c3:
    cal = AdaptiveKNNCalibrator(simulator=lambda x: simulator(x, xi),
                                sample_prior=sample_prior)
    D_cal2 = cal.run(Y_emp=y_emp, n_s=2500)
    posterior_list.append(D_cal2)

# Combine posteriors across experiments (independent designs)
#posterior = combine_posteriors(posterior_list)

In [None]:
alpha=.3
plt.scatter(D_cal1[:,0], D_cal1[:,1], alpha=alpha)
plt.scatter(theta_true_c1[:,0],theta_true_c1[:,1], alpha=0.9,  c='r')
plt.show()

plt.scatter(D_cal2[:,0], D_cal2[:,1], alpha=alpha)
plt.scatter(theta_true_c2[:,0],theta_true_c2[:,1], alpha=0.9, c='r')
plt.show()

for Dcal in posterior_list:
    plt.scatter(Dcal[:,0], Dcal[:,1], alpha=alpha, c='b')
    plt.scatter(theta_true_c3[:,0],theta_true_c3[:,1], alpha=0.9,  c='r')
plt.show()

In [None]:

from src.plot import plot_kde_2d , weighted_mixture_kde

# --- Plot individual KDEs ---
plot_kde_2d(D_cal1, true_theta=theta_true_c1)
plot_kde_2d(D_cal2, true_theta=theta_true_c2)

# Example weights (uniform)
K = len(posterior_list)
weights = np.ones(K) / K

weighted_mixture_kde(posterior_list, weights, true_theta=theta_true_c3)