In [32]:
import numpy as np
import struct
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

In [33]:
def load_images(file_path):
    """
    Load MNIST images from the IDX file format.
    
    Parameters:
    - file_path: str, path to the idx3-ubyte file
    
    Returns:
    - images: numpy.ndarray, shape (num_images, 28*28)
    """
    with open(file_path, 'rb') as f:
        # Read the magic number and dimensions
        magic, num_images, rows, cols = struct.unpack('>IIII', f.read(16))
        if magic != 2051:
            raise ValueError(f'Invalid magic number {magic} in image file: {file_path}')
        
        # Read the image data
        image_data = f.read()
        images = np.frombuffer(image_data, dtype=np.uint8)
        images = images.reshape(num_images, rows * cols)
        return images

def load_labels(file_path):
    """
    Load MNIST labels from the IDX file format.
    
    Parameters:
    - file_path: str, path to the idx1-ubyte file
    
    Returns:
    - labels: numpy.ndarray, shape (num_labels,)
    """
    with open(file_path, 'rb') as f:
        # Read the magic number and number of labels
        magic, num_labels = struct.unpack('>II', f.read(8))
        if magic != 2049:
            raise ValueError(f'Invalid magic number {magic} in label file: {file_path}')
        
        # Read the label data
        label_data = f.read()
        labels = np.frombuffer(label_data, dtype=np.uint8)
        return labels

In [34]:
# Paths to your MNIST files
train_images_path = 'data/train-images.idx3-ubyte'
train_labels_path = 'data/train-labels.idx1-ubyte'
test_images_path = 'data/t10k-images.idx3-ubyte'
test_labels_path = 'data/t10k-labels.idx1-ubyte'

# Load the data
X_train = load_images(train_images_path)
y_train = load_labels(train_labels_path)
X_test = load_images(test_images_path)
y_test = load_labels(test_labels_path)

In [35]:
# Normalize the data
X_train = X_train.astype(np.float64) / 255.0
X_test = X_test.astype(np.float64) / 255.0

In [36]:
# Apply PCA to reduce dimensionality to 20
pca = PCA(n_components=20, random_state=42)
X_train_pca = pca.fit_transform(X_train)
X_test_pca = pca.transform(X_test)

In [40]:
def initialize_gmm(X, K):

    #Initialize GMM parameters.
    num_samples, num_features = X.shape
    # Initialize means by randomly selecting K data points
    indices = np.random.choice(num_samples, K, replace=False)
    means = X[indices]
    # Initialize variances to the variance of the data
    variances = np.var(X, axis=0) + 1e-6  # Add a small value to prevent division by zero
    variances = np.tile(variances, (K, 1))
    # Initialize weights equally
    weights = np.full(K, 1 / K)
    return means, variances, weights

def compute_gaussian_density(X, means, variances):
    
    #Compute the Gaussian density for each data point and each component.
    num_samples, num_features = X.shape
    K = means.shape[0]
    densities = np.zeros((num_samples, K))
  

    for k in range(K):
        # Compute the exponent term
        exponent = -0.5 * np.sum(((X - means[k]) ** 2) / variances[k], axis=1)
        # Compute the coefficient term
        coef = 1 / np.sqrt((2 * np.pi) ** num_features * np.prod(variances[k]))
        densities[:, k] = coef * np.exp(exponent)
    
    return densities

def em_gmm(X, K, max_iters=500, tol=1e-5):
    #Perform EM algorithm to fit a diagonal GMM.
    
    means, variances, weights = initialize_gmm(X, K)
    log_likelihood = 0  #tracks the log probability as a debug tool
    for iteration in range(max_iters):
        # E-step: compute responsibilities
        densities = compute_gaussian_density(X, means, variances)
        weighted_densities = densities * weights
        total_density = np.sum(weighted_densities, axis=1, keepdims=True)
        responsibilities = weighted_densities / total_density  
        
        # M-step: update parameters
        Nk = np.sum(responsibilities, axis=0)  
        weights = Nk / X.shape[0]  
        means = (responsibilities.T @ X) / Nk[:, np.newaxis] 
        
        for k in range(K):
            diff = X - means[k]
            variances[k] = np.sum(responsibilities[:, k][:, np.newaxis] * (diff ** 2), axis=0) / Nk[k]
            variances[k][variances[k] < 1e-6] = 1e-6  # Prevent variances from becoming too small
       
        # Compute log likelihood as a debug tool
        new_log_likelihood = np.sum(np.log(total_density + 1e-300))  # Add small value to prevent log(0)
        
        if np.abs(new_log_likelihood - log_likelihood) < tol:
            print(f"  Converged at iteration {iteration+1}")
            break
        log_likelihood = new_log_likelihood
        
        if (iteration + 1) % 10 == 0 or iteration == 0:
            print(f"  Iteration {iteration+1}, Log Likelihood: {log_likelihood:.4f}")
        
    return means, variances, weights


