In [1]:
import numpy as np
from scipy.special import logsumexp

Importing data

In [2]:
data = np.load("exercise-02-data.npz")
x = data["x"]

In [3]:
print(f"{x.shape=}")
print(f"{x[0]=}")

x.shape=(1000, 5)
x[0]=array([1, 0, 2, 2, 2])


In [4]:
# where:

WALK = 0
SHOP = 1
CLEAN = 2

In [6]:
# quick one hot encoding
N, T, K, D = 1000, 5, 2, 3

x_one_hot = np.zeros((N, T, D))
x_one_hot[..., 0] = x == 0
x_one_hot[..., 1] = x == 1
x_one_hot[..., 2] = x == 2

assert x_one_hot.sum() == N * T

print(f"{x_one_hot[0]}")

[[0. 1. 0.]
 [1. 0. 0.]
 [0. 0. 1.]
 [0. 0. 1.]
 [0. 0. 1.]]


Copying over `forward_backward_v` from Exercise 2, changing it to take log_pi, log_a, log_b

In [7]:
def forward_backward_v(x, log_pi, log_a, log_b, debug=True):
    from scipy.special import logsumexp

    num_states = log_pi.size
    seq_len = len(x)

    log_alpha = np.zeros((seq_len, num_states))
    log_beta = np.zeros((seq_len, num_states))

    # calculate alpha
    log_alpha[0] = log_pi + log_b[:, x[0]]
    for t in range(seq_len-1):
        log_alpha[t+1] = logsumexp(log_alpha[t, :].reshape(-1, 1) + log_a[:, :] + log_b[:, x[t+1]].reshape(1, -1), axis=0)

    # calculate beta
    log_beta[-1] = 0  # redundant, due to initialization above 0 = log(1)
    for t in range(seq_len-2,-1,-1):
        log_beta[t] = logsumexp(log_a[:, :] + log_b[:, x[t+1]].reshape(1, -1) + log_beta[t+1, :], axis=1)
    
    # calculate log_gamma (unnormalized)
    log_gamma_unnormalized = log_alpha + log_beta

    if debug == True:
        # calculate log_p_x_lambda (and check if values are as expected)
        log_p_x_lambda = np.zeros(seq_len)
        for t in range(seq_len):
            log_p_x_lambda[t] = logsumexp(log_alpha[t] + log_beta[t])
        
        assert np.allclose(log_p_x_lambda, log_p_x_lambda[0])
        try:
            assert 0 <= np.exp(log_p_x_lambda[0]) <= 1
        except:
            raise ValueError(f"{log_p_x_lambda=}")

        gamma = np.exp(log_gamma_unnormalized - log_p_x_lambda.reshape(-1, 1))
        try:
            assert (0 <= gamma).all() and (gamma <= 1).all()
        except:
            raise ValueError(f"gamma:\n{gamma}")
    else:
        log_p_x_lambda = None
    
    # get argmax over gammas
    y = np.argmax(np.exp(log_gamma_unnormalized), axis=1)
        
    return y, log_alpha, log_beta, log_p_x_lambda

Implementing Baum-Welch (not optimized)

In [8]:
# dummy parameters for testing

num_states = 2
num_observations = 3

pi_est = np.ones((num_states)) / num_states
a_est = np.ones((num_states, num_states)) / num_states
b_est = np.ones((num_states, num_observations)) / num_observations

In [9]:
print(f"{pi_est=}")
print(f"{a_est=}")
print(f"{b_est=}")

pi_est=array([0.5, 0.5])
a_est=array([[0.5, 0.5],
       [0.5, 0.5]])
b_est=array([[0.33333333, 0.33333333, 0.33333333],
       [0.33333333, 0.33333333, 0.33333333]])


In [10]:
log_pi_est, log_a_est, log_b_est = np.log(pi_est), np.log(a_est), np.log(b_est)

In [11]:
def e_step(x, log_pi, log_a, log_b):
    N, T = x.shape
    K, K_ = log_a.shape
    assert K == K_
    K_, D = log_b.shape
    assert K == K_

    y = -1 * np.ones((N, T))
    log_alpha = -1 * np.ones((N, T, K))
    log_beta = -1 * np.ones((N, T, K))
    log_p_x_lambda = -1 * np.ones((N, T))  # output is duplicated T times

    for idx, seq in enumerate(x):
        out = forward_backward_v(x=seq, log_pi=log_pi, log_a=log_a, log_b=log_b, debug=True)
        y[idx], log_alpha[idx], log_beta[idx], log_p_x_lambda[idx] = out
    
    return y, log_alpha, log_beta, log_p_x_lambda

In [12]:
# testing e_step

_, log_alpha, log_beta, log_p_x_lambda = e_step(x=x, log_pi=log_pi_est, log_a=log_a_est, log_b=log_b_est)

