# Coding Assignment 4

CS 598 Practical Statistical Learning

2023-11-06

UIUC Fall 2023

**Authors**
* Ryan Fogle
    - rsfogle2@illinois.edu
    - UIN: 652628818
* Sean Enright
    - seanre2@illinois.edu
    - UIN: 661791377

**Contributions**

Part I:
- Ryan contributed to implementing the E-step, Sean contributed to to refactoring and completely implementing the EM algorithm.


Part II:
- Sean implemented the Baum-Welch Algorithm, Ryan implemented the Viterbi Algorithm. 

## Part 1: Gaussian Mixtures

In [1]:
import numpy as np
import pandas as pd

# Set random seed to the last four digits of our UINs
np.random.seed(8818 + 1377 + 1)

### Define Functions

In [2]:
def Estep(x: np.ndarray, G: int, pi: np.ndarray, mu: np.ndarray, sigma: np.ndarray):
    """EM algorithm expectation step. Here we estimate the latent variables based on the previous
    estimates of theta to build a responsibility matrix.

    Args:
        x (np.ndarray): Data matrix, (n, p)
        G (int): Number of classes
        pi (np.ndarray): Mixing weights, (G,)
        mu (np.ndarray): Mean values for each class, (p, G)
        sigma (np.ndarray): Shared covariance matrix, (p, p)
    
    Returns:
        np.ndarray: The responsibility matrix of shape (n, G)
    """
    resp = np.zeros((x.shape[0], G))
    for k in range(G):
        resp[:, k] = pi[k] * multivariate_normal_density(x, mu[:, k], sigma)
    return resp / resp.sum(axis=1).reshape(-1, 1)

def Mstep(x: np.ndarray, G: int, resp: np.ndarray, mu: np.ndarray):
    """EM algorithm maximization step.

    Args:
        x (np.ndarray): Data matrix, (n, p)
        G (int): Number of classes
        resp (np.ndarray): Responsibility matrix, (n, G)
        mu (np.ndarray): Mean values per dimension, (p, G)
    
    Returns:
        pi_new (np.ndarray): Updated mixing weights, (G,)
        mu_new (np.ndarray): Updated mean values per dimension, (p, G)
        sigma_new (np.ndarray): Updated covariance matrix, (p, p)
    """
    n = x.shape[0]
    # Pi
    pi_new = resp.sum(axis=0) / n
    # Mu
    mu_new = (x.T @ resp) / resp.sum(axis=0)
    # Sigma
    sigma_new = np.zeros(sigma.shape)
    for k in range(G):
        tmp = x.T - mu_new[:, k].reshape(-1, 1)
        #sigma_new += pi_new[k] * (resp[:, k] * A_mu) @ A_mu.T / resp[:, k].sum()
        sigma_new += pi_new[k] * tmp @ np.diag(resp[:, k]) @ tmp.T / resp[:, k].sum()
    return pi_new, mu_new, sigma_new

def loglik(x: np.ndarray, G: int, pi: np.ndarray, mu: np.ndarray, sigma: np.ndarray):
    """Calculate log likelihood, given distribution parameters.

    Args:
        x (np.ndarray): Input data, shape (n, p)
        G (int): Number of classes
        pi (np.ndarray): Mixing weights, shape (G,)
        mu (np.ndarray): Distribution means for each class, shape (p, G)
        sigma (np.ndarray): Shared covariance matrix, shape (p, p)

    Returns:
        float: log likelihood
    """
    ll = np.zeros(x.shape[0])
    for k in range(G):
        ll += pi[k] * multivariate_normal_density(x, mu[:, k], sigma)
    return np.log(ll).sum()

def multivariate_normal_density(x: np.ndarray, mu_k: np.ndarray, sigma: np.ndarray):
    """Evaluate multivariate normal probability density.
    This is used in the E-step and in the log-likelihood calculation.

    Args:
        x (np.ndarray): data, shape (n, p)
        mu_k (np.ndarray): mean for a given class, shape (p,)
        sigma (np.ndarray): covariance matrix, shape (p, p)

    Returns:
        np.ndarray: n-dimensional probability densities
    """
    A_mu = x.T - mu_k.reshape(-1, 1)
    exponent = - 0.5 * np.multiply(A_mu, np.linalg.inv(sigma) @ A_mu).sum(axis=0)
    return 1 / (2 * np.pi * np.sqrt(np.linalg.det(sigma))) * np.exp(exponent)

