# Gaussian_Mixture_Model

We will be implementing the following algorithm:

<img src="assets/gmm_alg.png" width="500">

In [None]:
import matplotlib.pyplot as plt
import numpy as np
np.random.seed(123)
from sklearn.datasets import make_blobs
import time


from plot_util import plot_data, plot_training, confidence_ellipse
from data_utils import make_ds, DatasetSize

# Problem Setup

Say we are trying to model a simple problem where we have 105 points that are 2-d. We know that there are roughly 4 clusters* and we know more-or-less where they start. However, we want to learn the parameters to fit them.

*In the next notebook we do not assume that we have this prior knowledge

In [None]:
GUESS_NUM_CLASSES = 4

unknown_centers = np.asarray([
    [1, -1],  # bottom left
    [5, 5],  # middle
    [8, 7],  # mid-right
    [10, 0]  # bottom right
])

# Options are: LARGE, MEDIUM, SMALL
dataset_size = DatasetSize.LARGE


(X, y), _ = make_ds(unknown_centers, dataset_size=dataset_size)

In [None]:
plot_data(X, y, num_centers=len(unknown_centers))

# Diagnostic Functions

The Gaussian Mixture Model is based on the maximum likelihood estimate, which optimizes for the log-likelihood. So, we should probably code it up and ensure that our model is optimizing the log-likelihood correctly.

**Note** this is un-vectorized, but that's fine for our purposes, because we'll leverage jax to speed things up for us. 

**Note** this is step 4 (and 5, really) of the algorithm listed above


![](assets/gaussian_pdf.png)

In [None]:
def gaussian_pdf(X, mus, sigmas, pis):
    n_samples, n_features = X.shape
    n_classes = len(mus)
    
    det_sigmas = np.linalg.det(sigmas)
    inv_sigmas = np.linalg.inv(sigmas)
    
    norm_constants = (2 * np.pi) ** (-n_features/2) * det_sigmas ** (-0.5)
    
    pdfs = np.zeros((n_samples, n_classes))
    
    for cls_idx in range(n_classes):
        diff = X - mus[cls_idx] 

        scaled = diff @ inv_sigmas[cls_idx] 
        quad = np.sum(scaled * diff, axis=1)
        
        pdfs[:, cls_idx] = norm_constants[cls_idx] * np.exp(-0.5 * quad) * pis[cls_idx]
    
    return pdfs


def log_likelihood(X, mus, sigmas, pis):
    """
    Vectorized log-likelihood computation.
    """
    pdfs = gaussian_pdf(X, mus, sigmas, pis)
    # Sum PDFs across classes for each data point, then take log and sum
    return np.sum(np.log(np.sum(pdfs, axis=1)))

# E-step

The "Expectation" step of the Expectation-Maximization algorithm, where we calculate the "expected-ness" of hte data

<img src="assets/e_step.png" width="500">

