In [1]:
import numpy as np
from scipy.sparse import csr_matrix, vstack, hstack
from scipy.optimize import LinearConstraint, Bounds, minimize
from typing import List, Union

## Markov Models, Sequential Data


The likelihood of a sequence $(x_1, x_2,\ldots,x_N)$ can be defined as a markov model of $M^{\mathrm{th}}$ order by

\begin{equation}
p(x_1,\ldots,x_N) = p(x_1) p(x_2|x_1)\ldots p(x_{M}|x_{M-1},\ldots x_1)\prod_{n=M}^N p(x_n| x_{n-1},\ldots x_{n-M}).
\end{equation}

The simplest order model where $M=1$ is defined as

\begin{equation}
p(x_1,\ldots,x_N) = p(x_1) \prod_{n=2}^N p(x_n | x_{n-1}).
\end{equation}

For this model, if $x\in\mathcal{N}$ is discrete with $K$ possible values, we can define $p(x_n|x_{n-1})$ by the transition matrix $\mathbf{A}: A_{ij}=p(x_n=i|x_{n-1}=j)$. This is a $K\times K$ rank 2 tensor but because we know that

\begin{equation}
\sum_i p(x_n=i|x_{n-1}=j) = 1, \forall j
\end{equation}

there are $K-1$ free variables per row of $\mathbf{A}$ and $K(K-1)$ free variables in total. For $M=1$,

\begin{equation}
\mathbf{A} =
\begin{bmatrix}
a_0 & 1-a_0 \\
a_1 & 1-a_1
\end{bmatrix}
\end{equation}

In [26]:
class ConditionalDistribution:
    """
    Conditional probability distribution p(x_n | x_n-1, .., x_n-M) for 
    a M^th order Markox sequence.
    """
    def __init__(self, K: int, M: int, random_init: bool=False):
        if K < 2:
            raise ValueError(f'Must have at least 2 possible categorical value.')
        if M < 0:
            raise ValueError('Model order must be at least 0.')
        
        # number of possible categorical values
        self.K = K
        
        # order of Markovian dependence
        self.M = M
        
        self.init_A(random=random_init)
        
        # number of free parameters
        self.N = (self.K-1)*(self.K**self.M)
        
    def init_A(self, random: bool=False):
        """
        Instantiate and set state transition probabilities
        """
        if random:
            # N(0,1)
            self.A = np.random.normal(size=[self.K]*(self.M+1))
            
            # make positive
            self.A = np.exp(self.A)
            
            # normalize
            self.A /= np.tile(np.expand_dims(np.sum(self.A, axis=0), axis=0),
                         [self.A.shape[0]]+[1]*(len(self.A.shape)-1))
        else:
            # uniform initial values for transition probabilities
            self.A = np.ones([self.K]*(self.M+1))/self.K
        
    def prob(self, *args) -> float:
        """
        Return p(args[0]|args[1],args[2],..,args[M])
        
        args[0] = x_n
        args[1] = x_n-1
        .
        .
        .
        args[M] = x_n-M
        
        Assume that elements of *args are integers in the range 0 to K-1
        inclusive.
        """
        if len(args) != self.M+1:
            raise Exception('Input shape doesnt match expected shape')
            
        return self.A[tuple(args)]
    
    def log_prob(self, *args) -> float:
        return np.log(self.prob(*args))
            
    def grad_log_prob(self, *args) -> np.ndarray:
        """
        Returns the gradient of ln p(x_n|x_n-1..) with respect to A.
        
        A[-1,...] = 1-A[:-1,...] therefore 
        
        grad A[:-1,...] = grad A[:-1,...] - grad A[-1,...]
        """
        out = np.zeros(self.A.shape)
        out[tuple(args)] = 1.0/self.A[tuple(args)]
        #  shape = (K,...,K)
        #return out
        # shape = (K-1,...,K) -> (-1, )
        return np.reshape(out[:-1,...] - out[-1,...], (-1, ))
    
    def get_params(self) -> np.ndarray:
        """
        Returns 1-d array of free parameters.
        """
        # p(x_n=K-1 | x_n-1 = j) = 1 - sum_k=0^K-2 p(x_n=k | x_n-1 = j)
        return np.reshape(self.A[:-1, ...], (-1, ))
    
    def set_params(self, A_small: np.ndarray):
        """
        Set self.A, the (K,...,K) rank M+1 tensor for conditional 
        transition probabilities from the (K-1, K, ..., K) rank M tensor
        A_small.
        
        Keyword arguments
        -----------------
        A_small, np.ndarray, shape = [(K-1)*K^M, ]
        """
        # rank M tensor, shape = (K-1, K, ...,K)
        A = np.reshape(A_small, tuple([self.K-1]+[self.K]*self.M))
        
        # rank M tensor, shape = (1, K, ..., K)
        A_ = 1.0 - np.reshape(np.sum(A, axis=0), tuple([1]+[self.K]*self.M))
        
        if self.M > 0:
            # rank M tensor, shape = (K,...,K)
            self.A = np.vstack((A, A_))
        else:
            self.A = np.hstack((A, A_))
        
    def sample(self, *args) -> int:
        """
        Return a sample conditioned on the Markov blanket, *args. 
        
        
        For example, for a model of order self.M=2, this class models
        the transition states p(x3|x2,x1). *args=[x2, x1] will generate
        a sample x3 ~ p(x3|x2,x1). Note the assumed order of x2 and x1
        in *args.
        
        Keyword arguments
        -----------------
        *args: List[float] -- for *args=[x2,x1] (self.M=2), return x3
            such that x3 ~ p(x3|x2,x1). 
        """
        if len(args) != self.M:
            raise ValueError('Arguments must be a list of values of Markov blanket.')
            
        # transition probabilities to state k, shape = (K, )
        probs = self.A.T[tuple(args[::-1])]
        
        return np.where(np.random.multinomial(1, pvals=probs))[0][0]
       
    def generate_constraint_matrix(self) -> csr_matrix:
        """
        Generate a matrix C such that 0 <= C self.get_params() <= 1
        define the constraints that sum_{x_n} p(x_n | x_n-1,...) = 1
        
        C.shape = ((K-1)^M, N)
        """
        C = np.zeros((self.K**self.M, self.N))
        
        for cc in range(C.shape[0]):
            C[cc, (self.K-1)*cc:(self.K-1)*(cc+1)] = 1.0
            
        return csr_matrix(C)
        
        
            
