# Approximate Bayesian Computation As An Informed Fuzzy Inference System

Author: Chris Vaisnor

This notebook is a companion to the paper "Approximate Bayesian Computation As An Informed Fuzzy Inference System" by Chris Vaisnor as part of the M.S. in Articial Intelligence program at The Johns Hopkins University.

In [153]:
import torch
import tqdm
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.figure_factory as ff
import random

# GLOBALS
torch.manual_seed(0)

<torch._C.Generator at 0x7fa4c41a2450>

# Black Box Function

This function is used to represent a software/program that we are trying to fuzzily infer the parameters of. 

The both the prior and the synthetic posterior data are passed to the black box function and the output is the number of times the synthetic data is within the prior bounds.

In [154]:
def fuzz_test(particles):
    '''
    This function will test if a batch of particles satisfies the following criteria:

    Criteria:
    For each particle:
    - if the first dimension is within the range of -0.5 and 0.5, then the particle passes the test

    The count represents the number of particles that pass the fuzz test
    '''
    count = 0
    for particle in particles:
        # if particle[0] is within the range of -0.5 and 0.5, add 1 to the counter
        if particle[0] >= -0.5 and particle[0] <= 0.5:
            count += 1
    
    # return the percentage of particles that pass the fuzz test
    return count / len(particles) * 100 

# Prior Functions

In [155]:
def prior(N, D, mu=0, sigma=1.0, shuffle=True, modify=False, dimension_to_modify=None, percentage_to_modify=None) -> torch.Tensor:
    '''
    Each dimension will have a mean of mu and a standard deviation of sigma.
    The prior is assumed to be a Gaussian distribution.
    '''
    particles = torch.normal(mean=mu, std=sigma, size=(N, D))

    if shuffle:
        indices = torch.randperm(N)
        particles = particles[indices]

    if modify:
        particles = prior_modification(particles, dimension_to_modify, percentage_to_modify)
    
    return particles


def prior_modification(particles, dimension_to_modify, percentage_to_modify):
    '''
    Manual modification of the prior
    '''
    particles_modified = particles.clone()

    # base cases
    if percentage_to_modify is None:
        percentage_to_modify = 0.3 # modify 30% of the particles

    if dimension_to_modify is None:
        dimension_to_modify = 0 # select dimension 0

    modified_dimension_value = 0 # set the new dimension value to 0

    if particles_modified.shape[0] == 1:
        particles_modified[0, dimension_to_modify] = modified_dimension_value
        print(f"Modifying 1 particle out of 1 particle")
        return particles_modified

    # modify the particles
    num_particles_to_modify = int(particles.shape[0] * percentage_to_modify)
    print(f"Modifying {num_particles_to_modify} particles out of {particles.shape[0]} particles")
    particles_modified[:num_particles_to_modify, dimension_to_modify] = modified_dimension_value

    return particles_modified

### Testing prior functions

In [156]:
# generate N prior samples
prior_samples = prior(N=10, D=3, modify=False)
prior_samples_modified = prior(N=10, D=3, modify=True)

print("Prior samples:")
print(prior_samples[:3])
print('Shape:', prior_samples.shape)
print()
print("Prior samples (modified):")
print(prior_samples_modified[:3])
print('Shape:', prior_samples_modified.shape)
print()

# fuzz test the prior samples
print("Fuzz test (prior samples):")
print(fuzz_test(prior_samples))
print()
print("Fuzz test (prior samples modified):")
print(fuzz_test(prior_samples_modified))

Modifying 3 particles out of 10 particles
Prior samples:
tensor([[-1.2633,  0.3500,  0.3081],
        [-0.1023,  0.7924, -0.2897],
        [-0.4339,  0.8487,  0.6920]])
Shape: torch.Size([10, 3])

Prior samples (modified):
tensor([[ 0.0000,  1.1687,  0.3945],
        [ 0.0000,  0.5073, -0.5515],
        [ 0.0000,  0.3582, -0.0033]])
Shape: torch.Size([10, 3])

Fuzz test (prior samples):
80.0

Fuzz test (prior samples modified):
50.0


In [157]:
# distribution of non-modified (normal-gaussian) prior

# for loop to get the data from each dimension if more than 1 dimension
data = []
group_labels = []
for i in range(prior_samples.shape[1]):
    data.append(prior_samples[:, i].numpy())
    group_labels.append(f'Dimension {i}')

fig = ff.create_distplot(data, group_labels, bin_size=0.2)
fig.update_layout(title='Prior Distribution')
fig.show()

In [158]:
# plotly surface plot for the prior
fig = go.Figure(data=[go.Surface(z=prior_samples.numpy())])
# set z axis range
fig.update_layout(scene = dict(zaxis = dict(range=[-5,5])))
fig.update_layout(title='Prior Distribution (Surface Plot)')
fig.show()

In [159]:
# distribution of modified prior

# for loop to get the data from each dimension if more than 1 dimension
data = []
group_labels = []

for i in range(prior_samples_modified.shape[1]):
    data.append(prior_samples_modified[:, i].numpy())
    group_labels.append(f'Dimension {i}')