def myEM(data: np.ndarray, G: int, prob: np.ndarray,
         mean: np.ndarray, Sigma: np.ndarray, itmax: int):
    """Main EM algorithm

    Args:
        data (np.ndarray): Input data, shape (n, p)
        G (int): Number of classes
        prob (np.ndarray): Mixing weights, shape (G,)
        mean (np.ndarray): Distribution means for each class, shape (p, G)
        Sigma (np.ndarray): Shared covariance matrix, shape (p, p)
        itmax (int): Number of EM iterations to perform

    Returns:
        (np.ndarray, np.ndarray, np.ndarray, float): probability vector, means, covariance and
                                                     log-likelihood
    """
    for _ in range(itmax):
        resp = Estep(data, G, prob, mean, Sigma)
        prob, mean, Sigma = Mstep(data, G, resp, mean)
        ll = loglik(data, G, prob, mean, Sigma)
    return prob, mean, Sigma, ll    

### Testing

In [3]:
# Load in data
data = pd.read_csv('faithful.dat', header=0, sep='\s+')
data.head()
data = data.to_numpy()

#### Case 1: G=2

In [4]:
G = 2
n = data.shape[0]
p1 = 10 / n
p2 = 1 - p1
mu1 = data[:10, :].mean(axis=0).reshape(-1, 1)
mu2 = data[10:, :].mean(axis=0).reshape(-1, 1)

sigma = 1 / n * (
           (data[:10].T - mu1) @ (data[:10].T - mu1).T + \
           (data[10:].T - mu2) @ (data[10:].T - mu2).T
        )

pi = np.array((p1, p2)) # Shape (G,)
mu = np.column_stack((mu1, mu2)) # Shape (p, G)

prob, mean, Sigma, ll = myEM(data, G, pi, mu, sigma, 20)
print("Case G=2")
print(f"prob\n{prob}\n\nmean\n{mean}\n\nSigma\n{Sigma}\n\nloglik\n{ll}\n")

Case G=2
prob
[0.04297883 0.95702117]

mean
[[ 3.49564188  3.48743016]
 [76.79789154 70.63205853]]

Sigma
[[  1.29793612  13.92433626]
 [ 13.92433626 182.58009247]]

loglik
-1289.5693549424109



#### Case 2: G=3

In [5]:
G = 3
p1 = 10 / n
p2 = 20 / n
p3 = 1 - p1 - p2
mu1 = data[:10, :].mean(axis=0).reshape(-1, 1)
mu2 = data[10:30, :].mean(axis=0).reshape(-1, 1)
mu3 = data[30:, :].mean(axis=0).reshape(-1, 1)
sigma = 1 / n * (
           (data[:10].T - mu1) @ (data[:10].T - mu1).T + \
           (data[10:30].T - mu2) @ (data[10:30].T - mu2).T + \
           (data[30:].T - mu3) @ (data[30:].T - mu3).T
        )

pi = np.array((p1, p2, p3)) # Shape (G,)
mu = np.column_stack((mu1, mu2, mu3)) # Shape (p, G)

prob, mean, Sigma, ll = myEM(data, G, pi, mu, sigma, 20)
print("Case G=3")
print(f"prob\n{prob}\n\nmean\n{mean}\n\nSigma\n{Sigma}\n\nloglik\n{ll}\n")

Case G=3
prob
[0.04363422 0.07718656 0.87917922]

mean
[[ 3.51006918  2.81616674  3.54564083]
 [77.10563811 63.35752634 71.25084801]]

Sigma
[[  1.26015772  13.51153756]
 [ 13.51153756 177.96419105]]

loglik
-1289.350958862739



## Part II: HMM

### Baum-Welch Algorithm

