# Imports and auxiliary functions

In [1]:
import numpy as np
rng = np.random.default_rng()
from scipy.stats import truncnorm
from dataclasses import dataclass

def print_results(mean, std, info):
    # Use 2 significant digits unless standard deviation is zero.
    digits = 10 if std==0 else int(np.ceil(-np.log10(abs(std))))+1
    print(f"   *{info} result: theta = "
          f"{myround(mean,digits)} ± {myround(std,digits)}")

def myround(x, Ndig):
    '''
    To do {:.nf}.format() for arbitrary n. It works like 'round', but
    perserves zeros when printing, i.e. format(1.000,2) = 1.00 while
    round(1.000,2) = 1.0.
    '''
    xf = f"{{:.{Ndig}f}}".format(x)
    return xf

def get_truncated_normal(mean, sd, low=0, upp=10):
    '''
      Because Scipy truncnorm's arguments are impractical.
    '''
    return truncnorm(
        (low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd)

# SMC base class

In [2]:
class SMCsampler():
    def __init__(self, Npart: int, model, thr = 0.5):

        self.Npart = Npart # Number of particles.
        self.model = model # Generative model.
        self.thr = thr # Resampling threshold (fraction of ESS).
        self.str = None # ID for the resampler (RWM or LW).

        # Init sampler to a flat prior on [0, scale].
        self.locs = np.random.uniform(size = Npart) # Particle locations.
        self.locs *= model.scale # Domain upper bound (lower is assumed 0).
        self.weights = np.ones(Npart)/Npart # Particle weights (here uniform).

        # Keep track of the sum of weights so it's easier to normalize.
        self.wsum = 1

    def offline_inference(self, data):
        '''
        Use SMC to sample from an incremental dataset containing 1 datum, then
        2 data, ..., then all data. Despite the inference being offline, we want
        full granularity to allow for resampling when needed.

        For the weight updates, only the last datum, which should be the latest
        at the current stage, is used. For resampling, the entire dataset may be
        used, and needs to if we are to fully preserve the distribution.
        '''
        for i in range(1, len(data)+1):
            self.update(data.partial_data(i))

        results = self.mean_and_std()
        print_results(*results, f"{self.str.ljust(3)}")

    def update(self, data):
        '''
        Update distribution based on the latest datum.

        It is assumed the data arrive sequentially, 'data' being the latest
        available dataset (chronologically ordered).
        '''
        # Get the latest datum for the weight updates.
        ctrl, outcome = data.ctrls[-1], data.outcomes[-1]

        # Get the likelihoods associated with each particle.
        likelihood = self.model.likelihood
        ls = likelihood(self.locs, ctrl, outcome)

        # Reweight.
        self.weights = self.weights*ls

        # Calculate effective sample size.
        self.wsum = np.sum(self.weights)
        wsum2 = np.sum(self.weights**2)
        ESS = self.wsum**2/wsum2

        # Resample if the weights are dominated by too small a fraction of
        # the particles (statistical power is critically low).
        ESS_thr = self.thr*self.Npart
        if ESS < ESS_thr:
            self.resample(data)

    def resample(self, data):
        '''
        Change particle locations to introduce variability. The mechanism may
        vary. There is a trade-off between statisticall correctness, processing
        cost, and space exploration.
        '''
        # Multinomial sampling with replacement from the current particle cloud.
        new_locs = rng.choice(self.locs, size=self.Npart,
                              p=self.weights/self.wsum)

        # Introduce variability by resampling.
        new_locs = self.resampler(new_locs, data)

        # After resampling the distribution is uniform.
        self.locs = new_locs
        self.weights = np.ones(self.Npart)/self.Npart

         # Update norm.
        self.wsum = 1

    def mean_and_std(self):
        mean = self.mean()
        var = np.sum((self.locs-mean)**2*self.weights)/self.wsum
        return mean, np.sqrt(var)

    def mean(self):
        mean = np.sum(self.locs*self.weights)/self.wsum
        return mean

# Liu-West

The resampling mechanism is the Liu-West filter (LW).

I use a truncated Gaussian to ensure the proposals don't fall out of bounds.

In [3]:
class LiuWestSampler(SMCsampler):

    def __init__(self, Npart: int, model, a = 0.98, **kwargs):
        super().__init__(Npart, model, **kwargs)
        self.a = a # Resampling factor.
        self.str = "LW"

    def resampler(self, old_locs, data):
        # The data are not used but we need them in the signature.
        currmean, currstd = self.mean_and_std()
        a = self.a
        means = a*old_locs+(1-a)*currmean
        h = (1-a**2)**0.5
        std = h*currstd

        Tnorm = get_truncated_normal(means, std, low=0, upp=self.model.scale)
        new_locs = Tnorm.rvs()
        return new_locs

# Metropolis (RWM)

The resampling mechanism is random walk Metropolis (RWM) with samples drawn from a Gaussian.

The variance of the Gaussian is a free parameter that strongly affects the performance. There's a trade-off involved in assuring proper space exploration: the proposals should be bold enough that they cover enough ground, but conservative enough that their acceptance probabilities are not too low. The effectiveness can be assessed by the acceptance probabilities.

In MCMC, this parameter may be chosen in a warm up phase. In SMC, we don't expect a fixed parameter to do well for the entire execution, as the distribution shape changes drastically through it. On the other hand, and unusually, we have access to handy approximate statistics of the distribution we are sampling from.

I choose the proposal variance to be proportional to the variance of the current SMC distribution, so that the proposals become closer to the original particles as the distribution becomes sharper. In the beginning, where the distribution they're targeting is flatter, the proposals can (should) be more distant and still have a fair chance of acceptance.

The proportionality coeffiecient can be chosen heuristically; here I used 1.

In [4]:
class MetropolisSampler(SMCsampler):
    def __init__(self, Npart: int, model, factor=1.5, **kwargs):
        super().__init__(Npart, model, **kwargs)
        self.factor = factor
        self.str = "RWM"

    def resampler(self, old_locs, data):
        # Propose new locations.
        proposals = self.generate_proposals(old_locs, data)
        # Calculate acceptance rates.
        acc_rates = self.acceptance_rates(proposals, old_locs, data)
        # Probabilistic rejection step.
        new_locs = self.rejection_step(proposals, acc_rates, old_locs)
        return new_locs

    def generate_proposals(self, old_locs, data):
        '''
        Propose new locations via a Gaussian perturbation of the old ones.

        The variance of the Gaussian is chosen to be proportional to that of the
        current SMC distribution, which approximates the target distribution.
        '''
        # Get the parameter for the proposal distribution.
        _, currsd = self.mean_and_std()
        sd = self.factor*currsd

        # Get the samples.
        Tnorm = get_truncated_normal(old_locs, sd, low=0, upp=self.model.scale)
        proposals = Tnorm.rvs()
        return proposals

    def acceptance_rates(self, proposals, old_locs, data):
        '''
        Calculate the Metropolis acceptance rates that assure detailed balance.
        '''
        old_likelihoods = self.model.batch_likelihood(old_locs, data)
        new_likelihoods = self.model.batch_likelihood(proposals, data)
        acc_rates = np.where(old_likelihoods>0, new_likelihoods/old_likelihoods,
                             1)
        acc_rates = self.filter_acc_rates(acc_rates, new_likelihoods,
                                          old_likelihoods)
        return acc_rates

    @staticmethod
    def filter_acc_rates(acc_rates, new_likelihoods, old_likelihoods):
        '''
        Some calculated acceptance rates may not be valid probabilities.
        This function:
        * Caps > 1 values at 1:  Metropolis rate should be min(1, Pnew/Pold).
        * Fixes NaNs created by division by zero: when Pold=0, we just accept
        the new proposal.

        '''
        # Not the clearest way but the fastest.
        acc_rates = np.where(np.logical_or(old_likelihoods==0, acc_rates>1),
                      np.ceil(new_likelihoods), acc_rates)
        return acc_rates

    @staticmethod
    def rejection_step(proposals, acc_rates, old_locs):
        '''
        Probabilistically accept or reject the new samples.
        '''
        accept = np.random.binomial(1, acc_rates)
        new_locs = np.where(accept, proposals, old_locs)
        return new_locs

# Data
To store information from a sequence of single shot binary measurements with one experimental control.

In [5]:
@dataclass
class MeasurementData:
    ctrls: [float]
    outcomes: [int]

    def partial_data(self, Ndata):
      if Ndata > len(self):
          print("> The requested partial dataset is exceeds the available "
                "dataset's length.")
      return MeasurementData(self.ctrls[:Ndata], self.outcomes[:Ndata])

    def __len__(self):
      l1 = len(self.ctrls)
      l2 = len(self.outcomes)
      if l1 != l2:
          print("> The length of the dataset controls and outcomes is unmatched.")
          return -1
      else:
          return l1

    def __str__(self):
      s = "MeasurementData instance:\n"
      s += f"* {len(self.ctrls)} controls: " + str(self.ctrls)
      s += f"\n* {len(self.ctrls)} outcomes: " + str(self.outcomes)
      return s

# Model
A simple test model for binary measurements $x \in \{0,1\}$ with $P(1 \mid \theta; t) = \sin^2(\theta \cdot t)$, where $\theta \in ]0, \pi/2]$ is a model parameter and $t$ an  experimental control.