class MarkovModel:
    def __init__(self, K: int, M: int, random_init: bool=False):
        self.K = K
        self.M = M
        
        self.models = [ConditionalDistribution(K=K, M=_m, random_init=random_init)
                       for _m in range(M+1)]
        
        # total number of parameters
        self.N = sum([_m.N for _m in self.models])
        
    def log_prob(self, x: np.ndarray) -> float:
        """
        Returns ln p(x0) + ln p(x1|x0) ... + ln p(x_M-1|...x0) + sum_n p(x_n | x_n-1...)
        """
        if x.shape[0] < self.M+1:
            raise ValueError(f"Input data must have length of at least {self.M+1}")
        
        if self.M > 0:
            # ln p(x0) +...+ ln p(x_M-1 | ...x0)
            out = sum([self.models[_m].log_prob(*x[0:_m+1][::-1]) for _m in range(self.M)])
        else:
            out = 0.0
        
        for _n in range(self.M, x.shape[0]):
            out += self.models[self.M].log_prob(*x[_n-self.M:_n+1][::-1])
            
        return out
    
    def get_params(self) -> np.ndarray:
        """
        Returns a 1-d array of concatenated parameters over all
        self.models, in order of increasing order.
        """
        return np.concatenate(tuple(_m.get_params() for _m in self.models))
    
    def set_params(self, A_small: np.ndarray):
        """
        Sets _m.A for _m in self.models from a concatenated list of 
        parameter values.
        
        Keyword arguments
        -----------------
        A_small: np.ndarray, shape = [Ntotal, ] -- All Ntotal free
            parameters of all m^th order models.
        """
        # number of free params in each m^th order model
        N = [_m.N for _m in self.models]
        
        if sum(N) != A_small.shape[0]:
            raise Error('Inconsistent shape of input argument. Expected [{N}, ]')
            
        # start and finish slice indices for each model into A_small
        idx = [sum(N[0:i]) for i in range(self.M+1)]
        
        for mm in range(self.M):
            self.models[mm].set_params(A_small[idx[mm]: idx[mm+1]])
        self.models[-1].set_params(A_small[idx[-1]: ])
            
    def grad_log_prob(self, x: np.ndarray) -> np.ndarray:
        """
        Returns the jacobian of the log likelihood with respect to 
        model free parameters (state transition probabilities). The 
        jacobian of each m^th order model are concatenated into a 1-d
        array in increading model order.
        
        Keyword arguments
        -----------------
        x: np.ndarray, shape = [N, ] -- categorical data of length N
            and with self.K possible values = [0, self.K-1]
        """
        
        grad2 = np.sum([self.models[self.M].grad_log_prob(*x[_n-self.M:_n+1][::-1])
                    for _n in range(self.M, x.shape[0])], axis=0)
            
        if self.M > 0:
            grad1 = self.models[0].grad_log_prob(x[0])
            
            for _m in range(1, self.M):
                grad1 = np.concatenate((grad1, self.models[_m].grad_log_prob(*x[0:_m+1][::-1]) ))
                
            return np.concatenate((grad1, grad2))
        else:
            return grad2

    def sample(self, N: int, x: Union[List, np.ndarray]=[]) -> np.ndarray:
        """
        Return a sampled sequence of length N.
        
        Keyword arguments
        -----------------
        N: int -- The length of the sequence to return.
        x: Union[List, np.ndarray], default = [] -- An optional seed to
            begin sampling from.
            
        Examples
        ---------
        > from pymm import MarkovModel
        >
        > model = MarkovModel(2, 0)
        > model.fit([0, 1, 0, 1, 0, 1])
        >
        > # sequence depends on seed values
        > print(f'sequence 1: {model.sample(10, [0])}')
        > print(f'sequence 2: {model.sample(10, [1])}')
        
        """
        if N<1:
            raise ValueError('Must sample at least 1 value.')
        
        x = list(deepcopy(x))
        
        if self.M > 0:
            for _m in range(self.M):
                if len(x) >= N:
                    break

                if len(x)-1 < _m:

                    # ConditionalDistribution.sample(*args) expects
                    # *args = [x_{n-1}, x_{n-2},...]
                    x.append(self.models[_m].sample(*x[:_m+1][::-1]))
                
        for _n in range(self.M, N):
            if len(x)-1 < _n:
                # ConditionalDistribution.sample(*args) expects
                # *args = [x_{n-1}, x_{n-2},...]
                x.append(self.models[self.M].sample(*x[_n-self.M:_n+1][::-1]))
        
        # shape = [N, ]
        return np.asarray(x)
        
    def generate_constraint_matrix(self) -> csr_matrix:
        """
        Concatenate constraint matrix for all models into a single
        constraint matrix C such that 0 <= C * self.get_params() <= 1,
        which corresponds to the probability normalisation contraint.
        """
        # List[csr_matrix], len = self.M+1
        constraints = [_m.generate_constraint_matrix() for _m in self.models]
        
        if len(constraints) < 2:
            return constraints[0]
        else:
            out = self.stack(constraints[0], constraints[1])
            if len(constraints) < 3:
                return out
            else:
                for _c in constraints[2: ]:
                    out = self.stack(out, _c)
                    
                return out
    
    def generate_constraint(self) -> LinearConstraint:
        """
        Returns a scipy.optimizse.LinearConstraint instance
        """
        C = self.generate_constraint_matrix()
        
        # avoid 1/0 probabilities
        delta = 1e-6
        
        return LinearConstraint(C, np.zeros(C.shape[0])+delta, np.ones(C.shape[0])-delta)
    
    def generate_bounds(self) -> Bounds:
        """
        Return a bounds object, constraining each transition probability
        component to be positive.
        """
        return Bounds(np.zeros(self.N), np.ones(self.N))
    
    def _log_prob(self, params: np.ndarray) -> float:
        """
        Returns log likelihood as an explicit function of free
        parameters, assuming self.x has been previously set.
        """
        self.set_params(params)
        return -self.log_prob(self.x)
    
    def _grad_log_prob(self, params: np.ndarray) -> np.ndarray:
        """
        Returns gradient of log-likelihood with respect to transition
        state probability free parameters, assuming self.x: np.ndarray
        has been set.
        """
        self.set_params(params)
        return -self.grad_log_prob(self.x)
    
    def fit(self, x: np.ndarray):
        self.x = x
        
        bounds = self.generate_bounds()
        constraints = self.generate_constraint()
        
        self.log = minimize(fun=self._log_prob,
                            jac=self._grad_log_prob,
                            x0=self.get_params(),
                            bounds=bounds,
                            constraints=constraints
                            )

    @classmethod
    def stack(cls, c1: csr_matrix, c2: csr_matrix) -> csr_matrix:
        """
        c1.shape = (n1, d1) -- n1 linear constraints and d1 associated
            parameters.
        """
        (n1, d1) = c1.shape
        (n2, d2) = c2.shape
        
        # shape = [n1+n2, d1+d2]
        C = csr_matrix((n1+n2, d1+d2))
        
        C[:n1, :d1] = c1[:, :]
        C[n1:, d1:] = c2[:, :]
        
        return C

