In [37]:
import numpy as np
from scipy.stats import multivariate_normal
from scipy.special import logsumexp


In [4]:

class GaussianHMM:
    """
    given a set of observations and number of states, this class uses the expectation-maximisation algorithm to fit an HMM model,
    finding the time-dependent transition probabilities between the states generating the observations, as well as the gaussian
    parameters of best fit for each state
    """

    def __init__(self, k, pi = None, logpi = None, A = None, logA = None, mu_mat = None, cv_mat = None, distribs = None, ll_pts = None):
        """initalises the parameters of the HMM, requires K to be initalised by user"""

        # K - number of states
        self.k = k

        # mu matrix - means of each state
        if mu_mat == None:
            self.mu_mat = np.array([[-1, 2], [0.2, 0.2], [3, 2.5]])

        # cv matrix - covariance of each state
        if cv_mat is None:
            self.cv_mat = np.array([[[1, 0.8], [0.8, 1]],[[1, 0.8], [0.8, 1]], [[1, 0.8], [0.8, 1]]])

        # emission probability distributions
        if distribs == None:
            self.distribs = [None, None, None]

            for k in range(self.k):
                self.distribs[k] = multivariate_normal(mean = self.mu_mat[k], cov = self.cv_mat[k])

        # list of LLs to update at each iteration
        self.ll_pts = []

        # initial probabilities
        self.pi = [0.3, 0.3, 0.4]
        self.logpi = np.log(self.pi)

        # transition matrix
        self.A = [[0.7, 0.2, 0.1],
                [0.2, 0.7, 0.1],
                [0.1, 0.2, 0.7]]
        self.logA = np.log(self.A)

        #Yahoo! initialised


    def calc_logalpha(self, data):
        """calculates alpha of the forward-backward algorithm, E-step"""
        t = data.shape[0]

        logalpha = np.zeros((t, self.k))

        for k in range(self.k):

            logalpha[0, k] = self.logpi[k] + self.distribs[k].logpdf(data[0])

        for t in range(1, t):

            for k in range(self.k):

                logalpha_temp = []

                for j in range(self.k):

                    logalpha_temp.append( logalpha[t-1, j] + self.logA[j, k]  )

                logalpha_sum = logsumexp(logalpha_temp)
                logalpha[t, k] = logalpha_sum + self.distribs[k].logpdf(data[t])

        return logalpha


    def calc_logbeta(self, data):
        """calculates beta of the forward-backward algorithm, E-step"""
        T = data.shape[0]

        logbeta = np.zeros((T, self.k))

        logbeta[T-1, :] = 0

        for t in range(T-2, -1, -1):

            for k in range(self.k):

                logbeta_temp = []

                for j in range(self.k):

                    logbeta_temp.append(logbeta[t+1, j] + self.logA[k, j] + self.distribs[j].logpdf(data[t+1]) )

                logbeta[t, k] = logsumexp(logbeta_temp)


        return logbeta


    def calc_loggamma(self, data, logalpha, logbeta):
        """calculates gamma, the expectation of the latent - the marginal posterior distribution, E-step"""
        T = len(data)
        loggamma = np.zeros((T, self.k))

        for t in range(T):
            gamma = logalpha[t, :] + logbeta[t, :]
            gamma_denom = logsumexp(gamma)
            gamma -= gamma_denom
            loggamma[t, :] = gamma

        return loggamma


    def calc_logxi(self, data, logalpha, logbeta):
        """calculates xi, the expectation of the latent transitions - the joint posterior distribution, E-step"""
        T = len(data)
        logxi = np.zeros((T-1, self.k, self.k))

        for t in range(T-1):

            logxi_temp = np.zeros((self.k, self.k))

            for k in range(self.k):
                for j in range(self.k):

                    logxi_temp[k, j] = logalpha[t, k] + self.logA[k, j] + self.distribs[j].logpdf(data[t+1]) + logbeta[t+1, j]

            # normalisation step
            logxi[t, :, :] = logxi_temp - logsumexp(logxi_temp)

        return logxi


    def up_pi(self, loggamma):
        """updates pi parameter in M-step"""
        loggamma_col_sum = loggamma[0, :]

        self.pi = np.exp(loggamma_col_sum)
        self.logpi = loggamma_col_sum
        return


    def up_A(self, logxi, loggamma):
        """updates A parameter in M-step"""
        temporary_logA = np.zeros((self.k, self.k))

        for i in range(self.k):
            for j in range(self.k):
                temporary_logA[i, j] = (logsumexp(logxi[:, i, j]) - logsumexp(loggamma[:-1, i]))

        self.logA = temporary_logA
        self.A = np.exp(self.logA)
        return


    def up_mu(self, loggamma, data):
        """updates mu in M-step"""
        gamma = np.exp(loggamma)

        for k in range(self.k):

            gammak = (gamma[:, k]).reshape(len(gamma), 1)

            gamma_sum = np.sum(gammak)

            mu_k_new = (np.sum((gammak * data), axis = 0)) / gamma_sum

            self.mu_mat[k, :] = mu_k_new
        return


    def up_cv(self, loggamma, data):
        """updates CV in M-step"""
        gamma = np.exp(loggamma)

        for k in range(self.k):

            cv_sum = np.zeros((data.shape[1], data.shape[1]))
            data_temp = data - self.mu_mat[k]

            for n in range(len(data)):

                data_sqr = data_temp[n, :].reshape(-1, 1)
                cv_sum += (gamma[n, k] * (data_sqr @ data_sqr.T))

            self.cv_mat[k, :, :] = cv_sum / np.sum(gamma[:, k])
        return


    def up_distribs(self):
        """updates emission distributions in M-step"""
        for k in range(self.k):
            self.distribs[k] = multivariate_normal(mean = self.mu_mat[k], cov = self.cv_mat[k])
        return


    def EM(self, data):
        """Expectation Maximisation Algorithm, takes expectation of the latent, then uses the argmax of Q-function to update parameter of model"""

        #Expectation Step
        logalpha = self.calc_logalpha(data)
        logbeta = self.calc_logbeta(data)
        loggamma = self.calc_loggamma(data, logalpha, logbeta)
        logxi = self.calc_logxi(data, logalpha, logbeta)

        #Maximisation Step
        self.up_pi(loggamma)
        self.up_A(logxi, loggamma)
        self.up_mu(loggamma, data)
        self.up_cv(loggamma, data)
        self.up_distribs()
        return


    def calc_ll(self, logalpha):
        """calculates the log-likelihood of each iteration to track fit progress"""
        ll = (logsumexp(logalpha[-1, :]))

        self.ll_pts.append(ll)

        return ll


    def fit(self, data, max_iter, a_tol):
        """given data - observations, max iteration - the max iteration of EM alg, a_tol - smallest step of conversion, fits HMM"""
        ll = float('-inf')

        for i in range(max_iter):

            self.EM(data)
            logalpha = self.calc_logalpha(data)
            ll_new = self.calc_ll(logalpha)


            if (ll_new - ll) < a_tol:
                break


            ll = ll_new

        print(f"LL: {self.ll_pts}")

        return(self.mu_mat, self.cv_mat, self.pi, self.ll_pts, self.A, all(self.ll_pts[i] > self.ll_pts[i - 1] for i in range(1, len(self.ll_pts))))