In [88]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
from scipy.stats import gamma, norm

from functools import reduce

from gproc.generative import sample_at_x
from gproc.laplace import chol_inverse

In [89]:
# Data sizes and dimension
D = 2
N = 50
M = 50

x_1 = np.random.uniform(-1, 1, N * D).reshape(-1, D) # Reshape to N x D matrix
x_2 = np.random.uniform(-1, 1, M * D).reshape(-1, D) # Reshape to N x D matrix

In [90]:
class BaseKernel:
    def __init__(self):
        pass

    def make_gram(self, x_1, x_2):
        raise NotImplementedError()
        
    def invert_gram(self):
        raise NotImplementedError()
    
    def constrain_params(self, unconstrained_param_array):
        raise NotImplementedError()

    def prior_log_pdf(self, param_array):
        raise NotImplementedError()

In [91]:
def squared_exponential(x_1, x_2, lengthscale=0.5, variance=1.0):
    """
    Also known as RBF.
    
    :param x_1, N x d matrix
    :param x_2, M x d matrix

    :param lengthscale, float
    :param variance, float

    :returns K, N x M matrix, K_{ij} = k(x_i, x_j; lengthscale, variance)
    """
    sq_diffs = cdist(x_1, x_2, metric = 'sqeuclidean')
    return variance * np.exp(-0.5 * sq_diffs / lengthscale)

squared_exponential(x_1, x_2).shape

(50, 50)

In [92]:
class SquaredExponential(BaseKernel):
    def __init__(self, lengthscale=0.5, variance=1.0):
        self.lengthscale = lengthscale
        self.variance = variance
        self.param_dim = 2
        super().__init__()
    
    def make_gram(self, x_1, x_2):
        self.gram = squared_exponential(x_1, x_2, self.lengthscale, self.variance)
        return self.gram 
    
    def invert_gram(self):
        self.inverse_gram = chol_inverse(self.gram)
        return self.inverse_gram
    
    def constrain_params(self, unconstrained_params):
        if unconstrained_params.shape[0] != self.param_dim:
            raise AssertionError('Parameter array not the same size as kernel parameter dimension')
        self.constrained_params = np.exp(unconstrained_params)
        return self.constrained_params
    
    def update_params(self):
        self.lengthscale = self.constrained_params[0]
        self.variance = self.constrained_params[1]
    
    def prior_log_pdf(self, d):
        self.prior = gamma.logpdf(self.constrained_params[0], a = 1, scale = np.sqrt(d))
        self.prior += gamma.logpdf(self.constrained_params[1], a = 1.2, scale = 1/0.2)
        return self.prior

kernel = SquaredExponential(lengthscale = 1, variance = 1)
kernel.make_gram(x_1, x_1)
kernel.constrain_params(np.ones(2))
kernel.update_params()
kernel.make_gram(x_1, x_1)

array([[2.71828183, 2.28506574, 2.70784883, ..., 2.14915104, 2.46156486,
        2.37930197],
       [2.28506574, 2.71828183, 2.2912546 , ..., 2.56572821, 1.83153792,
        1.93586545],
       [2.70784883, 2.2912546 , 2.71828183, ..., 2.21977572, 2.36397548,
        2.47684215],
       ...,
       [2.14915104, 2.56572821, 2.21977572, ..., 2.71828183, 1.50466813,
        2.16685022],
       [2.46156486, 1.83153792, 2.36397548, ..., 1.50466813, 2.71828183,
        1.78019102],
       [2.37930197, 1.93586545, 2.47684215, ..., 2.16685022, 1.78019102,
        2.71828183]])

In [93]:
def rational_quadratic(x_1, x_2, lengthscale=0.5, variance=1.0, weighting=1.0):
    """
    Rational Quadratic Kernel, equivalent to adding together many Squared Exponential kernels with different 
    lengthscales. Weight parameter determine relative weighting of large and small scale variations. When
    the weighting goes to infinity, RQ = SE.
    
    :param x_1, N x d matrix
    :param x_2, M x d matrix

    :param lengthscale, float
    :param variance, float
    :param weighting, float

    :returns K, N x M matrix, K_{ij} = k(x_i, x_j; lengthscale, variance, weighting)
    """
    sq_diffs = cdist(x_1, x_2, metric = 'sqeuclidean')
    return variance * ( (1 + sq_diffs / 2*lengthscale * weighting) ** (-weighting) )

