# Homework Program HMM Timur Khuzin

### Function that read parameters.

First line of file: M,K,L tab separated<br>
Than M lines of M numbers (MxM transition matrix)<br>
Row index is old state, column index is next state<br>
Than M lines of K numbers (MxK emisstion matrix)<br>
Each row of emission matrix is probabilities of K emissions.<br>
Than M numbers in line (Vector of M — initial distribution)<br>

In [56]:
import numpy as np

class HMM_Params:
    def __init__(self, state_num, emission_num, to_observe):
        self.state_num = state_num
        self.emission_num = emission_num
        self.observation_length = to_observe
        self.transition_matrix = np.zeros((state_num, state_num), dtype='float64')
        self.emission_matrix = np.zeros((state_num, emission_num), dtype='float64')
        self.beginning_distribution = np.zeros((state_num), dtype='float64')

def read_params(filename):
    with open(filename, 'r') as f:
        M, K, L = map(int, f.readline().strip().split())
        params = HMM_Params(M, K, L)
        
        for i in range(M):
            params.transition_matrix[i,:] = [float(x) for x in f.readline().strip().split()]
            
        for i in range(M):
            params.emission_matrix[i,:] = [float(x) for x in f.readline().strip().split()]
        
        params.beginning_distribution[:] = np.array([float(x) for x in f.readline().strip().split()])
        
        return params

### HMM generator

In [57]:
from numpy.random import choice as npchoice

# params is HMM_Params
# returns sequence of tuples: (pi, x)
def generate_random_HMM(params):
    current_pos = npchoice(params.state_num,1, p=params.beginning_distribution)[0]
    current_emission = npchoice(params.emission_num, 1, p=params.emission_matrix[current_pos,:])[0]
    result = []
    while len(result)<params.observation_length:
        result.append((current_pos, current_emission))
        current_pos = npchoice(params.state_num, 1, p=params.transition_matrix[current_pos,:])[0]
        current_emission = npchoice(params.emission_num, 1, p=params.emission_matrix[current_pos,:])[0]
    return result

def beautify_HMM_sequence(hmm_seq):
    def beautify_pair(p):
        return 'p%d'%p[0], 'e%d'%p[1]
    return [beautify_pair(x) for x in hmm_seq]

def print_HMM_to_file(hmm, filename):
    hmm = beautify_HMM_sequence(hmm)
    with open(filename, 'w') as f:
        f.write('\n'.join(map(lambda p:'%s\t%s'%p, hmm)))

In [58]:
params = read_params('test.txt')
hmm = generate_random_HMM(params)
print_HMM_to_file(hmm, 'generated_path.txt')
#beautify_HMM_sequence(hmm)

In [65]:
# input is HMM_Params and observation list (with ints inside)
# output is hidden states in integer list
def viterbi_solver(params, observations):
    # matrix LxM where stored previous states for each state in row
    parents = np.zeros((params.observation_length, params.state_num), dtype='float64')
    # current probabilities
    current_iter = params.beginning_distribution * params.emission_matrix[:,observations[0]]
    # probabilities of next step
    next_generation = np.zeros(params.state_num,dtype='float64')
    for i in range(1, params.observation_length):
        for state in range(params.state_num):
            probs = current_iter * params.transition_matrix[:,state]
            prev = probs.argmax()
            next_generation[state] = probs[prev] * params.emission_matrix[state, observations[i]]
            parents[i,state] = prev
        # avoid garbage collection
        current_iter, next_generation = next_generation, current_iter
    
    path = np.ones(params.observation_length, dtype='int64')
    path[-1] = current_iter.argmax()
    for i in range(params.observation_length - 2, -1, -1):
        p = parents[i+1, path[i+1]]
        path[i] = p
    return path

In [76]:
# checking viterbi
params = read_params('test.txt')
data = [x.strip() for x in open('generated_path.txt', 'r')]
observations = [int(x.split()[1][1:]) for x in data]
actual_states = [int(x.split()[0][1:]) for x in data]
most_probable = viterbi_solver(params, observations)
with open('viterbi.txt', 'w') as f:
    f.write("RPath\tObserv\tViterbi\n")
    p = map(lambda x: '%s\tp%d'%x, zip(data, most_probable))
    p = list(p)
    f.write('\n'.join(p))
same_indices = [i for i in range(len(data)) if actual_states[i]==most_probable[i]]
print("Accuracy: %f"%(len(same_indices)/len(data)))

Accuracy: 0.780000