This will be used to generate data for the inference.

To show a scenario where Liu West fails, I made up an alternative way of generating trickier data, where we introduce a temporary decoy parameter. For the first half of the measurements, we alternate between measurements based on the true parameter $\theta_{real}$ and measurements based on a decoy parameter $\theta_{decoy}$. I.e., we sample from:

$$\frac{P(x \mid \theta_{real}; t)}{2} + \frac{P(x \mid \theta_{decoy}; t)}{2}$$

This will introduce bimodality in the posterior, with a mode at $\theta_{real}$ and another at $\theta_{decoy}$.

After these measurements, we eliminate the decoy and start drawing samples exclusively from the true distribution. This will resolve the ambiguity, as the probability of $\theta_{decoy}$ will drop.

We expect that this will throw off Liu-West, because its unimodality assumption will drag the particles to the midpoint between these modes. After the decoy phase is over, it will not be able to recover, because there will be particle depletion in the correct region of space.

It is clear Liu-West cannot salvage such a misled sampler, because it doesn't use any information not already contained in it. On the contrary: it does not even use all of the available information.

On the other hand, with Metropolis we expect that an equitative fraction of particles will gather around each mode. When the redundance is lifted, the ones covering the decoy mode should join the others around the true value. Even if that were not the case, in theory, a good Markov kernel could correct the problem, because it converges to the real distribution rather than the SMC approximation (or even a simplification thereof, as with LW).