rational_quadratic(x_1, x_2).shape

(50, 50)

In [94]:
class RationalQuadratic(BaseKernel):
    def __init__(self, lengthscale=0.5, variance=1.0, weighting=1.0):
        self.lengthscale = lengthscale
        self.variance = variance
        self.weighting = weighting
        self.param_dim = 3
        super().__init__()
    
    def make_gram(self, x_1, x_2):
        self.gram = rational_quadratic(x_1, x_2, self.lengthscale, self.variance, self.weighting)
        return self.gram 
    
    def invert_gram(self):
        self.inverse_gram = chol_inverse(self.gram)
        return self.inverse_gram
    
    def constrain_params(self, unconstrained_params):
        if unconstrained_params.shape[0] != self.param_dim:
            raise AssertionError('Parameter array not the same size as kernel parameter dimension')
        self.constrained_params = np.exp(unconstrained_params)
        return self.constrained_params
    
    def update_params(self):
        self.lengthscale = self.constrained_params[0]
        self.variance = self.constrained_params[1]
        self.weighting = self.constrained_params[2]
    
    def prior_log_pdf(self, d):
        self.prior = gamma.logpdf(self.constrained_params[0], a = 1, scale = np.sqrt(d))
        self.prior += gamma.logpdf(self.constrained_params[1], a = 1.2, scale = 1/0.2)
        self.prior += gamma.logpdf(self.constrained_params[2], a = 0.00001, scale = 1/0.00001) # uninformative prior
        return self.prior

kernel = RationalQuadratic(lengthscale = 1, variance = 1, weighting = 1)
kernel.make_gram(x_1, x_1)
kernel.constrain_params(np.zeros(3))
kernel.prior_log_pdf(1)


-14.558996227068079

In [95]:
def periodic(x_1, x_2, lengthscale=0.5, variance=1.0, period=1.0):
    """
    The periodic kernel allows one to model functions which repeat themselves exactly.
    
    :param x_1, N x d matrix
    :param x_2, M x d matrix

    :param lengthscale, float
    :param variance, float
    :param period, float

    :returns K, N x M matrix, K_{ij} = k(x_i, x_j; lengthscale, variance, period)
    """
    diffs = cdist(x_1, x_2, metric = 'euclidean')
    return variance * np.exp(-2 * np.sin(np.pi * diffs / period)**2 / lengthscale)

periodic(x_1, x_2).shape

(50, 50)

In [96]:
class Periodic(BaseKernel):
    def __init__(self, lengthscale=0.5, variance=1.0, period=1.0):
        self.lengthscale = lengthscale
        self.variance = variance
        self.period = period
        self.param_dim = 3
        super().__init__()
    
    def make_gram(self, x_1, x_2):
        self.gram = periodic(x_1, x_2, self.lengthscale, self.variance, self.period)
        return self.gram 
    
    def invert_gram(self):
        self.inverse_gram = chol_inverse(self.gram)
        return self.inverse_gram
    
    def constrain_params(self, unconstrained_params):
        if unconstrained_params.shape[0] != self.param_dim:
            raise AssertionError('Parameter array not the same size as kernel parameter dimension')
        self.constrained_params = np.exp(unconstrained_params)
        return self.constrained_params
    
    def update_params(self):
        self.lengthscale = self.constrained_params[0]
        self.variance = self.constrained_params[1]
        self.period = self.constrained_params[2]

    def prior_log_pdf(self, d):
        self.prior = gamma.logpdf(self.constrained_params[0], a = 1, scale = np.sqrt(d))
        self.prior += gamma.logpdf(self.constrained_params[1], a = 1.2, scale = 1/0.2)
        self.prior += gamma.logpdf(self.constrained_params[2], a = 0.00001, scale = 1/0.00001) # uninformative prior
        return self.prior

kernel = Periodic(lengthscale = 1, variance = 1, period = 10)
kernel.make_gram(x_1, x_1)
kernel.constrain_params(np.zeros(3))
kernel.prior_log_pdf(1)

-14.558996227068079

