Date: 10/30/2024

In [None]:
# Connect to drive to access data
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
import numpy as np
import pandas as pd

# Part 1: Gaussian Mixtures

In [None]:
def calc_mahalanobis_dist(data, sigma, mu):

  U, D, _ = np.linalg.svd(sigma)
  D_tilde = np.diag(1/np.sqrt(D))

  data_tilde = (D_tilde @ U.T @ data.T).T
  data_tilde_reshaped = data_tilde[:, np.newaxis, :]

  mu_tilde = (D_tilde @ U.T @ mu.T).T
  mu_tilde_reshaped = mu_tilde[np.newaxis, :, :]

  mahalanobis_dist = np.sum((data_tilde_reshaped - mu_tilde_reshaped)**2, axis=2)

  return mahalanobis_dist

In [None]:
# Calculate responsibility matrix r_nk
def Estep(data, sigma, mu, p_k):

  n, p = data.shape
  G = mu.shape[0]

  # Compute Mahalanobis Dist: (x_i - mu_k)^T @ sigma^(-1) @ (x_i-mu_k)
  mahalanobis_dist = calc_mahalanobis_dist(data, sigma, mu)

  # Compute responsibility matrix r_nk
  exp_term = np.exp(-0.5 * mahalanobis_dist)
  const_term = 1/np.sqrt(((2 * np.pi)**p) * np.linalg.det(sigma))

  weighted_pdfs = p_k * const_term * exp_term
  denominator = np.sum(weighted_pdfs, axis=1, keepdims=True)

  r_nk = weighted_pdfs/denominator

  return r_nk

In [None]:
# Update parameters p_k, mu, sigma
def Mstep(data, r_nk):

  n, p = data.shape
  G = r_nk.shape[1]

  # Update mixing probability
  p_k = np.sum(r_nk, axis=0)/n

  # Update mean matrix
  mu = (r_nk.T @ data) / (np.sum(r_nk, axis=0)[:, np.newaxis])

  # Update covariance matrix
  sigma = np.zeros((p, p))

  for k in range(G):
    data_centered = data - mu[k]
    sigma = sigma + (r_nk[:, k][:, np.newaxis] * data_centered).T @ data_centered

  sigma = sigma/np.sum(r_nk)

  return p_k, mu, sigma

In [None]:
# Calculate log-likelihood
def loglik(data, sigma, mu, p_k):

  n, p = data.shape
  G = mu.shape[0]

  # 1. Compute Mahalanobis Dist: (x_i - mu_k)^T @ sigma^(-1) @ (x_i-mu_k)
  mahalanobis_dist = calc_mahalanobis_dist(data, sigma, mu)

  # 2. Calculate log-likelihood
  exp_term = np.exp(-0.5 * mahalanobis_dist)
  const_term = 1/np.sqrt(((2 * np.pi)**p) * np.linalg.det(sigma))

  weighted_pdfs = p_k * const_term * exp_term
  total_pdfs = np.sum(weighted_pdfs, axis=1)

  loglik_val = np.sum(np.log(total_pdfs))

  return loglik_val

In [None]:
def myEM(data, G, sigma, mu, p_k, itmax):

  for i in range(itmax):
    # E-step: calculate responsibilities
    r_nk = Estep(data, sigma, mu, p_k)

    # M-step: update parameters based on responsibilities
    p_k, mu, sigma = Mstep(data, r_nk)

  # Compute the final log-likelihood with updated parameters
  loglik_val = loglik(data, sigma, mu, p_k)

  return p_k, mu.T, sigma, loglik_val

## Testing

In [None]:
df = pd.read_csv("datasets/faithful.dat", delim_whitespace=True)
data = df.to_numpy()

In [None]:
data.shape

(272, 2)

In [None]:
data[:5]

array([[ 3.6  , 79.   ],
       [ 1.8  , 54.   ],
       [ 3.333, 74.   ],
       [ 2.283, 62.   ],
       [ 4.533, 85.   ]])

### Case 1: G=2

In [None]:
G = 2
n = data.shape[0]
itmax = 20

# Initial mixing probabilities p_k
p1 = 10/n
p2 = 1 - p1
p_k = np.array([p1, p2])

# Initial mean matrix
mu1 = np.mean(data[:10], axis=0)
mu2 = np.mean(data[10:], axis=0)
mu = np.array([mu1, mu2])

# Initial covariance matrix
centered_data1 = data[:10] - mu1
centered_data2 = data[10:] - mu2

sigma = ((centered_data1.T @ centered_data1) + (centered_data2.T @ centered_data2)) / n

# EM-algorithm
p_k, mu, sigma, loglik_val = myEM(data, G, sigma, mu, p_k, itmax)

