In [1]:
import numpy as np

In [2]:
states = ('Cold', 'Hot')
     
observations = ('1', '2', '3')

start_probability = {'Cold': 0.2, 'Hot': 0.8}

transition_probability = {
    'Cold': {'Cold': 0.5, 'Hot': 0.5},
    'Hot': {'Cold': 0.4, 'Hot': 0.6},
}

emission_probability = {
    'Cold': {'1': 0.5, '2': 0.4, '3': 0.1},
    'Hot': {'1': 0.2, '2': 0.4, '3': 0.4},
}

In [3]:
class HMM:
        """
        Order 1 Hidden Markov Model
     
        Attributes
        ----------
        A : numpy.ndarray
            State transition probability matrix
        B: numpy.ndarray
            Output emission probability matrix with shape(N, number of output types)
        pi: numpy.ndarray
            Initial state probablity vector
        """
     
        def __init__(self, A, B, pi):
            self.A = A
            self.B = B
            self.pi = pi
            
        def forward(self, obs_seq):
            N = self.A.shape[0]
            T = len(obs_seq)

            F = np.zeros((N,T))
            F[:,0] = self.pi * self.B[:, obs_seq[0]]

            for t in range(1, T):
                for n in range(N):
                    F[n,t] = np.dot(F[:,t-1], (self.A[:,n])) * self.B[n, obs_seq[t]]

            return F

In [4]:
def generate_index_map(lables):
    index_label = {}
    label_index = {}
    i = 0
    for l in lables:
        index_label[i] = l
        label_index[l] = i
        i += 1
    return label_index, index_label


states_label_index, states_index_label = generate_index_map(states)
observations_label_index, observations_index_label = generate_index_map(observations)


def convert_observations_to_index(observations, label_index):
    list = []
    for o in observations:
        list.append(label_index[o])
    return list


def convert_map_to_vector(map, label_index):
    v = np.empty(len(map), dtype=float)
    for e in map:
        v[label_index[e]] = map[e]
    return v


def convert_map_to_matrix(map, label_index1, label_index2):
    m = np.empty((len(label_index1), len(label_index2)), dtype=float)
    for line in map:
        for col in map[line]:
            m[label_index1[line]][label_index2[col]] = map[line][col]
    return m

In [5]:
A = convert_map_to_matrix(transition_probability, states_label_index, states_label_index)
print(A)
B = convert_map_to_matrix(emission_probability, states_label_index, observations_label_index)
print(B)
observations_index = convert_observations_to_index(observations, observations_label_index)
pi = convert_map_to_vector(start_probability, states_label_index)
print(pi)


h = HMM(A, B, pi)

[[0.5 0.5]
 [0.4 0.6]]
[[0.5 0.4 0.1]
 [0.2 0.4 0.4]]
[0.2 0.8]


In [6]:
obs_seq = ('1','3','3','2','3','3','3','2','1','2','1','1','3','1','3','3','3')
obs_seq = convert_observations_to_index(obs_seq, observations_label_index)
obs_seq

[0, 2, 2, 1, 2, 2, 2, 1, 0, 1, 0, 0, 2, 0, 2, 2, 2]

In [7]:
F = h.forward(obs_seq)
F

array([[1.00000000e-01, 1.14000000e-02, 2.90600000e-03, 3.18856000e-03,
        3.39117600e-04, 8.55898640e-05, 2.34645902e-05, 2.58494868e-05,
        1.37479259e-05, 3.86259018e-06, 1.84946608e-06, 6.45676547e-07,
        4.40811324e-08, 5.10041110e-08, 3.68614195e-09, 8.64964721e-10,
        2.36095201e-10],
       [1.60000000e-01, 5.84000000e-02, 1.62960000e-02, 4.49224000e-03,
        1.71584960e-03, 4.79627424e-04, 1.32228555e-04, 3.64277711e-05,
        6.95628121e-06, 4.41909267e-06, 9.16550139e-07, 2.94932625e-07,
        1.99919139e-07, 2.83984100e-08, 1.70164406e-08, 4.82117413e-09,
        1.33007474e-09]])

In [8]:
print (F.sum())

0.3595539987636594