In [97]:
def locally_periodic(x_1, x_2, lengthscale_sqe=0.5, variance=1.0, lengthscale_p =0.5, period=1.0):
    """
    A squared exponential kernel multiplied by a periodic kernel. Allows one to model periodic functions
    which can vary slowly over time.
    
    :param x_1, N x d matrix
    :param x_2, M x d matrix

    :param lengthscale, float
    :param variance, float
    :param period, float

    :returns K, N x M matrix, K_{ij} = k(x_i, x_j; lengthscale, variance, period)
    """
    
    diffs = cdist(x_1, x_2, metric = 'euclidean')
    sq_diffs = cdist(x_1, x_2, metric = 'sqeuclidean')
    
    K_period = np.exp(-2 * np.sin(np.pi * diffs / period)**2 / lengthscale_p)
    K_sqe = np.exp(-0.5 * sq_diffs / lengthscale_sqe)
    return variance * np.multiply(K_period, K_sqe)

locally_periodic(x_1, x_2, 1, 0.5, 0.5, 0.5)

array([[0.01943932, 0.29180675, 0.19719793, ..., 0.00698842, 0.03547656,
        0.00929565],
       [0.00697473, 0.00932752, 0.25789686, ..., 0.13097394, 0.2865392 ,
        0.00201134],
       [0.00894003, 0.05146021, 0.0135198 , ..., 0.03231052, 0.00431434,
        0.01294791],
       ...,
       [0.0110584 , 0.01112045, 0.16290165, ..., 0.07276377, 0.09532865,
        0.00609362],
       [0.0233737 , 0.24850693, 0.19878202, ..., 0.00418271, 0.30323217,
        0.01966417],
       [0.01281723, 0.00204036, 0.00641971, ..., 0.20021683, 0.03443829,
        0.17148864]])

In [98]:
class LocallyPeriodic(BaseKernel):
    def __init__(self, lengthscale_sqe=0.5, variance=1.0, lengthscale_p =0.5, period=1.0):
        self.lengthscale_sqe = lengthscale_sqe
        self.variance = variance
        self.lengthscale_p = lengthscale_p
        self.period = period
        self.param_dim = 4
        super().__init__()
    
    def make_gram(self, x_1, x_2):
        self.gram = locally_periodic(x_1, x_2, self.lengthscale_sqe, self.variance, self.lengthscale_p, self.period)
        return self.gram 
    
    def invert_gram(self):
        self.inverse_gram = chol_inverse(self.gram)
        return self.inverse_gram
    
    def constrain_params(self, unconstrained_params):
        if unconstrained_params.shape[0] != self.param_dim:
            raise AssertionError('Parameter array not the same size as kernel parameter dimension')
        self.constrained_params = np.exp(unconstrained_params)
        return self.constrained_params
    
    def update_params(self):
        self.lengthscale_sqe = self.constrained_params[0]
        self.variance = self.constrained_params[1]
        self.lengthscale_p = self.constrained_params[2]
        self.period = self.constrained_params[3]
    
    def prior_log_pdf(self, d):
        self.prior = gamma.logpdf(self.constrained_params[0], a = 1, scale = np.sqrt(d))
        self.prior += gamma.logpdf(self.constrained_params[1], a = 1.2, scale = 1/0.2)
        self.prior += gamma.logpdf(self.constrained_params[2], a = 0.00001, scale = 1/0.00001) # uninformative prior
        self.prior += gamma.logpdf(self.constrained_params[3], a = 0.00001, scale = 1/0.00001) # uninformative prior
        return self.prior

kernel = LocallyPeriodic(lengthscale_sqe = 1, variance = 1, lengthscale_p = 0.5, period = 10)
kernel.make_gram(x_1, x_1)
kernel.constrain_params(np.zeros(4))
kernel.prior_log_pdf(1)

-26.072041049218555

In [99]:
def linear(x_1, x_2, constant_variance=0.5, variance=1.0, offset=1.0):
    """
    A linear kernel is a non-stationary kernel, which when used with a GP, is equivalent to
    Bayesian linear regression.
    
    :param x_1, N x d matrix
    :param x_2, M x d matrix

    :param constant_variance, float
    :param variance, float
    :param offset, float

    :returns K, N x M matrix, K_{ij} = k(x_i, x_j; constant_variance, variance, offset)
    """
    
    return constant_variance + variance*np.dot(x_1 - offset, x_2.T - offset)