In [None]:
p_k

array([0.04297883, 0.95702117])

In [None]:
mu

array([[ 3.49564188,  3.48743016],
       [76.79789154, 70.63205853]])

In [None]:
sigma

array([[  1.29793612,  13.92433626],
       [ 13.92433626, 182.58009247]])

In [None]:
loglik_val

-1289.56935494241

### Case 2: G=3

In [None]:
G = 3
itmax = 20

# Initial mixing probabilities p_k
p1 = 10/n
p2 = 20/n
p3 = 1 - p1 - p2
p_k = np.array([p1, p2, p3])

# Initial mean matrix
mu1 = np.mean(data[:10], axis=0)
mu2 = np.mean(data[10:30], axis=0)
mu3 = np.mean(data[30:], axis=0)
mu = np.array([mu1, mu2, mu3])

# Initial covariance matrix
centered_data1 = data[:10] - mu1
centered_data2 = data[10:30] - mu2
centered_data3 = data[30:] - mu3

cov1 = centered_data1.T @ centered_data1
cov2 = centered_data2.T @ centered_data2
cov3 = centered_data3.T @ centered_data3
sigma = (cov1 + cov2 + cov3) / n

# EM-algorithm
p_k, mu, sigma, loglik_val = myEM(data, G, sigma, mu, p_k, itmax)

In [None]:
p_k

array([0.04363422, 0.07718656, 0.87917922])

In [None]:
mu

array([[ 3.51006918,  2.81616674,  3.54564083],
       [77.10563811, 63.35752634, 71.25084801]])

In [None]:
sigma

array([[  1.26015772,  13.51153756],
       [ 13.51153756, 177.96419105]])

In [None]:
loglik_val

-1289.350958862738

# Part 2: HMM

## Baum-Welch Algorithm

In [None]:
def BW_Estep(X, mx, mz, w, A, B):
  # Calculate forward prob, alpha_t
  alpha_t = np.zeros((len(X), mz))
  alpha_t[0] = w * B[:, (X[0] - 1)]

  for t in range(1, len(X)):
      alpha_t[t, :] = (alpha_t[t-1, :] @ A) * B[:, (X[t] - 1)]

  # Calculate backward prob, beta_t1
  beta_t1 = np.zeros((len(X), mz))
  beta_t1[len(X) - 1] = np.ones(mz)

  for t in range(len(X)-2, -1, -1):
      beta_t1[t, :] = A @ (B[:, (X[t+1] - 1)] * beta_t1[t+1, :])

  # Calculate myGamma
  myGamma = np.zeros((len(X)-1, mz, mz))
  for t in range(len(X)-1):
    temp = np.outer(alpha_t[t], beta_t1[t+1])
    temp = temp * A
    myGamma[t] = temp * B[:, (X[t+1] - 1)]

  return myGamma

def BW_Mstep(X, mx, mz, A, B, myGamma):
  # Update A matrix
  numerator = np.sum(myGamma, axis=0)
  A_update = numerator / np.sum(numerator, axis=1, keepdims=True)

  # Calculate gamma_i matrix for B update
  gamma_ti = np.zeros((len(X), mz))

  for t in range(len(X)-1):
     gamma_ti[t] = np.sum(myGamma[t], axis=1)

  # Calculate last element of gamma_i seperately since it sums over first axis
  gamma_ti[-1] = np.sum(myGamma[-1], axis=0)

  # Update B matrix
  B_update = np.zeros((mz, mx))
  for i in range(mz):
    for l in range(mx):
      numerator = 0
      for t in range(len(X)):
        if X[t] == l+1:
          numerator += gamma_ti[t, i]
      B_update[i, l] = numerator / np.sum(gamma_ti[:, i])

  return A_update, B_update

def BW_onestep(X, mx, mz, w, A, B):
  # E-Step
  myGamma = BW_Estep(X, mx, mz, w, A, B)

  # M-Step
  A_update, B_update = BW_Mstep(X, mx, mz, A, B, myGamma)

  return A_update, B_update

# Run for 100 iterations
def myBW(data, mx, mz, w, A, B):
  for i in range(100):
    A, B = BW_onestep(data, mx, mz, w, A, B)

  return A, B

## Viterbi Algorithm