In [27]:
from copy import deepcopy
from scipy.optimize import approx_fprime

def _test_getset(K: int, M: int):
    inst = ConditionalDistribution(K=K, M=M, random_init=True)
    
    Aorig = deepcopy(inst.A)
    inst.set_params(inst.get_params())
    Arecon = deepcopy(inst.A)
    return np.allclose(Aorig, Arecon)

def test_getset():
    out = [[_test_getset(K=_k, M=_m) for _m in range(5)] for _k in range(2, 4)]
    assert np.all(out), 'mismatch between getter and setter'

def _test_grad(model, x: np.ndarray):
    def log_prob(params: np.ndarray):
        # log prob as function of free model params 
        # for fixed sequence x
        _model = deepcopy(model)
        _model.set_params(params)
        if isinstance(_model, ConditionalDistribution):
            return _model.log_prob(*x)
        else:
            return _model.log_prob(x)
    
    numerical_grad = approx_fprime(model.get_params(), log_prob, 1e-8)
    if isinstance(model, ConditionalDistribution):
        analytical_grad = model.grad_log_prob(*x)
    else:
        analytical_grad = model.grad_log_prob(x)

    assert np.allclose(numerical_grad, analytical_grad), f'gradient computation wrong: {analytical_grad} != {numerical_grad}'
    
