In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import re


Coding Baum-Welch Algorithm from scratch

In [None]:
def forward(states, sequence, a, b, pi, key):
    N = len(states)
    T = len(sequence)
    pi = pi[key] # prob of state i, since 2 states, let's half it be 0.5, 0.5 initially
    i = key # holds the first state

    # Pseudocount to handle zeros
    pseudocount = 1e-100
    # for all possible states, and the first actual state (alpha)
    # i.e. alpha i for all i has been caluclated given yt
    alpha = np.zeros((N, T))
    alpha[:,0] = pi * b[:,int(sequence[0])] + pseudocount


    # next, we have to do iterations to calculate alpha at different times t
    # we need all alpha values since it is going to be summed up to calculate gamma
    
    for t in range(1, T):
        for j in range(N):
            alpha[j][t] = sum(alpha[i][t-1]*a[i][j]*b[j][int(sequence[t])] for i in range(N)) + pseudocount

    return alpha

In [None]:
def backward(states, sequence, a, b):
    N = len(states)
    T = len(sequence)
    beta = np.zeros((N, T))

    # Pseudocount to handle zeros
    pseudocount = 1e-100

    # Initialization
    beta[:, -1] = 1  # Set the last column to 1

    # Recursion
    for t in range(T - 2, -1, -1):
        for i in range(N):
            beta[i, t] = sum(a[i, j] * b[j, int(sequence[t + 1])] * beta[j, t + 1] for j in range(N)) + pseudocount

    return beta

In [None]:
def train(a, b, pi, sequence, states, key, n_iterations = 100, tol=1e-6):
    #Baum-Welch algorithm for HMM
    # calculate gamma, xi, and then update a and b parameters
    N = len(states)
    T = len(sequence)
    
    # M is the number of possible observations i.e. number of columns
    M = b.shape[1]

    prev_log_likelihood = 0

    for iteration in range(n_iterations):
        alpha = forward(states, sequence, a, b, pi, key)
        beta = backward(states, sequence, a, b)

        print(f"Alpha: {alpha}")
        print(f"Beta:{beta}")

        # Pseudocount to handle zeros
        pseudocount = 1e-100
        gamma = alpha * beta
        # print(gamma)
        denominator = np.sum(gamma, axis=0, keepdims=True) # same for all i
        gamma = gamma/denominator + pseudocount

        print(f"gamma:{gamma}") 

        xi = np.zeros((N, N, T - 1))

        for i in range(N):
            for j in range(N):
                for t in range(T - 1):
                    numerator = alpha[i, t] * a[i, j] * b[j, int(sequence[t + 1])] * beta[j, t + 1]
                    denominator = np.sum(alpha[k, t] * a[k, l] * b[l, int(sequence[t + 1])] * beta[l, t + 1] for k in range(N) for l in range(N))
                    xi[i, j, t] = (numerator / denominator) + pseudocount

        print(f"Xi: {xi}")


        # update a and b
        # M-step
        '''
        sequence == k creates a boolean array of the same length as sequence, where each element is True if the corresponding element in sequence is equal to k, and False otherwise.
    mask = (sequence == k) assigns this boolean array to the variable mask.
    In the context of the Baum-Welch algorithm or similar algorithms for Hidden Markov Models (HMMs), this kind of mask is often used to select specific observations in the computation of probabilities. For example, 
    it might be used to sum over only the observations that match a particular value, which is relevant when updating the emission matrix b.
        '''
        # a = (np.sum(xi, axis=2) + pseudocount)/ np.sum(gamma[:, :-1], axis=1, keepdims=True) 
        for i in range(N):  # N is the number of states
            for j in range(N):  # N is the number of states
                numerator = np.sum(xi[i, j, :])
                denominator = np.sum(gamma[i, :])
                a[i, j] = (numerator+pseudocount) / (denominator+pseudocount) 


        b = np.zeros((N, M))
        # print(gamma.shape)
        gamma_sum = np.sum(gamma, axis=1)
        
        obs = []
        for i in sequence:
            obs.append(int(i))
        obs = np.array(obs)

        for j in range(N):
            for k in range(M):
                mask = (obs==k) # for indicative function i.e. 1 if observed = yt, else 0
                b[j, k] = (np.sum(gamma[j]*mask)+ pseudocount) / (np.sum(gamma[j]) + pseudocount) 
        

        # Normalize rows to ensure each row sums to 1.0
        a = a / np.sum(a, axis=1)[:, np.newaxis]
        b = b / np.sum(b, axis=1)[:, np.newaxis]

        print(f"a = {a}, b = {b}")

        # Log Likelihood Calculation
        log_likelihood = np.sum(np.log(np.sum(alpha, axis=0)))

        # Convergence Check
        if np.abs(log_likelihood - prev_log_likelihood) < tol:
            print(f"Converged after {iteration + 1} iterations.")
            break

        prev_log_likelihood = log_likelihood

    return a, b, pi

