In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import functools
import scipy

# Some useful utilities
from mcmc_utils_and_plot import scatter_matrix, build_cov_mat, lognormpdf, plot_bivariate_gauss, eval_func_on_grid

def compose(*functions):
    "Compose a list of functions"
    return functools.reduce(lambda f, g: lambda x: f(g(x)), functions, lambda x: x)

The Metropolis-Hastings variants of MCMC algorithms rely on an accept-reject probability. Specifically, given a proposal PDF (could be un-normalized) $q$, the current sample $x^{(k)}$ and a proposed sample $y$, the acceptance probability is
\begin{equation}
a(x^{(k)}, y) = \min \left\{ \frac{f_X(y)}{f_X(x^{(k)})} \frac{q(x^{(k)} \mid y)}{q(y \mid x^{(k)})}, \  1 \right\}
\end{equation}

In [7]:
def mh_acceptance_prob(current_target_logpdf,proposed_target_logpdf, current_sample, proposed_sample, proposal_func):
    """Compute the metropolis-hastings accept-reject probability
    
    Inputs
    ------
    current_target_logpdf : float, logpdf at the current sample in the chain f_X(x^{(k)})
    proposed_target_logpdf : float, logpdf at the proposed sample in the chain
    current_sample : (d, ), current sample
    proposed_sample : (d, ), proposed sample
    proposal_func: f(x, y) callable that gives the log probability of y given x
    
    Returns
    -------
    acceptance probability
    """
    
    prop_reverse = proposal_func(proposed_sample, current_sample)
    prop_forward = proposal_func(current_sample, proposed_sample)
    check = proposed_target_logpdf - current_target_logpdf + prop_reverse - prop_forward
    if check < 0:
        return np.exp(check)
    else:
        return 1        

Now we have implement the MHMC algorithm
1. Propose sample y
2. Compute acceptance probability
3. Accept or reject sample y
4. Repeat

In [8]:
def mhmcmc(starting_sample, num_samples, target_logpdf, proposal_logpdf, proposal_sampler):
    """Metropolis-Hastings MCMC
    
    Inputs
    ------
    starting_sample: (d, ) the initial sample
    num_sample: positive integer, the number of total samples
    target_logpdf: function(x) -> logpdf of the target distribution
    proposal_logpdf: function (x, y) -> logpdf of proposing y if current sample is x
    proposal_sampler: function (x) -> y, generate a sample if you are currently at x
    
    Returns
    -------
    Samples: (num_samples, d) array of samples
    accept_ratio: ratio of proposed samples that were accepted
    """

    d = starting_sample.shape[0]
    samples = np.zeros((num_samples, d))
    samples[0, :] = starting_sample
    current_target_logpdf = target_logpdf(samples[0, :])
    
    num_accept = 0
    for ii in range(1, num_samples):
        # propose
        proposed_sample = proposal_sampler(samples[ii-1, :])
        proposed_target_logpdf = target_logpdf(proposed_sample)
        
        # determine acceptance probability
        a = mh_acceptance_prob(current_target_logpdf, proposed_target_logpdf, samples[ii-1,:], proposed_sample, proposal_logpdf)
        
        # Accept or reject the sample
        if a == 1: #guaranteed to accept
            samples[ii, :] = proposed_sample
            current_target_logpdf = proposed_target_logpdf
            num_accept += 1
        else:
            ## Accept with probability a
            u = np.random.rand()
            if u < a: # accept
                samples[ii, :] = proposed_sample
                current_target_logpdf = proposed_target_logpdf
                num_accept += 1
            else: # reject
                samples[ii, :] = samples[ii-1, :]
                
    return samples, num_accept / float(num_samples-1)

Covariance update based on

\begin{equation}
S_{k+1} = \frac{k-1}{k}S_k + \frac{s_d}{k} \left[ \xi I + k \bar{x}^{k-1}\bar{x}^{k-1^T} - (k+1)\bar{x}^k \bar{x}^{k^T} + x^{(k)}x^{(k)^T} \right]
\end{equation}

