In [2]:
import numpy as np
import tqdm
from tqdm import tqdm_notebook as tqdm_ntb
from scipy.stats import multinomial
from functools import partial
import csv
import matplotlib.pyplot as plt
import time
import multiprocessing
from functools import partial
from multiprocessing import Pool
import scipy.interpolate

# KSD Class

The KSD class if the main object used in the tests.

It is calculated using a finite dimensional approximation of the data using the basis corresponding to the base Gaussian measure.

For example, if we are considering functions over L^{2}([0,1]) and the base Gaussian measure is a Brownian motion. Then the basis is the standard Brownian basis e_{n}(t) = \sqrt{2}\sin((n-0.5)\pi t)

We approximate L^{2}([0,1]) inner products by <f,g> = \sum_{n=1}^{\infty}<f,e_{n}><g,e_{n}> \approx \sum_{n=1}^{n_freqs}<f,e_{n}><g,e_{n}> for some value n_freqs standing for number of frequencies.

So the data one inputs into KSD is the n_freqs coefficients with respect to the Brownian motion basis.

This is different from the projections used in functional data analysis which aim to use a low number of frequencies to represent signals, often no more than 12-15. Whereas we use a high number, n_freqs = 100 in our experiments, as it is purely a technique to make inner products easier to compute rather than for statistical efficiency.

In [3]:
def form_distance_mat(x1,y1,x2,y2,A,B = None):
    """
    Description:
        Forms a distance matrix with ij-th entry <A(x1_i-y1_j),B(x2_i-y2_j)> where <.,.> is Euclidean inner product
        and x_i = x[:,i] is i-th column of data, analogous for y_i. 
        If B = None then B becomes the identity.
    Arg:
        x1,x2: (d,n) matrix, data in columns
        y1,y2: (d,m) matrix, data in columns
        A: (d,d) matrix
        B: (d,d) matrix
    Return:
        dist_mat: (n,m) matrix with ij-th entry <A(x1_i - y1_j), B(x2_i - y2_j)> 
                  where x1_i = x[:,i] is i-th data column, analogous for ,x2_i,y1_j,y2_j
    """  
    d = x1.shape[0]
    n = x1.shape[1]
    m = y1.shape[1]
    
    if (B is None) or (B.all() is None):
        B = np.eye(d)
    
    mat_x1x2 = np.einsum("ji,ji -> i", A @ x1, B @ x2)
    mat_x1x2 = np.reshape(mat_x1x2,(n,1))
    mat_x1x2 = np.tile(mat_x1x2,(1,m))
    
    mat_y1y2 = np.einsum("ji,ji -> i", A @ y1, B @ y2)
    mat_y1y2 = np.reshape(mat_y1y2,(1,m))
    mat_y1y2 = np.tile(mat_y1y2,(n,1))
    
    mat_x1y2 = np.einsum("ji,jk -> ik",A @ x1, B @ y2)
    mat_y1x2 = np.einsum("jk,ji -> ik",A @ y1, B @ x2)
    
    dist_mat = mat_x1x2 + mat_y1y2 - mat_x1y2 - mat_y1x2
    
    return dist_mat

class KSD:
    """
    Description:
        Class to represent an instance of kernel Stein discrepancy.
        Has methods to ouput the Stein kernel evaluated on data
    """  
    def __init__(self,C,T,DU = 0,kernel_type = "SE", gamma = -1):
        """
        Arg:
            C: (d,d) matrix representing the covariance operator
            T: (d,d) matrix representing the hyperparameter
            DU: Function for the DU term in KSD. Default is DU = 0 which makes the DU term be 0.
            kernel_type: either "SE" or "IMQ"
            gamma: lengthscale, if -1 then median heuristic is employed
        """  
        self.C = C
        self.T = T
        self.DU = DU
        self.kernel_type = kernel_type
        self.gamma = gamma
        

    def __call__(self, x, y):
        """
        Arg:
            x: (d,n) data matrix
            y: (d,m) data matrix
        Return:
            Stein_mat: (n,m) matrix with ij-th entry the Stein kernel h evaluated at x_i,y_j 
        """

        n = np.shape(x)[1]
        m = np.shape(y)[1] 
        
        sqr_dist_mat = form_distance_mat(x,y,x,y,T,T)
            
        # median heuristic
        if self.gamma == -1:
            self.gamma = np.sqrt(np.median(sqr_dist_mat[sqr_dist_mat > 0]))
        # changes the T which will be used later
        self.T = self.T/self.gamma
        # renormalises the squared distance matrix already computed that'll be used later
        sqr_dist_mat = sqr_dist_mat/(self.gamma**2)
        
        # introduces variable S to make calculations easier
        S = self.C @ np.transpose(self.T) @ self.T
        # form the CDU terms
        if self.DU == 0:
            CDUx = np.zeros(np.shape(x))
            CDUy = np.zeros(np.shape(y))
        else:
            CDUx = C @ self.DU(x)
            CDUy = C @ self.DU(y)
        
        # <x + CDU(x),y+CDU(y)> term
        term1 = np.einsum("ji,jk -> ik",x+CDUx,y + CDUy)
        # - <S(x-y),x-y> term
        term2 = -1 * form_distance_mat(x,y,x,y,S)
        # - <S(x-y),CDU(x)-CDU(y)> term
        term3 = -1 * form_distance_mat(x,y,CDUx,CDUy,S)
        # Tr(SC) term
        term4 = np.trace(S @ C)
        # ||S(x-y)||^2 term
        term5 = -1 * form_distance_mat(x,y,x,y,S,S)
    
        # calculations are taken from example of Stein kernels in paper associated with SE and IMQ base kernels
        if self.kernel_type == "SE":
            
            SE_mat = np.exp(-0.5 * sqr_dist_mat)
            
            Stein_mat = SE_mat * (term1 + term2 + term3 + term4 + term5)
            
            return Stein_mat
        
        if self.kernel_type == "IMQ":
            
            IMQ_mat = (sqr_dist_mat + 1)**(-0.5)
            
            Stein_mat = (term1 * IMQ_mat)  + ((term2 + term3 + term4) * (IMQ_mat**3)) + (3 * term5 * (IMQ_mat**5))
            
            return Stein_mat
            

