In [41]:
import matplotlib.pyplot as plt
import numpy as np
import jax
import jax.numpy as jnp
from jax import random as random
from functools import partial

from mm_sbi_review.examples.misspec_ma1 import assumed_dgp, calculate_summary_statistics, true_dgp

In [42]:
import numpy as np
import torch
import matplotlib.pyplot as plt
from sbi.inference import SNPE
from sbi.utils import BoxUniform
from scipy.stats import gaussian_kde

# Ensure reproducibility
torch.manual_seed(0)
np.random.seed(0)

# Define the prior over θ in [-1, 1]
prior = BoxUniform(low=torch.tensor([-1.0]), high=torch.tensor([1.0]))


In [43]:
# Define the assumed data-generating process (MA(1) model)
def assumed_dgp(theta, n_obs=100):
    """
    Simulate an MA(1) process.

    Args:
        theta (torch.Tensor): Shape [batch_size, 1]
        n_obs (int): Number of observations.

    Returns:
        torch.Tensor: Simulated data, shape [batch_size, n_obs]
    """
    batch_size = theta.shape[0]
    w = torch.randn(batch_size, n_obs + 2)
    x = w[:, 2:] + theta * w[:, 1:-1]
    return x

# Define the autocovariance function
def autocov(x, lag=1):
    """
    Compute the autocovariance at a given lag.

    Args:
        x (torch.Tensor): Shape [batch_size, n_obs]
        lag (int): The lag at which to compute the autocovariance.

    Returns:
        torch.Tensor: Shape [batch_size]
    """
    if lag == 0:
        C = torch.mean(x ** 2, dim=1)
    else:
        C = torch.mean(x[:, lag:] * x[:, :-lag], dim=1)
    return C

# Define the function to calculate summary statistics
def calculate_summary_statistics(x):
    """
    Calculate summary statistics for misspecified MA(1) example.

    Args:
        x (torch.Tensor): Shape [batch_size, n_obs]

    Returns:
        torch.Tensor: Shape [batch_size, 2]
    """
    s0 = autocov(x, lag=0)
    s1 = autocov(x, lag=1)
    return torch.stack([s0, s1], dim=1)


# Define the true data-generating process (stochastic volatility model)
def true_dgp(w=-0.736, rho=0.9, sigma_v=0.36, batch_size=1, n_obs=100):
    """
    Sample from a stochastic volatility model.

    Args:
        w (float): Model parameter.
        rho (float): AR coefficient.
        sigma_v (float): Standard deviation of the volatility process.
        batch_size (int): Number of sequences to generate.
        n_obs (int): Number of observations in each sequence.

    Returns:
        torch.Tensor: Generated samples, shape [batch_size, n_obs]
    """
    w_vec = torch.full((batch_size,), w)
    rho_vec = torch.full((batch_size,), rho)
    sigma_v_vec = torch.full((batch_size,), sigma_v)

    h_mat = torch.zeros(batch_size, n_obs)
    y_mat = torch.zeros(batch_size, n_obs)

    # Initial value
    h_mat[:, 0] = w_vec + torch.randn(batch_size) * sigma_v_vec
    y_mat[:, 0] = torch.exp(h_mat[:, 0] / 2) * torch.randn(batch_size)

    for i in range(1, n_obs):
        h_mat[:, i] = w_vec + rho_vec * h_mat[:, i - 1] + torch.randn(batch_size) * sigma_v_vec
        y_mat[:, i] = torch.exp(h_mat[:, i] / 2) * torch.randn(batch_size)

    return y_mat


In [44]:
n_obs = 100
# Generate observed data
x_o = true_dgp(n_obs=n_obs).squeeze(0)  # Shape [n_obs]
x_o_ss = calculate_summary_statistics(x_o.unsqueeze(0))  # Shape [1, 2]
# Number of simulations
num_simulations = 10_000

# Sample parameters θ from the prior
theta = prior.sample((num_simulations,))  # Shape [num_simulations, 1]

# Simulate data and calculate summary statistics
x_sim = assumed_dgp(theta, n_obs=n_obs)  # Shape [num_simulations, n_obs]
x_ss = calculate_summary_statistics(x_sim)  # Shape [num_simulations, 2]


In [45]:
# # Define the simulator function for sbi
# def simulator(theta, n_obs=100):
#     """
#     Simulator function for sbi.

#     Args:
#         theta (torch.Tensor): Shape [batch_size, 1]

#     Returns:
#         torch.Tensor: Summary statistics, shape [batch_size, 2]
#     """
#     x_sim = assumed_dgp(theta, n_obs=n_obs)
#     x_ss = calculate_summary_statistics(x_sim)
#     return x_ss


