# PSL Coding Assignment 4

Members: 
-  Amy Hwang (ahwang22)
-  Christian Tam (cmtam2)
-  Monil Kaneria (kaneria2)

**Contributions:** 

Christian Tam worked on the following: eStep function, mStep function, and part 1 testing. 

Amy Hwang worked on the following: loglik function, myEM function, refactoring of part 2, and the latent state question in part 2. 

Monil Kaniera worked on the following: all the functions of part 2

In [1]:
import numpy as np
import math
from scipy.stats import norm

## Part 1: Gaussian Mixtures

### Functions

In [2]:
# Return an n-by-G matrix, where the (i, j)th entry is the conditional probability P(Zi = k | xi). 
# i ranges from 1 to n and k ranges from 1 to G.

def eStep(sigma, G, p, x, mu):
    U, D, UT = np.linalg.svd(sigma)
    dBar = np.diag(1.0 / np.sqrt(D))

    xBar = x @ UT @ dBar
    muBar = mu @ UT @ dBar

    diff = xBar[:, np.newaxis, :] - muBar
    distances = np.sum(diff ** 2, axis=2)

    probs = np.exp(distances * -0.5)
    probs *= p
    probs = (probs / probs.sum(axis=1, keepdims=True))

    return probs

In [3]:
#  Return the updated parameters for the Gaussian mixture model.

# Input:
#    data: nxp matrix
#    mu_k: mean vector for component k
#    p: nx1 vector, the probability of each sample belonging to the kth component

def mStep(data, probs, G):
    n, d = data.shape
    sigma_new = np.zeros((G, d, d))

    weighted_sums = probs.sum(axis=0)
    p_new = weighted_sums / n
    mu_new = (probs.T @ data) / weighted_sums[:, np.newaxis]
    weighted_cov = np.zeros((d, d))
    for k in range(G):
        x_centered = data - mu_new[k]
        weighted_cov += (probs[:, k][:, np.newaxis] * x_centered).T @ x_centered
    sigma_new = weighted_cov / np.sum(weighted_sums)

    return p_new, mu_new, sigma_new

In [4]:
#  Computes the log-likelihood of the data given the parameters.
# Input:
#    data: nxp matrix
#    mu: mean vector for component k
#    p: nx1 vector, the probability of each sample belonging to the kth component

def loglik(data, G, sigma, mu, p):
    density = np.zeros((np.shape(data)[0], G))
    D = data.shape[1]
    
    # Calculate the multivariable normal pdf
    for k in range(G):
        # calculate mahalanobis distance between each data point and mean of component k
        mu_k = mu[k]
        diff = data - mu_k
        
        inv_cov = np.linalg.inv(sigma)
        mahal_distance_sq = np.dot(np.dot(diff, inv_cov), diff.T).diagonal()

        # Use mahalanobis distance and the determinant of the covariance matrix of component k to get the multivar normal pdf.
        norm_const = (2 * np.pi) ** (D / 2) * np.linalg.det(sigma) ** 0.5
        exponent = -0.5 * (mahal_distance_sq)
        normal_pdf = np.exp(exponent) / norm_const

        # Multiply the pdf by the mixture weight p_k to get the probability density of the data point under component k.
        density[:, k] = p[k] * normal_pdf

    # Get the log of the sum of the probability densities across all components
    log_row_sums = np.log(np.sum(density, axis=1))

    # Sum the log-likelihoods of all data points to get the total log-likelihood.
    log_likelihood = np.sum(log_row_sums)
    
    return log_likelihood

In [5]:
# Main function. Call the Estep and Mstep functions. Returns the estimated parameters and log-likelihood (via
# the loglik function)
#
# Input:
#   data: the dataset.
#   G: The number of components.
#   params: Initial parameters.
#   itmax: The number of iterations.
# Output:
#   prob: A G-dimensional probability vector (p1,…,pG)
#   mean: A p-by-G matrix with the k-th column being μk, the p-dimensional mean for the k-th Gaussian component.
#   Sigma: A p-by-p covariance matrix Σ shared by all G components