We choose $\theta_{decoy} = \theta_{real} \pm 1$, with whichever sign makes it fall the correct domain. This is just an arbitrary option that seems to illustrate the point well for various values of $\theta_{real}$.

In [6]:
class SineModel():
    def __init__(self, theta, scale = np.pi/2 ):
        self.theta = theta

        # Upper bound to the parameter value. Lower is assumed 0.
        self.scale = scale

    def measure(self, t):
        p1 = np.sin(self.theta*t)**2
        outcome = np.random.binomial(1, p1)
        return outcome

    def get_data(self, ts, tricky = False):
        if tricky:
            return self.get_tricky_data(ts)

        xs = self.measure(ts)
        data = MeasurementData(ts, xs)
        return data

    def get_tricky_data(self, ts, bfrac = 0.5):
        '''
        Data that is initially redundant. The measurements for the first 'bfrac'
        (fractional) times sample from a bimodal distribution. Unimodality holds
        for the rest (at least asymptotically).
        '''
        assert bfrac > 0 and bfrac < 1

        Nmeas = len(ts)
        # Times where the binomial parameter will change between true and decoy.
        ts1 = ts[:int(bfrac*Nmeas)]
        xs1 = self.measure_decoy(ts1)

        # Times where the binomial parameter will be fixed (true only).
        ts2 = ts[int(bfrac*Nmeas):]
        xs2 = self.measure(ts2)

        # Combine the data from the bimodal and unimodal phases, by this order.
        xs = np.concatenate((xs1, xs2))

        data = MeasurementData(ts, xs)
        return data

    def measure_decoy(self, ts):
        '''
        Sample measurement outcomes from a bimodal distribution where one of
        the modes is the true parameter and the other is a decoy.

        The used parameter alternates between the two options (decoy or true)
        through 'ts'.
        '''
        # Save original parameter.
        real_theta = self.theta

        # Set decoy parameter and make measurements accordingly, for even times.
        decoy = self.theta - 1 if self.theta > 1 else self.theta + 1
        self.theta = decoy
        xs1 = self.measure(ts[0::2])

        # Reset true parameter and make measurements accordingly, for odd times.
        self.theta = real_theta
        xs2 = self.measure(ts[1::2])

        # Intercalate elements of each outcome list to make a single array.
        xsboth = np.dstack((xs1,xs2)).flatten()
        return xsboth

    def likelihood(self, thetas, ts, outcomes):
        # Calculate P(1|theta; t).
        p1 = np.sin(thetas*ts)**2

        # Correct for outcome 0.
        ls = p1 if outcomes==1 else 1-p1

        # Ensure theta is in the correct domain, i.e. [0, scale].
        ls = self.enforce_domain(thetas, ls)
        return ls

    def enforce_domain(self, thetas, ls):
        '''
        Correct the likelihood of out-of-bounds parameters. Parameters not in
        [0, scale] should have 0 likelihood.
        '''
        valid = np.logical_and(thetas >= 0, thetas <= self.scale)
        ls = np.where(valid, ls, 0)
        return ls

    def batch_likelihood(self, thetas, data):
        '''
        Calculate the likelihood based on a dataset, rather than a single datum.
        '''
        ctrls, xs = np.array(data.ctrls), np.array(data.outcomes)

        args = np.outer(thetas, ctrls)
        sin2 = np.sin(args)**2
        Ls_joint = np.prod(sin2**xs*(1-sin2)**(1-xs), axis = 1)

        Ls_joint = self.enforce_domain(thetas, Ls_joint)
        return Ls_joint

