In [20]:
import numpy as np
from operator import itemgetter

In [9]:
states = ['INIT','Onset','Mid','End','FINAL']
observations = ['C1','C2','C3','C4','C5','C6','C7']
A = [[0,1,0,0,0],
    [0,0.3,0.7,0,0],
    [0,0,0.9,0.1,0],
    [0,0,0,0.4,0.6]]
B = [[0,0,0,0,0,0,0],
    [0.5,0.2,0.3,0,0,0,0],
    [0,0,0.2,0.7,0.1,0,0],
    [0,0,0,0.1,0,0.5,0.4]]
seq = [['C1','C2','C3','C4','C4','C6','C7'],
      ['C2','C2','C5','C4','C4','C6','C6']]

In [66]:
class HMM:

    def __init__(self, A=None, B=None, name_states=None, name_observations=None):
        self.A = A
        self.B = B
        self.name_states = name_states
        self.name_observations = name_observations

    def gen_sequence(self, num_sequences=1):
        def draw_from(probabilities):
            return np.random.choice(len(probabilities), 1, p=probabilities)[0]
        sequences = []
        for i in np.arange(0,num_sequences):
            states = []
            observations = []
            states.append(draw_from(self.A[0]))
            observations.append(draw_from(self.B[states[0]]))
            t = 1
            state = draw_from(self.A[states[t - 1]])
            while state < len(self.name_states) - 1:
                states.append(state)
                observations.append(draw_from(self.B[state]))
                t += 1
                state = draw_from(self.A[states[t - 1]])
            sequences.append(itemgetter(*observations)(self.name_observations))
        return sequences

In [67]:
hmm = HMM(A,B,states,observations)

In [78]:
hmm.gen_sequence(3)

[('C3', 'C1', 'C4', 'C4', 'C4', 'C4', 'C4', 'C3', 'C6', 'C7'),
 ('C1', 'C2', 'C4', 'C7'),
 ('C1', 'C4', 'C3', 'C4', 'C5', 'C4', 'C4', 'C3', 'C6')]

In [23]:
itemgetter(*[1, 3, 6])(observations)

('C2', 'C4', 'C7')

In [14]:
states = ('Onset','Mid','End')
end_state = 'FINAL'
 
observations = ('C1','C2','C3','C4','C5','C6','C7')
 
start_probability = {'Onset': 1, 'Mid': 0,'End':0}
 
transition_probability = {
   'Onset' : {'Onset': 0.3, 'Mid': 0.7, 'End': 0, 'FINAL': 0},
   'Mid' : {'Onset': 0, 'Mid': 0.9, 'End': 0.1, 'FINAL': 0},
   'End' : {'Onset': 0, 'Mid': 0, 'End': 0.4, 'FINAL': 0.6},
   }
 
emission_probability = {
   'Onset' : {'C1': 0.5, 'C2': 0.2, 'C3': 0.3,'C4':0,'C5':0,'C6':0,'C7':0},
   'Mid' : {'C1': 0, 'C2': 0, 'C3': 0.2,'C4':0.7,'C5':0.1,'C6':0,'C7':0},
   'End' : {'C1': 0, 'C2': 0, 'C3': 0,'C4':0.1,'C5':0,'C6':0.5,'C7':0.4},
   }

In [None]:
states = ('Healthy', 'Fever')
end_state = 'E'
 
observations = ('normal', 'cold', 'dizzy')
 
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
 
transition_probability = {
   'Healthy' : {'Healthy': 0.69, 'Fever': 0.3, 'E': 0.01},
   'Fever' : {'Healthy': 0.4, 'Fever': 0.59, 'E': 0.01},
   }
 