fig = ff.create_distplot(data, group_labels, bin_size=0.2)
fig.update_layout(title='Prior Distribution (Modified)')
fig.show()

In [160]:
# plotly surface plot for the prior (modified)
fig = go.Figure(data=[go.Surface(z=prior_samples_modified.numpy())])
# set z axis range
fig.update_layout(scene = dict(zaxis = dict(range=[-5,5])))
fig.update_layout(title='Prior Distribution (Modified, Surface Plot)')
fig.show()

# Sequential Monte Carlo Helper Functions

In [161]:
def transition(states):
    """
    The transition model.
    We will just add a small noise to each dimension of each state.
    """
    noise = torch.normal(mean=0, std=0.1, size=states.shape)
    return states + noise


def likelihood(state_seq, dimensions):
    """
    Returns a weighted likelihood considering zero in the first dimension and the Euclidean distance to a specific target point for a sequence of states.
    Returns a Torch tensor.
    """
    target = torch.tensor([0.0] * dimensions) # target point at the center
    square_diff = (state_seq - target) ** 2
    euclidean_distance = torch.sqrt(square_diff.sum(dim=-1))
    
    # Penalty for not being within -0.5 and 0.5 in the first dimension
    first_dimension_penalty = torch.where(state_seq[:, 0] >= -0.5, torch.where(state_seq[:, 0] <= 0.5, torch.zeros_like(state_seq[:, 0]), torch.abs(state_seq[:, 0] - 0.5)), torch.abs(state_seq[:, 0] + 0.5))

    # The alpha parameter controls how much more we care about the first dimension being zero than the Euclidean distance.
    alpha = 0.7
    return alpha*first_dimension_penalty + (1 - alpha)*euclidean_distance


def plot_convergence_smc(weight_history):
    """
    Plots the total weight over time to check for convergence of SMC
    """
    fig = go.Figure()
    
    # Line plot of weight history
    fig.add_trace(go.Scatter(x=list(range(len(weight_history))), y=weight_history,
                            mode='lines',
                            name='Total Weight'))

    # Layout
    fig.update_layout(title='Convergence of Sequential Monte Carlo',
                      xaxis_title='Iteration',
                      yaxis_title='Sum of Weights')

    fig.show()

# Sequential Monte Carlo Algorithm

In [162]:
def smc(prior, timesteps, num_samples, record_weights=False):
    """
    The SMC loop
    """

    dimensions = prior.shape[-1]
    weight_history = []

    for t in tqdm.tqdm(range(timesteps)):
        proposed_states = transition(states=prior)
        # we will apply the negative exponential to convert distances into weights,
        # which results in a higher weight to closer states because the close distance results in a large exponential value
        weights = torch.exp(-likelihood(proposed_states, dimensions))
        weights /= weights.sum() # normalize weights
        if record_weights:
            weight_history.append(weights.sum().item())
        indices = torch.multinomial(weights, num_samples=num_samples, replacement=True) 
        states = proposed_states[indices]

    return states, weight_history if record_weights else states

# Markov Chain Monte Carlo Helper Functions

In [163]:
def proposal(state, std=0.1):
    """
    Adds Gaussian noise to the current state to make it more likely to generate adjacent states
    """
    return torch.normal(mean=state, std=std)


def acceptance(curr, prop, dimensions):
    """
    Determines whether to accept or reject the proposed state
    """
    likelihood_curr = torch.exp(-likelihood(curr, dimensions))
    likelihood_prop = torch.exp(-likelihood(prop, dimensions))
    return min(1, (likelihood_prop / likelihood_curr).item())


def plot_convergence_mcmc(mcmc_samples, dimension=0):
    """
    Plots the trace and histogram of the MCMC sampler, which can help to
    diagnose whether the Markov chain has converged.
    """
    values = mcmc_samples[:, 0, dimension].numpy()

    fig = go.Figure()
    
    # Trace plot
    fig.add_trace(go.Scatter(x=list(range(len(values))), y=values,
                               mode='lines',
                               name='Trace'))

    # Layout
    fig.update_layout(title=f'Convergence for Dimension {dimension}',
                      xaxis_title='Iteration',
                      yaxis_title='Value',
                      bargap=0.2,
                      bargroupgap=0.1)
    fig.show()

# Markov Chain Monte Carlo Algorithm

In [164]:
def mcmc(init_state, iterations, dimensions, burn_in=0, lag=1):
    """
    The MCMC loop
    """
    state = init_state.clone()
    samples = []
    
    for _ in tqdm.tqdm(range(iterations)):
        proposed_state = proposal(state)
        if random.random() < acceptance(state, proposed_state, dimensions):
            state = proposed_state
        samples.append(state)

    return torch.stack(samples[burn_in::lag])

# SMC Model Parameters

In [165]:
# Params
DIMENSIONS = 100 # State dimension
N_prior = 10 # initial number of particles
MU = 0.0 # Mean of the prior
SIGMA = 10.0 # Variance of the prior

TIMESTEPS = 1000 # Number of timesteps
N_smc = 1000 # Number of particles for SMC to generate

