In [1]:
# Author: Yilin ZHENG
import numpy as np
import random

1. Viterbi Algorithm

In [2]:
# Reference: https://en.wikipedia.org/wiki/Viterbi_algorithm
def viterbi(observations, states, init_state, trans_mat, emission_mat):
    V = [{}]
    # Initialization
    for s in states:
        V[0][s] = {"prob": init_state[s] * emission_mat[s][observations[0]], "prev": None}
    # For later steps
    for t in range(1, len(observations)):
        V.append({})
        for s in states:
            max_trans_prob = V[t-1][states[0]]["prob"]*trans_mat[states[0]][s]
            prev_selected_state = states[0]
            for prev_state in states[1:]:
                trans_prob = V[t-1][prev_state]["prob"]*trans_mat[prev_state][s]
                if trans_prob > max_trans_prob:
                    max_trans_prob = trans_prob
                    prev_selected_state = prev_state 
            max_prob = max_trans_prob * emission_mat[s][observations[t]]
            V[t][s] = {"prob": max_prob, "prev": prev_selected_state}
    opt = []
    max_prob = max(value["prob"] for value in V[-1].values())
    previous = None
    for s, data in V[-1].items():
        if data["prob"] == max_prob:
            opt.append(s)
            previous = s
            break
    for t in range(len(V) - 2, -1, -1):
        opt.insert(0, V[t + 1][previous]["prev"])
        previous = V[t + 1][previous]["prev"]
    return opt, max_prob

In [3]:
# sequences
seq0 = ('Heads', 'Heads', 'Heads')
seq1 = ('Heads', 'Heads', 'Tails')
seq2 = ('Heads', 'Tails', 'Heads')
seq3 = ('Heads', 'Tails', 'Tails')
seq4 = ('Tails', 'Heads', 'Heads')
seq5 = ('Tails', 'Heads', 'Tails')
seq6 = ('Tails', 'Tails', 'Heads')
seq7 = ('Tails', 'Tails', 'Tails')

all_seqs = [seq0, seq1, seq2, seq3, seq4, seq5, seq6, seq7]
observation_seq = random.sample(all_seqs, 5)  # randomly selected 5 sequences

observations = []
for seq in observation_seq:
    observations += [s for s in seq]
print(observations)
states = ("Coin1", "Coin2", "Coin3")
init_state_1 = {"Coin1": 0.3, "Coin2": 0.4, "Coin3": 0.3}
init_state_2 = {"Coin1": 0.34, "Coin2": 0.33, "Coin3": 0.33}
init_state_3 = {"Coin1": 0.2, "Coin2": 0.4, "Coin3": 0.4}
init_state_4 = {"Coin1": 0.4, "Coin2": 0.5, "Coin3": 0.1}
init_states = [init_state_1, init_state_2, init_state_3, init_state_4]
trans_mat = {"Coin1": {"Coin1": 0.6, "Coin2": 0.2, "Coin3": 0.2},
             "Coin2": {"Coin1": 0.3, "Coin2": 0.5, "Coin3": 0.2},
             "Coin3": {"Coin1": 0.5, "Coin2": 0.2, "Coin3": 0.3}}
emission_mat = {"Coin1": {"Heads": 0.7, "Tails": 0.3},
                "Coin2": {"Heads": 0.4, "Tails": 0.6},
                "Coin3": {"Heads": 0.5, "Tails": 0.5}}

['Tails', 'Heads', 'Tails', 'Tails', 'Heads', 'Heads', 'Heads', 'Tails', 'Heads', 'Heads', 'Tails', 'Tails', 'Heads', 'Heads', 'Tails']


In [4]:
for init_state in init_states: 
    result_seq, prob = viterbi(observations, states, init_state, trans_mat, emission_mat)
    print("The sequences are " + " -> ".join(result_seq))
    print("The highest probability is %s" % prob)

The sequences are Coin2 -> Coin2 -> Coin2 -> Coin2 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1
The highest probability is 5.2274369621610784e-09
The sequences are Coin3 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1
The highest probability is 4.5282672684720325e-09
The sequences are Coin3 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1
The highest probability is 5.488808810269131e-09
The sequences are Coin2 -> Coin2 -> Coin2 -> Coin2 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1 -> Coin1
The highest probability is 6.534296202701345e-09


2. Calculate Parameters

In [66]:
# Data "Heads": 0, "Tails": 1
observations_2 = [0 if o == "Heads" else 1 for o in observations]
print(observations_2)
init_state = np.array([0.34, 0.33, 0.33])
trans_mat = np.array([[0.1, 0.3, 0.6],
                       [0.5, 0.3, 0.2],
                       [0.4, 0.3, 0.3]])
emission_mat = np.array([[0.2, 0.8],
                          [0.4, 0.6],
                          [0.5, 0.5]])

[1, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1]


In [67]:
def baum_welch(observations, states, trans, emission, iterations=0):
    if iterations:
        N = trans.shape[0]
        T = len(observations)
        for i in range(iterations):
            #Forward
            F = np.zeros((N, T))
            F[:, 0] = states * emission[:, observations[0]]
            for t in range(1, T):
                for n in range(N):
                    F[n, t] = np.dot(F[:, t-1], (trans[:, n])) * emission[n, observations[t]]
            #Backward
            X = np.zeros((N, T))
            X[:, -1:] = 1
            for t in reversed(range(T-1)):
                for n in range(N):
                    X[n, t] = np.sum(X[:, t+1] * trans[n, :] * emission[:, observations[t+1]])
            #Estimation
            obs_prob = np.sum(F[:, -1])
            x_i = np.zeros((T-1, N, N))
            for t in range(x_i.shape[0]):
                x_i[t, :, :] = trans * F[:, [t]] * emission[:, observations[t+1]] * X[:, t+1] / obs_prob
            gamma = F * X / obs_prob
            gamma_sum_trans = np.sum(gamma[:, :-1], axis=1, keepdims=True)
            rows_to_keep_trans = (gamma_sum_trans == 0)
            gamma_sum_trans[gamma_sum_trans == 0] = 1.
            next_trans = np.sum(x_i, axis=0) / gamma_sum_trans

            gamma_sum_emission = np.sum(gamma, axis=1, keepdims=True)
            rows_to_keep_emission = (gamma_sum_emission == 0)
            gamma_sum_emission[gamma_sum_emission == 0] = 1.

            obs_mat = np.zeros((T, emission.shape[1]))
            obs_mat[range(T), observations] = 1
            next_emission = np.dot(gamma, obs_mat) / gamma_sum_emission

            trans = trans * rows_to_keep_trans + next_trans
            emission = emission * rows_to_keep_emission + next_emission
            states = gamma[:, 0] / np.sum(gamma[:, 0])
    return states, trans, emission

In [68]:
states, trans, emission = baum_welch(observations_2, init_state, trans_mat, emission_mat, iterations=2)
print("states: " + str(states))
print("trans: " + str(trans))
print("emission: " + str(emission))

states: [0.68496556 0.17894851 0.13608593]
trans: [[0.08537028 0.27434949 0.64028023]
 [0.44882755 0.31700537 0.23416708]
 [0.33551023 0.31721426 0.34727551]]
emission: [[0.31107914 0.68892086]
 [0.57220631 0.42779369]
 [0.6868026  0.3131974 ]]