linear(x_1, x_2)

array([[2.12485862, 3.08382839, 3.33338171, ..., 2.14257458, 3.39988176,
        3.0848776 ],
       [2.96513369, 3.08075935, 3.22094832, ..., 1.04626758, 3.31858539,
        1.64253894],
       [1.89121769, 2.7938565 , 3.02204574, ..., 2.02488153, 3.07918173,
        2.88243903],
       ...,
       [1.99913609, 1.94820288, 2.01187183, ..., 0.65604826, 2.07095395,
        0.94321709],
       [2.96053834, 4.39916247, 4.77465071, ..., 2.96767593, 4.87531942,
        4.38618119],
       [0.57413216, 1.37396248, 1.5199463 , ..., 1.67340633, 1.52481655,
        2.18685359]])

In [100]:
class Linear(BaseKernel):
    def __init__(self, constant_variance=0.5, variance=1.0, offset=1.0):
        self.constant_variance = constant_variance
        self.variance = variance
        self.offset = offset
        self.param_dim = 3
        super().__init__()
    
    def make_gram(self, x_1, x_2):
        self.gram = linear(x_1, x_2, self.constant_variance, self.variance, self.offset)
        return self.gram 
    
    def invert_gram(self):
        self.inverse_gram = chol_inverse(self.gram)
        return self.inverse_gram
    
    def constrain_params(self, unconstrained_params):
        if unconstrained_params.shape[0] != self.param_dim:
            raise AssertionError('Parameter array not the same size as kernel parameter dimension')
        self.constrained_params = np.concatenate((np.exp(unconstrained_params[0:2]), unconstrained_params[2].reshape(-1)))
        return self.constrained_params
    
    def update_params(self):
        self.constant_variance = self.constrained_params[0]
        self.variance = self.constrained_params[1]
        self.offset = self.constrained_params[2]
    
    def prior_log_pdf(self, d):
        self.prior = gamma.logpdf(self.constrained_params[1], a = 1.2, scale = 1/0.2)
        self.prior += gamma.logpdf(self.constrained_params[1], a = 1.2, scale = 1/0.2)
        self.prior += norm.logpdf(self.constrained_params[2]) # Standard Gaussian prior for offset
        return self.prior

kernel = Linear(constant_variance=0.5, variance=1.0, offset=1.0)
kernel.make_gram(x_1, x_1)
kernel.constrain_params(np.zeros(3))
kernel.prior_log_pdf(1)

-5.010841343039882

In [102]:
class Additive(BaseKernel):
    def __init__(self, kernels):
        self.kernels = kernels
        self.param_dim = sum(k.param_dim for k in self.kernels)
        super().__init__()
    
    def make_gram(self, x_1, x_2):
        grams = [k.make_gram(x_1, x_2) for k in self.kernels]
        self.gram = sum(grams)
        return self.gram
        
    def invert_gram(self):
        self.inverse_gram = chol_inverse(self.gram)
        return self.inverse_gram

    def constrain_params(self, unconstrained_params):
        if unconstrained_params.shape[0] != self.param_dim:
            raise AssertionError('Parameter array not the same size as kernel parameter dimension')
            
        self.constrained_params = np.zeros(0)
        dim_count = 0
        for k in self.kernels:
            self.constrained_params = np.concatenate((self.constrained_params, k.constrain_params(unconstrained_params[dim_count:(dim_count + k.param_dim)])))
            dim_count += k.param_dim
        return self.constrained_params
    
    def update_params(self):
        for k in self.kernels:
            k.update_params()
    
    def prior_log_pdf(self, d):
        self.prior = 0
        for k in self.kernels:
            self.prior += k.prior_log_pdf(d)
        return self.prior

k1 = SquaredExponential(lengthscale = 1, variance = 1)
k2 = Periodic(lengthscale = 1, variance = 1, period = 1)
k3 = Linear(constant_variance = 0.5, variance = 1, offset = 1)

kadd = Additive([k1, k2, k3])

kadd.make_gram(x_1, x_1)

kadd.constrain_params(np.array([1, 2, 3, 4, 5, 6, 7, 8]))
kadd.update_params()

