In [3]:
import torch
import numpy as np
import matplotlib.pyplot as plt

from typing import Tuple, Callable, Union, Optional
from tensor_type import Tensor
from scipy.stats import norm, expon, uniform
from torch import mm, exp, log, det, minimum, sqrt, mul
from torch.linalg import inv, det

import time
import pdb

In [4]:
def KE(x:  Tensor,
       mu: Tensor,
       Minv: Optional[Tensor]=None,
       detM: Optional[float] =0.0) -> Tensor:
    '''
    This is a kinetic energy function KE(x) = -log pi(p|q), where
    pi is of general form xT.A.x
    x: is a point
    mu: is the distribution mean
    '''
    if(Minv):
        return mul(exp((x-mu).T.mm(Minv).mm((x-mu))) + log(Minv), .5)
    else:
        return mul(exp((x-mu).T.mm((x-mu))), .5)

def PE(x:  Tensor,
       mu: Tensor,
       Minv: Optional[Tensor] = None,
       detM: Optional[float] = 0.0) -> Tensor:
    '''
    -log pi(q) where pi(q) is Gaussian 
    '''
    if(Minv):
        return .5 * exp((x-mu).T.mm(Minv).mm(x-mu)) + log(det(Minv))
    else:
        return mul(exp((x-mu).T.mm((x-mu))), .5)

In [5]:
def get_autograd(x:    Tensor,
                 mu:   Tensor,
                 Mat:  Tensor,
                 func: Callable) -> Tuple[Tensor, Tensor]:
    '''
    dH calculates K(p) or V(q) along with associated gradient.
    x: is a tensor
    Mat: is a mass matrix
    func: is the K or V function to calculate
    returns function output and gradient
    '''
    #pdb.set_trace()
    x.requires_grad_(True) # track math operations
    x.retain_grad() # keep gradient after backward()
    out = func(x, mu, Mat)  # calculate given function
    out.backward(gradient=torch.ones(out.size()))  # Calculate grads
    x_grad = x.grad.data  # get gradients only
    x.grad = None  # reset gradient 
    x.requires_grad_(False)  # stop tracking
    return out.detach().clone(), x_grad 

# Unit Test
#t = torch.tensor([6., 4.]).view(2,1)
#tout, dt = dH(t, Minv, Tst2)

In [6]:
def stormer_verlet(V:    Callable,
                   q0:   Tensor,
                   p0:   Tensor,
                   mu:   Tensor,
                   Minv: Tensor,
                   eps:  float,
                   T:    int) -> Tuple[Tensor, Tensor]:
    '''
    Stormer-Verlet should propagate a point in phase space
    (p,q) according to Hamiltonian H.
    
    dVdq: references a function yielding the derivative of
    potential energy V.
    q0: is initial position tensor
    p0: is initial momentum tensor
    epsilon: scalar time increment
    T: time interval length
    returns numpy arrays q and p
    '''
    q, p = q0, p0    
    for n in range(int(T//eps)):
        pdb.set_trace()
        start = time.time()
        _, dHdq = get_autograd(q, mu, Minv, V)
        d1 = time.time()-start
        
        p -= eps * dHdq/2 # half momentum step
        q += eps * p # full parameter step
        
        start = time.time()
        _, dHdq = get_autograd(q, mu, Minv, V)
        d2 = time.time()-start
        p -= epsilon * dHdq # full momentum step
        
        
        print(f'n={n}  derivative time {d1:.2f} --- {d2:.2f}')
        
    return q,-p 

In [7]:
def acceptance(q0: Tensor,
               p0: Tensor,
               q:  Tensor,
               p:  Tensor) -> Tuple[Tensor, bool]:
    r = min(1.,exp(KE(q0) + PE(p0) - KE(q) - PE(p)).item())
    obs = uniform.rvs()
    if obs < r:
        point = q
        acceptFlag = True
    else:
        point = q0
        acceptFlag = False
    return point, acceptFlag

In [None]:
T = 9  # Full time-steps
eps = 0.3  # Discretization of time-steps
Nq = 100  # Number of q's to test 
D = 2  # Dimension of q
M = torch.tensor([[1., 0.8],
                  [0.8, 1.]])
Minv = inv(M)
detM = det(M).type(torch.float32)
mu = torch.tensor([[0.],[0.]],dtype=torch.float32)

accepted = 0
rejected = 0

acceptedPoints = torch.zeros((D,Nq))

# pre-select random q's
Q0 = np.random.uniform(low=-4, high=4, size=(D,Nq)) #(2,100)
# Loop iterations
for n in range(Nq):
    # sample a parameter point from Q-space
    q0 = torch.from_numpy(Q0[:,n].reshape(2,1)).type(torch.float32)
    # sample momentum p0 from normal
    p0 = torch.from_numpy(norm.rvs(size = 2)).view(2,1)
    # q0,p0 are (2,1) tensors
    q, p = stormer_verlet(PE, q0, p0, mu, Minv, eps, T)
    q1, aflag = acceptance(q0, p0, q, p)
    acceptedPoints[:,n] = torch.squeeze(q)

    if(aFlag):
        accepted +=1
    else:
        rejected +=1
print(f'Acceptance rate: {accepted/Nq}')    