In [1]:
import numpy as np
from collections import deque

In [2]:
def forward(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    alpha = np.empty((T, num_states))
    alpha[0] = pi * B[seq[0]]
    for t in range(1, T):
        alpha[t] = alpha[t - 1] @ A * B[seq[t]]
    return alpha

def backward(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    beta = np.empty((T, num_states))
    beta[T - 1] = 1
    for t in range(T - 2, -1, -1):
        beta[t] = A * B[seq[t + 1]] @ beta[t + 1]
    return beta

def alpha_prob(alpha):
    return sum(alpha[-1])

def beta_prob(pi, B, beta, seq):
    return sum(pi * B[seq[0]] * beta[0])

In [3]:
pi = np.array([0.4, 0.3, 0.3])
# A[i, j] from state i+1 to j+1
A = np.array([
    [0.8, 0.2, 0. ],
    [0.3, 0.4, 0.3],
    [0. , 0.3, 0.7]])
# B[o, i] emitting o at state i+1
B = np.array([
    [0.9, 0.5, 0.2],
    [0.1, 0.5, 0.8]])

seq = np.array([0, 0, 1, 0])
alpha = forward((pi, A, B), seq)
beta = backward((pi, A, B), seq)
alpha_prob(alpha), beta_prob(pi, B, beta, seq)

(0.054768480000000015, 0.054768480000000015)

In [None]:
def compute_xi():
    xi = np.zeros((3, 3, 3))
    for t in range(3):
        for i in range(3):
            for j in range(3):
                xi[t, i, j] = alpha[t, i] * A[i, j] * B[seq[t + 1], j] * beta[t + 1, j]
    xi /= alpha_prob(alpha)
    for t in range(3):
        assert np.isclose(xi[t].sum(), 1)
    return xi

In [None]:
def compute_xi_fast():
    xi = np.zeros((3, 3, 3))
    for t in range(3):
        xi[t] = np.outer(alpha[t], beta[t + 1]) * A * B[seq[t + 1]]
    xi /= alpha_prob(alpha)
    return xi

In [None]:
xi1 = compute_xi()
xi2 = compute_xi_fast()
np.allclose(xi1, xi2)

In [None]:
%timeit -n 50 compute_xi()
%timeit -n 50 compute_xi_fast()

In [None]:
gamma = np.empty((4, 3))
for t in range(4):
    for i in range(3):
        gamma[t, i] = alpha[t, i] * beta[t, i]
gamma

In [None]:
gamma = alpha * beta
gamma /= alpha_prob(alpha)

In [None]:
# update pi gamma[0] equivalent
xi[0].sum(axis=1), gamma[0]

In [None]:
# update A
numerator = xi.sum(axis=0)
denom = gamma[:-1].sum(axis=0)
A = numerator / denom[:, None]
print(A.sum(axis=1)) # row-wise 1
A

In [None]:
# update B
denom = gamma.sum(axis=0) # soft counts of state i over entire seq
for k in range(2):
    B[k] = gamma[seq == k].sum(axis=0)
B /= denom
print(B.sum(axis=0)) # col-wise 1
B

In [None]:
class HMM:
    def __init__(self):
        self.pi = np.array([0.4, 0.3, 0.3])
        # A[i, j] from state i+1 to j+1
        self.A = np.array([
            [0.8, 0.2, 0. ],
            [0.3, 0.4, 0.3],
            [0. , 0.3, 0.7]])
        # B[o, i] emitting o at state i+1
        self.B = np.array([
            [0.9, 0.5, 0.2],
            [0.1, 0.5, 0.8]])
        self.num_states = 3
        self.num_emissions = 2
        self.alpha = None
        self.beta = None
        self.xi = None
        self.gamma = None
        
    def forward(self, seq):
        T = seq.shape[0]
        self.alpha = np.empty((T, self.num_states)) # T * num_states
        self.alpha[0] = self.pi * self.B[seq[0]]
        for t in range(1, T):
            self.alpha[t] = self.alpha[t - 1] @ self.A * self.B[seq[t]]
    
    def backward(self, seq):
        num_states = self.A.shape[0]
        T = seq.shape[0]
        self.beta = np.empty((T, self.num_states))
        self.beta[T - 1] = 1
        for t in range(T - 2, -1, -1):
            self.beta[t] = self.A * self.B[seq[t + 1]] @ self.beta[t + 1]
            
    def alpha_prob(self):
        return sum(self.alpha[-1])
    
    def beta_prob(self, seq):
        return sum(self.pi * self.B[seq[0]] * self.beta[0])
    
    def expectation(self, seq):
        self.forward(seq)
        self.backward(seq)
        
        T = seq.shape[0]
        alpha_p = self.alpha_prob()
        self.xi = np.empty((T - 1, self.num_states, self.num_states))
        for t in range(T - 1):
            for i in range(self.num_states):
                for j in range(self.num_states):
                    numerator = self.alpha[t, i] * self.A[i, j] * \
                    self.B[seq[t + 1], j] * self.beta[t + 1, j]
                    self.xi[t, i, j] = numerator
        self.xi /= self.alpha_prob()
        
        self.gamma = self.alpha * self.beta / alpha_p
    
    def maximization(self, seq):
        # update pi
        self.pi = self.gamma[0] # soft counts of each state at time 1
        # update A
        numerator = self.xi.sum(axis=0) # soft counts of transitions from i to j
        denom = self.gamma[:-1].sum(axis=0) # soft counts of transitions out of i 
        self.A = numerator / denom[:, None]
        
        # update B
        denom = self.gamma.sum(axis=0) # soft counts of state i over entire seq
        for k in range(self.num_emissions):
            self.B[k] = self.gamma[seq == k].sum(axis=0)
        self.B /= denom
        
    def check_probabilities(self):
        assert np.isclose(hmm.pi.sum(), 1)
        assert np.allclose(hmm.A.sum(axis=1), 1)
        assert np.allclose(hmm.B.sum(axis=0), 1)

In [None]:
seq = np.array([0, 0, 1, 0])
hmm = HMM()
hmm.forward(seq)
hmm.backward(seq)
hmm.alpha_prob(), hmm.beta_prob(seq)

In [None]:
np.set_printoptions(precision=5, suppress=True)
for i in range(10):
    hmm.expectation(seq)
    hmm.maximization(seq)
    print(hmm.A, hmm.B, hmm.pi, sep='\n\n**\n\n')
    hmm.check_probabilities()

In [25]:
# viterbi
delta = np.empty((4, 3))
delta[0] = pi * B[seq[0]]
for t in range(1, 4):
    for j in range(3):
        arr = [delta[t - 1, i] * A[i, j] for i in range(3)]
        delta[t, j] = max(arr) * B[seq[t], j]
delta

array([[0.36      , 0.15      , 0.06      ],
       [0.2592    , 0.036     , 0.009     ],
       [0.020736  , 0.02592   , 0.00864   ],
       [0.01492992, 0.005184  , 0.0015552 ]])

In [26]:
delta = np.empty((4, 3))
delta[0] = pi * B[seq[0]]
for t in range(1, 4):
    delta[t] = (delta[t - 1][:, None] * A).max(axis=0) * B[seq[t]]
delta

array([[0.36      , 0.15      , 0.06      ],
       [0.2592    , 0.036     , 0.009     ],
       [0.020736  , 0.02592   , 0.00864   ],
       [0.01492992, 0.005184  , 0.0015552 ]])

In [95]:
delta = np.empty((4, 3))
delta[0] = pi * B[seq[0]]
for t in range(1, 4):
    delta[t] = (delta[t - 1] * A).max(axis=0) * B[seq[t]]
delta

array([[0.36      , 0.15      , 0.06      ],
       [0.2592    , 0.03      , 0.0084    ],
       [0.020736  , 0.006     , 0.004704  ],
       [0.01492992, 0.0012    , 0.00065856]])

In [96]:
t = 1
np.einsum('i,ij->ij', delta[0], A).max(axis=0) * B[0], \
np.einsum('ij,i->ij', A, delta[0]).max(axis=0) * B[0]

(array([0.2592, 0.036 , 0.009 ]), array([0.2592, 0.036 , 0.009 ]))

In [77]:
def viterbi_trans_cached(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    Atrans = A.T
    delta = np.empty((T, num_states))
    delta[0] = pi * B[seq[0]]
    for t in range(1, T):
        delta[t] = (delta[t - 1] * Atrans).max(axis=1) * B[seq[t]]
    return delta

In [104]:
def viterbi_einsum(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    delta = np.empty((T, num_states))
    delta[0] = pi * B[seq[0]]
    for t in range(1, T):
        delta[t] = np.einsum('i,ij->ij', delta[t - 1], A).max(axis=0) * B[seq[t]]
    return delta

In [108]:
def viterbi_newaxis(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    delta = np.empty((T, num_states))
    delta[0] = pi * B[seq[0]]
    for t in range(1, T):
        delta[t] = (delta[t - 1][:, np.newaxis] * A).max(axis=0) * B[seq[t]]
    return delta

In [105]:
def viterbi_einsum2(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    delta = np.empty((T, num_states))
    delta[0] = pi * B[seq[0]]
    for t in range(1, T):
        delta[t] = np.einsum('ij,i->ij', A, delta[t - 1]).max(axis=0) * B[seq[t]]
    return delta

In [109]:
test = np.random.randint(0, 2, 1000)
res1 = viterbi_trans_cached((pi, A, B), test)
res2 = viterbi_newaxis((pi, A, B), test)
res3 = viterbi_einsum((pi, A, B), test)
res4 = viterbi_einsum2((pi, A, B), test)
np.allclose(res1, res2), np.allclose(res2, res3), np.allclose(res4, res3)

(True, True, True)

In [112]:
%timeit -n 50 viterbi_trans_cached((pi, A, B), test)
%timeit -n 50 viterbi_newaxis((pi, A, B), test)
# %timeit -n 50 viterbi_einsum((pi, A, B), test)
# %timeit -n 50 viterbi_einsum2((pi, A, B), test)
%timeit -n 50 viterbi((pi, A, B), test)

10.1 ms ± 1.26 ms per loop (mean ± std. dev. of 7 runs, 50 loops each)
8.92 ms ± 350 µs per loop (mean ± std. dev. of 7 runs, 50 loops each)
8.75 ms ± 353 µs per loop (mean ± std. dev. of 7 runs, 50 loops each)


In [None]:
delta = np.empty((3, 4))
delta[0] = pi * B[seq[0]]
for t in range(1, 4):
    delta[t] = (delta[t - 1] * A.T).max(axis=1) * B[seq[t]]
delta

In [17]:
# backtracing
delta = np.empty((4, 3))
psi = np.empty((4, 3), dtype='int')
psi[0] = np.arange(3) # best state to come from initially, aka. pi
delta[0] = pi * B[seq[0]]
for t in range(1, 4):
    temp = delta[t - 1][:, None] * A
    psi[t] = temp.argmax(axis=0)
    delta[t] = temp.max(axis=0) * B[seq[t]]
print(delta, psi, sep='\n\n')

best = np.argmax(delta[-1])
path = deque([best])
for t in range(3, -1, -1):
    best = psi[t, best]
    path.appendleft(best)
    
print(path)

[[0.36       0.15       0.06      ]
 [0.2592     0.036      0.009     ]
 [0.020736   0.02592    0.00864   ]
 [0.01492992 0.005184   0.0015552 ]]

[[0 1 2]
 [0 0 1]
 [0 0 1]
 [0 1 1]]
deque([0, 0, 0, 0, 0])


In [24]:
def viterbi(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    delta = np.empty((T, num_states))
    psi = np.empty((T, num_states), dtype='int')
    psi[0] = np.arange(num_states) # best state to come from initially, aka. pi
    delta[0] = pi * B[seq[0]]
    for t in range(1, T):
        temp = delta[t - 1][:, None] * A
        psi[t] = temp.argmax(axis=0)
        delta[t] = temp.max(axis=0) * B[seq[t]]
    print(delta, psi, sep='\n\n')
    best = np.argmax(delta[-1])
    path = deque([best])
    for t in range(T - 2, -1, -1):
        best = psi[t, best]
        path.appendleft(best)
    return path

In [25]:
viterbi((pi, A, B), seq)

[[0.36       0.15       0.06      ]
 [0.2592     0.036      0.009     ]
 [0.020736   0.02592    0.00864   ]
 [0.01492992 0.005184   0.0015552 ]]

[[0 1 2]
 [0 0 1]
 [0 0 1]
 [0 1 1]]


deque([0, 0, 0, 0])

In [26]:
test = np.array([0, 1, 0, 1])
viterbi((pi, A, B), test)

[[0.36       0.15       0.06      ]
 [0.0288     0.036      0.036     ]
 [0.020736   0.0072     0.00504   ]
 [0.00165888 0.0020736  0.0028224 ]]

[[0 1 2]
 [0 0 1]
 [0 1 2]
 [0 0 2]]


deque([1, 1, 2, 2])

In [36]:
psi = np.array([[0, 1], [1, 1], [0, 0], [0, 0]])
best = 0
path = deque([best])
for t in range(2, -1, -1):
    best = psi[t, best]
    path.appendleft(best)
path

deque([1, 1, 0, 0])