In [6]:
def BW_onestep(data: np.ndarray, mx: np.ndarray, mz: np.ndarray,
               w: np.ndarray, A: np.ndarray, B: np.ndarray):
    """Perform one iteration of the Baum-Welch algorithm, improving the estimates of
       the transition probability and emission distribution matrices.

    Args:
        data (np.ndarray): Observations
        mx (int): Count of distinct values X can take
        mz (int): Count of distinct values Z can take
        w (np.ndarray): An mz-by-1 probability vector representing the initial distribution for Z1.
        A (np.ndarray): The mz-by-mz transition probability matrix that
                        models the progression from Zt to Zt+1
        B (np.ndarray): The mz-by-mx emission probability matrix,
                        indicating how X is produced from Z

    Returns:
        (np.ndarray, np.ndarray): Updated A and B matrices
    """
    n = data.shape[0]
    
    # E-step
    # ==========================================================

    # Forward algorithm
    # Alpha is an mz-by-T forward probability matrix
    alpha = np.empty((mz, n))
    # \alpha_1(i) = w(i) B(i, x_1)
    alpha[:, 0] = np.multiply(w, B[:, data[0]])
    for t in range(n - 1):
        # \alpha_{t+1}(i) = \sum_j \alpha_t(j) A(j,i) B(i, x_{t+1})
        alpha[:, t + 1] = (A.T @ alpha[:, t]) * B[:, data[t + 1]]
    
    # Backward algorithm
    # Beta is an mz-by-T backwards probability matrix
    beta = np.empty((mz, n))
    # \beta_n(i) = 1
    beta[:, n - 1] = 1
    for t in np.arange(n - 2, -1, step = -1):
        # \beta{t}(i) = \sum_j A(i, j) B(j, x_{t+1}) \beta_{t+1}(j)
        beta[:, t] = A @ (B[:, data[t + 1]] * beta[:, t + 1])

    # Gamma
    # \gamma_t(i,j) = \alpha_t(i) A(i, j) B(j, x_{t + 1}) \beta_{t+1}(j)
    gamma = np.empty((mz, mz, n - 1))
    for t in range(n - 1):
        for j in range(mz):
            gamma[:, j, t] = alpha[:, t] * A[:, j] * B[j, data[t + 1]] * beta[j, t + 1]

    # M-step
    # ==========================================================

    # Update A
    A = gamma.sum(axis=2) # Sum over time
    A /= A.sum(axis=1).reshape(-1, 1)

    # Update B
    # Marginalized gamma: mz-by-n
    gamma_marginal = np.empty((mz, n))
    # P(Z_t=i \mid x) = \sum_{j=1}^{m_z} P(Z_t=i, Z_{t+1} = j \mid x) = \sum_{j=1}^{m_z} \gamma_t(i j)
    gamma_marginal[:, :n - 1] = gamma.sum(axis=1)
    # P(Z_t=i \mid x) = \sum_{j=1}^{m_z} P(Z_{t-1}=j, Z_t = i \mid x) = \sum_{j=1}^{m_z} \gamma_{t-1}(j, i)
    gamma_marginal[:, n - 1] = gamma[:, :, n - 2].sum(axis=0)
    # B^*(i, l) = \frac{\sum_{t:x_t = l} \gamma_t(i)} {\sum_t \gamma_t(i)}
    for l in range(mx):
        B[:, l] = gamma_marginal[:, data == l].sum(axis=1) / gamma_marginal.sum(axis=1)
    return A, B

def myBW(data: np.ndarray, mx: int, mz: int, w: np.ndarray,
         A: np.ndarray, B: np.ndarray, itmax: int):
    """Perform the Baum-Welch algorithm for the Hidden Markov Model to estimate
       the transition probability and emission distribution matrices.

    Args:
        data (np.ndarray): Observations
        mx (int): Count of distinct values X can take
        mz (int): Count of distinct values Z can take
        w (np.ndarray): An mz-by-1 probability vector representing the initial distribution for Z1.
        A (np.ndarray): The mz-by-mz transition probability matrix that
                        models the progression from Zt to Zt+1
        B (np.ndarray): The mz-by-mx emission probability matrix,
                        indicating how X is produced from Z
        itmax (int): Maximum number of EM step iterations to perform
    """
    # Convert range of X values fron [1, 3] to  [0, 2] to facilitate indexing in Python
    data = data - 1
    for _ in range(itmax):
        A, B = BW_onestep(data, mx, mz, w, A, B)
    return A, B

data = pd.read_csv('coding4_part2_data.txt', header=None).to_numpy().flatten()

# Establish possible observations and number of latent states
mx = np.unique(data).shape[0] # Unique X values
mz = 2 # Given in instructions

