In [16]:
all(np.ones([2,2]).sum(axis=1)==1)

False

In [566]:
import numpy as np
from scipy.stats import entropy

class MarkovChain:
    
    '''
    A MarkovChain
    '''
    
    def __init__(self, transition_matrix=None, state_vector=None):
        self.transition_matrix = transition_matrix
        self.state_vector = state_vector

    @property
    def transition_matrix(self):
        
        '''
        The Markov state transition probability matrix.
        Needs to be square.
        '''
        
        return self._transition_matrix
        
    @transition_matrix.setter
    def transition_matrix(self, transition_matrix):
        if transition_matrix:        
            transition_matrix = np.array(transition_matrix)
            assert transition_matrix.shape[0] == transition_matrix.shape[1], \
                'transition matrix needs to be square'
            assert all(transition_matrix.sum(axis=1) == 1), \
                'transition matrix rows need to sum to one'
            if hasattr(self, 'state_vector'):
                assert transition_matrix.shape[0] == self.state_vector.shape[1], \
                    'state vector dimension mismatch'
            self._transition_matrix = transition_matrix
        else:
            self._transition_matrix = None
        
    
    @property
    def state_vector(self):
        
        '''
        The current state vector.
        '''
        
        return self._state_vector
    
    @state_vector.setter
    def state_vector(self, state_vector):
        if state_vector:
            state_vector = np.array(state_vector).reshape(1,-1)
            assert state_vector.sum(axis=1) == 1, \
                'state vector needs to sum to one'
            assert (state_vector>=0).all() and (state_vector<=1).all(), \
                'probabilites need to be bounded between zero and one'
            if hasattr(self, 'transition_matrix'):
                assert state_vector.shape[1] == self.transition_matrix.shape[0], \
                    'transition matrix dimension mismatch'
            self._state_vector = state_vector
        else:
            self._state_vector = None
    

    def steady_state(self, set_state=False):
        
        '''
        Returns the steady state probabilities of the Markov chain.
        If set_state=True, MarkovChain object is modified in place.
        '''
        
        dim = np.array(self.transition_matrix).shape[0]
        q = np.c_[(self.transition_matrix-np.eye(dim)),np.ones(dim)]
        QTQ = np.dot(q, q.T)
        steady_state = np.linalg.solve(QTQ,np.ones(dim))
        if set_state:
            self.state_vector = steady_state
        else:
            return steady_state
        
        
    @property
    def expected_durations(self):
        
        '''
        Returns the expected state durations of the MarkovChain object.
        '''
        
        expected_durations = (np.ones(self.n_states)-np.diag(self.transition_matrix))**-1
        return expected_durations
    
    
    @property
    def n_states(self):
        
        '''
        Returns the number of states of the MarkovChain object.
        '''
        
        return self.state_vector.shape[1]


    def iterate(self, steps=1, set_state=False):
        
        '''
        Iterates the MarkovChain object in place.
        steps needs to be a positive integer.
        (negative steps work, but tend to break before the initial state)
        If set_state=True, MarkovChain object is modified in place.
        '''
        
        new_state = np.dot(self.state_vector, np.linalg.matrix_power(self.transition_matrix, steps))
        
        # ensure total probability is 1
        if new_state.sum() != 1:
            new_state = new_state.round(16)/new_state.round(16).sum()
        
        if set_state:
            self.state_vector = new_state
        else:
            return new_state
        
        
    def forecast(self, horizons=[1]):
        
        '''
        Returns forecasted state probabilities for a set of horizons.
        horizons needs to be an iterable.
        '''
        
        horizons_states = np.array([]).reshape(0, self.n_states)
        for horizon in horizons:
            pi_ = np.dot(self.state_vector, np.linalg.matrix_power(self.transition_matrix, horizon))
            horizons_states = np.concatenate([horizons_states, pi_.reshape(1, self.n_states)], axis=0)
        return horizons_states
    

    def rvs(self, t_steps=0, random_state=1):
        
        '''
        Draws a random sample sequence from the MarkovChain object.
        t_steps is the number of time steps forward to be drawn.
        If t_steps is zero, only the current state is drawn.
        '''
        
        sample = np.random.choice(mc.n_states, size=1, p=mc.state_vector.squeeze())
        for t in range(1, t_steps+1):
            sample = np.concatenate([sample, np.random.choice(mc.n_states, size=1, p=mc.transition_matrix[draw])])
        return sample
    
    
    def entropy(self, horizons=None):
        
        '''
        Calculate Shannon's entropy of the n state probabilities based on logarithms with base n.
        '''
        
        if horizons is None:
            state_entropy = entropy(self.state_vector.squeeze(), base=self.n_states)
        
        else:
            horizon_states = self.forecast(horizons)
            state_entropy = []
            for horizon in horizon_states:
                state_entropy += [entropy(horizon.squeeze(), base=self.n_states)]
            
        return np.array(state_entropy)
    