In [46]:
# Initialize the NPE inference method
inference = SNPE(prior=prior)

# Train the neural network
density_estimator = inference.append_simulations(theta, x_ss).train()


  x_numel = get_numel(


 Neural network successfully converged after 55 epochs.

In [47]:
# Build the posterior distribution
posterior = inference.build_posterior(density_estimator)

# Sample from the posterior given the observed data summary statistics
num_posterior_samples = 10_000
samples = posterior.sample((num_posterior_samples,), x=x_o_ss)


Drawing 10000 posterior samples:   0%|          | 0/10000 [00:00<?, ?it/s]

In [48]:
# # Convert samples to NumPy array for plotting
# samples_np = samples.numpy().flatten()
# print('a')
# # Plot the posterior samples
# plt.figure(figsize=(8, 6))
# plt.hist(samples_np, bins=50, density=True, alpha=0.6, label='NPE Posterior Samples')
# print('b')

# # Plot the prior distribution
# theta_range = np.linspace(-1, 1, 200)
# prior_density = np.ones_like(theta_range) * 0.5  # Uniform density
# plt.plot(theta_range, prior_density, label='Prior', color='black', linestyle='--')
# print('c')

# # Plot the pseudo-true parameter value
# # Since the model is misspecified, we may consider a pseudo-true value
# theta_pseudo_true = 0.0  # Adjust based on your context
# plt.axvline(theta_pseudo_true, color='red', linestyle='--', label='Pseudo-true θ')

# plt.xlabel(r'$\theta$')
# plt.ylabel('Density')
# plt.title('Posterior Distribution of $\theta$ using NPE')
# plt.legend()
# plt.tight_layout()
# plt.savefig("figs/npe_posterior.pdf")
# plt.clf()


In [49]:
# # Plot the joint distribution of (s0, s1)

# x_pp_ss = np.empty((1000, 2))

# for i in range(1000):
#     theta = samples[i, :]
#     x_sim = assumed_dgp(torch.atleast_2d(theta), n_obs=n_obs)
#     x_ss = calculate_summary_statistics(x_sim)
#     x_pp_ss[i, :] = x_ss

# plt.figure(figsize=(8, 6))
# plt.scatter(x_pp_ss[:, 1], x_pp_ss[:, 0], alpha=0.5, label='Posterior Predictive')
# plt.scatter(x_o_ss[0, 1].item(), x_o_ss[0, 0].item(), color='red', marker='x', s=100, label='Observed Data')
# plt.xlabel('$s_0$ (Autocovariance at lag 0)')
# plt.ylabel('$s_1$ (Autocovariance at lag 1)')
# plt.title('Posterior Predictive Summary Statistics')
# plt.legend()
# plt.tight_layout()
# plt.savefig("figs/posterior_predictive_joint_npe.pdf")
# plt.clf()


In [50]:
import numpy as np
import matplotlib.pyplot as plt

# Update font size for consistency
plt.rcParams.update({'font.size': 24})

# Assume that `samples` contains the posterior samples from NPE
# and that `assumed_dgp` and `calculate_summary_statistics` are defined
# Also, `x_o_ss` contains the observed summary statistics

# Convert samples to NumPy array for plotting
samples_np = samples.numpy().flatten()

# Plot the histogram of posterior samples
plt.figure(figsize=(8, 6))
plt.hist(samples_np, bins=50, density=True, alpha=0.6, color='blue', label='Posterior Samples')

# Plot the prior distribution for comparison
theta_range = np.linspace(-1, 1, 200)

# Plot the pseudo-true parameter value
theta_pseudo_true = 0.0  # Adjust based on your context
plt.axvline(theta_pseudo_true, color='black', linestyle='--', label='Pseudo-true θ')

# Add labels and legend
plt.xlabel(r'$\theta$')
plt.ylabel('Density')
plt.xlim([-1, 1])
plt.xticks([-1, 0, 1])
# plt.title('Posterior Distribution of $\theta$ using NPE')
plt.legend(fontsize=16)
plt.tight_layout()
plt.savefig("figs/fig3a_npe.pdf")
plt.clf()

# Posterior predictive simulations
num_posterior_samples = len(samples_np)
num_pp_samples = 1_000  # Number of posterior predictive samples
thinning_interval = num_posterior_samples // num_pp_samples

# Initialize array to store summary statistics
x_pp_ss = np.empty((num_pp_samples, 2))

# Perform posterior predictive simulations
for i in range(num_pp_samples):
    idx = i * thinning_interval
    theta = samples_np[idx]
    x_sim = assumed_dgp(torch.tensor([[theta]]), n_obs=100)  # Adjust n_obs as needed
    x_ss = calculate_summary_statistics(x_sim)
    x_pp_ss[i, :] = x_ss.numpy()

def b_theta(t):
    return np.array([1 + t ** 2, t])

# Generate values for b_theta
t_vals = np.linspace(-1, 1, 101)
b_theta_vals = np.array([b_theta(t) for t in t_vals])


# Plot the joint distribution of summary statistics from posterior predictive simulations
plt.figure(figsize=(8, 6))
plt.scatter(x_pp_ss[:, 1], x_pp_ss[:, 0], c='blue', alpha=0.5)
plt.plot(b_theta_vals[:, 1], b_theta_vals[:, 0], color='orange', label=r'$b(\theta)$', linewidth=6)
plt.xlim(-2.0, 2.0)
plt.ylim(-0.5, 2.5)
plt.xticks([-2, 0, 2])
plt.yticks([-0.5, 1, 2.5])

# Plot the observed summary statistics
plt.scatter(x_o_ss[0, 1].item(), x_o_ss[0, 0].item(), color='black', marker='x', s=100, label='S(y)')

# Include the function b_theta for reference, if applicable
# plt.plot(b_theta_vals[:, 1], b_theta_vals[:, 0], color='orange', label=r'$b(\theta)$', linewidth=4)

# Set axis limits and labels
plt.xlabel(r'$\zeta_1$')
plt.ylabel(r'$\zeta_2$')
# plt.title('Posterior Predictive Summary Statistics')
plt.legend()
plt.tight_layout()
plt.savefig("figs/fig3b_npe.pdf")
plt.clf()


<Figure size 800x600 with 0 Axes>

<Figure size 800x600 with 0 Axes>

In [51]:
from sbi.inference import SNLE

inference = SNLE(prior=prior)
# Number of rounds and simulations
num_rounds = 1  # Adjust as needed
num_sims = 10_000  # Number of simulations per round

# Set initial proposal
proposal = prior

for round_ in range(num_rounds):
    print(f"Round {round_ + 1}/{num_rounds}")
    # Sample θ from the proposal
    theta = proposal.sample((num_sims,))
    
    # Simulate data and calculate summary statistics
    x_sim = assumed_dgp(theta, n_obs=n_obs)
    x_ss = calculate_summary_statistics(x_sim)
    
    # Append simulations and train the likelihood estimator
    density_estimator = inference.append_simulations(theta, x_ss).train()
    
    # Build the posterior (combines estimated likelihood with prior)
    posterior = inference.build_posterior(mcmc_method="slice_np_vectorized",
                                          mcmc_parameters={"num_chains": 20, "thin": 5})
    
    # Update the proposal to focus on high-probability regions
    proposal = posterior.set_default_x(x_o_ss)


Round 1/1
 Neural network successfully converged after 57 epochs.

In [52]:
# Number of posterior samples
num_posterior_samples = 10_000

# Sample from the posterior given the observed data
samples = posterior.sample((num_posterior_samples,),
                           x=x_o_ss,
                           mcmc_method="slice_np_vectorized",
                           mcmc_parameters={"num_chains": 20, "thin": 5})


  samples = posterior.sample((num_posterior_samples,),
  samples = posterior.sample((num_posterior_samples,),


Running vectorized MCMC with 20 chains:   0%|          | 0/71000 [00:00<?, ?it/s]

In [53]:
# # Convert samples to NumPy array for plotting
# samples_np = samples.numpy().flatten()

# # Plot the posterior samples
# plt.figure(figsize=(8, 6))
# plt.hist(samples_np, bins=50, density=True, alpha=0.6, label='NLE Posterior Samples')

# # Plot the prior distribution
# theta_range = np.linspace(-1, 1, 200)
# prior_density = np.ones_like(theta_range) * 0.5  # Uniform density
# plt.plot(theta_range, prior_density, label='Prior', color='black', linestyle='--')

# # Plot the pseudo-true parameter value
# # Since the model is misspecified, we may consider a pseudo-true value
# theta_pseudo_true = 0.0  # Adjust based on your context
# plt.axvline(theta_pseudo_true, color='red', linestyle='--', label='Pseudo-true θ')

# plt.xlabel(r'$\theta$')
# plt.ylabel('Density')
# plt.title('Posterior Distribution of $\theta$ using NLE')
# plt.legend()
# plt.tight_layout()
# plt.savefig("figs/nle_posterior.pdf")
# plt.clf()


In [54]:
samples.shape

torch.Size([10000, 1])

In [55]:
# # Plot the joint distribution of (s0, s1)

# x_pp_ss = np.empty((1000, 2))

# for i in range(1000):
#     theta = samples[i, :]
#     x_sim = assumed_dgp(torch.atleast_2d(theta),n_obs=n_obs)
#     x_ss = calculate_summary_statistics(x_sim)
#     x_pp_ss[i, :] = x_ss

# plt.figure(figsize=(8, 6))
# plt.scatter(x_pp_ss[:, 1], x_pp_ss[:, 0], alpha=0.5, label='Posterior Predictive')
# plt.scatter(x_o_ss[0, 1].item(), x_o_ss[0, 0].item(), color='red', marker='x', s=100, label='Observed Data')
# plt.xlabel('$s_0$ (Autocovariance at lag 0)')
# plt.ylabel('$s_1$ (Autocovariance at lag 1)')
# plt.title('Posterior Predictive Summary Statistics')
# plt.legend()
# plt.tight_layout()
# plt.savefig("figs/posterior_predictive_joint_nle.pdf")
# plt.clf()


In [59]:
import numpy as np
import matplotlib.pyplot as plt

# Update font size for consistency
plt.rcParams.update({'font.size': 24})

# Assume that `samples` contains the posterior samples from NLE
# and that `assumed_dgp` and `calculate_summary_statistics` are defined
# Also, `x_o_ss` contains the observed summary statistics

# Convert samples to NumPy array for plotting
samples_np = samples.numpy().flatten()

# Plot the histogram of posterior samples
plt.figure(figsize=(8, 6))
plt.hist(samples_np, bins=50, density=True, alpha=0.6, color='blue', label='Posterior Samples')

# Plot the prior distribution for comparison
theta_range = np.linspace(-1, 1, 200)



# Plot the pseudo-true parameter value
theta_pseudo_true = 0.0  # Adjust based on your context
plt.axvline(theta_pseudo_true, color='black', linestyle='--', label='Pseudo-true θ')

# Add labels and legend
plt.xlabel(r'$\theta$')
plt.ylabel('Density')
plt.xlim([-1, 1])
plt.xticks([-1, 0, 1])
# plt.title('Posterior Distribution of $\theta$ using NLE')
plt.legend()
plt.tight_layout()
plt.savefig("figs/fig3a_nle.pdf")
plt.clf()

# Posterior predictive simulations
num_posterior_samples = len(samples_np)
num_pp_samples = 1_000  # Number of posterior predictive samples
thinning_interval = num_posterior_samples // num_pp_samples

# Initialize array to store summary statistics
x_pp_ss = np.empty((num_pp_samples, 2))

# Perform posterior predictive simulations
for i in range(num_pp_samples):
    idx = i * thinning_interval
    theta = samples_np[idx]
    x_sim = assumed_dgp(torch.tensor([[theta]]), n_obs=100)  # Adjust n_obs as needed
    x_ss = calculate_summary_statistics(x_sim)
    x_pp_ss[i, :] = x_ss.numpy()

# Plot the joint distribution of summary statistics from posterior predictive simulations
plt.figure(figsize=(8, 6))
plt.scatter(x_pp_ss[:, 1], x_pp_ss[:, 0], c='blue', alpha=0.5)
def b_theta(t):
    return np.array([1 + t ** 2, t])

# Generate values for b_theta
t_vals = np.linspace(-1, 1, 101)
b_theta_vals = np.array([b_theta(t) for t in t_vals])

# prior_density = np.ones_like(theta_range) * 0.5  # Uniform density over [-1, 1]
plt.plot(b_theta_vals[:, 1], b_theta_vals[:, 0], color='orange', label=r'$b(\theta)$', linewidth=6)


# Plot the observed summary statistics
plt.scatter(x_o_ss[0, 1].item(), x_o_ss[0, 0].item(), color='black', marker='x', s=100, label='S(y)')

# Include the function b_theta for reference, if applicable
# plt.plot(b_theta_vals[:, 1], b_theta_vals[:, 0], color='orange', label=r'$b(\theta)$', linewidth=4)

# Set axis labels
plt.xlabel(r'$\zeta_1$')
plt.ylabel(r'$\zeta_2$')
plt.xlim(-2.0, 2.0)
plt.ylim(-0.5, 2.5)
plt.xticks([-2, 0, 2])
plt.yticks([-0.5, 1, 2.5])

# plt.title('Posterior Predictive Summary Statistics')
plt.legend()
plt.tight_layout()
plt.savefig("figs/fig3b_nle.pdf")
plt.clf()

<Figure size 800x600 with 0 Axes>

<Figure size 800x600 with 0 Axes>