**Note** the image above was taken (with permission) from [Prof. Matt Golub](https://homes.cs.washington.edu/~mgolub/)'s course, [Machine Learning for Neuroscience (CSE599N)](https://courses.cs.washington.edu/courses/cse599n/24sp/).


In [None]:
def e_step(X, mus, sigmas, pis):
    # Get PDFs for all data points and classes
    pdfs = gaussian_pdf(X, mus, sigmas, pis)
    # Normalize to get responsibilities
    # Sum across classes for each data point
    pdf_sums = np.sum(pdfs, axis=1, keepdims=True)
    responsibilities = pdfs / pdf_sums
    
    return responsibilities

# M-step

The "maximixation" step of the Expectation-Maximization algorithm, where we do actual optimization

<img src="assets/m_step.png" width="500">

**Note** the image above was taken (with permission) from [Prof. Matt Golub](https://homes.cs.washington.edu/~mgolub/)'s course, [Machine Learning for Neuroscience (CSE599N)](https://courses.cs.washington.edu/courses/cse599n/24sp/).


In [None]:
def m_step(X, responsibilities, mus, sigmas, pis):
    cls_resp_sum = np.sum(responsibilities, axis=0)
    num_cls = len(mus)

    new_mus = np.zeros_like(mus, dtype=np.float32)
    new_sigmas = np.zeros_like(sigmas, dtype=np.float32)
    new_pis = np.zeros_like(pis, dtype=np.float32)

    for cls_idx in range(num_cls):
        cls_resp = np.expand_dims(responsibilities[:, cls_idx], axis=-1)
        scale_factor = 1 / cls_resp_sum[cls_idx]
        new_mus[cls_idx] = scale_factor * np.sum(cls_resp * X, axis=0)

        deviation = X - mus[cls_idx]
        scaled_deviation = cls_resp * deviation
        new_sigmas[cls_idx] = scale_factor * (scaled_deviation.T @ deviation)
        new_pis[cls_idx] = cls_resp_sum[cls_idx] / len(X)
        
    return new_mus, new_sigmas, new_pis

# Initialize the Guesses 

- Step 1 of the original algorithm

In [None]:
def initialize_guesses(X, guessed_num_classes, dataset_size):
    # We just say the covariance of the entire dataset is the covariance of each sub-cluster.
    sigmas = np.asarray([np.cov(X.T) for _ in range(guessed_num_classes)])
    
    # We simply 
    cls_probs = np.expand_dims(
        np.asarray([1 / guessed_num_classes for _ in range(guessed_num_classes)]).T,
        axis=-1
    )

    _, mus = make_ds(unknown_centers, dataset_size)
    mus = mus.astype(np.float32)
    sigmas = sigmas.astype(np.float32)
    cls_probs = cls_probs.astype(np.float32)

    return mus, sigmas, cls_probs
    


# Putting it all together

Assemble the final algorithm

In [None]:
def gmm(
    X: np.ndarray,
    guess_num_classes,
    dataset_size,
    verbose=False
):

    mus, sigmas, cls_probs = initialize_guesses(X, guess_num_classes, dataset_size)
    counter = 0
    ll_container = []
    TOL = 0.0001
    ll_container.append(np.inf)

    start_time = time.perf_counter()
    while True:  # Run until converges
        # e-step
        responsibilities = e_step(X, mus, sigmas, cls_probs)

        # m-step
        mus, sigmas, cls_probs = m_step(X, responsibilities, mus, sigmas, cls_probs)

        # Recalculate the log-likelihood
        ll_curr = float(log_likelihood(X, mus, sigmas, cls_probs))

        if np.abs(ll_container[-1] - ll_curr) < TOL:
            print(f"Converged to within {TOL} after: {counter} iterations")
            break

        ll_container.append(float(ll_curr))
        if verbose and counter % 5 == 0 and counter > 0:
            print(f"Data Log-Likelihood at iteration: {counter} = {ll_curr:.6f}")
        counter += 1

    responsibilities = e_step(X, mus, sigmas, cls_probs)
    print(f"Total Training time was: {time.perf_counter() - start_time:.4f}s over {counter} rounds")
    return mus, sigmas, cls_probs.T, responsibilities.T, ll_container[1:]
    # -------------------------- #



In [None]:

mus, sigmas, cls_priors, _, lls = gmm(
    X,
    guess_num_classes=GUESS_NUM_CLASSES,
    dataset_size=dataset_size,
    verbose=True
)

# Visualize Training Process

In [None]:
plot_training(lls)

In [None]:
fig, axs = plt.subplots(1, 1, figsize=(15, 10))

colors = ["r", "g", "b", "y"]

for i, c in enumerate(colors):
    
    # Plot the centers
    plt.scatter(unknown_centers[i, 0], unknown_centers[i, 1], c=c, marker="o", label=f"Cluster: {i} True Center")
    plt.scatter(mus[i, 0], mus[i, 1], c=c, marker="^", label=f"Cluster: {i} Inferred Center")
    
    # Plot the standard deviations
    mask = y == i
    masked_points = X[mask]
    mu_x = np.mean(masked_points, axis=0)
    sigma = np.cov(masked_points[:, 0], masked_points[:, 1])
    confidence_ellipse(mu_x, sigma,  ax=axs, n_std=1, edgecolor=c, linestyle="-")
    confidence_ellipse(mu_x, sigma, ax=axs, n_std=2, edgecolor=c, linestyle="-")
    confidence_ellipse(mu_x, sigma, ax=axs, n_std=3, edgecolor=c, linestyle="-")


    confidence_ellipse(mus[i], sigmas[i],  ax=axs, n_std=1, edgecolor=c, linestyle="--")
    confidence_ellipse(mus[i], sigmas[i], ax=axs, n_std=2, edgecolor=c, linestyle="--")
    confidence_ellipse(mus[i], sigmas[i], ax=axs, n_std=3, edgecolor=c, linestyle="--")
plt.legend(loc="best")
