In [2]:
import numpy as np
import pandas as pd

In [3]:
seq = pd.read_csv("seq.csv")

# Viterbi Algorithm 

In [4]:
o = np.array(["a", "b", "c"]) # observation space
s = np.array(["St1", "St2"]) # state space
y = np.asarray(seq.iloc[:, 2].copy()) # observation sequence
x_true = np.asarray(seq.iloc[:, 1].copy()) # true result
a = np.array([[0.981, 0.007],[0.019, 0.992]]) # transition probability matrix
b = np.array([[0.076, 0.802, 0.122], [0.179, 0.151, 0.669]]) # emission probability matrix
p = np.array([0.723, 0.277]) # prior probability

In [5]:
y[y == "a"] = 0
y[y == "b"] = 1
y[y == "c"] = 2
x_true[x_true == "St1"] = 0
x_true[x_true == "St2"] = 1

In [6]:
def viterbi(o, s, y, a, b, p):
    
    k = a.shape[0]    
    t = len(y)
    tState = np.empty((k, t), 'd')
    tIndex = np.empty((k, t), 'B')
    x = np.empty(t, 'B')
    
    tState[:, 0] = p * b[:, y[0]]
    tIndex[:, 0] = 0

    for i in range(1, t):
        tState[:, i] = np.max(tState[:, i-1] * a.T * b[np.newaxis, :, y[i]].T, 1)
        tIndex[:, i] = np.argmax(tState[:, i-1] * a.T * b[np.newaxis, :, y[i]].T, 1)

    x[-1] = np.argmax(tState[:, t-1])
    for i in reversed(range(1, t)):
        x[i - 1] = tIndex[x[i], i]

    return x, tState, tIndex

In [7]:
x, tState, tIndex = viterbi(o, s, y, a, b, p)

In [8]:
a = x == 0
b = x_true == 0
Tp = sum(a == b)

a = x == 1
b = x_true == 1
Tn = sum(a == b)

a = x == 0
b = x_true == 1
Fp = sum(a == b)

a = x == 1
b = x_true == 0
Fn = sum(a == b)

In [9]:
Tn / (Tn+Fp)

0.9925

In [10]:
Tp / (Tp+Fn)

0.9925

# Forward–backward algorithm

In [49]:
o = np.array(["a", "b", "c"]) # observation space
s = np.array(["St1", "St2"]) # state space 
y = np.asarray(seq.iloc[:, 2].copy()) # observation sequence
e = 2 # number of end state
x_true = np.asarray(seq.iloc[:, 1].copy()) # true result
a = np.array([[0.981, 0.007, 0.01], [0.019, 0.992, 0.01]]) # transition probability matrix
b = np.array([[0.076, 0.802, 0.122], [0.179, 0.151, 0.669]]) # emission probability matrix
p = np.array([0.723, 0.277]) # prior probability

In [116]:
def forward_backward(o, s, p, a, b, e):

    # forward part of the algorithm
    fwd = []
    f_prev = []
    for i, observation_i in enumerate(o):
        f_curr = []
        for st in range(len(s)):
            if i == 0:
                # base case for the forward part
                prev_f_sum = p[st]
            else:
                prev_f_sum = sum(f_prev[k]*a[k, st] for k in range(len(s)))
            
            f_curr.append(b[st, i] * prev_f_sum)

        fwd.append(f_curr)
        f_prev = f_curr
        print(fwd)

    p_fwd = sum(f_curr[k] * a[k, e] for k in range(len(s)))

    # backward part of the algorithm
    bkw = []
    b_prev = []
    for i, observation_i_plus in enumerate(range(len(s))):
        b_curr = []
        for st in range(len(s)):
            if i == 0:
                # base case for backward part
                b_curr.append(a[st, e])
            else:
                b_curr.append(sum(a[st, l] * b[l, observation_i_plus] * b_prev[l] for l in range(len(s))))

        bkw.insert(0,b_curr)
        b_prev = b_curr

    p_bkw = sum(p[l] * b[l, 0] * b_curr[l] for l in range(len(s)))

    # merging the two parts
    posterior = []
    for i in range(len(o)):
        posterior.append({st: fwd[i][st] * bkw[i][st] / p_fwd for st in range(len(s))})

    assert p_fwd == p_bkw
    return posterior

In [117]:
forward_backward(o, s, p, a, b, e)

[[0.054948, 0.049583]]
[[0.054948, 0.049583], [0.04398654413, 0.007485216772000001]]
[[0.054948, 0.049583], [0.04398654413, 0.007485216772000001], [0.005281748307044155, 0.005173538126465047]]


IndexError: list index out of range

In [60]:
states = ('Healthy', 'Fever')
end_state = 'E'
 
observations = ('normal', 'cold', 'dizzy')
 
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
 
transition_probability = {
   'Healthy' : {'Healthy': 0.69, 'Fever': 0.3, 'E': 0.01},
   'Fever' : {'Healthy': 0.4, 'Fever': 0.59, 'E': 0.01},
   }
 
emission_probability = {
   'Healthy' : {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
   'Fever' : {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
   }


def fwd_bkw(observations, states, start_prob, trans_prob, emm_prob, end_st):
    # forward part of the algorithm
    fwd = []
    f_prev = {}
    for i, observation_i in enumerate(observations):
        f_curr = {}
        for st in states:
            if i == 0:
                # base case for the forward part
                prev_f_sum = start_prob[st]
            else:
                prev_f_sum = sum(f_prev[k]*trans_prob[k][st] for k in states)

            f_curr[st] = emm_prob[st][observation_i] * prev_f_sum

        fwd.append(f_curr)
        f_prev = f_curr

    p_fwd = sum(f_curr[k] * trans_prob[k][end_st] for k in states)

    # backward part of the algorithm
    bkw = []
    b_prev = {}
    print(observations[1:]+(None,))
    for i, observation_i_plus in enumerate(reversed(observations[1:]+(None,))):
        b_curr = {}
        for st in states:
            if i == 0:
                # base case for backward part
                b_curr[st] = trans_prob[st][end_st]
            else:
                b_curr[st] = sum(trans_prob[st][l] * emm_prob[l][observation_i_plus] * b_prev[l] for l in states)

        bkw.insert(0,b_curr)
        b_prev = b_curr

    p_bkw = sum(start_prob[l] * emm_prob[l][observations[0]] * b_curr[l] for l in states)

    # merging the two parts
    posterior = []
    for i in range(len(observations)):
        posterior.append({st: fwd[i][st] * bkw[i][st] / p_fwd for st in states})

    assert p_fwd == p_bkw
    return fwd, bkw, posterior

fwd_bkw(observations,
                   states,
                   start_probability,
                   transition_probability,
                   emission_probability,
                   end_state)

('cold', 'dizzy', None)


([{'Healthy': 0.3, 'Fever': 0.04000000000000001},
  {'Healthy': 0.0892, 'Fever': 0.03408},
  {'Healthy': 0.007518, 'Fever': 0.028120319999999997}],
 [{'Healthy': 0.0010418399999999998, 'Fever': 0.00109578},
  {'Healthy': 0.00249, 'Fever': 0.00394},
  {'Healthy': 0.01, 'Fever': 0.01}],
 [{'Healthy': 0.8770110375573259, 'Fever': 0.1229889624426741},
  {'Healthy': 0.623228030950954, 'Fever': 0.3767719690490461},
  {'Healthy': 0.2109527048413057, 'Fever': 0.7890472951586943}])

In [66]:
for i in enumerate(reversed(observations[1:]+(None,))):
    print(i)

(0, None)
(1, 'dizzy')
(2, 'cold')
