In [1]:
import numpy as np
import pandas as pd
from scipy.stats import norm as gaussian
from scipy import linalg
import math

In [2]:
with open('Sample input and output for HMM/Input/data.txt') as f:
    rainfall = np.loadtxt(f)

In [3]:
with open('Sample input and output for HMM/Input/parameters.txt') as f:
    nstates = int(next(f))
    trans_mat = np.asarray([[float(x) for x in next(f).strip().split('\t')] for i in range(nstates)])
    mu = np.asarray([float(x) for x in next(f).strip().split('\t')])
    sigma = np.asarray([float(x) for x in next(f).strip().split('\t')])

In [4]:
emit_mat = np.asarray([[gaussian.pdf(x, loc=mu[i], scale=math.sqrt(sigma[i])) for x in rainfall] for i in range(nstates)]).T

In [5]:
def viterbi(obs, states, init_prob, trans_mat, emit_mat):
    prev_prob = np.log(init_prob * emit_mat[0]).reshape(1, -1)
    prev_st = np.full((1, states), -1)
    
    for ep in emit_mat[1:]:
        probs = prev_prob[-1].reshape(-1, 1) + np.log(trans_mat * ep)
        prev_prob = np.concatenate((prev_prob, probs.max(axis=0).reshape(1, -1)))
        prev_st = np.concatenate((prev_st, probs.argmax(axis=0).reshape(1, -1)))
    
    most_likely_st = np.array([prev_prob[-1].argmax()])
    for st in prev_st[:0:-1]:
        most_likely_st = np.append(most_likely_st, st[most_likely_st[-1]])
    
    st, counts = np.unique(most_likely_st, return_counts=True)
    
    return most_likely_st, dict((st[i], counts[i]) for i in range(states))
    

In [6]:
init_prob = np.full(nstates, 1/nstates)
print('init_prob: ', init_prob)
most_likely_states, state_counts = viterbi(rainfall, nstates, init_prob, trans_mat, emit_mat)

init_prob:  [0.5 0.5]


In [7]:
most_likely_states