# Tests

Test Liu-West and Metropolis; first using simple data from the sine model, then using the tricky data.

In [7]:
def test_normal_and_tricky(theta, params):
    print(f"> Testing inference for theta = {theta}.")

    # Test inference on the basic sin^2 data.
    print("\n1. Straightforward data.")
    test_both_samplers(**params)
    params["tricky"] = True

    # Test inference on the tricky sin^2 data.
    print(f"\n2. Tricky data (initially bimodal).")
    test_both_samplers(**params)

def test_both_samplers(**args):
    '''
    Test the performance of Liu West and Metropolis.
    '''
    for which in ["LW", "RWM"]:
        test(which, **args)

def test(which, theta, tmax, Nmeas, tricky, Npart, thr, a = None, factor = None):
    '''
    Test the performance of Liu West or Metropolis.
    '''
    # Create model and generate data for the inference.
    model = SineModel(theta)
    ts = np.linspace(0, tmax, Nmeas)
    data = model.get_data(ts, tricky = tricky)

    # Common sampler arguments.
    cargs = {"Npart": Npart, "model": model, "thr": thr}

    # Get requested sampler.
    if which=="LW":
        sampler = LiuWestSampler(**cargs, a = a)
    if which=="RWM":
        sampler = MetropolisSampler(**cargs, factor = factor)

    # Perform inference on the dataset.
    sampler.offline_inference(data)

In [8]:
# True parameter.
theta = 1.2

# Problem and sampler parameters.
params = {"theta": theta,
          "tricky": False,
          "tmax": 50,
          "Nmeas": 300,
          "Npart": 5000,
          "thr": 0.5,
          "a": 0.98,
          "factor": 1}

test_normal_and_tricky(theta, params)

> Testing inference for theta = 1.2.

1. Straightforward data.
   *LW  result: theta = 1.20104 ± 0.00094
   *RWM result: theta = 1.1990 ± 0.0011

2. Tricky data (initially bimodal).
   *LW  result: theta = 0.97 ± 0.15
   *RWM result: theta = 0.18378 ± 0.00082
