Similar to Experiment 76 but here the list of $\epsilon$s is adaptively chosen based on the distance of the particles, as done in Chang's code. The only thing to be careful about is that in Chang's code he was using a uniform kernel, whereas here are using a normal kernel.

In [1]:
import numpy as np
from numpy.random import rand, randn
from numpy import ones, exp, log, diag, vstack, pi, array, r_, isfinite, logspace, zeros, eye
from numpy.linalg import norm, solve
from numpy.random import default_rng, choice
from scipy.optimize import fsolve
from scipy.stats import multivariate_normal as MVN
from scipy.special import ndtri, ndtr
from scipy.stats import uniform as udist
from scipy.stats import norm as ndist
from scipy.linalg import block_diag

import time
from math import prod
from warnings import catch_warnings, filterwarnings, resetwarnings


import matplotlib.pyplot as plt
from matplotlib import rc
from ipywidgets.widgets import interact


from RWM import RWM
from Manifolds.Manifold import Manifold
from tangential_hug_functions import HugTangentialMultivariate
from utils import ESS_univariate, prep_contour
from copy import deepcopy

# G-and-K Functions and Settings

In [2]:
class GKManifold(Manifold):
    def __init__(self, ystar):
        self.m = len(ystar)            # Number constraints = dimensionality of the data
        self.d = 4                     # Manifold has dimension 4 (like the parameter θ)
        self.n = self.d + self.m       # Dimension of ambient space is m + 4
        self.ystar = ystar
        # N(0, 1) ---> U(0, 10).
        self.G    = lambda θ: 10*ndtr(θ)
        # U(0, 10) ---> N(0, 1)
        self.Ginv = lambda θ: ndtri(θ/10)

    def q(self, ξ):
        """Constraint for G and K."""
        ξ = r_[self.G(ξ[:4]), ξ[4:]]   # expecting theta part to be N(0, 1)
        with catch_warnings():
            filterwarnings('error')
            try:
                return (ξ[0] + ξ[1]*(1 + 0.8*(1 - exp(-ξ[2]*ξ[4:]))/(1 + exp(-ξ[2]*ξ[4:]))) * ((1 + ξ[4:]**2)**ξ[3])*ξ[4:]) - self.ystar
            except RuntimeWarning:
                raise ValueError("Constraint found Overflow warning.")
                
    def _q_raw_uniform(self, ξ):
        """Constraint function expecting ξ[:4] ~ U(0, 10). It doesn't do any warning check."""
        return (ξ[0] + ξ[1]*(1 + 0.8*(1 - exp(-ξ[2]*ξ[4:]))/(1 + exp(-ξ[2]*ξ[4:]))) * ((1 + ξ[4:]**2)**ξ[3])*ξ[4:]) - self.ystar
    def _q_raw_normal(self, ξ):
        """Same as `_q_raw_uniform` except expects ξ[:4]~N(0,1)."""
        ξ = r_[self.G(ξ[:4]), ξ[4:]] 
        return self._q_raw_uniform(ξ)

    def Q(self, ξ):
        """Transpose of Jacobian for G and K. """
        ξ = r_[self.G(ξ[:4]), ξ[4:]]
        return vstack((
        ones(len(ξ[4:])),
        (1 + 0.8 * (1 - exp(-ξ[2] * ξ[4:])) / (1 + exp(-ξ[2] * ξ[4:]))) * ((1 + ξ[4:]**2)**ξ[3]) * ξ[4:],
        8 * ξ[1] * (ξ[4:]**2) * ((1 + ξ[4:]**2)**ξ[3]) * exp(ξ[2]*ξ[4:]) / (5 * (1 + exp(ξ[2]*ξ[4:]))**2),
        ξ[1]*ξ[4:]*((1+ξ[4:]**2)**ξ[3])*(1 + 9*exp(ξ[2]*ξ[4:]))*log(1 + ξ[4:]**2) / (5*(1 + exp(ξ[2]*ξ[4:]))),
        diag(ξ[1]*((1+ξ[4:]**2)**(ξ[3]-1))*(((18*ξ[3] + 9)*(ξ[4:]**2) + 9)*exp(2*ξ[2]*ξ[4:]) + (8*ξ[2]*ξ[4:]**3 + (20*ξ[3] + 10)*ξ[4:]**2 + 8*ξ[2]*ξ[4:] + 10)*exp(ξ[2]*ξ[4:]) + (2*ξ[3] + 1)*ξ[4:]**2 + 1) / (5*(1 + exp(ξ[2]*ξ[4:]))**2))
    ))
    
    def J(self, ξ):
        """Safely computes Jacobian."""
        with catch_warnings():
            filterwarnings('error')
            try:
                return self.Q(ξ).T
            except RuntimeWarning:
                raise ValueError("J computation found Runtime warning.")
                
    def fullJacobian(self, ξ):
        """J_f(G(ξ)) * J_G(ξ)."""
        JGbar = block_diag(10*np.diag(ndist.pdf(ξ[:4])), eye(len(ξ[4:])))
        return self.J(ξ) @ JGbar
                
    def log_parameter_prior(self, θ):
        """IMPORTANT: Typically the prior distribution is a U(0, 10) for all four parameters.
        We keep the same prior but since we don't want to work on a constrained space, we 
        reparametrize the problem to an unconstrained space N(0, 1)."""
        with catch_warnings():
            filterwarnings('error')
            try:
                return udist.logpdf(self.G(θ), loc=0.0, scale=10.0).sum() + ndist.logpdf(θ).sum()
            except RuntimeWarning:
                return -np.inf
            
    def logprior(self, ξ):
        """Computes the prior distribution for G and K problem. Notice this is already reparametrized."""
        return self.log_parameter_prior(ξ[:4]) - ξ[4:]@ξ[4:]/2

    def logη(self, ξ):
        """log posterior for c-rwm. This is on the manifold."""
        try:
            J = self.J(ξ)
            logprior = self.logprior(ξ)
            correction_term  = - prod(np.linalg.slogdet(J@J.T))/2 
            return  logprior + correction_term
        except ValueError as e:
            return -np.inf
        
    def generate_logηϵ(self, ϵ, kernel='normal'):
        """Returns the log abc posterior for THUG."""
        if kernel not in ['normal']:
            raise NotImplementedError
        else:
            def log_abc_posterior(ξ):
                """Log-ABC-posterior."""
                u = self.q(ξ)
                m = len(u)
                return self.logprior(ξ) - u@u/(2*ϵ**2) - m*log(ϵ) - m*log(2*pi)/2
            return log_abc_posterior
            
    def logp(self, v):
        """Log density for normal on the tangent space."""
        return MVN(mean=zeros(self.d), cov=eye(self.d)).logpdf(v)
    
    def is_on_manifold(self, ξ, tol=1e-8):
        """Checks if ξ is on the ystar manifold."""
        return np.max(abs(self.q(ξ))) < tol
    
    