def myEM(data, G, sigma_init, mu_init, p_init, itmax):
    sigma = sigma_init
    mu = mu_init
    p = p_init
    pi = np.zeros((G))
    li_threshold = 1e-3
    li_previous = 0
    li_current = 1
    loop_count = 0

    while (abs(li_current - li_previous) > li_threshold) and (loop_count < itmax):
        # Call Estep to get the updated probability matrix
        probs = eStep(sigma, G, p, data, mu)

        # Call Mstep to get the updated parameters
        p, mu, sigma = mStep(data, probs, G)

        # Call logik to get the log-likelihood of the data given the updated parameters
        li_current = loglik(data, G, sigma, mu, p)

        loop_count += 1
            
    return p, mu.T, sigma, li_current

### Testing

In [20]:
data = []
with open('faithful.dat') as faithful:
    for row in faithful:
        data.append(row.split()[1:])
        
    # convert to float values
    data = np.array([[float(value) for value in row] for row in data[1:]])

#### Case 1: G = 2

In [27]:
G = 2
itmax = 20
n = len(data)
p1 = 10 / n
p2 = 1 - p1
p_init = [p1, p2]

cluster_1 = data[:10]
cluster_2 = data[10:]
mu1 = np.mean(cluster_1, axis=0)
mu2 = np.mean(cluster_2, axis=0)
mu_init = np.array([mu1, mu2])

cov_matrix_1 = np.sum([(i - mu1).reshape(-1, 1) @ (i - mu1).reshape(-1, 1).T for i in cluster_1], axis=0)
cov_matrix_2 = np.sum([(i - mu2).reshape(-1, 1) @ (i - mu2).reshape(-1, 1).T for i in cluster_2], axis=0)

# 2x2 matrix
cov_matrix_init = (cov_matrix_1 + cov_matrix_2) / n

In [28]:
prob, mean, Sigma, log_likelihood = myEM(data, G, cov_matrix_init, mu_init, p_init, itmax)
print("prob\n", prob, "\n\nmean\n", mean, "\n\nSigma\n", Sigma, "\n\nloglik\n", log_likelihood)

prob
 [0.04297883 0.95702117] 

mean
 [[ 3.49564188  3.48743016]
 [76.79789154 70.63205853]] 

Sigma
 [[  1.29793612  13.92433626]
 [ 13.92433626 182.58009247]] 

loglik
 -1289.5693549424107


#### Case 2: G = 3

In [25]:
G = 3
itmax = 20
x = data
n = len(x)
p1 = 10 / n
p2 = 20 / n
p3 = 1 - p1 - p2
p_init = [p1, p2, p3]

cluster_1 = x[:10]
cluster_2 = x[10:30]
cluster_3 = x[30:]
mu1 = np.mean(cluster_1, axis=0)
mu2 = np.mean(cluster_2, axis=0)
mu3 = np.mean(cluster_3, axis=0)
mu_init = np.array([mu1, mu2, mu3])

cov_matrix_1 = np.sum([(i - mu1).reshape(-1, 1) @ (i - mu1).reshape(-1, 1).T for i in cluster_1], axis=0)
cov_matrix_2 = np.sum([(i - mu2).reshape(-1, 1) @ (i - mu2).reshape(-1, 1).T for i in cluster_2], axis=0)
cov_matrix_3 = np.sum([(i - mu3).reshape(-1, 1) @ (i - mu3).reshape(-1, 1).T for i in cluster_3], axis=0)

# 2x2 matrix
cov_matrix_init = (cov_matrix_1 + cov_matrix_2 + cov_matrix_3) / n

In [26]:
prob, mean, Sigma, log_likelihood = myEM(data, G, cov_matrix_init, mu_init, p_init, itmax)
print("prob\n", prob, "\n\nmean\n", mean, "\n\nSigma\n", Sigma, "\n\nloglik\n", log_likelihood)

prob
 [0.04363422 0.07718656 0.87917922] 

mean
 [[ 3.51006918  2.81616674  3.54564083]
 [77.10563811 63.35752634 71.25084801]] 

