In [1]:
import numpy as np
import scipy
from scipy import stats

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
dis = stats.multivariate_normal(mean=[0, 0], cov=np.array([[10, 6], [6, 10]]))
samples = dis.rvs(1000)
# plt.scatter(samples[:, 0], samples[:, 1])

In [3]:
# sample using gibbs sampler

def gibbs_sampler_of_gaussian(mean, cov, burnin=0, num=100):
    ''' 2 dims '''
    # calculate conditional distribution p(x_i | x_{-i})
    x = [mean[0], mean[1]]
    precision = cov.I
    samples = []
    i = 0
    while True:
        # p(x0 | x1)
        x0_var = 1 / precision[0, 0]
        x0_mean = mean[0] - x0_var * precision[0, 1] * (x[1] - mean[1])
        x0_sample = np.random.normal(x0_mean, x0_var)
        x[0] = x0_sample
        # p(x1 | x0)
        x1_var = 1 / precision[1, 1]
        x1_mean = mean[1] - x1_var * precision[0, 1] * (x[0] - mean[0])
        x1_sample = np.random.normal(x1_mean, x1_var)
        x[1] = x1_sample
        if i > burnin:
            samples.append([x[0], x[1]])
        if len(samples) >= num:
            break
        i += 1
        
    return np.array(samples)

samples = gibbs_sampler_of_gaussian(mean=[0, 0], cov=np.matrix([[10, 6], [6, 10]]), burnin=1000, num=1000)
# plt.scatter(samples[:, 0], samples[:, 1])

# HMC 参考资料

- [**blog & visualizations**: Hamiltonian Monte Carlo explained](http://arogozhnikov.github.io/2016/12/19/markov_chain_monte_carlo.html)

- [**blog & code**: Hamiltonian Monte Carlo from scratch](https://colindcarroll.com/2019/04/11/hamiltonian-monte-carlo-from-scratch/)

In [4]:
from autograd import grad
from autograd import numpy as auto_np

def hamiltonian_monte_carlo(n_samples, negative_log_prob, initial_position, path_len=1, step_size=0.5):
    """Run Hamiltonian Monte Carlo sampling.

    Parameters
    ----------
    n_samples : int
        Number of samples to return
    negative_log_prob : callable
        The negative log probability to sample from
    initial_position : np.array
        A place to start sampling from.
    path_len : float
        How long each integration path is. Smaller is faster and more correlated.
    step_size : float
        How long each integration step is. Smaller is slower and more accurate.

    Returns
    -------
    np.array
        Array of length `n_samples`.
    """
    # autograd magic
    dVdq = grad(negative_log_prob)

    # collect all our samples in a list
    samples = [initial_position]

    # Keep a single object for momentum resampling
    momentum = stats.norm(0, 1)

    # If initial_position is a 10d vector and n_samples is 100, we want
    # 100 x 10 momentum draws. We can do this in one call to momentum.rvs, and
    # iterate over rows
    size = (n_samples,) + initial_position.shape[:1]
    for p0 in momentum.rvs(size=size):
        # Integrate over our path to get a new position and momentum
        q_new, p_new = leapfrog(
            samples[-1],
            p0,
            dVdq,
            path_len=path_len,
            step_size=step_size,
        )

        # Check Metropolis acceptance criterion
        start_log_p = negative_log_prob(samples[-1]) - auto_np.sum(momentum.logpdf(p0))
        new_log_p = negative_log_prob(q_new) - auto_np.sum(momentum.logpdf(p_new))
        if auto_np.log(auto_np.random.rand()) < start_log_p - new_log_p:
            samples.append(q_new)
        else:
            samples.append(auto_np.copy(samples[-1]))

    return auto_np.array(samples[1:])


def leapfrog(q, p, dVdq, path_len, step_size):
    """Leapfrog integrator for Hamiltonian Monte Carlo.

    Parameters
    ----------
    q : np.floatX
        Initial position
    p : np.floatX
        Initial momentum
    dVdq : callable
        Gradient of the velocity
    path_len : float
        How long to integrate for
    step_size : float
        How long each integration step should be

    Returns
    -------
    q, p : np.floatX, np.floatX
        New position and momentum
    """
    q, p = auto_np.copy(q), auto_np.copy(p)

    p -= step_size * dVdq(q) / 2  # half step
    for _ in range(int(path_len / step_size) - 1):
        q += step_size * p  # whole step
        p -= step_size * dVdq(q)  # whole step
    q += step_size * p  # whole step
    p -= step_size * dVdq(q) / 2  # half step

    # momentum flip at end
    return q, -p

In [5]:
# sample using HMC

def neg_log_prob(x, mean, cov):
    return  auto_np.dot(auto_np.dot(x-mean, cov.I.getA()), x-mean)

mean = auto_np.array([0.0, 0.0])
cov = auto_np.matrix([[10.0, 6.0], [6.0, 10.0]])

samples = hamiltonian_monte_carlo(n_samples=1000, 
                                  negative_log_prob=lambda x: neg_log_prob(x, mean, cov), 
                                  initial_position=auto_np.array([0.0, 0.0]), 
                                  path_len=1.0, 
                                  step_size=0.5)

# plt.scatter(samples[:, 0], samples[:, 1])