In [41]:
def compute_class_priors(y_train, class_labels):
    
    #Compute the prior probabilities for each class.
    class_priors = np.array([np.mean(y_train == c) for c in class_labels])
    return class_priors

def classify_test_data(X_test, class_labels, gmm_models, class_priors):
    #Classify test data using the fitted GMM models and Bayes classifier.
 
    num_test = X_test.shape[0]
    num_classes = len(class_labels)
    log_probs = np.zeros((num_test, num_classes))
    
    for idx, c in enumerate(class_labels):
        means, variances, weights = gmm_models[c]
        densities = compute_gaussian_density(X_test, means, variances)  
        weighted_densities = densities * weights 
        p_x_given_c = np.sum(weighted_densities, axis=1)  
        # To prevent log(0), add a small epsilon
        p_x_given_c = np.maximum(p_x_given_c, 1e-300)
        log_p_x_given_c = np.log(p_x_given_c)
        log_prior = np.log(class_priors[idx])
        log_probs[:, idx] = log_prior + log_p_x_given_c
    
    # Predict the class with the highest log probability
    y_pred = class_labels[np.argmax(log_probs, axis=1)]
    return y_pred


In [42]:
class_labels = np.unique(y_train)
class_priors = compute_class_priors(y_train, class_labels)


K_values = [5, 10, 20, 30]


error_rates = {}

for K in K_values:
    print(f"\nFitting GMMs with K={K} components per class...")
    gmm_models = {}
    for c in class_labels:
        print(f"  Fitting GMM for class {c}...")
        X_c = X_train_pca[y_train == c]
        means, variances, weights = em_gmm(X_c, K, max_iters=500, tol=1e-5)
        gmm_models[c] = (means, variances, weights)
    print("GMMs fitted.")

    # Classify test data
    print("Classifying test data...")
    y_pred = classify_test_data(X_test_pca, class_labels, gmm_models, class_priors)

    # Compute error rate
    error_rate = np.mean(y_pred != y_test)
    error_rates[K] = error_rate
    print(f"Error rate for K={K}: {error_rate:.4f}")



Fitting GMMs with K=5 components per class...
  Fitting GMM for class 0...
  Iteration 1, Log Likelihood: -210956.9490
  Iteration 10, Log Likelihood: -161929.7184
  Iteration 20, Log Likelihood: -161508.4293
  Iteration 30, Log Likelihood: -161468.8365
  Iteration 40, Log Likelihood: -161463.4028
  Iteration 50, Log Likelihood: -161462.9064
  Iteration 60, Log Likelihood: -161462.8697
  Iteration 70, Log Likelihood: -161462.8670
  Converged at iteration 78
  Fitting GMM for class 1...
  Iteration 1, Log Likelihood: -134373.6901
  Iteration 10, Log Likelihood: -89126.1209
  Iteration 20, Log Likelihood: -87738.6170
  Iteration 30, Log Likelihood: -86841.3846
  Iteration 40, Log Likelihood: -86575.2302
  Iteration 50, Log Likelihood: -86496.2659
  Iteration 60, Log Likelihood: -86486.2672
  Iteration 70, Log Likelihood: -86484.9583
  Iteration 80, Log Likelihood: -86484.7787
  Iteration 90, Log Likelihood: -86484.7543
  Iteration 100, Log Likelihood: -86484.7510
  Iteration 110, Log Li

In [43]:
print("\nSummary of Error Rates:")
for K in K_values:
    print(f"  K={K}: Error Rate = {error_rates[K]:.4f}")


Summary of Error Rates:
  K=5: Error Rate = 0.0882
  K=10: Error Rate = 0.0733
  K=20: Error Rate = 0.0611
  K=30: Error Rate = 0.0512