"""
OTHER FUNCTIONS
"""    
def generate_powers_of_ten(max_exponent, min_exponent):
    """E.g. generate_powers_of_ten(2, -1) will return 100, 10, 0, 0.1."""
    number_of_powers = max_exponent + abs(min_exponent) + 1
    return logspace(start=max_exponent, stop=min_exponent, num=number_of_powers, endpoint=True)


def data_generator(θ0, m, seed):
    """Stochastic Simulator. Generates y given θ."""
    rng = default_rng(seed)
    z = rng.normal(size=m)
    ξ = r_[θ0, z]
    return ξ[0] + ξ[1]*(1 + 0.8*(1 - exp(-ξ[2]*ξ[4:]))/(1 + exp(-ξ[2]*ξ[4:]))) * ((1 + ξ[4:]**2)**ξ[3])*ξ[4:]

def find_point_on_manifold(ystar, ϵ, max_iter=1000, tol=1.49012e-08):
    """Find a point on the data manifold."""
    i = 0
    manifold = GKManifold(ystar=ystar)
    log_abc_posterior = manifold.generate_logηϵ(ϵ)
    with catch_warnings():
        filterwarnings('error')
        while i <= max_iter:
            i += 1
            try: 
                # Sample θ from U(0, 10)
                θfixed = randn(4)
                function = lambda z: manifold._q_raw_normal(r_[θfixed, z])
                z_guess  = randn(manifold.m)
                z_found  = fsolve(function, z_guess, xtol=tol)
                ξ_found  = r_[θfixed, z_found]
                if not isfinite([log_abc_posterior(ξ_found)]):
                    pass
                else:
                    resetwarnings()
                    return ξ_found

            except RuntimeWarning:
                continue
        resetwarnings()
        raise ValueError("Couldn't find a point, try again.") 
        
        
def find_point_on_manifold_from_θ(ystar, θfixed, ϵ, maxiter=2000, tol=1.49012e-08):
    """Same as the above but we provide the θfixed. Can be used to find a point where
    the theta is already θ0."""
    i = 0
    manifold = GKManifold(ystar=ystar)
    log_abc_posterior = manifold.generate_logηϵ(ϵ)
    function = lambda z: manifold._q_raw_normal(r_[θfixed, z])
    with catch_warnings():
        filterwarnings('error')
        while i <= maxiter:
            i += 1
            try:
                z_guess  = randn(manifold.m)
                z_found  = fsolve(function, z_guess, xtol=tol)
                ξ_found  = r_[θfixed, z_found]
                if not isfinite([log_abc_posterior(ξ_found)]):
                    resetwarnings()
                    raise ValueError("Couldn't find a point.")
                else:
                    resetwarnings()
                    return ξ_found
            except RuntimeWarning:
                continue
        resetwarnings()
        raise ValueError("Couldn't find a point, try again.")



In [3]:
def generate_setting(m, ϵs, B, δ, N, thinning=10):
    """Generates an object from which one can grab the settings. This allows one to run multiple scenarios."""
    θ0        = array([3.0, 1.0, 2.0, 0.5])      # True parameter value on U(0, 10) scale.
    d         = 4 + m                            # Dimensionality of ξ=(θ, z)
    ystar     = data_generator(θ0, m, seed=1234) # Observed data
    q         = MVN(zeros(d), eye(d))            # Proposal distribution for THUG
    ξ0        = find_point_on_manifold_from_θ(ystar=ystar, θfixed=ndtri(θ0/10), ϵ=1e-5, maxiter=5000, tol=1e-15)
    resetwarnings()
    manifold  = GKManifold(ystar)
    return {
        'θ0': θ0,
        'm' : m,
        'd' : d,
        'ystar': ystar,
        'q': q,
        'ξ0': ξ0,
        'ϵs': ϵs,
        'B': B,
        'δ': δ,
        'N': N,
        'manifold': manifold,
        'thinning': thinning
    }

# Multivariate Markov Snippets