In [155]:
# package imports
import numpy as np
from itertools import islice
from scipy.linalg import eig
from scipy.stats import norm
from numpy import float64
import pathlib

In [156]:
# get stationary dustribution of transition matrix
# from stack overflow
def get_stationary_distibution(state_transition_matrix: np.ndarray) -> np.ndarray:
    S, U = eig(state_transition_matrix.T)
    stationary = np.array(U[:, np.where(np.abs(S - 1.) < 1e-8)[0][0]].flat)
    stationary = stationary / np.sum(stationary)
    return stationary

In [157]:
def estimate_sequence(state_transition_matrix:np.ndarray, gaussian_params: np.ndarray, initial_state_probability:np.ndarray, observations:np.ndarray, state_count: int, state_converter:dict):
    observation_count = observations.shape[0]
    state_probability_matrix = np.ndarray((state_count, observation_count))
    path = np.ndarray((state_count, observation_count-1), dtype=int)
    emission_matrix = norm(loc=gaussian_params[0,:], scale=gaussian_params[1,:]).pdf(observations).T
    # emission_matrix = normalize(emission_matrix, axis=0)
    state_probability_matrix[:,0] = np.log(initial_state_probability) + np.log(emission_matrix[:,0])
    for i in range(1, observation_count):
        for j in range(state_count):
            prob = state_probability_matrix[:,i-1] + np.log(state_transition_matrix[:,j]) + np.log(emission_matrix[j,i])
            path[j,i-1] = np.argmax(prob)
            state_probability_matrix[j,i] = max(prob)

    out_path = ['' for _ in range(observation_count)]
    sink_index = np.argmax(state_probability_matrix[:,-1])
    out_path[-1] = state_converter[sink_index]
    for i in range(observation_count-2,-1,-1):
        sink_index = path[sink_index, i]
        out_path[i] = state_converter[sink_index]

    return path, state_probability_matrix, emission_matrix, out_path

In [158]:
# Viterbi algorithm
def viterbi(state_transition_matrix:np.ndarray, gaussian_params: np.ndarray, initial_state_probability:np.ndarray, observations:np.ndarray, state_count: int, state_converter:dict):
    observation_count = observations.shape[0]
    state_probability_matrix = np.ndarray((state_count, observation_count), dtype=float64)
    path = np.ndarray((state_count, observation_count-1), dtype=int)
    emission_matrix = norm(loc=gaussian_params[0,:], scale=gaussian_params[1,:]).pdf(observations).T
    state_probability_matrix[:,0] = np.log(initial_state_probability) + np.log(emission_matrix[:,0])
    for i in range(1, observation_count):
        prob = state_probability_matrix[:,i-1] + np.log(state_transition_matrix) + np.log(emission_matrix[:,i].reshape(-1,1))
        path[:,i-1] = np.argmax(prob, axis=1)
        state_probability_matrix[:,i] = np.max(prob, axis=1)

    out_path = ['' for _ in range(observation_count)]
    sink_index = np.argmax(state_probability_matrix[:,-1])
    out_path[-1] = state_converter[sink_index]
    for i in range(observation_count-2,-1,-1):
        sink_index = path[sink_index, i]
        out_path[i] = state_converter[sink_index]

    return path, state_probability_matrix, emission_matrix, out_path

In [159]:
# Baum-Welch Learning
def baum_welch():
    pass

In [160]:
# read data
observed_states = np.loadtxt('./Input/data.txt', dtype=float).reshape(-1,1)
observed_states.shape

(1000, 1)

In [161]:
# read parameters
with open('./Input/parameters.txt.txt', 'r') as f:
    no_of_states = int(f.readline())

with open('./Input/parameters.txt.txt', 'r') as lines:
    transition_matrix = np.genfromtxt(islice(lines, 1, 1+no_of_states))

with open('./Input/parameters.txt.txt', 'r') as lines:
    gaussian_parameters = np.genfromtxt(islice(lines, 1+no_of_states, 1+2*no_of_states), dtype=int)

gaussian_parameters

array([[200, 100],
       [ 10,  10]])

In [162]:
initial_distribution = get_stationary_distibution(transition_matrix)
index_state_map = {
    0: '\"El Nino\"',
    1: '\"La Nina\"'
}
initial_distribution.shape

(2,)

In [163]:
# estimate_sequence(transition_matrix, gaussian_parameters, initial_distribution, observed_states, no_of_states, index_state_map)

In [164]:
a, b, c, hidden_path = viterbi(transition_matrix, gaussian_parameters, initial_distribution, observed_states, no_of_states, index_state_map)

In [165]:
# norm.pdf(104.524317662043, loc=gaussian_parameters[:,0], scale=gaussian_parameters[:,1])
a.T

array([[1, 1],
       [1, 1],
       [1, 1],
       ...,
       [0, 1],
       [0, 1],
       [1, 1]])

In [166]:
b.T

array([[-5.01858476e+01, -3.61155295e+00],
       [-5.97561869e+01, -6.95296555e+00],
       [-6.01843770e+01, -1.02870653e+01],
       ...,
       [-1.02142021e+04, -1.02144021e+04],
       [-1.02717469e+04, -1.02178047e+04],
       [-1.02735158e+04, -1.02211398e+04]])

In [167]:
c.T

array([[6.40653671e-22, 3.60131591e-02],
       [1.37903101e-24, 3.93188164e-02],
       [2.53963017e-23, 3.96074027e-02],
       ...,
       [8.51062085e-08, 2.56571442e-07],
       [1.45716704e-25, 3.69858576e-02],
       [2.12759494e-24, 3.95700812e-02]])

In [168]:
viterbi_output = []
with open('./Output/states_Viterbi_wo_learning.txt', 'r') as f:
    for line in f.readlines():
        viterbi_output.append(line.rstrip('\n'))

match = 0

for item1, item2 in zip(viterbi_output, hidden_path):
    if item1 == item2:
        match += 1

print(match)

pathlib.Path('./my_output').mkdir(parents=True, exist_ok=True)

with open('./my_output/states_Viterbi_wo_learning.txt', 'w') as f:
    for item in hidden_path:
        f.write(item+'\n')

857