In [19]:
def covariance_update(S_k, k, xk, xk_1_m, sd, xi): # m for means
    
    xk_m = 1/(k+1) * x_k + k/(k+1) * xk_1_m
    
    return xk_m, (k-1)/k*S_k + sd/k*( xi*np.eye(np.shape(S_k)[0]) + k * xk_1 @ xk_1.T - (k+1) * xk_m @ xk_m.T + xk @ xk.T )

Gaussian Random Walk Proposal

In [18]:
def proposal_rw_sampler(x, cov=1.0):
    """Sample from a random walk proposal with identity covariance"""
    sample_x = x.reshape(-1,1)
    cov_sqrt = scipy.linalg.sqrtm(cov)
    y = cov_sqrt @ np.random.randn(x.shape[0],1) + x
    return y.reshape(1,-1)[0]

def proposal_rw_logpdf(x, y, cov=1.0):
    """Probability of moving from x to y (in this case it is symmetric)"""
    delta = (x.reshape(-1,1) - y.reshape(-1,1))
    logpdf = -0.5 * delta.T @ np.linalg.inv(cov) @ delta
    return logpdf[0,0]

# proposal_rw_sampler(np.array([ [1],[2] ]), np.array( [ [1.0, 0.5],[0.5, 1.0] ] ))
# proposal_rw_logpdf(np.array([ 1,2 ]), np.array([0,0]), np.array( [ [1.0, 0.5],[0.5, 1.0] ] ))

In [21]:
def adaptive_mhmcmc(starting_sample, starting_cov, num_samples, target_logpdf, proposal_logpdf, proposal_sampler):
    """Metropolis-Hastings MCMC
    
    Inputs
    ------
    starting_sample: (d, ) the initial sample
    num_sample: positive integer, the number of total samples
    target_logpdf: function(x) -> logpdf of the target distribution
    proposal_logpdf: function (x, y) -> logpdf of proposing y if current sample is x
    proposal_sampler: function (x) -> y, generate a sample if you are currently at x
    
    Returns
    -------
    Samples: (num_samples, d) array of samples
    accept_ratio: ratio of proposed samples that were accepted
    """
    
    d = starting_sample.shape[0]
    samples = np.zeros((num_samples, d))
    cur_cov = starting_cov
    samples[0, :] = starting_sample
    current_target_logpdf = target_logpdf(samples[0, :])
    sample_mean = np.zeros(np.shape(starting_sample))
    
    num_accept = 0
    
    burn_in = 10
    xi = 0.01
    sd = 2.4/d**2
    
    for ii in range(1, num_samples):
        
        # Adapt covariance
        if ii>burn_in:
            sample_mean, cur_cov = covariance_update(cur_cov, ii, samples[ii-1], sample_mean, sd, xi) # m for means
        elif ii>1:
            sample_mean = 1/(ii+1) *  proposed_sample + ii/(ii+1) * sample_mean
        
        # propose
        proposed_sample = proposal_sampler(samples[ii-1, :], cur_cov)
        proposed_target_logpdf = target_logpdf(proposed_sample, cur_cov)
                
        
        # determine acceptance probability
        a = mh_acceptance_prob(current_target_logpdf, proposed_target_logpdf, samples[ii-1,:], proposed_sample, proposal_logpdf)
        
        # Accept or reject the sample
        if a == 1: #guaranteed to accept
            samples[ii, :] = proposed_sample
            current_target_logpdf = proposed_target_logpdf
            num_accept += 1
        else:
            ## Accept with probability a
            u = np.random.rand()
            if u < a: # accept
                samples[ii, :] = proposed_sample
                current_target_logpdf = proposed_target_logpdf
                num_accept += 1
            else: # reject
                samples[ii, :] = samples[ii-1, :]
                
    return samples, num_accept / float(num_samples-1)