emission_probability = {
   'Healthy' : {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
   'Fever' : {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
   }

In [22]:
def fwd_bkw(x, states, a_0, a, e, end_st):
    L = len(x)
 
    fwd = []
    f_prev = {}
    # forward part of the algorithm
    for i, x_i in enumerate(x):
        f_curr = {}
        for st in states:
            if i == 0:
                # base case for the forward part
                prev_f_sum = a_0[st]
            else:
                prev_f_sum = sum(f_prev[k]*a[k][st] for k in states)
 
            f_curr[st] = e[st][x_i] * prev_f_sum
 
        fwd.append(f_curr)
        f_prev = f_curr
 
    p_fwd = sum(f_curr[k]*a[k][end_st] for k in states)
 
    bkw = []
    b_prev = {}
    # backward part of the algorithm
    for i, x_i_plus in enumerate(reversed(x[1:]+(None,))):
        b_curr = {}
        for st in states:
            if i == 0:
                # base case for backward part
                b_curr[st] = a[st][end_st]
            else:
                b_curr[st] = sum(a[st][l]*e[l][x_i_plus]*b_prev[l] for l in states)
 
        bkw.insert(0,b_curr)
        b_prev = b_curr
 
    p_bkw = sum(a_0[l] * e[l][x[0]] * b_curr[l] for l in states)
 
    # merging the two parts
    posterior = []
    for i in range(L):
        posterior.append({st: fwd[i][st]*bkw[i][st]/p_fwd for st in states})
 
#     assert p_fwd == p_bkw
    return fwd, bkw, posterior, p_fwd, p_bkw

In [23]:
fwd_bkw(observations,
                   states,
                   start_probability,
                   transition_probability,
                   emission_probability,
                   end_state)

([{'End': 0, 'Mid': 0, 'Onset': 0.5},
  {'End': 0.0, 'Mid': 0.0, 'Onset': 0.03},
  {'End': 0.0, 'Mid': 0.0042, 'Onset': 0.0026999999999999997},
  {'End': 4.2000000000000004e-05, 'Mid': 0.0039689999999999994, 'Onset': 0.0},
  {'End': 0.0, 'Mid': 0.00035720999999999995, 'Onset': 0.0},
  {'End': 1.78605e-05, 'Mid': 0.0, 'Onset': 0.0},
  {'End': 2.85768e-06, 'Mid': 0.0, 'Onset': 0.0}],
 [{'End': 0.0, 'Mid': 0.0, 'Onset': 3.4292160000000006e-06},
  {'End': 0.0, 'Mid': 4.898880000000002e-05, 'Onset': 5.7153600000000015e-05},
  {'End': 0.0, 'Mid': 0.0002721600000000001, 'Onset': 0.00021168000000000003},
  {'End': 0.0, 'Mid': 0.00043200000000000015, 'Onset': 0.00033600000000000004},
  {'End': 0.019200000000000005, 'Mid': 0.004800000000000001, 'Onset': 0.0},
  {'End': 0.09600000000000002, 'Mid': 0.024000000000000004, 'Onset': 0.0},
  {'End': 0.6, 'Mid': 0, 'Onset': 0}],
 [{'End': 0.0, 'Mid': 0.0, 'Onset': 1.0000000000000002},
  {'End': 0.0, 'Mid': 0.0, 'Onset': 1.0000000000000002},
  {'End': 0.

In [18]:
def viterbi(obs, states, start_p, trans_p, emit_p):
    V = [{}]
    for st in states:
        V[0][st] = {"prob": start_p[st] * emit_p[st][obs[0]], "prev": None}
    # Run Viterbi when t > 0
    for t in range(1, len(obs)):
        V.append({})
        for st in states:
            max_tr_prob = max(V[t-1][prev_st]["prob"]*trans_p[prev_st][st] for prev_st in states)
            for prev_st in states:
                if V[t-1][prev_st]["prob"] * trans_p[prev_st][st] == max_tr_prob:
                    max_prob = max_tr_prob * emit_p[st][obs[t]]
                    V[t][st] = {"prob": max_prob, "prev": prev_st}
                    break
    for line in dptable(V):
        print(line)
    opt = []
    # The highest probability
    max_prob = max(value["prob"] for value in V[-1].values())
    previous = None
    # Get most probable state and its backtrack
    for st, data in V[-1].items():
        if data["prob"] == max_prob:
            opt.append(st)
            previous = st
            break
    # Follow the backtrack till the first observation
    for t in range(len(V) - 2, -1, -1):
        opt.insert(0, V[t + 1][previous]["prev"])
        previous = V[t + 1][previous]["prev"]

    print('The steps of states are ' + ' '.join(opt) + ' with highest probability of %s' % max_prob)

def dptable(V):
    # Print a table of steps from dictionary
    yield " ".join(("%12d" % i) for i in range(len(V)))
    for state in V[0]:
        yield "%.7s: " % state + " ".join("%.7s" % ("%f" % v[state]["prob"]) for v in V)

In [19]:
viterbi(observations,
                   states,
                   start_probability,
                   transition_probability,
                   emission_probability)

           0            1            2            3            4            5            6
Mid: 0.00000 0.00000 0.00420 0.00264 0.00023 0.00000 0.00000
End: 0.00000 0.00000 0.00000 0.00004 0.00000 0.00001 0.00000
Onset: 0.50000 0.03000 0.00270 0.00000 0.00000 0.00000 0.00000
The steps of states are Onset Onset Mid Mid Mid End End with highest probability of 1.9051200000000005e-06