In [525]:
mc = MarkovChain(np.array([[0.99,0.01],[0.2,0.8]]),np.array([0,1]))
mc.iterate(0)
mc.state_vector
mc.transition_matrix
mc.forecast([5,1,7,123,45,0])

array([[0.65932796, 0.34067204],
       [0.2       , 0.8       ],
       [0.76948658, 0.23051342],
       [0.95238095, 0.04761905],
       [0.9523574 , 0.0476426 ],
       [0.        , 1.        ]])

In [526]:
mc.entropy()

array(0.)

In [527]:
mc.state_vector.squeeze()

array([0, 1])

In [528]:
mc.entropy()

array(0.)

In [529]:
sample = np.random.choice(mc.n_states, size=1, p=mc.state_vector.squeeze())
#sample = [draw]

for t in range(1, 20):
    sample = np.concatenate([sample, np.random.choice(mc.n_states, size=1, p=mc.transition_matrix[draw])])
        
sample

array([1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1])

In [530]:
mc.rvs(3)

array([1, 1, 1, 0])

In [531]:
mc.steady_state(set_state=True)
mc.state_vector

array([[0.95238095, 0.04761905]])

In [554]:
mc.iterate(10, set_state=True)
mc.state_vector

array([[0.95238095, 0.04761905]])

In [567]:
mc=MarkovChain()

In [562]:
import pandas as pd
import numpy as np
import warnings

from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression
from hmmlearn.hmm import GaussianHMM

#from src.dists import GaussianMixtureDistribution
#from src.markov import MarkovChain