In [None]:
def predict(sequence, states, a, b, pi):
    # Makes use of the viterbi algorithm to predict best path
    # Initialize Variables
    T = len(sequence)
    N = len(states)

    # Pseudocount to handle zeros
    pseudocount = 1e-100

    viterbi_table = np.zeros((N, T)) # delta
    backpointer = np.zeros((N, T)) # psi

    # Initialization step, for t = 0
    print(int(sequence[0]))
    viterbi_table[:, 0] = pi * b[:, int(sequence[0])] + pseudocount

    # Calculate Probabilities
    for t in range(1, T):
        for s in range(N):
            
            max_prob = max(viterbi_table[prev_s][t-1] * a[prev_s][s] for prev_s in range(N)) * b[s][int(sequence[t])] 
            viterbi_table[s][t] = max_prob + pseudocount
            backpointer[s][t] = np.argmax([viterbi_table[prev_s][t-1] * a[prev_s][s]for prev_s in range(N)])

    #Traceback and Find Best Path
    best_path = []
    last_state = np.argmax(viterbi_table[:, -1])

    best_path.append(last_state)
    best_prob = 1.0
    for t in range(T-2, -1, -1):
        last_state = last_state = np.argmax(viterbi_table[:, t])
        best_prob *= (viterbi_table[last_state, t] + pseudocount)
        best_path.append(last_state) # i.e. add to start of list

        
    return best_path


Explain this, prepare some more datasets, read them from csv files etc at least shows data prep etc
Talk about Baum-Welch algorithm and viterbi and show code for it (even though it does not work. Can add a small note towards the end of the blog and say you are still debugging its implementation that would be cute)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

from hmmlearn import hmm

gen_model = hmm.CategoricalHMM(n_components=2, random_state=99)


gen_model.startprob_ = np.array([1.0, 0.0])

gen_model.transmat_ = np.array([[0.95, 0.05],
                                [0.1, 0.9]])

gen_model.emissionprob_ = \
    np.array([[1 / 6, 1 / 6, 1 / 6, 1 / 6, 1 / 6, 1 / 6],
              [1 / 10, 1 / 10, 1 / 10, 1 / 10, 1 / 10, 1 / 2]])

# simulate the loaded dice rolls
rolls, gen_states = gen_model.sample(30000)

print("ROLLS",rolls)

# plot states over time, let's just look at the first rolls for clarity
fig, ax = plt.subplots()
ax.plot(gen_states[:500])
ax.set_title('States over time')
ax.set_xlabel('Time (# of rolls)')
ax.set_ylabel('State')
fig.show()

# plot rolls for the fair and loaded states
fig, ax = plt.subplots()
ax.hist(rolls[gen_states == 0], label='fair', alpha=0.5,
        bins=np.arange(7) - 0.5, density=True)
ax.hist(rolls[gen_states == 1], label='loaded', alpha=0.5,
        bins=np.arange(7) - 0.5, density=True)
ax.set_title('Roll probabilities by state')
ax.set_xlabel('Count')
ax.set_ylabel('Roll')
ax.legend()
fig.show()

# %%
# Now, let's see if we can recover our hidden states, transmission matrix
# and emission probabilities.

# split our data into training and validation sets (50/50 split)
X_train = rolls[:rolls.shape[0] // 2]
X_validate = rolls[rolls.shape[0] // 2:]

# check optimal score
gen_score = gen_model.score(X_validate)

best_score = best_model = None
n_fits = 50
np.random.seed(13)
for idx in range(n_fits):
    model = hmm.CategoricalHMM(
        n_components=2, random_state=idx,
        init_params='se')  # don't init transition, set it below
    # we need to initialize with random transition matrix probabilities
    # because the default is an even likelihood transition
    # we know transitions are rare (otherwise the casino would get caught!)
    # so let's have an Dirichlet random prior with an alpha value of
    # (0.1, 0.9) to enforce our assumption transitions happen roughly 10%
    # of the time
    model.transmat_ = np.array([np.random.dirichlet([0.9, 0.1]),
                                np.random.dirichlet([0.1, 0.9])])
    model.fit(X_train)
    score = model.score(X_validate)
    print(f'Model #{idx}\tScore: {score}')
    if best_score is None or score > best_score:
        best_model = model
        best_score = score

print(f'Generated score: {gen_score}\nBest score:      {best_score}')

# use the Viterbi algorithm to predict the most likely sequence of states
# given the model
states = best_model.predict(rolls)

# plot our recovered states compared to generated (aim 1)
fig, ax = plt.subplots()
ax.plot(gen_states[:500], label='generated')
ax.plot(states[:500] + 1.5, label='recovered')
ax.set_yticks([])
ax.set_title('States compared to generated')
ax.set_xlabel('Time (# rolls)')
ax.set_xlabel('State')
ax.legend()
fig.show()

# %%
# Let's check our learned transition probabilities and see if they match.

print(f'Transmission Matrix Generated:\n{gen_model.transmat_.round(3)}\n\n'
      f'Transmission Matrix Recovered:\n{best_model.transmat_.round(3)}\n\n')

# %%
# Finally, let's see if we can tell how the die is loaded.

print(f'Emission Matrix Generated:\n{gen_model.emissionprob_.round(3)}\n\n'
      f'Emission Matrix Recovered:\n{best_model.emissionprob_.round(3)}\n\n')

# %%
# In this case, we were able to get very good estimates of the transition and
# emission matrices, but decoding the states was imperfect. That's because
# the decoding algorithm is greedy and picks the most likely series of states
# which isn't necessarily what happens in real life. Even so, our model could
# tell us when to watch for the loaded die and we'd have a better chance at
# catching them red-handed.

In [None]:
# Additionally, do tests with comparison so you can do confusion matrix, roc curve or something, but i feel like hmm model would be nice and confusion matrix
# cri time