array([1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0,
       1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
       1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1,
       1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0,
       1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
       0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 1, 1, 0, 0,
       0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0,
       0, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1,
       1, 1, 0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 1,
       0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1,

In [8]:
state_counts

{0: 291, 1: 709}

In [None]:
def forward(obs, init_prob, trans_mat, emit_mat):
    T = obs.shape[0]
    m = trans_mat.shape[0]
    
    alpha = np.zeros((T, m))
    alpha[0] = init_prob * emit_mat[0]
    alpha[0] /= np.sum(alpha[0])
    
    for t in range(1, T):
        for i in range(m):
            alpha[t, i] = alpha[t - 1].dot(trans_mat[:, i] * emit_mat[t, i])
        alpha[t] /= np.sum(alpha[t])
            
    return alpha

In [None]:
def backward(obs, trans_mat, emit_mat):
    T = obs.shape[0]
    m = trans_mat.shape[0]
    
    beta = np.zeros((T, m))
    beta[-1] = np.ones(m)
    beta[-1] /= np.sum(beta[-1])
    
    for t in range(T-2, -1, -1):
        for i in range(m):
            beta[t, i] = trans_mat[i].dot(beta[t+1] * emit_mat[t+1])
        beta[t] /= np.sum(beta[t])
    
    return beta

In [None]:
def baum_welch(obs, init_prob, trans_mat, emit_mat, n_iter):
    T = obs.shape[0]
    m = trans_mat.shape[0]
    
    updated_trans_mat = trans_mat.copy()
    updated_emit_mat = emit_mat.copy()
    
    for n in range(n_iter):
        alpha = forward(obs, init_prob, updated_trans_mat, updated_emit_mat)
        beta = backward(obs, updated_trans_mat, updated_emit_mat)
        
        xi = np.zeros((m, m, T-1))
        for t in range(T-1):
            denom = (alpha[t].dot(updated_trans_mat) * updated_emit_mat[t+1]).dot(beta[t+1])
            for i in range(m):
                numer = alpha[t, i] * updated_trans_mat[i] * updated_emit_mat[t+1] * beta[t+1]
                xi[i, :, t] = numer / denom
        
        gamma = np.sum(xi, axis=1)
        updated_trans_mat = np.sum(xi, axis=2) / np.sum(gamma, axis=1).reshape(-1, 1)
        
        gamma = np.hstack((gamma, np.sum(xi[:, :, T-2], axis=0).reshape(-1, 1)))
        
        denom = np.sum(gamma, axis=1)
        updated_emit_mat /= denom
    
    return updated_trans_mat, updated_emit_mat

In [None]:
trans_mat_trained, emit_mat_trained = baum_welch(rainfall, init_prob, trans_mat, emit_mat, n_iter=30)

In [None]:
trans_mat_trained

In [9]:
def forward(init_prob, trans_mat, emit_mat, norm):
    T = emit_mat.shape[0]
    m = trans_mat.shape[0]
    
    alpha = np.zeros((T, m))
    alpha[0] = init_prob * emit_mat[0]
    norm[0] = 1. / np.sum(alpha[0])
    alpha[0] *= norm[0]
    
    for t in range(1, T):
        for i in range(m):
            alpha[t, i] = alpha[t - 1].dot(trans_mat[:, i] * emit_mat[t, i])
        norm[t] = 1. / np.sum(alpha[t])
        alpha[t] *= norm[t]
            
    return alpha

# def forward_t(init_prob, trans_mat, emit_mat, norm):
#     T = emit_mat.shape[0]
#     m = trans_mat.shape[0]
    
#     alpha = np.zeros((T, m))
#     alpha[0] = init_prob * emit_mat[0]
#     norm[0] = 1. / np.sum(alpha[0])
#     alpha[0] *= norm[0]
    
#     for t in range(1, T):
#         for i in range(m):
#             for j in range(m):
#                 temp = alpha[t-1, j] * trans_mat[j, i] * emit_mat[t, i]
#                 alpha[t, i] += temp
#         norm[t] = 1. / np.sum(alpha[t])
#         alpha[t] *= norm[t]
            
#     return alpha

In [10]:
def backward(trans_mat, emit_mat, norm):
    T = emit_mat.shape[0]
    m = trans_mat.shape[0]
    
    beta = np.zeros((T, m))
    beta[-1] = np.ones(m)
    
    for t in range(T-2, -1, -1):
        for i in range(m):
            beta[t, i] = trans_mat[i].dot(beta[t+1] * emit_mat[t+1])
        beta[t] *= norm[t]
    
    return beta

# def backward_t(trans_mat, emit_mat, norm):
#     T = emit_mat.shape[0]
#     m = trans_mat.shape[0]
#     beta = np.zeros((T, m))
    
#     beta[-1] = np.ones(m)
    
#     for t in range(T-2, -1, -1):
#         for i in range(m):
#             for j in range(m):
#                 temp = beta[t+1, j] * trans_mat[i, j] * emit_mat[t+1, j]
#                 beta[t, i] += temp
#             beta[t, i] *= norm[t]
#     return beta
    

In [11]:
def baum_welch(obs, init_prob, trans_mat, emit_mat, n_iter):
    T = emit_mat.shape[0]
    m = trans_mat.shape[0]
    
    updated_trans_mat = trans_mat.copy()
    updated_emit_mat = emit_mat.copy()
    
    norm = np.zeros(T)
    
    for n in range(n_iter):
        alpha = forward(init_prob, updated_trans_mat, updated_emit_mat, norm)
        beta = backward(updated_trans_mat, updated_emit_mat, norm)

        alphaXbeta = alpha * beta / norm.reshape(-1, 1)
        denoms = alphaXbeta.sum(axis=0)
        for i in range(m):
            for j in range(m):
                numer = 0
                for t in range(T-1):
                    numer += alpha[t, i] * updated_trans_mat[i, j] * beta[t+1, j] * updated_emit_mat[t+1, j]
                updated_trans_mat[i, j] = numer / denoms[i]
                
        alphaXbeta *= norm.reshape(-1, 1)
        alphaXbeta /= alphaXbeta.sum(axis=1).reshape(-1, 1)

        tmp = alphaXbeta * obs.reshape(-1, 1)
        _mu_ = tmp.sum(axis=0) / alphaXbeta.sum(axis=0)

        diff = obs.reshape(-1, 1) - _mu_
        _sigma_ = (alphaXbeta * (diff ** 2)).sum(axis=0) / alphaXbeta.sum(axis=0)

        updated_emit_mat = np.asarray([[gaussian.pdf(x, loc=_mu_[i], scale=math.sqrt(_sigma_[i])) for x in obs] for i in range(m)]).T
        
    return updated_trans_mat, _mu_, _sigma_

In [12]:
trans_mat_trained, mu_trained, sigma_trained = baum_welch(rainfall, init_prob, trans_mat, emit_mat, n_iter=20)

In [13]:
trans_mat_trained

array([[0.82795699, 0.17204301],
       [0.21719457, 0.78054299]])

In [14]:
mu_trained

array([150.1898689 , 100.20940296])

In [15]:
sigma_trained

array([5.031877  , 8.71346514])

In [16]:
emit_mat_trained = np.asarray([[gaussian.pdf(x, loc=mu_trained[i], scale=math.sqrt(sigma_trained[i])) for x in rainfall] for i in range(nstates)]).T

In [17]:
most_likely_states_after_train, state_counts_after_train = viterbi(rainfall, nstates, init_prob, trans_mat_trained, emit_mat_trained)

In [18]:
state_counts_after_train

{0: 558, 1: 442}

In [None]:
with open('viterbi_wo_learning.txt', 'w') as f:
    emit_mat = np.asarray([[gaussian.pdf(x, loc=mu[i], scale=math.sqrt(sigma[i])) for x in rainfall] for i in range(nstates)]).T
    init_prob = np.full(nstates, 1/nstates)
    most_likely_states, state_counts = viterbi(rainfall, nstates, init_prob, trans_mat, emit_mat)
    most_likely_states = ['El Nino' if x == 0 else 'La Nina' for x in most_likely_states]
    f.write('\n'.join(most_likely_states))
    print(state_counts)
    
with open('parameters_learned.txt', 'w') as f:
    trans_mat_trained, mu_trained, sigma_trained = baum_welch(rainfall, init_prob, trans_mat, emit_mat, n_iter=20)
    f.write(str(nstates)+'\n')
    
    [f.write('\t'.join([str(v) for v in row])+'\n') for row in trans_mat_trained]
    f.write('\t'.join([str(mu_x) for mu_x in mu_trained]) + '\n')
    f.write('\t'.join([str(sigma_x) for sigma_x in sigma_trained]))
    
with open('viterbi_after_learning.txt', 'w') as f:
    emit_mat_trained = np.asarray([[gaussian.pdf(x, loc=mu_trained[i], scale=math.sqrt(sigma_trained[i])) for x in rainfall] for i in range(nstates)]).T
    most_likely_states_after_train, state_counts_after_train = viterbi(rainfall, nstates, init_prob, trans_mat_trained, emit_mat_trained)
    most_likely_states_after_train = ['El Nino' if x == 0 else 'La Nina' for x in most_likely_states_after_train]
    f.write('\n'.join(most_likely_states_after_train))
    print(state_counts_after_train)

In [None]:
['\t'.join([str(mu_x) for mu_x in mu_trained]), '\t'.join([str(sigma_x) for sigma_x in sigma_trained])]

In [36]:
def initial_dist(trans_mat):
    w, vl = linalg.eig(trans_mat, left=True, right=False)
    vec = vl[:, np.isclose(w, 1)][:, 0]
    return vec / vec.sum()

In [34]:
np.isclose(w, 1)

array([False,  True])

In [35]:
vl[:, np.isclose(w, 1)]

array([[-0.31622777],
       [-0.9486833 ]])

In [25]:
print(vl[:, np.isclose(w, 1)])

[]


In [26]:
def initial_prob(transition_matrix):
    lam, vec = linalg.eig(transition_matrix, left=True, right=False)
    # print(lam, vec)
    evec1 = vec[:,np.isclose(lam, 1)]
    evec1 = evec1[:,0]
    # print(evec1)
    pi = evec1/evec1.sum()
    return pi

In [37]:
initial_prob(trans_mat)

array([0.25, 0.75])

In [38]:
initial_dist(trans_mat)

array([0.25, 0.75])

In [33]:
vl

array([[-0.70710678, -0.31622777],
       [ 0.70710678, -0.9486833 ]])