class HMM(MarkovChain):
    def __init__(self, emission_models=(), transition_matrix=None, start_probas=None, switch_var=True, switch_const=True, k=None):
        
        '''
        
        '''

        self.emission_models = emission_models
        self.transition_matrix = transition_matrix
        self.start_probas = start_probas
        
        self.switch_var = switch_var
        self.switch_const = switch_const
        self.k = k
        
        self.params_ = None
        self.se_ = None
        self.tstats_ = None

        self.metrics_ = None
        self.smooth_prob_ = None
        self.filt_prob_ = None
    
    
    def fit(self, y, package='baumwelch', start_params=None, iter=100, **kwargs):
        
        '''
        Fits the Gaussian HMM to the series y.
        '''
        
        assert package in ['statsmodels', 'hmmlearn', 'baumwelch'], 'package unknown'
        
        if package == 'statsmodels':
            #if start_params is None:
                #start_params = np.random.randn(self.k+self.k**2)*0.01
                #m = y.mean()
                #s = y.std()
                #v = y.var()
                #start_params = np.full(self.**2-self.k, 1/self.k).tolist()\
                #                        +(np.random.randn(self.k)*s/2+m).tolist()\
                #                        +(np.random.randn(self.k)*v+s).tolist()
            model = MarkovRegression(endog=y, switching_variance=self.switch_var, switching_trend=self.switch_const, k_regimes=self.k)\
                                .fit(start_params=start_params, maxiter=iter, **kwargs)
            self.params_ = model.params
            self.se_ = model.bse
            self.tstats_ = model.tvalues
            self.metrics_ = pd.Series({'llf': model.llf, 'aic': model.aic, 'bic': model.bic,})
            self.smooth_prob_ = model.smoothed_marginal_probabilities
            self.filt_prob_ = model.filtered_marginal_probabilities
        
        if package == 'hmmlearn':

            assert self.switch_var is True and self.switch_const is True, 'only implemented for fully parametrised components'
            t_index = y.index
            y = np.expand_dims(y.values, axis=1)
            model = GaussianHMM(n_components=self.k, n_iter=iter, **kwargs).fit(y)
            trans_probas = model.transmat_.T.reshape(self.k**2,1)[:self.k**2-self.k]
            states = np.arange(self.k)
            p_index=[f'p[{j}->{i}]' for i in states[:-1] for j in states]\
                        +[f'const[{i}]' for i in states]\
                        +[f'sigma2[{i}]' for i in states]
            self.params_ = pd.Series(np.concatenate((trans_probas, model.means_, model.covars_.squeeze(axis=1))).squeeze(), index=p_index)
            llf = model.score(y)
            self.metrics_ = pd.Series({'llf': llf,
                                       'aic': 2*len(self.params_)-2*llf,
                                       'bic': len(self.params_)*np.log(len(y))-2*llf})
            self.smooth_prob_ = pd.DataFrame(model.predict_proba(y), index=t_index)

        if package == 'baumwelch':
            self = self._estimate_baum_welch(np.array(y), max_iter=iter, **kwargs)

        return self
    
    
    @property
    def estimates_(self):
        estimates = pd.DataFrame({'estimate': self.params_,
                                  's.e.': self.se_,
                                  't-stat': self.tstats_})
        return estimates
    

    @property
    def transition_matrix_(self):
        k = self.k
        trans_matrix = np.matrix(self.params_[:k**2-k].values.reshape(k-1, k).T)
        trans_matrix = np.append(trans_matrix, 1-trans_matrix.sum(axis=1), axis=1)
        return trans_matrix


    @property
    def steady_state_(self):
        mc = MarkovChain(transition_matrix=self.transition_matrix_)
        steady = mc.steady_state_probabilities
        return steady


    def get_mixture_distribution(self, state='steady_state'):
        if state == 'steady_state':
            probas = self.steady_state_
        elif state == 'latest':
            probas = self.filt_prob_.iloc[-1]
        else:
            assert len(state) == self.k, 'wrong number of state probabilities'
            probas = state

        components = [(self.params_[f'const[{i}]'], self.params_[f'sigma2[{i}]']**0.5, probas[i]) for i in range(self.k)]
        mix = GaussianMixtureDistribution(components=components)
        return mix


    def filtered_moments(self):
        filt_mom = pd.DataFrame(index=self.filt_prob_.index, columns=['mean','var','skew','kurt','entropy'])
        for date, probas in self.filt_prob_.iterrows():
            mix = self.get_mixture_distribution(state=probas.values)
            filt_mom.loc[date] = [*mix.mvsk(), mix.entropy()]

        return filt_mom


    def smoothened_moments(self):
        smooth_mom = pd.DataFrame(index=self.smooth_prob_.index, columns=['mean','var','skew','kurt','entropy'])
        for date, probas in self.smooth_prob_.iterrows():
            mix = self.get_mixture_distribution(state=probas.values)
            smooth_mom.loc[date] = [*mix.mvsk(), mix.entropy()]
        return smooth_mom
        

    def _check(self, A, pi, models):
        '''OK'''
        assert len(models) == A.shape[0] == A.shape[1] == pi.shape[1], 'dimension mismatch'
    
    @property
    def steady_state(self):
        '''FIX'''
        k = self.transition_matrix.shape[0]
        steady_state = np.full(k, 1/k).reshape(1, -1) @ self.transition_matrix
        return steady_state
        
    def _initialise_baum_welch(self, Y):
        '''FIX'''
        if self.start_probas is None:
            self.start_probas = self.steady_state
        
        A = np.array(self.transition_matrix)
        models = self.emission_models
        pi = np.array(self.start_probas).reshape(1, -1)
        return A, pi, models
    
    def _evaluate_emission_models(self, Y, emission_models):
        '''OK'''
        B = np.concatenate([model.pdf(Y).reshape(-1, 1) for model in emission_models], axis=1)
        return B
        
    def _forward_pass(self, A, B, pi):
        '''OK'''
        # initialise forward pass with first observation
        alpha_0 = pi * B[0]
        c_0 = 1/alpha_0.sum()
        
        # save values & scaling factor
        Alpha = alpha_0*c_0
        C = [c_0]
        
        # iterate
        for b_t in B[1:]:
            # calculate
            alpha_t = (b_t * Alpha[-1] @ A).reshape(1, -1)
            c_t = 1/alpha_t.sum()
            
            # save
            Alpha = np.concatenate((Alpha, alpha_t*c_t), axis=0)
            C += [c_t]
            
        C = np.array(C).reshape(-1, 1)
        return Alpha, C
            
    def _backward_pass(self, A, B, pi, C):
        '''OK'''
        # initialise backward pass as one
        beta_T = np.ones(pi.shape)
        
        # save values & scaling factor
        Beta = beta_T*C[-1]
        
        # iterate
        for b_t, c_t in zip(B[:0:-1],C[len(C)-2::-1]):
            # calculate
            beta_t = (b_t * Beta[0] @ A.T).reshape(1, -1)
            
            # save
            Beta = np.concatenate((beta_t*c_t, Beta), axis=0)
            
        return Beta
    
    def _emission_odds(self, Alpha, Beta):
        '''OK'''
        total = Alpha * Beta
        Gamma = total/total.sum(axis=1).reshape(-1, 1)
        return Gamma
    
    def _transition_odds(self, A, B, Alpha, Beta):
        '''OK'''
        Alpha_block = np.kron(Alpha[:-1], np.ones(A.shape[0]))
        B_Beta_block = np.kron(np.ones(A.shape[0]), B[1:]*Beta[1:])
        total = Alpha_block * B_Beta_block * A.reshape(1, -1)
        Xi = total/total.sum(axis=1).reshape(-1, 1)
        return Xi
        
    def _do_e_step(self, Y, A, B, pi):
        '''OK'''
        Alpha, C = self._forward_pass(A, B, pi)
        Beta = self._backward_pass(A, B, pi, C)
        Gamma = self._emission_odds(Alpha, Beta)
        Xi = self._transition_odds(A, B, Alpha, Beta)
        return Alpha, Gamma, Xi
    
    def _update_transition_matrix(self, Gamma, Xi):
        '''OK'''
        numerator = Xi.sum(axis=0)
        denominator = np.kron(Gamma[:-1], np.ones(Gamma.shape[1])).sum(axis=0)
        A_ = (numerator/denominator).reshape(Gamma.shape[1], Gamma.shape[1])
        return A_
    
    def _update_parameters(self, Y, emission_models, Gamma):
        '''OK'''
        models_ = []
        for model, weights in zip(emission_models, Gamma.T):
            model.fit(Y, weights)
            models_ += [model]
        return tuple(models_)
    
    def _update_initial_state(self, Gamma):
        '''OK'''
        return Gamma[0].reshape(1, -1)
    
    def _do_m_step(self, Y, models, Gamma, Xi):
        '''OK'''
        A_ = self._update_transition_matrix(Gamma, Xi)
        models_ = self._update_parameters(Y, models, Gamma)
        pi_ = self._update_initial_state(Gamma)
        return A_, models_, pi_
    
    def _score(self, Y, emission_models, Gamma):
        '''OK'''
        B = self._evaluate_emission_models(Y, emission_models)
        score = np.log((B * Gamma).sum(axis=1)).sum(axis=0)
        return score, B
    
    def _update_attributes(self, A_, models_, pi_, Gamma, Alpha):
        '''OK'''
        self.transition_matrix = A_
        self.emission_models = models_
        self.start_probas = pi_
        self.smooth_prob_ = Gamma
        self.filt_prob_ = Alpha
    
    def _estimate_baum_welch(self, Y, max_iter=100, threshold=1e-6):
        '''OK'''
        # initialise
        A_, pi_, models_ = self._initialise_baum_welch(Y)
        self._check(A_, pi_, models_)
        score_, B_ = self._score(Y, models_, pi_)
        
        # store
        iteration = 0
        scores = {iteration: score_}
        
        while iteration < max_iter:
            iteration += 1
            Alpha, Gamma, Xi = self._do_e_step(Y, A_, B_, pi_)            
            A_, models_, pi_ = self._do_m_step(Y, models_, Gamma, Xi)
            score_, B_ = self._score(Y, models_, Gamma)
            scores[iteration] = score_
            
            if abs(scores[iteration]-scores[iteration-1]) < threshold:
                break
        else:
            warnings.warn('maximum number of iterations reached')
                
        self._update_attributes(A_, models_, pi_, Gamma, Alpha)
        self.convergence_ = scores
        
        return self
            
    # def fit(self, Y, method='baumwelch', **kwargs):
    #     '''OK'''
    #     assert method in ['baumwelch'], 'method unknown'
        
    #     if method == 'baumwelch':
    #         self = self._estimate_baum_welch(Y, **kwargs)

In [568]:
hmm = HMM(transition_matrix=np.array([[0.99,0.01],[0.2,0.8]]))#, state_vector=np.array([0,1]))

In [569]:
HMM.transition_matrix

<property at 0x7fb7461a2408>