def test_grad_ConditionalDistribution():
    # sequence
    x = np.asarray([1, 1], dtype=int)
    
    model = ConditionalDistribution(K=2, M=1, random_init=True)
    
    _test_grad(model, x)
    
def test_grad_MarkovModel():
    """
    Comparae finite and analytical derivative of log likelihood with 
    respect to all model parameters.
    """
    # sequence
    x = np.asarray([1, 1, 0, 1, 0], dtype=int)
    
    for _m in range(4):
        model = MarkovModel(K=2, M=1, random_init=True)

        _test_grad(model, x)
    
def test_logprob_MarkovModel():
    """
    Check cumulative log probability over models is correct
    """
    def _test(M: int):
        # p(x1,x2,x3,x4,x5)=p(x5|x4,x3)p(x4|x3,x2)p(x3|x2,x1)p(x2|x1)p(x1)
        model = MarkovModel(K=2, M=M, random_init=True)

        # sequence
        x = np.asarray([0, 1, 0, 1, 1], dtype=int)

        if M==0:
            true = sum([model.models[0].log_prob(_x) for _x in x])
        elif M==1:
            # ln p(x1)
            true = model.models[0].log_prob(x[0])
            # ln p(x2|x1)
            true += model.models[1].log_prob(x[1], x[0])
            # ln p(x3|x2)
            true += model.models[1].log_prob(x[2], x[1])
            # ln p(x4|x3)
            true += model.models[1].log_prob(x[3], x[2])
            # ln p(x5|x4)
            true += model.models[1].log_prob(x[4], x[3])
        elif M==2:
            # ln p(x1)
            true = model.models[0].log_prob(x[0])
            # += ln p(x2|x1)
            true += model.models[1].log_prob(x[1], x[0])
            # += ln p(x3|x2,x1)
            true += model.models[2].log_prob(x[2], x[1], x[0])
            # += ln p(x4|x3,x2)
            true += model.models[2].log_prob(x[3], x[2], x[1])
            # += ln p(x5|x4,x3)
            true += model.models[2].log_prob(x[4], x[3], x[2])
        else:
            raise ValueError(f'M = {M} unsupported')
            
        model_val = model.log_prob(x)
        assert np.isclose(true, model_val), 'log joint probability incorrect'
        
    _test(M=0)
    _test(M=1)
    _test(M=2)
    
    
test_getset()
test_grad_ConditionalDistribution()
test_grad_MarkovModel()
test_logprob_MarkovModel()

In [32]:
K, M = 2, 2
model = MarkovModel(K=K, M=M)

x = np.asarray([0, 0, 1, 0, 0, 1, 0], dtype=int)
model.fit(x)

In [33]:
model.sample(10, [0])

array([0, 0, 1, 0, 0, 1, 0, 0, 1, 0], dtype=int64)

In [34]:
model.sample(10, [1])

array([1, 0, 0, 1, 0, 0, 1, 0, 0, 1], dtype=int64)

In [37]:
np.allclose(np.asarray([0, 1, 1]),np.asarray([0, 1, 1]))

True