Sigma
 [[  1.26015772  13.51153756]
 [ 13.51153756 177.96419105]] 

loglik
 -1289.3509588627387


## Part 2: HMM

### Functions

In [11]:
# Forward algorithm to calculate alpha values
def forward_algorithm(data, w, A, B):
    T = len(data)
    alpha = np.zeros((T, mz))
    
    # Initialization step
    alpha[0, :] = w * B[:, data[0]]

    # Recursion step
    for t in range(1, T):
        for i in range(mz):
            alpha[t, i] = np.sum(alpha[t - 1] * A[:, i]) * B[i, data[t]]
            
    return alpha

In [12]:
# Backward algorithm to calculate beta values
def backward_algorithm(data, A, B):
    T = len(data)
    mz = A.shape[0]
    beta = np.zeros((T, mz))

    # Initialization step
    beta[T - 1, :] = 1

    # Recursion step
    for t in range(T - 2, -1, -1):
        for i in range(mz):
            beta[t, i] = np.sum(A[i, :] * B[:, data[t + 1]] * beta[t + 1, :])

    return beta

In [13]:
# Baum-Welch one step (E-step and M-step)
def BW_onestep(data, w, A, B, mz, mx):
    T = len(data)
    
    # E-step
    alpha = forward_algorithm(data, w, A, B)
    beta = backward_algorithm(data, A, B)
    gamma = np.zeros((T, mz))
    xi = np.zeros((T - 1, mz, mz))

    for t in range(T):
        gamma[t, :] = alpha[t, :] * beta[t, :]
        gamma[t, :] /= np.sum(gamma[t, :])

    for t in range(T - 1):
        denom = np.sum(alpha[t, :] * np.sum(A * B[:, data[t + 1]].reshape(1, -1) * beta[t + 1, :], axis=1))
        for i in range(mz):
            xi[t, i, :] = alpha[t, i] * A[i, :] * B[:, data[t + 1]] * beta[t + 1, :]
            xi[t, i, :] /= denom

    # M-step
    A_new = np.sum(xi, axis=0) / np.sum(gamma[:-1, :], axis=0).reshape(-1, 1)
    B_new = np.zeros((mz, mx))

    for i in range(mz):
        for l in range(mx):
            B_new[i, l] = np.sum(gamma[data == l, i])
        B_new[i, :] /= np.sum(gamma[:, i])

    return A_new, B_new

In [14]:
def myBW(data, w_init, A_init, B_init, mz_init, mx_init, iteration):
    # Example usage with provided data
    mz = mz_init
    mx = mx_init
    w = w_init
    A = A_init
    B_new = B_init

    for _ in range(iteration):
        A, B_new = BW_onestep(data, w, A, B_new, mz, mx)
    
    return A, B_new

In [15]:
# Viterbi algorithm to find the most likely sequence of latent states
def myViterbi(data, w, A, B, mz, mx):
    T = len(data)
    mz = len(w)
    delta = np.zeros((T, mz))
    psi = np.zeros((T, mz), dtype=int)

    # Initialization
    delta[0, :] = w * B[:, data[0]]
    psi[0, :] = 0

    # Recursion
    for t in range(1, T):
        for i in range(mz):
            delta[t, i] = np.max(delta[t - 1] * A[:, i]) * B[i, data[t]]
            psi[t, i] = np.argmax(delta[t - 1] * A[:, i])

    # Termination
    Z = np.zeros(T, dtype=int)
    Z[T - 1] = np.argmax(delta[T - 1, :]) + 1

    # Path backtracking
    for t in range(T - 2, -1, -1):
        Z[t] = psi[t + 1, Z[t + 1] - 1] + 1

    return Z

### Testing

#### Part 1: Testing Baum-Welch and Viterbi with Coding4_part2_data.txt where B = [[1/9, 3/9, 5/9], [1/6, 2/6, 3/6]]

In [16]:
data = np.loadtxt('Coding4_part2_data.txt', dtype=int) - 1
mz = 2  
mx = 3  
w = np.array([0.5, 0.5])
A = np.array([[0.5, 0.5], [0.5, 0.5]])
B = np.array([[1/9, 3/9, 5/9], [1/6, 2/6, 3/6]])
iteration = 100