In [4]:
class GoodnessOfFitTest:
    """
    Description: A single goodness-of-fit test which can produce a p-value given data and a KSD object
    """
    def __init__(self, discrepancy, x):
        """
        Args:
            discrepancy: A callable that returns a matrix of Stein kernel evaluations
            x: (d,n) matrix of data
        """
        self.d = discrepancy
        self.x = x
        self.n = x.shape[1]
        

    def compute_pvalue(self, nbootstrap):
        """
        Arg:
            nbootstrap: Number of bootstrap samples.
        Return:
            bootstrap_stats: bootstraped test statistics
            test_stat: the test statistic calculated using observed data
            pvalue: p-value based on comparing test_stat with bootstrap_stats
        """
        # Form the test statistic from evaluations of the Stein kernel
        stein_matrix = self.d(self.x, self.x)
        u_matrix = stein_matrix - np.diag(np.diag(stein_matrix))
        test_stat = u_matrix.sum() / self.n / (self.n-1)
        
        # Obtain bootstrap samples using multi-nomial distribution
        bootstrap_stats = np.zeros(n_bootstrap)
        for i in range(n_bootstrap):
            W = np.random.multinomial(self.n,(1./self.n)*np.ones(self.n))
            W = (W-1)/self.n
            bootstrap_stats[i] = W @ u_matrix @ W
        
        # Calculate p-value
        pvalue = (bootstrap_stats > test_stat).mean()

        return (bootstrap_stats, test_stat, pvalue)
    

# Model specification

These functions specify the base Gaussian measure, a Brownian bridge for this experiment, and the DU term in the KSD, which involves sin and cos functions. As discussed above the Brownian bridge basis will be used to represent our data which is why it is called in sin_DU

In [5]:
def BB_basis(n_freqs,obs,TT):
    """
        Arg:
            n_freqs: number of basis frequencies
            obs: observation grid to evaluate the basis elements
            TT: time frame over which the basis is evaluated
        Return:
            X: (n_freqs,obs) matrix of n_freqs many basis elements each evaluated at obs locations
    """
    X = np.zeros((n_freqs,len(obs)))
    for i in range(1,n_freqs+1):
        X[i-1,:] = (np.sqrt(2) / np.sqrt(TT)) * np.sin(i*np.pi*obs/TT)
    return X

def sin_DU(x,obs, TT = 50.0):
    """
        Arg:
            x: point to evaluate DU
            obs: the observation grid over which to compute the inner product terms in DU
            TT: the time frame over which the L^2 inner products are evaluated
            random_state: random seed
        Return:
            DUx_freqs: (n_freqs,) array of DU evaluated at point x
    """
    alpha = 0.7
    dim = np.shape(x)[0]
    x_recon = np.dot(BB_basis(dim,obs,TT).T,x)
    DUx = ((alpha**2) * np.sin(x_recon)*np.cos(x_recon)) - ((alpha / 2)*np.sin(x_recon))
    DUx_freqs = (TT/len(obs))*np.dot(BB_basis(dim,obs,TT),DUx)
    return DUx_freqs

# EM sampler

Two functions are used to generate our samples. The first is the standard Euler-Maruyama method foor oour non-linear SDE. The second repeatedly generates EM samples and then only acceptes those whose final value at terminal time is within the given threshld of zero.

The last function then linearly interpolates the samples up to a given resolution for further processings in the KSD tests.

