In [56]:
import pandas as pd
import numpy as np
import collections

In [60]:
file = open('/Users/swimmingcircle/cs156_code/session24/speaker.txt', 'r').read()

symbols = {'A':0, 'o':1, 'e':2, 't':3, 'p':4, 'g':5, 'k':6}
nums = np.array([symbols[i] for i in file])

In [61]:
collections.Counter(nums)

Counter({2: 141, 1: 188, 5: 121, 0: 176, 4: 167, 3: 168, 6: 39})

In [69]:
def initial_prob_dist(nums):
    '''Initial prob distribution'''
    max_num = nums.max()
    length = len(nums)
    counter=collections.Counter(nums)
    dist = []
    for i in range(max_num+1): 
        dist.append(counter[i]/length) 
    return dist 
initial_dist = initial_prob_dist(nums)

In [41]:
def transition_matrix(transitions):
    '''
    the following code takes a list such as
    [1,1,2,6,8,5,5,7,8,8,1,1,4,5,5,0,0,0,1,1,4,4,5,1,3,3,4,5,4,1,1]
    with states labeled as successive integers starting with 0
    and returns a transition matrix, M,
    where M[i][j] is the probability of transitioning from i to j
    Resource: https://stackoverflow.com/questions/46657221/generating-markov-transition-matrix-in-python
    '''

    n = 1+ max(transitions) #number of states

    M = [[0]*n for _ in range(n)]

    for (i,j) in zip(transitions,transitions[1:]):
        M[i][j] += 1

    #now convert to probabilities:
    for row in M:
        s = sum(row)
        if s > 0:
            row[:] = [f/s for f in row]
    return M

def print_matrix(m): 
    for row in m: 
        print(' '.join('{0:.2f}'.format(x) for x in row))
    return None
M = transition_matrix(nums)

In [55]:
print_matrix(M)
M = np.array(M)

0.31 0.12 0.05 0.31 0.09 0.09 0.04
0.15 0.28 0.09 0.06 0.31 0.06 0.04
0.11 0.16 0.29 0.09 0.08 0.26 0.03
0.27 0.11 0.09 0.36 0.08 0.05 0.04
0.09 0.33 0.09 0.05 0.31 0.09 0.04
0.07 0.12 0.29 0.11 0.12 0.25 0.06
0.26 0.13 0.23 0.18 0.10 0.08 0.03


- The emission probabilities: are the maximum likelihood estimates of the letters in each column. 
- The transition probabilities: are obtained by counting the number of times each transition would be taken.

In [28]:
def forward(V, a, b, initial_distribution):
    alpha = np.zeros((V.shape[0], a.shape[0]))
    alpha[0, :] = initial_distribution * b[:, V[0]]
    for t in range(1, V.shape[0]):
        for j in range(a.shape[0]):
            # Matrix Computation Steps
            #                  ((1x2) . (1x2))      *     (1)
            #                        (1)            *     (1)
            alpha[t, j] = alpha[t - 1] @ a[:, j] * b[j, V[t]]
 
    return alpha

In [29]:
def backward(V, a, b):
    beta = np.zeros((V.shape[0], a.shape[0]))
 
    # setting beta(T) = 1
    beta[V.shape[0] - 1] = np.ones((a.shape[0]))
 
    # Loop in backward way from T-1 to
    # Due to python indexing the actual loop will be T-2 to 0
    for t in range(V.shape[0] - 2, -1, -1):
        for j in range(a.shape[0]):
            beta[t, j] = (beta[t + 1] * b[:, V[t + 1]]) @ a[j, :]
 
    return beta

In [30]:
def baum_welch(V, a, b, initial_distribution, n_iter=100):
    M = a.shape[0]
    T = len(V)

    for n in range(n_iter):
        ###estimation step
        alpha = forward(V, a, b, initial_distribution)
        beta = backward(V, a, b)

        xi = np.zeros((M, M, T - 1))
        for t in range(T - 1):
            # joint probab of observed data up to time t @ transition prob * emisssion prob as t+1 @
            # joint probab of observed data from time t+1
            denominator = (alpha[t, :].T @ a * b[:, V[t + 1]].T) @ beta[t + 1, :]
            for i in range(M):
                numerator = alpha[t, i] * a[i, :] * b[:, V[t + 1]].T * beta[t + 1, :].T
                xi[i, :, t] = numerator / denominator

        gamma = np.sum(xi, axis=1)
        ### maximization step
        a = np.sum(xi, 2) / np.sum(gamma, axis=1).reshape((-1, 1))

        # Add additional T'th element in gamma
        gamma = np.hstack((gamma, np.sum(xi[:, :, T - 2], axis=0).reshape((-1, 1))))

        K = b.shape[1]
        denominator = np.sum(gamma, axis=1)
        for l in range(K):
            b[:, l] = np.sum(gamma[:, V == l], axis=1)

        b = np.divide(b, denominator.reshape((-1, 1)))

    return a, b

