In [1]:
import numpy as np
from scipy.special import logsumexp
from collections import deque
np.random.seed(666)

In [2]:
def forward(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    alpha = np.empty((T, num_states))
    alpha[0] = pi * B[seq[0]]
    for t in range(1, T):
        alpha[t] = alpha[t - 1] @ A * B[seq[t]]
    return alpha

def backward(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    beta = np.empty((T, num_states))
    beta[T - 1] = 1
    for t in range(T - 2, -1, -1):
        beta[t] = A * B[seq[t + 1]] @ beta[t + 1]
    return beta

def alpha_prob(alpha):
    return sum(alpha[-1])

def beta_prob(pi, B, beta, seq):
    return sum(pi * B[seq[0]] * beta[0])

In [3]:
pi = np.array([0.4, 0.3, 0.3])
# A[i, j] from state i+1 to j+1
A = np.array([
    [0.8, 0.2, 0. ],
    [0.3, 0.4, 0.3],
    [0. , 0.3, 0.7]])
# B[o, i] emitting o at state i+1
B = np.array([
    [0.9, 0.5, 0.2],
    [0.1, 0.5, 0.8]])

seq = np.array([0, 0, 1, 0])
alpha = forward((pi, A, B), seq)
beta = backward((pi, A, B), seq)
alpha_prob(alpha), beta_prob(pi, B, beta, seq)

(0.054768480000000015, 0.054768480000000015)

In [4]:
def log_forward(model, seq):
    log_pi, log_A, log_B = model
    num_states = log_A.shape[0]
    T = seq.shape[0]
    log_alpha = np.empty((T, num_states))
    log_alpha[0] = log_pi + log_B[seq[0]]
    for t in range(1, T):
        log_alpha[t] = logsumexp(log_alpha[t - 1] + log_A.T, axis=1) + log_B[seq[t]]
    return log_alpha

def log_backward(model, seq):
    log_pi, log_A, log_B = model
    num_states = log_A.shape[0]
    T = seq.shape[0]
    log_beta = np.empty((T, num_states))
    log_beta[T - 1] = 0 # log(1)
    for t in range(T - 2, -1, -1):
        log_beta[t] = logsumexp(log_A + log_B[seq[t + 1]] + log_beta[t + 1], axis=1)
    return log_beta

def log_alpha_prob(log_alpha):
    return logsumexp(log_alpha[-1])

def log_beta_prob(log_pi, log_B, log_beta, seq):
    return logsumexp(log_pi + log_B[seq[0]] + log_beta[0])

In [5]:
log_pi = np.log(pi)
log_A = np.log(A)
log_B = np.log(B)
log_alpha = log_forward((log_pi, log_A, log_B), seq)
log_beta = log_backward((log_pi, log_A, log_B), seq)
log_alpha_prob(log_alpha), log_beta_prob(log_pi, log_B, log_beta, seq), \
np.log(alpha_prob(alpha))

  log_A = np.log(A)


(-2.90464043299082, -2.9046404329908198, -2.9046404329908198)

In [6]:
# transposed
def log_forward_trans(model, seq):
    log_pi, log_A, log_B = model
    num_states = log_A.shape[0]
    T = seq.shape[0]
    log_alpha = np.empty((num_states, T))
    log_alpha[:, 0] = log_pi + log_B[seq[0]]
    for t in range(1, T):
        log_alpha[:, t] = logsumexp(log_alpha[:, t - 1] + log_A.T, axis=1) + log_B[seq[t]]
    return log_alpha

def log_backward_trans(model, seq):
    log_pi, log_A, log_B = model
    num_states = log_A.shape[0]
    T = seq.shape[0]
    log_beta = np.empty((num_states, T))
    log_beta[:, T - 1] = 0 # log(1)
    for t in range(T - 2, -1, -1):
        log_beta[:, t] = logsumexp(log_A + log_B[seq[t + 1]] + log_beta[:, t + 1], axis=1)
    return log_beta

def log_alpha_prob_trans(log_alpha):
    return logsumexp(log_alpha[:, -1])

def log_beta_prob_trans(log_pi, log_B, log_beta, seq):
    return logsumexp(log_pi + log_B[seq[0]] + log_beta[:, 0])

In [7]:
log_alpha_t = log_forward_trans((log_pi, log_A, log_B), seq)
log_beta_t = log_backward_trans((log_pi, log_A, log_B), seq)
log_alpha_prob_trans(log_alpha_t), log_beta_prob_trans(log_pi, log_B, log_beta_t, seq), \
np.log(alpha_prob(alpha))

(-2.90464043299082, -2.9046404329908198, -2.9046404329908198)

In [9]:
def viterbi(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    delta = np.empty((T, num_states))
    psi = np.empty((T, num_states), dtype='int')
    psi[0] = np.arange(num_states) # best state to come from initially, aka. pi
    delta[0] = pi * B[seq[0]]
    for t in range(1, T):
        temp = delta[t - 1][:, None] * A
        psi[t] = temp.argmax(axis=0)
        delta[t] = temp.max(axis=0) * B[seq[t]]
#     print(delta, psi, sep='\n\n')
    best = np.argmax(delta[-1])
    path = deque([best])
    for t in range(T - 2, -1, -1):
        best = psi[t, best]
        path.appendleft(best)
    return delta, psi, path

In [53]:
# TODO
def viterbi_trans(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    delta = np.empty((num_states, T))
    psi = np.empty((T, num_states), dtype='int')
    psi[0] = np.arange(num_states) # best state to come from initially, aka. pi
    delta[:, 0] = pi * B[seq[0]]
    for t in range(1, T):
        temp = delta[:, t - 1][:, None] * A
        psi[t] = temp.argmax(axis=0)
        delta[:, t] = temp.max(axis=0) * B[seq[t]]
#     print(delta, psi, sep='\n\n')
    best = np.argmax(delta[:, -1])
    path = deque([best])
    for t in range(T - 2, -1, -1):
        best = psi[t, best]
        path.appendleft(best)
    return delta, psi, path

In [54]:
delta1, _, _ = viterbi((pi, A, B), seq)
delta2, _, _ = viterbi_trans((pi, A, B), seq)

In [56]:
np.allclose(delta1.T, delta2)

True

In [57]:
def viterbi_log(model, seq):
    log_pi, log_A, log_B = model
    num_states = log_A.shape[0]
    T = seq.shape[0]
    log_delta = np.empty((T, num_states))
    psi = np.empty((T, num_states), dtype='int')
    psi[0] = np.arange(num_states) # best state to come from initially, aka. pi
    log_delta[0] = log_pi + log_B[seq[0]]
    for t in range(1, T):
        temp = log_delta[t - 1][:, None] + log_A
        psi[t] = temp.argmax(axis=0)
        log_delta[t] = temp.max(axis=0) + log_B[seq[t]]
#     print(log_delta, psi, sep='\n\n')
    
    q = np.empty(T, dtype='int') # best path
    q[T - 1] = np.argmax(log_delta[-1])
    for t in range(T - 2, -1, -1):
        q[t] = psi[t, q[t + 1]]
    return log_delta, psi, q

In [61]:
def viterbi_log_trans(model, seq):
    log_pi, log_A, log_B = model
    num_states = log_A.shape[0]
    T = seq.shape[0]
    log_delta = np.empty((num_states, T))
    psi = np.empty((T, num_states), dtype='int')
    psi[0] = np.arange(num_states) # best state to come from initially, aka. pi
    log_delta[:, 0] = log_pi + log_B[seq[0]]
    for t in range(1, T):
        temp = log_delta[:, t - 1][:, None] + log_A
        psi[t] = temp.argmax(axis=0)
        log_delta[:, t] = temp.max(axis=0) + log_B[seq[t]]
#     print(log_delta, psi, sep='\n\n')
    
    q = np.empty(T, dtype='int') # best path
    q[T - 1] = np.argmax(log_delta[:, -1])
    for t in range(T - 2, -1, -1):
        q[t] = psi[t, q[t + 1]]
    return log_delta, psi, q

In [62]:
delta3, _, _ = viterbi_log((log_pi, log_A, log_B), seq)
delta4, _, _ = viterbi_log_trans((log_pi, log_A, log_B), seq)

In [64]:
np.allclose(np.log(delta2).T, delta3),\
np.allclose(delta3.T, delta4)

(True, True)

In [17]:
# TODO: viterbi log transposed

In [32]:
test = np.random.randint(0, 2, 100)

In [19]:
np.all(viterbi((pi, A, B), test) == viterbi_log((log_pi, log_A, log_B), test))

True

In [20]:
np.all(viterbi((pi, A, B), test) == viterbi_log_trans((log_pi, log_A, log_B), test))

False

In [14]:
def compute_xi(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    
    alpha = forward(model, seq)
    beta = backward(model, seq)
    alpha_p = alpha_prob(alpha)
    
    xi = np.empty((T - 1, num_states, num_states))
    for t in range(T - 1):
        xi[t] = np.outer(alpha[t], beta[t + 1]) * A * B[seq[t + 1]]
    xi /= alpha_p
    return xi

In [15]:
xi = compute_xi((pi, A, B), seq)
xi

array([[[0.56129219, 0.13159394, 0.        ],
        [0.0877019 , 0.10966162, 0.03975097],
        [0.        , 0.03289848, 0.0371009 ]],

       [[0.35897144, 0.29002266, 0.        ],
        [0.03368726, 0.14515648, 0.0953103 ],
        [0.        , 0.02525723, 0.05159464]],

       [[0.34477349, 0.04788521, 0.        ],
        [0.23456192, 0.17374957, 0.05212487],
        [0.        , 0.07598531, 0.07091962]]])

In [18]:
t = 2
np.outer(alpha[t], beta[t + 1]) * A * B[seq[t + 1]] / alpha_prob(alpha)

array([[0.34477349, 0.04788521, 0.        ],
       [0.23456192, 0.17374957, 0.05212487],
       [0.        , 0.07598531, 0.07091962]])

In [20]:
xi[-1], np.log(xi[-1])

  xi[-1], np.log(xi[-1])


(array([[0.34477349, 0.04788521, 0.        ],
        [0.23456192, 0.17374957, 0.05212487],
        [0.        , 0.07598531, 0.07091962]]),
 array([[-1.06486763, -3.03894865,        -inf],
        [-1.45003566, -1.75014025, -2.95411306],
        [       -inf, -2.57721523, -2.6462081 ]]))

In [21]:
np.outer(alpha[t], beta[t + 1]) * A * B[seq[t + 1]]

array([[0.01888272, 0.0026226 , 0.        ],
       [0.0128466 , 0.009516  , 0.0028548 ],
       [0.        , 0.0041616 , 0.00388416]])

In [25]:
np.allclose(np.log(A * B[seq[t + 1]]), log_A + log_B[seq[t + 1]])

  np.allclose(np.log(A * B[seq[t + 1]]), log_A + log_B[seq[t + 1]])


True

In [36]:
np.allclose(
    np.log(np.outer(alpha[t], beta[t + 1])) + log_A + log_B[seq[t + 1]],
    np.log(np.outer(alpha[t], beta[t + 1]) * A * B[seq[t + 1]])
)

  np.log(np.outer(alpha[t], beta[t + 1]) * A * B[seq[t + 1]])


True

In [59]:
# what is equivalent to np.log(np.outer(alpha[t], beta[t + 1]))?
np.allclose(
    np.log(np.outer(alpha[t], beta[t + 1])),
    log_alpha[t][:, None] + log_beta[t+1][None:, ]
)

True

In [63]:
np.allclose(
    log_alpha[t][:, None] + log_beta[t + 1] + log_A + log_B[seq[t + 1]],
    np.log(np.outer(alpha[t], beta[t + 1]) * A * B[seq[t + 1]])
)

  np.log(np.outer(alpha[t], beta[t + 1]) * A * B[seq[t + 1]])


True

In [39]:
v1 = np.array([1, 2])
v2 = np.array([3, 4])
np.outer(v1, v2) == v1[:, None] @ v2[None, :] 

In [67]:
def log_compute_xi(model, seq):
    pi, A, B = model
    num_states = A.shape[0]
    T = seq.shape[0]
    
    log_alpha = log_forward(model, seq)
    log_beta = log_backward(model, seq)
    log_alpha_p = log_alpha_prob(log_alpha)
    
    xi = np.empty((T - 1, num_states, num_states))
    for t in range(T - 1):
        xi[t] = log_alpha[t][:, None] + log_beta[t + 1] + log_A + log_B[seq[t + 1]]
    xi -= log_alpha_p
    return xi

In [70]:
np.allclose(
    log_compute_xi((log_pi, log_A, log_B), seq),
    np.log(xi)
)

  np.log(xi)


True