Baum-Welch (i.e., EM) algorithm and the Viterbi algorithm from scratch for a Hidden Markov Model that produces an outcome sequence of discrete random variables with three distinct values.  

Parameters for Discrete HMM:  
• mx: Count of distinct values X can take.  
• mz: Count of distinct values Z can take.  
• w: An mz-by-1 probability vector representing the initial distribution for Z1.  
• A: The mz-by-mz transition probability matrix that models the progression from Zt to Zt+1.  
• B: The mz-by-mx emission probability matrix, indicating how X is produced from Z.  

Parameters A and B get updated and the value for mx is given and mz is specified.  

## Baum-Welch Algorihtm

In [None]:
import numpy as np

# Forward
def forward(data, A, B, w):
    T = len(data)
    mz = len(A)
    alpha = np.zeros((T, mz))
    alpha[0] = w * B[:, data[0] - 1]

    for t in range(1, T):
        for j in range(mz):
            alpha[t, j] = np.sum(alpha[t-1] * A[:, j]) * B[j, data[t] - 1]

    return alpha

# Backward
def backward(data, A, B):
    T = len(data)
    mz = len(A)
    beta = np.zeros((T, mz))
    beta[T-1] = 1

    for t in range(T-2, -1, -1):
        for i in range(mz):
            beta[t, i] = np.sum(beta[t+1] * A[i, :] * B[:, data[t+1] - 1])

    return beta

# Baum-Welch EM Algorithm
def BW_onestep(data, A, B, w):
    alpha = forward(data, A, B, w)
    beta = backward(data, A, B)
    T = len(data)
    mz, mx = B.shape

    # gamma and xi
    gamma = np.zeros((T, mz))
    xi = np.zeros((T-1, mz, mz))

    for t in range(T-1):
        denom = np.sum(alpha[t] * beta[t])
        gamma[t] = (alpha[t] * beta[t]) / denom
        for i in range(mz):
            xi[t, i] = alpha[t, i] * A[i] * B[:, data[t+1] - 1] * beta[t+1] / denom

    gamma[T-1] = (alpha[T-1] * beta[T-1]) / np.sum(alpha[T-1] * beta[T-1])

    # A, B update
    A = np.sum(xi, axis=0) / np.sum(gamma[:-1], axis=0)[:, None]
    for k in range(mx):
        B[:, k] = np.sum(gamma[np.array(data) == k + 1], axis=0) / np.sum(gamma, axis=0)

    return A, B

def myBW(data, A, B, w, iterations):
    for _ in range(iterations):
        A, B = BW_onestep(data, A, B, w)
    return A, B


## Viterbi Algorihtm

In [None]:
# Viterbi
def myViterbi(data, A, B, w):
    T = len(data)
    mz = len(A)
    log_prob = np.zeros((T, mz))
    path = np.zeros((T, mz), dtype=int)

    log_prob[0] = np.log(w) + np.log(B[:, data[0] - 1])

    for t in range(1, T):
        for j in range(mz):
            prob = log_prob[t-1] + np.log(A[:, j]) + np.log(B[j, data[t] - 1])
            path[t, j] = np.argmax(prob)
            log_prob[t, j] = np.max(prob)

    Z = np.zeros(T, dtype=int)
    Z[T-1] = np.argmax(log_prob[T-1])
    for t in range(T-2, -1, -1):
        Z[t] = path[t+1, Z[t+1]]

    return Z

## Testing

### Test 1

In [None]:
import urllib.request
url = "data.txt"
response = urllib.request.urlopen(url)
data = list(map(int, response.read().decode('utf-8').split()))

w = np.array([0.5, 0.5])
A = np.array([[0.5, 0.5], [0.5, 0.5]])
B = np.array([[1/9, 3/9, 5/9], [1/6, 2/6, 3/6]])


iterations = 100
A_updated, B_updated = myBW(data, A, B, w, iterations)

print("Updated Transition Matrix A after Baum-Welch:")
print(A_updated)
print("\nUpdated Emission Matrix B after Baum-Welch:")
print(B_updated)

Z = myViterbi(data, A_updated, B_updated, w)
Z = Z + 1
print("\nState Sequence from Viterbi Algorithm:")
print(Z)

url2 = "Z.txt"
response2 = urllib.request.urlopen(url2)
Z_true = list(map(int, response2.read().decode('utf-8').split()))
Z_true = np.array(Z_true)
print("\nTrue State Sequence:")
print(Z_true)

Updated Transition Matrix A after Baum-Welch:
[[0.49793938 0.50206062]
 [0.44883431 0.55116569]]

Updated Emission Matrix B after Baum-Welch:
[[0.22159897 0.20266127 0.57573976]
 [0.34175148 0.17866665 0.47958186]]

State Sequence from Viterbi Algorithm:
[1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1 2 1 1
 1 1 1 1 1 1 2 2 1 1 1 1 1 1 2 2 2 1 1 1 1 2 2 2 2 1 1 1 1 1 1 1 1 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 2 2 2 2 2 2 1 1 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 2 2 1 1 1 1 1
 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1]

True State Sequence:
[1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1 2 1 1
 1 1 1 1 1 1 2 2 1 1 1 1 1 1 2 2 2 1 1 1 1 2 2 2 2 1 1 1 1 1 1 1 1 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 2 2

### Test 2

In [None]:
w = np.array([0.5, 0.5])
A = np.array([[0.5, 0.5], [0.5, 0.5]])
B = np.array([[1/3, 1/3, 1/3], [1/3, 1/3, 1/3]])


iterations = 20
A_updated, B_updated = myBW(data, A, B, w, iterations)

print("Updated Transition Matrix A after Baum-Welch:")
print(A_updated)
print("\nUpdated Emission Matrix B after Baum-Welch:")
print(B_updated)

Updated Transition Matrix A after Baum-Welch:
[[0.5 0.5]
 [0.5 0.5]]

Updated Emission Matrix B after Baum-Welch:
[[0.285 0.19  0.525]
 [0.285 0.19  0.525]]


In [None]:
iterations = 100
A_updated, B_updated = myBW(data, A, B, w, iterations)

print("Updated Transition Matrix A after Baum-Welch:")
print(A_updated)
print("\nUpdated Emission Matrix B after Baum-Welch:")
print(B_updated)

Updated Transition Matrix A after Baum-Welch:
[[0.5 0.5]
 [0.5 0.5]]

Updated Emission Matrix B after Baum-Welch:
[[0.285 0.19  0.525]
 [0.285 0.19  0.525]]


The updated A and B matrices from the EM algorithm are the same after both 20 and 100 iterations, which shows that the matrices are not getting updated properly. If the emission matrix B has the same values for different hidden states, it cannot tell the difference between the probabilities of getting different observations. Because of this, the B matrix does not update well since the algorithm cannot tell the hidden states apart based on the observations. This issue causes both the A and B matrices to stay mostly the same.  