In [50]:
print_matrix(M)

0.31 0.12 0.05 0.31 0.09 0.09 0.04
0.15 0.28 0.09 0.06 0.31 0.06 0.04
0.11 0.16 0.29 0.09 0.08 0.26 0.03
0.27 0.11 0.09 0.36 0.08 0.05 0.04
0.09 0.33 0.09 0.05 0.31 0.09 0.04
0.07 0.12 0.29 0.11 0.12 0.25 0.06
0.26 0.13 0.23 0.18 0.10 0.08 0.03


In [77]:
# # Transition Probabilities
a = np.ones((3, 3))
a = a / np.sum(a, axis=1)

# Emission Probabilities
b = np.array(((1, 3, 5, 7, 9, 11, 13), (2, 4, 6, 8 , 10, 12, 14),(2, 4, 6, 8 , 10, 12, 14) ))
b = b / np.sum(b, axis=1).reshape((-1, 1))

# Equal Probabilities for the initial distribution
initial_distribution = np.array(initial_dist)

n_iter = 100
a_model, b_model = baum_welch(nums.copy(), a.copy(), b.copy(), initial_distribution.copy(), n_iter=n_iter)
print(f'Custom model A is \n{a_model} \n \nCustom model B is \n{b_model}')

ValueError: operands could not be broadcast together with shapes (7,) (3,) 

In [None]:
initial_distribution * b[:, V[0]]

In [80]:
initial_distribution 

array([0.176, 0.188, 0.141, 0.168, 0.167, 0.121, 0.039])

In [83]:
b[:, nums[0]]

array([0.10204082, 0.10714286, 0.10714286])

In [74]:
np.array((1, 3, 5), (2, 4, 6))

TypeError: Tuple must have size 2, but has size 3

In [37]:
b

array([[0.11111111, 0.33333333, 0.55555556],
       [0.16666667, 0.33333333, 0.5       ]])

array([0.5, 0.5])

## Solution

In [1]:
import numpy as np

# Observations (1 = Late, 0 = On Time)
obs = np.array([1,1,0,0,1,0,1])

# Initializations
# Make these values sensible
x0 = np.array([0.5, 0.5])

# Transition matrix A_ij: this is the probability that we end up in state i, given we were in state j
A = np.array([[0.9, 0.1],[0.9, 0.1]])

# Emission Matrix B_jk: this is the probability
B = np.array([[0.5, 0.5],[0.5, 0.5]])

# Forward pass to calculate the alphas
def calc_alpha(x0, A, B, y):
    #Initialize
    n = len(x0)
    N = len(y)
    alpha = np.zeros((n,N))
    alpha[:,0] = x0 * B[:,y[0]] #t = 0, the value of hidden state given the observed state 
    for t in range(1,N):
        alpha[:,t] = (A.T @ alpha[:,t-1]) * B[:,y[t]]
    return alpha

# Backward pass to calculate the betas
def calc_beta(x0, A, B, y):
    #Initialize
    n = len(x0)
    N = len(y)
    beta = np.zeros((n,N))
    beta[:,N-1] = np.ones(n)
    for t in range(N-1,0,-1):
        beta[:,t-1] = A @ (beta[:,t] * B[:,y[t]])
    return beta

# E step
def e_step(x0, A, B, y):
    n = len(x0)
    N = len(y)
    alpha = calc_alpha(x0, A, B, y)
    beta = calc_beta(x0, A, B, y)
    # Marginal probability of all observations
    p_y = sum(alpha[:,-1])
    gamma = (alpha * beta) / p_y
    xi = alpha[:,np.newaxis,0:-1] * A[:,:,np.newaxis] * B[np.newaxis,0:,y[1:]] * beta[np.newaxis,0:,1:] / p_y
    return gamma, xi

# M step
def m_step(gamma, xi, y):
    symbols = np.unique(y)
    A_new = np.sum(xi, axis = 2)/np.sum(xi, axis = (1,2))[:,np.newaxis]
    B_new = np.zeros(B.shape)
    for k in range(0,len(symbols)):
        B_new[:,k] = np.sum(gamma[:,y == symbols[k]], axis = 1) / np.sum(gamma, axis = 1)
    return A_new, B_new

# Run the EM algorithm
for steps in range(0,15):
    [gamma, xi] = e_step(x0, A, B, obs)
    [A,B] = m_step(gamma, xi, obs)
    
print(A)
print(B)

[[0.99243258 0.00756742]
 [0.76214952 0.23785048]]
[[4.95835849e-01 5.04164151e-01]
 [6.54703366e-08 9.99999935e-01]]