In [13]:
def m_step(x, x_one_hot, log_a, log_b, log_alpha, log_beta, log_p_x_lambda):
    N, T, K = log_alpha.shape
    log_gamma = log_alpha + log_beta - log_p_x_lambda.reshape(N, T, 1)

    # max pi
    log_pi_new = logsumexp(log_gamma[:, 0, :], axis=0) - np.log(N)
    assert np.allclose(np.exp(logsumexp(log_pi_new)), 1)

    # max a
    log_a_new = np.ones_like(log_a) * -1
    for j in range(K):
        for k in range(K):
            log_xi_jk = np.ones((N, T-1)) * -1
            for n in range(N):
                for t in range(T-1):
                    log_xi_jk[n, t] = log_alpha[n, t, j] + log_a[j, k] + log_b[k, x[n, t+1]] + log_beta[n, t+1, k] - log_p_x_lambda[n, 0]
            log_a_new[j, k] = logsumexp(log_xi_jk) - logsumexp(log_gamma[:, :-1, j])

    # max b
    log_b_new = np.ones_like(log_b) * -1
    for i in range(D):
        log_b_new[..., i] = logsumexp(log_gamma + np.expand_dims(np.log(x_one_hot[..., i]), axis=-1), axis=(0, 1)) - logsumexp(log_gamma, axis=(0, 1))

    return log_pi_new, log_a_new, log_b_new

In [14]:
# testing m_step

log_pi_new, log_a_new, log_b_new = m_step(x=x, x_one_hot=x_one_hot, log_a=log_a_est, log_b=log_b_est, log_alpha=log_alpha, log_beta=log_beta, log_p_x_lambda=log_p_x_lambda)

  log_b_new[..., i] = logsumexp(log_gamma + np.expand_dims(np.log(x_one_hot[..., i]), axis=-1), axis=(0, 1)) - logsumexp(log_gamma, axis=(0, 1))


In [45]:
# initialize parameters (with fixed random.seed ...)

num_states = 2
num_observations = 3

pi_est = np.ones((num_states)) / num_states
np.random.seed(0)
a_est = np.random.uniform(low=0.25, high=0.75, size=(num_states, num_states))
b_est = np.random.uniform(low=0.25, high=0.75, size=(num_states, num_observations))
a_est = a_est / a_est.sum(axis=-1, keepdims=True)
b_est = b_est / b_est.sum(axis=-1, keepdims=True)
print(f"{pi_est=}")
print(f"{a_est=}")
print(f"{b_est=}")
log_pi_est, log_a_est, log_b_est = np.log(pi_est), np.log(a_est), np.log(b_est)

pi_est=array([0.5, 0.5])
a_est=array([[0.46325626, 0.53674374],
       [0.51347526, 0.48652474]])
b_est=array([[0.3071543 , 0.38105828, 0.31178742],
       [0.37224356, 0.3914712 , 0.23628524]])


In [46]:
# running Baum-Welch

NUM_EPOCHS = 500
for epoch in range(NUM_EPOCHS):
    _, log_alpha, log_beta, log_p_x_lambda = e_step(x=x, log_pi=log_pi_est, log_a=log_a_est, log_b=log_b_est)
    log_pi_est, log_a_est, log_b_est = m_step(x=x, x_one_hot=x_one_hot, log_a=log_a_est, log_b=log_b_est, log_alpha=log_alpha, log_beta=log_beta, log_p_x_lambda=log_p_x_lambda)
    log_p_X_lambda = log_p_x_lambda[:, 0].sum()
    print(f"{epoch=:05}: {log_p_X_lambda=}")

    # Note: log_p_X_lambda (the log-likelihood) should always increase, we could test for this and use the difference in log-likelihoods as a stopping criteria ...

  log_b_new[..., i] = logsumexp(log_gamma + np.expand_dims(np.log(x_one_hot[..., i]), axis=-1), axis=(0, 1)) - logsumexp(log_gamma, axis=(0, 1))


epoch=00000: log_p_X_lambda=-5475.4594027374205
epoch=00001: log_p_X_lambda=-5438.252450623976
epoch=00002: log_p_X_lambda=-5438.23565193709
epoch=00003: log_p_X_lambda=-5438.219753782536
epoch=00004: log_p_X_lambda=-5438.204678805732
epoch=00005: log_p_X_lambda=-5438.1903560875935
epoch=00006: log_p_X_lambda=-5438.176720459356
epoch=00007: log_p_X_lambda=-5438.163711899495
epoch=00008: log_p_X_lambda=-5438.151275002316
epoch=00009: log_p_X_lambda=-5438.139358507848
epoch=00010: log_p_X_lambda=-5438.127914885402
epoch=00011: log_p_X_lambda=-5438.116899963719
epoch=00012: log_p_X_lambda=-5438.1062726015
epoch=00013: log_p_X_lambda=-5438.095994393847
epoch=00014: log_p_X_lambda=-5438.0860294097565
epoch=00015: log_p_X_lambda=-5438.076343956816
epoch=00016: log_p_X_lambda=-5438.066906370639
epoch=00017: log_p_X_lambda=-5438.057686825122
epoch=00018: log_p_X_lambda=-5438.048657162162
epoch=00019: log_p_X_lambda=-5438.039790737895
epoch=00020: log_p_X_lambda=-5438.031062284081
epoch=00021: 