In [17]:
# Baum-Welch algorithm
A_updated, B_updated = myBW(data, w, A, B, mz, mx, iteration)
print(f"Transition matrix A: \n{A_updated}") 
print(f"\nEmission matrix B: \n{B_updated}")

# Viterbi algorithm to get the most likely sequence of hidden states
Z = myViterbi(data, w, A_updated, B_updated, mz, mx)
print(f"\nMost likely latent sequence Z: \n{Z}")

Transition matrix A: 
[[0.49793938 0.50206062]
 [0.44883431 0.55116569]]

Emission matrix B: 
[[0.22159897 0.20266127 0.57573976]
 [0.34175148 0.17866665 0.47958186]]

Most likely latent sequence Z: 
[1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1 2 1 1
 1 1 1 1 1 1 2 2 1 1 1 1 1 1 2 2 2 1 1 1 1 2 2 2 2 1 1 1 1 1 1 1 1 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 2 2 2 2 2 2 1 1 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 2 2 1 1 1 1 1
 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1]


In [18]:
# Checking our Viterbi algorithm implementation with the given benchmark in Coding4_part2_Z.txt
viterbi_benchmark = []

with open('Coding4_part2_Z.txt', 'r') as file:
    for line in file:
        viterbi_benchmark.extend(map(int, line.split()))

viterbi_benchmark = np.asarray(viterbi_benchmark)

are_equal = np.array_equal(Z, viterbi_benchmark)
print("Is our Viterbi implementation equal to the benchmark?: " + str(are_equal))

Is our Viterbi implementation equal to the benchmark?: True


#### Part 2: Testing Baum-Welch and Viterbi with Coding4_part2_data.txt where B = [[1/3, 1/3, 1/3], [1/3, 1/3, 1/3]]

In [19]:
mz = 2
mx = 3
w = np.array([0.5, 0.5])
A = np.array([[0.5, 0.5], [0.5, 0.5]])
B = np.array([[1/3, 1/3, 1/3], [1/3, 1/3, 1/3]])

# Run Baum-Welch algorithm for 20 iterations
iteration = 20
A_20, B_20 = myBW(data, w, A, B, mz, mx, iteration)
print(f"Transition matrix A after 20 iterations with uniform B initialization: \n{A_20}") 
print(f"\nEmission matrix B after 20 iterations with uniform B initialization: \n{B_20}\n")

# Run Baum-Welch algorithm for 100 iterations
iteration = 100
A_100, B_100 = myBW(data, w, A, B, mz, mx, iteration)
print(f"Transition matrix A after 100 iterations with uniform B initialization: \n{A_100}") 
print(f"\nEmission matrix B after 100 iterations with uniform B initialization: \n{B_100}")


Transition matrix A after 20 iterations with uniform B initialization: 
[[0.5 0.5]
 [0.5 0.5]]

Emission matrix B after 20 iterations with uniform B initialization: 
[[0.285 0.19  0.525]
 [0.285 0.19  0.525]]

Transition matrix A after 100 iterations with uniform B initialization: 
[[0.5 0.5]
 [0.5 0.5]]

Emission matrix B after 100 iterations with uniform B initialization: 
[[0.285 0.19  0.525]
 [0.285 0.19  0.525]]


#### Explain why the resulting A and B matrices had these outcomes. You should understand why we cannot initialize our parameters in a way that makes the latent states indistinguishable.

Poor initial probability values in a Hidden Markov Model which cause indistinguishable latent states impact the overall model performance. If the emission probabilities for different hidden states are too similar or incorrect, the model will struggle to distinguish between the states. There would be slow convergence and/or the model would settle into a local minima as a result. Comparing the 20th and 100th iterations we can see that there have been no shift in the transition and emission probabilities. We also see that both iterations have different transmission and emission probabilities from the earlier iteration with differing initial emission probabilities whose Viterbi values matched the benchmark. This implies that the model may have settled into a local minima. 