# Initialize transition probability and emission distribution matrices
w = np.array((0.5, 0.5))
A = np.full((2, 2), 0.5)
B = np.row_stack([np.array([1, 3, 5]) / 9,
                  np.array([1, 2, 3]) / 6])

# Perform Baum-Welch to find estimates of A and B
A, B = myBW(data, mx, mz, w, A, B, 100)
print(f"A: the {mz}-by-{mz} transition matrix\n\n{A}\n\n"
      f"B: the {mz}-by-{mx} emission matrix\n\n{B}\n")

A: the 2-by-2 transition matrix

[[0.49793938 0.50206062]
 [0.44883431 0.55116569]]

B: the 2-by-3 emission matrix

[[0.22159897 0.20266127 0.57573976]
 [0.34175148 0.17866665 0.47958186]]



### Viterbi Algorithm

In [7]:
def myViterbi(data: np.ndarray, mx: int, mz: int, w: np.ndarray,
         A: np.ndarray, B: np.ndarray, itmax: int):
    """Perform the Viterbi Algorithm to output the most likely latent sequence considering 
        the data and the MLE of the parameters.

    Args:
        data (np.ndarray): Observations
        mx (int): Count of distinct values X can take
        mz (int): Count of distinct values Z can take
        w (np.ndarray): An mz-by-1 probability vector representing the initial distribution for Z1.
        A (np.ndarray): The mz-by-mz transition probability matrix that
                        models the progression from Zt to Zt+1
        B (np.ndarray): The mz-by-mx emission probability matrix,
                        indicating how X is produced from Z
        itmax (int): Maximum number of EM step iterations to perform
    """

    # Perform Baum-Welch to find estimates of A and B
    A, B = myBW(data, mx, mz, w, A, B, itmax)

    # put all valus on log-scale
    w = np.log(w)
    A = np.log(A)
    B = np.log(B)

    # initialize additional parameters
    n = data.shape[0]
    delta = np.zeros((mz, n))
    Z = np.zeros(n, dtype=int)

    # subtract 1 from data, python is indexed by 0 as the start.  
    data = data - 1

    # set initial delta value
    delta[:, 0] = w + B[:, data[0]]

    # update for delta
    for idx in range(n - 1):
        delta[:, idx + 1] = np.max(A + delta[:, idx].reshape(-1,1), axis=0) + B[:, data[idx + 1]]
    # print(delta.T)
    
    # find optimal Z value. 
    Z[n-1] = np.argmax(delta[:, n-1])
    for idx in range(n-1, 0, -1):
        Z[idx - 1] = np.argmax(delta[:, idx-1] + A[:, Z[idx]])

    # add one at the end to match output
    return Z + 1

data = pd.read_csv('coding4_part2_data.txt', header=None).to_numpy().flatten()

# Establish possible observations and number of latent states
mx = np.unique(data).shape[0] # Unique X values
mz = 2 # Given in instructions

# Initialize transition probability and emission distribution matrices
w = np.array((0.5, 0.5))
A = np.full((2, 2), 0.5)
B = np.row_stack([np.array([1, 3, 5]) / 9,
                  np.array([1, 2, 3]) / 6])

# Load in valid Z values for comparison
Z_valid = []
with open('Coding4_part2_Z.txt', 'r') as f:
    Z_valid = np.array(f.read().strip().split(' ')).astype(int)

# Run Viterbi algorithm
Z = myViterbi(data, mx, mz, w, A, B, 100)

# Output results
print('================== Z Valid ========================\n')
print(Z_valid, '\n')
print('================== Z Calculated ===================\n')
print(Z, '\n')
print('\nZ Valid == Z Calc:', np.array_equal(Z, Z_valid))


[1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1 2 1 1
 1 1 1 1 1 1 2 2 1 1 1 1 1 1 2 2 2 1 1 1 1 2 2 2 2 1 1 1 1 1 1 1 1 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 2 2 2 2 2 2 1 1 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 2 2 1 1 1 1 1
 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1] 


[1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1 2 1 1
 1 1 1 1 1 1 2 2 1 1 1 1 1 1 2 2 2 1 1 1 1 2 2 2 2 1 1 1 1 1 1 1 1 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 2 2 2 2 2 2 1 1 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 2 2 1 1 1 1 1
 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1] 


Z Valid == Z Calc: True