In [6]:
def EM_sampler(n_samples,n_points,TT = 50,random_state = None):
    """
        Arg:
            n_samples: the number of trajectories to generate
            n_points: the numbe of Euler-Maruyama steps to perform
            TT: the length of time the trajectory is simulated over
            random_state: random seed
        Return:
            X: (n_samples,n_points) matrix of trajectories
    """
    # initialise grid for the trajectories
    rng = np.random.RandomState(random_state)
    grid = np.linspace(0,TT,n_points,endpoint = True)
    X = np.zeros((n_samples,n_points))
    # set time step size
    dt = grid[1]-grid[0]
    # for each simulated point take a EM step
    for i in range(n_points-1):
        X[:,i+1] = X[:,i] + 0.7*np.sin(X[:,i])*dt + np.sqrt(dt)*rng.randn(n_samples)
    return X

def EM_conditional(n_samples,n_points,eps = 0.05,TT = 50,random_state = None):
    """
        Arg:
            n_samples: the number of trajectories to generate
            n_points: the numbe of Euler-Maruyama steps to perform
            TT: the length of time the trajectory is simulated over
            eps: threshold within zero at time TT which the trajectories must have to be accepted
            random_state: random seed
        Return:
            X: (n_samples,n_points) matrix of accepted trajectories 
    """
    rng = np.random.RandomState(random_state)
    X = np.zeros((n_samples,n_points))
    n_accepted = 0
    pbar = tqdm_ntb(total=n_samples)
    # simulate trajectories until n_samples have been accepted
    while n_accepted < n_samples:
        # generate an EM sample
        X_cand = EM_sampler(1,n_points,TT,None)
        # access if EM trajectory is within threshold at zero
        if np.abs(X_cand[0,-1]) < eps:
            X[n_accepted,:] = X_cand
            pbar.update(1)
            n_accepted = n_accepted + 1
    pbar.close()
    return X

def interpolate_samples(X,inter_grid,original_grid):
    """
        Arg:
            X: (n_samples,n_points) data matrrix
            inter_grid: the grid up to which the data shall be linearly interpolated
            original_grid: the grid of points the trajectories were generated over
        Return:
            X: (n_samples,n_points) matrix of trajectories that have been linearly interpolated up to inter_grid
    """
    Y = np.zeros((len(X),len(inter_grid)))
    # for each trajectory linearly interpolate up to inter_grid
    for i in range(len(X)):
        Y[i,:] = np.interp(inter_grid,original_grid,X[i,:])
    return Y

In [7]:
# Setting parameters of the experiment:

# Time horizon
TT = 50
# Number of frequencies to use of the Brownian bridge basis to represent the data
n_freqs = 100
# The grid the trajectories will be linearly interpolated over
n_inter_points = 100
inter_grid = np.linspace(0,TT,n_inter_points,endpoint = True)
# Number of samples taken to estimate KSD
n_samples = 2000
# Defining a partial evaluation of the DU function 
sin_DU_partial = partial(sin_DU,obs = inter_grid, TT = TT)

In [8]:
# The covariance operator of the base Gaussian measure, in this case Brownian bridge
C = np.diag([(1/(i*np.pi/TT))**(2) for i in np.arange(1,n_freqs + 1)])

# Set hyperparameters
T_1 = np.eye(n_freqs)
n_adjust_freqs = 50
T_2 = np.eye(n_freqs)
T_2[np.diag_indices(n_adjust_freqs)] = C[np.diag_indices(n_adjust_freqs)]**(-1)

# Set lengthscale
gamma = 1


In [None]:
# Set kernel and hyperparameters the experiment will be performed for
kernel_list = ["IMQ"]
T_list = [T_1,T_2]
# Set range of points for the experiment
n_points_arr = np.array([5,10,15,20,25])

for (j,n_points) in enumerate(n_points_arr):
    # Get conditioned samples
    X = EM_conditional(n_samples,n_points,eps = 0.1)
    # Intepolate them over inter_grid
    inter_grid = np.linspace(0,TT,n_inter_points,endpoint=True)
    Y = interpolate_samples(X,inter_grid,np.linspace(0,TT,n_points,endpoint=True))
    # Calculate Brownian bridge basis representation
    Y_freqs = (TT/n_inter_points)*np.dot(BB_basis(n_freqs,inter_grid,TT),Y.T)

    for kernel in kernel_list:
        for i in np.arange(len(T_list)):
            T = T_list[i]
            # Form KSD object
            EM_KSD = KSD(C,T,DU = sin_DU_partial,kernel_type = kernel,gamma = gamma)
            # Calculte KSD using the data
            stein_matrix = EM_KSD(Y_freqs,Y_freqs)
            # Calculte estimator of KSD
            test_stat = stein_matrix.sum() / n_samples**2
            print("n_points: ", n_points, " using kernel ", kernel," with T_", i+1, "and n_samples = ", n_samples, " has value ",test_stat)
        