array([[4.60733042, 3.54489056, 4.08152105, ..., 2.47155223, 4.77052975,
        2.95145953],
       [3.54489056, 4.21036693, 3.36205472, ..., 2.49761874, 3.1517095 ,
        1.42409117],
       [4.08152105, 3.36205472, 4.23659163, ..., 2.64674966, 4.79607998,
        2.57911438],
       ...,
       [2.47155223, 2.49761874, 2.64674966, ..., 3.09886458, 2.26226533,
        1.9155143 ],
       [4.77052975, 3.1517095 , 4.79607998, ..., 2.26226533, 7.27866731,
        2.58953087],
       [2.95145953, 1.42409117, 2.57911438, ..., 1.9155143 , 2.58953087,
        3.29646968]])

In [65]:
class Multiplicative(BaseKernel):
    def __init__(self, kernels):
        self.kernels = kernels
        self.param_dim = sum(k.param_dim for k in self.kernels)
        super().__init__()
    
    def make_gram(self, x_1, x_2):
        grams = [k.make_gram(x_1, x_2) for k in self.kernels]
        self.gram = reduce(np.multiply, grams)
        return self.gram
        
    def invert_gram(self):
        self.inverse_gram = chol_inverse(self.gram)
        return self.inverse_gram
    
    def constrain_params(self, unconstrained_params):
        if unconstrained_params.shape[0] != self.param_dim:
            raise AssertionError('Parameter array not the same size as kernel parameter dimension')
            
        self.constrained_params = np.zeros(0)
        dim_count = 0
        for k in self.kernels:
            self.constrained_params = np.concatenate((self.constrained_params, k.constrain_params(unconstrained_params[dim_count:(dim_count + k.param_dim)])))
            dim_count += k.param_dim
        return self.constrained_params
    
    def update_params(self):
        for k in self.kernels:
            k.update_params()
    
    def prior_log_pdf(self, d):
        self.prior = 0
        for k in self.kernels:
            self.prior += k.prior_log_pdf(d)
        return self.prior
    
k1 = SquaredExponential(lengthscale = 1, variance = 1)
k2 = Periodic(lengthscale = 1, variance = 1, period = 10)
k3 = Linear(constant_variance = 0.5, variance = 1, offset = 1)

kmul = Multiplicative([k1, k2, k3])

kmul.make_gram(x_1, x_1)
kmul.constrain_params(np.array([1, 2, 3, 4, 5, 6, 7, 8]))

array([   2.71828183,    7.3890561 ,   20.08553692,   54.59815003,
        148.4131591 ,  403.42879349, 1096.63315843,    8.        ])

In [None]:
def add(x_1, x_2, kernels):
    """
    Add together an arbitrary number of kernels.
    
    :param x_1, N x d matrix
    :param x_2, M x d matrix
    
    :param kernels, tuple, tuple consisting of kernel functions, and corresponding parameter dictionaries
    
    :returns K, N x M matrix, sum of kernel matrices 
    """
    
    return sum([
        kernel(x_1, x_2, **kernel_kwargs)
        for kernel, kernel_kwargs in kernels
    ])

kernels = (
    (locally_periodic, {'lengthscale':0.5, 'variance':1.0, 'period':1.0}),
    (periodic, {'lengthscale':0.5, 'variance':1.0, 'period':0.1}),
)

add(x_1, x_2, kernels).shape

(50, 25)

In [None]:
def multiply(x_1, x_2, kernels):
    """
    Multiply together an arbitrary number of kernels.
    
    :param x_1, N x d matrix
    :param x_2, M x d matrix
    
    :param kernels, tuple, kernel functions and corresponding parameter dictionaries
    
    :returns K, N x M matrix, sum of kernel matrices 
    """
    
    grams = [
    kernel(x_1, x_2, **kernel_kwargs) 
    for kernel, kernel_kwargs in kernels
    ]
    
    return reduce(np.multiply, grams)

kernels = (
    (linear, {'constant_variance':0, 'variance':1.0, 'offset':0}),
    (periodic, {'lengthscale':0.5, 'variance':1.0, 'period':0.1}),
    (locally_periodic, {'lengthscale':0.5, 'variance':1.0, 'period':10})
    
)


multiply(x_1, x_2, kernels).shape

(50, 25)