In [None]:
# Make sure to run calculations in log scale, so use sums of logs instead of log of product
# When outputing sequence path make sure to calculate outputs correctly, e.g. +1 if needed
# Function assumes we have already ran Baum-Welch to get MLE of A, B
def myViterbi(X, mx, mz, w, A, B):
  # Calculate delta_ti
  delta_ti = np.zeros((len(X), mz))
  #delta_ti[0] = np.log(w * B[:, (X[0] - 1)])
  delta_ti[0] = np.log(w) + np.log(B[:, (X[0] - 1)])

  for t in range(1, len(X)):
    for i in range(mz):
      max_prev = max(delta_ti[t-1, j] + np.log(A[j, i]) for j in range(mz))
      # delta_ti[t, i] = np.log(np.exp(max_prev) * B[i, (X[t] - 1)])
      delta_ti[t, i] = max_prev + np.log(B[i, (X[t] - 1)])

  # Calculate Z* hat, which is the most likely single sequence backwards
  Z_hat = np.zeros(len(X), dtype=int)
  Z_hat[-1] = np.argmax(delta_ti[-1]) + 1
  for t in range(len(X)-2, -1, -1):
    Z_hat[t] = np.argmax(delta_ti[t] + np.log(A[:, (Z_hat[t+1] - 1)])) + 1

  return Z_hat

## Testing

In [None]:
# 1. Testing with given intial values

# Initalizing values & Loading Data
w =  np.array([0.5, 0.5])
A = np.array([[0.5, 0.5],
              [0.5, 0.5]])
B = np.array([[1/9, 3/9, 5/9],
              [1/6, 2/6, 3/6]])
mz = 2

x = np.loadtxt("datasets/coding4_part2_data.txt", dtype=int)
mx = len(np.unique(x))

# Testing Baum-Welch

A, B = myBW(x, mx, mz, w, A, B)
print("A: the 2-by-2 transition matrix ")
print(A)
print("B: the 2-by-3 emission matrix ")
print(B)


A: the 2-by-2 transition matrix 
[[0.49793938 0.50206062]
 [0.44883431 0.55116569]]
B: the 2-by-3 emission matrix 
[[0.22159897 0.20266127 0.57573976]
 [0.34175148 0.17866665 0.47958186]]


We see that the results of our Baum-Welch Algorithm matches the results given in the assignment description.

In [None]:
# Testing Viterbi, make sure to run the BW algorithm first (i.e. the cell above) to avoid issues with results not matching up
with open("datasets/Coding4_part2_Z.txt", "r") as f:
    file_content = f.read()
viterbi_actual = np.array([int(x) for x in file_content.replace("\n", " ").split()])


viterbi_predicted = myViterbi(x, mx, mz, w, A, B)

print("Number of mismatches:")
print(np.sum(viterbi_actual != viterbi_predicted))
print("Predicted sequence:")
print(viterbi_predicted)
print("Actual sequence:")
print(viterbi_actual)

Number of mismatches:
0
Predicted sequence:
[1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1 2 1 1
 1 1 1 1 1 1 2 2 1 1 1 1 1 1 2 2 2 1 1 1 1 2 2 2 2 1 1 1 1 1 1 1 1 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 2 2 2 2 2 2 1 1 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 2 2 1 1 1 1 1
 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1]
Actual sequence:
[1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1 2 1 1
 1 1 1 1 1 1 2 2 1 1 1 1 1 1 2 2 2 1 1 1 1 2 2 2 2 1 1 1 1 1 1 1 1 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 2 2 2 2 2 2 1 1 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 2 2 1 1 1 1 1
 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1]


We see that the result from the viterbi algorithm also match the assignment description.

In [None]:
# 2. Testing with B having all entries as 1/3
w =  np.array([0.5, 0.5])
A = np.array([[0.5, 0.5],
              [0.5, 0.5]])
mz = 2
B = np.array([[1/3, 1/3, 1/3],
              [1/3, 1/3, 1/3]])

for i in range(20):
  A, B = BW_onestep(x, mx, mz, w, A, B)
print("A: the 2-by-2 transition matrix ")
print(A)
print("B: the 2-by-3 emission matrix ")
print(B)

A: the 2-by-2 transition matrix 
[[0.5 0.5]
 [0.5 0.5]]
B: the 2-by-3 emission matrix 
[[0.285 0.19  0.525]
 [0.285 0.19  0.525]]


Since we initialize B with all entries as $\dfrac{1}{3}$ we see that, the matrix A remains the same as its initial representation and the matrix B has two identical rows. Making the latent states indistinguishable, makes it impossible for the Baum-Welch algorithm to differentiate between latent states. This is because when calculating the forward & backward probabilities and then the myGamma matrix, we just get a matrix that will end up scaling A or B by some amount. In this case, the A matrix was just scaled by 1 while the B matrix was scaled in a way the emission probabilities from both possible states of Z are identical. Then as we continue looping, the probabilities still remain in a way that makes it impossible to distinguish between latent states so then we continue to get updates that just scale the A & B matrices.