In [47]:
print(f"{np.exp(log_pi_est)=}")
print(f"{np.exp(log_a_est)=}")
print(f"{np.exp(log_b_est)=}")

np.exp(log_pi_est)=array([0.41666108, 0.58333892])
np.exp(log_a_est)=array([[0.63155233, 0.36844767],
       [0.32702993, 0.67297007]])
np.exp(log_b_est)=array([[0.11315723, 0.40124073, 0.48560203],
       [0.62583838, 0.27979874, 0.09436288]])


Sanity checks

In [48]:
# checking that the algorithm outputs valid parameters for our HMM (i.e. respecting their summation constraints)

assert np.allclose(np.exp(log_pi_est).sum(), 1)
assert np.allclose(np.exp(log_a_est).sum(axis=-1), 1)
assert np.allclose(np.exp(log_b_est).sum(axis=-1), 1)

Run Viterbi algorithm

In [49]:
# copying over Viterbi from Exercise 2

def viterbi_v(x, pi, a, b):
    num_states = pi.size
    seq_len = len(x)

    # let's work on the log domain (base 2)
    log_pi = np.log2(pi)
    log_a = np.log2(a)
    log_b = np.log2(b)

    log_delta = np.zeros((seq_len, num_states))
    psi = np.full((seq_len, num_states), -1, dtype=np.int32)

    # initilization
    log_delta[0] = log_pi + log_b[:, x[0]]

    for t in range(1, seq_len):
        # Sum log_delta[t-1] (array of log_delta in the previous step) with
        # log_a matrix. Since delta[t-1] is an array, we want to broadcast its
        # second dimension through the first dimension of log_a (which
        # represents the i state on a transition i->j). So that, in the end,
        # we have log_delta_a[i,j] = log_delta[t-1,i] + log_a[i,j]
        log_delta_a = log_delta[t - 1].reshape(-1, 1) + log_a

        # log_delta is the max value plus the emission probability on
        # time-step t. The max is taken along the first dimension, i.e., the max
        # along the previous state `i` for each state `j`.
        log_delta[t] = log_delta_a.max(axis=0) + log_b[:, x[t]]

        # psi is the argmax
        psi[t] = log_delta_a.argmax(axis=0)

    # now we recover the most likely sequence of states backwards, from the last
    # time step (in Python, -1 represents the last index of a list/array) up to
    # the first one.
    y = []
    # here we use the argmax() method of a numpy array to find the best
    # state on the last time step
    y.append(log_delta[-1].argmax())
    for t in range(seq_len - 1, 0, -1):
        y.append(psi[t, y[-1]])

    # the list y is backwards, so reverse it and return
    y.reverse()
    return y, log_delta, psi

In [50]:
example = np.array([WALK, SHOP, CLEAN, SHOP, WALK])
print(example)

[0 1 2 1 0]


In [51]:
y, _, _ = viterbi_v(x=example, pi=np.exp(log_pi_est), a=np.exp(log_a_est), b=np.exp(log_b_est))

In [52]:
print(y)

[1, 0, 0, 0, 1]


In [53]:
# interpretation: in the run above, states are reversed from exercise 1, suggesting, that state 1 might correspond to "Sunny" and state 0 might correspond to "Rainy"
# (this might be different for your run)

Comparison with ground truth model

In [54]:
# ground truth copied from exercise 1

pi = np.array([0.6, 0.4])

a = np.array(
    [
        [0.7, 0.3],  # transitions a_0j (from state 0 (Sunny) to some state j)
        [0.4, 0.6],  # transitions a_1j (from state 1 (Rainy) to some state j)
    ]
)

b = np.array(
    [
        [0.6, 0.3, 0.1],  # emissions b_0(k) (in state 0 (Sunny) emits k)
        [0.1, 0.4, 0.5],  # emissions b_1(k) (in state 1 (Rainy) emits k)
    ]
)

In [55]:
print(f"{np.exp(log_pi_est)=}")
print(f"{np.exp(log_a_est)=}")
print(f"{np.exp(log_b_est)=}")

np.exp(log_pi_est)=array([0.41666108, 0.58333892])
np.exp(log_a_est)=array([[0.63155233, 0.36844767],
       [0.32702993, 0.67297007]])
np.exp(log_b_est)=array([[0.11315723, 0.40124073, 0.48560203],
       [0.62583838, 0.27979874, 0.09436288]])


In [None]:
# interpretation: indeed, if one changes the ordering of states the match looks pretty decent ... (again, this depends on the run)