dimension_to_modify=0
percentage_to_modify=0.3

# Generate prior
particles_prior_smc = prior(mu=MU, sigma=SIGMA, N=N_prior, D=DIMENSIONS, shuffle=True, modify=True, dimension_to_modify=dimension_to_modify, percentage_to_modify=percentage_to_modify)
print('Prior size (SMC):', particles_prior_smc.shape)

Modifying 3 particles out of 10 particles
Prior size (SMC): torch.Size([10, 100])


In [166]:
# Run SMC and record weights
particles_posterior_smc, weight_history = smc(prior=particles_prior_smc, timesteps=TIMESTEPS, num_samples=N_smc, record_weights=True)

# check size of posterior
print('Posterior size: (SMC)', particles_posterior_smc.shape)

100%|██████████| 1000/1000 [00:00<00:00, 10079.94it/s]

Posterior size: (SMC) torch.Size([1000, 100])





In [167]:
# fuzz test the prior and posterior
print("Fuzz test (prior):")
print(fuzz_test(particles_prior_smc))
print()
print("Fuzz test (posterior):")
print(fuzz_test(particles_posterior_smc))

Fuzz test (prior):
30.0

Fuzz test (posterior):
89.7


In [168]:
# Plot convergence SMC
plot_convergence_smc(weight_history)

In [169]:
# print one of the posterior particles
print("Posterior particle:")
print(particles_posterior_smc[10])

Posterior particle:
tensor([ 1.2779e+00,  1.3795e+01,  2.3919e+00,  2.8090e+00, -7.5565e-01,
         1.6204e+01, -1.2334e+00, -1.3949e+01,  1.9015e+01,  1.8478e+01,
         7.2020e+00,  3.1246e-01, -8.5704e+00,  5.3704e+00, -5.1797e+00,
        -1.3240e+01, -3.5537e+00,  1.3409e+01, -2.0553e+01,  5.1985e+00,
         5.8123e+00, -4.1630e+00,  4.5400e+00, -4.0123e+00,  4.4671e+00,
        -1.4327e+01, -8.9286e-01,  4.8381e+00,  7.0724e+00,  5.1205e-01,
        -4.2980e+00, -6.8372e+00, -3.4354e+00,  1.7759e+01, -1.0544e+01,
        -2.2939e+00, -1.2781e+01,  5.5652e+00,  1.6189e+00,  1.0277e+01,
        -1.2254e+00,  6.7192e+00, -1.8032e+01,  6.8495e+00,  1.1792e+01,
         1.3239e+01, -3.3268e+00,  1.1828e+01,  4.3132e+00,  9.8188e+00,
        -9.3791e+00, -2.1573e+01,  3.9526e+00, -4.1467e+00, -7.5831e+00,
         1.6588e+01,  3.5947e+00,  9.9527e+00,  4.9854e+00,  2.6107e+00,
         1.4936e+01,  3.3727e+00,  1.6338e+01,  6.1012e+00, -9.1671e+00,
         2.1064e+01, -3.1539e-0

# MCMC Model

In [170]:
# Generate single particle for MCMC

init_state = prior(mu=MU, sigma=SIGMA, N=1, D=DIMENSIONS, shuffle=True, modify=True, dimension_to_modify=dimension_to_modify, percentage_to_modify=percentage_to_modify)
print('Prior size (MCMC):', init_state.shape)

Modifying 1 particle out of 1 particle
Prior size (MCMC): torch.Size([1, 100])


In [171]:
burn_in = 500
lag = 5
TIMESTEPS_MCMC = 5000

# Run MCMC
particles_posterior_mcmc = mcmc(init_state=init_state, iterations=TIMESTEPS_MCMC, dimensions=DIMENSIONS, burn_in=burn_in, lag=lag)

# check size of posterior
print('Posterior size (MCMC):', particles_posterior_mcmc.shape)

100%|██████████| 5000/5000 [00:00<00:00, 10275.60it/s]

Posterior size (MCMC): torch.Size([900, 1, 100])





In [172]:
# modification of fuzz_test to work with MCMC tensor sample shape (TIMESTEPS, 1, DIMENSIONS)

def fuzz_test_mcmc(particles):
    '''
    This function will test if a batch of particles satisfies the following criteria:

    Criteria:
    For each particle:
    - if the first dimension is 0, add 1 to the counter

    The count represents the number of particles that pass the fuzz test
    '''
    count = 0
    for particle in particles:
        # if particle[0] is within the range of -0.5 and 0.5, add 1 to the counter
        if particle[0][0] >= -0.5 and particle[0][0] <= 0.5:
            count += 1
    
    # return the percentage of particles that pass the fuzz test
    return count / len(particles) * 100

In [173]:
# fuzz test the prior and posterior
print("Fuzz test (prior):")
print(fuzz_test(init_state))
print()
print("Fuzz test (posterior):")
print(fuzz_test_mcmc(particles_posterior_mcmc))

Fuzz test (prior):
100.0

Fuzz test (posterior):
23.0


In [174]:
# Plot Convergence MCMC
plot_convergence_mcmc(particles_posterior_mcmc)