# **import Libraries**

In [2]:
import os
from PIL import Image

import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import StratifiedShuffleSplit, train_test_split

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader

import torchvision.transforms as transforms

# **Create Data Matrix & Labels**

In [29]:
import os
import numpy as np
from PIL import Image

# Check if the directory exists
input_path = "/kaggle/input/"
# if not os.path.exists(input_path):
#     print("Error: Dataset directory not found. Check the path or add the dataset.")
#     # Try listing available datasets (for Kaggle)
#     print("Available datasets in /kaggle/input/:")
#     print(os.listdir("/kaggle/input/"))
# else:
index = 0
data = np.zeros((400, 10304))
labels = np.zeros(400)

for folder_name in os.listdir(input_path):
    if folder_name == "README":
        continue

    root_path = os.path.join(input_path, folder_name)
    
    for img in os.listdir(root_path):
        img_path = os.path.join(root_path, img)
        img = np.asarray(Image.open(img_path))
        img = img / 255.0
        
        flatten_img = np.ravel(img)
        data[index] = flatten_img
        
        labels[index] = int(folder_name[1:])  # Assumes folder names are like 's1', 's2', etc.
        index += 1

## **Split Data Into Train and Test**

In [46]:
X_train = train_data   = data[::2]
y_train = train_labels = labels[::2]
y_train = y_train.astype(int)
X_test = test_data   = data[1::2]
y_test = test_labels = labels[1::2]
y_test = y_test.astype(int)
print(np.max(y_train))
np.max(y_test)

40


40

## **Create Custom Dataset**

In [47]:
class FaceDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        img = self.data[idx]
        img.resize((112, 92))
        
        if self.transform:
            img = self.transform(img)
        else:
            img = torch.from_numpy(img).float()
            img = img.unsqueeze(0)
        return img, img

## **Create Dataloaders**

In [42]:
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(p = 0.5),
    transforms.ColorJitter(brightness = 0.2, contrast = 0.2),
    transforms.ToTensor(),
])

train_idx, val_idx, y_train, y_test = train_test_split(np.arange(len(train_data)), np.arange(len(train_labels)), test_size = 0.2, random_state = 42)
train_images = train_data[train_idx]
val_images   = train_data[val_idx]
y_train      = train_labels[y_train]
y_test       = train_labels[y_test]


train_dataset = FaceDataset(train_images, transform = train_transform)
val_dataset   = FaceDataset(val_images)

batch_size   = 4  # You can adjust this
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
val_loader   = DataLoader(val_dataset, batch_size   = batch_size, shuffle = False)


In [48]:
from sklearn.decomposition import PCA

pca = PCA(n_components=40) 
print(f"Original train shape: {X_train.shape}")
print(f"Original test shape: {X_test.shape}")
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)

print(f"PCA-reduced train shape: {X_train.shape}")
print(f"PCA-reduced test shape: {X_test.shape}")

Original train shape: (200, 10304)
Original test shape: (200, 10304)
PCA-reduced train shape: (200, 40)
PCA-reduced test shape: (200, 40)


In [49]:
print(y_train, y_test)

[32 32 32 32 32 39 39 39 39 39 26 26 26 26 26 20 20 20 20 20 18 18 18 18
 18 25 25 25 25 25 24 24 24 24 24 14 14 14 14 14 40 40 40 40 40 27 27 27
 27 27 33 33 33 33 33  1  1  1  1  1 11 11 11 11 11 31 31 31 31 31 35 35
 35 35 35 13 13 13 13 13 19 19 19 19 19 29 29 29 29 29 37 37 37 37 37 10
 10 10 10 10  8  8  8  8  8  5  5  5  5  5  7  7  7  7  7 28 28 28 28 28
  9  9  9  9  9 15 15 15 15 15 21 21 21 21 21  2  2  2  2  2  6  6  6  6
  6 30 30 30 30 30  3  3  3  3  3 23 23 23 23 23  4  4  4  4  4 16 16 16
 16 16 38 38 38 38 38 17 17 17 17 17 36 36 36 36 36 22 22 22 22 22 34 34
 34 34 34 12 12 12 12 12] [32 32 32 32 32 39 39 39 39 39 26 26 26 26 26 20 20 20 20 20 18 18 18 18
 18 25 25 25 25 25 24 24 24 24 24 14 14 14 14 14 40 40 40 40 40 27 27 27
 27 27 33 33 33 33 33  1  1  1  1  1 11 11 11 11 11 31 31 31 31 31 35 35
 35 35 35 13 13 13 13 13 19 19 19 19 19 29 29 29 29 29 37 37 37 37 37 10
 10 10 10 10  8  8  8  8  8  5  5  5  5  5  7  7  7  7  7 28 28 28 28 28
  9  9  9  9  9 15 15 15 

# **GMM**

## Main Class

In [44]:
import numpy as np
from sklearn.base import BaseEstimator, RegressorMixin
from scipy.stats import multivariate_normal
from scipy.special import logsumexp  # Added missing import


class GMM(BaseEstimator, RegressorMixin):
    def __init__(self, n_components=1, tol=0.001, reg_covar=1e-06, max_iter=100, verbose=False, n_init=10, early_stop=1):
        self.n_components = n_components
        self.tol = tol
        self.reg_covar = reg_covar
        self.max_iter = max_iter
        self.verbose = verbose
        self.n_init = n_init
        self.early_stop = early_stop
        self.means_ = None
        self.covariances_ = None
        self.weights_ = None
        
        self.converged_ = False

    def fit(self, X):
        best_m = None
        best_c = None
        best_w = None
        best_log_likelihood = -np.inf

        for init in range(self.n_init):
            if self.verbose:
                print(f"Model #: {init}")
            self.__fit(X)
            ll = self.score(X)
            if ll > best_log_likelihood:
                best_log_likelihood = ll
                best_m = self.means_
                best_c = self.covariances_
                best_w = self.weights_
            self.means_ = None
            self.covariances_ = None
            self.weights_ = None
        self.means_ = best_m
        self.covariances_ = best_c
        self.weights_ = best_w
        return self

    def __fit(self, X):
        n_samples, n_features = X.shape
        
        # Initialize means with random samples - ensure they're truly different from each other
        idx = np.random.choice(n_samples, self.n_components, replace=False)
        self.means_ = X[idx].copy()
        
        # Add small random perturbations to ensure uniqueness
        self.means_ += np.random.normal(0, 0.01, self.means_.shape)
        
        # Initialize covariances with data variance - ensure they're different for each component
        base_cov = np.eye(n_features) * np.var(X, axis=0)
        self.covariances_ = np.array([
            base_cov * (0.5 + np.random.rand()) + self.reg_covar 
            for _ in range(self.n_components)
        ])
        
        # Initialize with slightly different weights to break symmetry
        raw_weights = np.random.rand(self.n_components) + 0.5
        self.weights_ = raw_weights / np.sum(raw_weights)

        log_likelihood_old = -np.inf
        patience = self.early_stop
        no_improvement_count = 0

        # Debug
        if self.verbose:
            print(f"Initial weights: {self.weights_}")
            print(f"Initial means shape: {self.means_.shape}")
            print(f"Initial covariances shape: {self.covariances_.shape}")

        for iteration in range(self.max_iter):
            if self.verbose and iteration % 10 == 0:
                print(f"Iteration: {iteration} -> log_likelihood_old -> {log_likelihood_old}")
            
            # E-step: calculate responsibilities - vectorized version
            weighted_log_prob = np.zeros((n_samples, self.n_components))
            
            for k in range(self.n_components):
                # Try-except to catch singular matrix errors
                try:
                    log_pdf = multivariate_normal.logpdf(
                        X, mean=self.means_[k], cov=self.covariances_[k], allow_singular=True)
                    weighted_log_prob[:, k] = np.log(self.weights_[k]) + log_pdf
                except Exception as e:
                    if self.verbose:
                        print(f"Warning in component {k}: {e}")
                    # Fallback to a more robust approach
                    cov_regularized = self.covariances_[k] + np.eye(n_features) * self.reg_covar * 10
                    log_pdf = multivariate_normal.logpdf(
                        X, mean=self.means_[k], cov=cov_regularized, allow_singular=True)
                    weighted_log_prob[:, k] = np.log(self.weights_[k]) + log_pdf
            
            # Normalize log probabilities for numerical stability
            log_prob_norm = logsumexp(weighted_log_prob, axis=1)
            log_resp = weighted_log_prob - log_prob_norm[:, np.newaxis]
            resp = np.exp(log_resp)
            
            # Ensure no numerical issues
            resp = np.maximum(resp, np.finfo(resp.dtype).tiny)
            row_sums = resp.sum(axis=1, keepdims=True)
            resp = resp / row_sums  # Ensure each row sums to 1.0
            
            # Debug
            if self.verbose and iteration % 10 == 0:
                component_resp_sums = resp.sum(axis=0)
                print(f"Component responsibility sums: {component_resp_sums}")
                print(f"Min resp: {resp.min()}, Max resp: {resp.max()}")
            
            # M-step: update parameters
            for k in range(self.n_components):
                resp_sum = np.sum(resp[:, k])
                
                if self.verbose and iteration == 0:
                    print(f"resp_sum for component {k}: {resp_sum}")
                
                if resp_sum > 1e-6:  # Prevent division by very small numbers
                    # Update weights
                    self.weights_[k] = resp_sum / n_samples
                    
                    # Update means
                    weighted_sum = np.sum(resp[:, k, np.newaxis] * X, axis=0)
                    self.means_[k] = weighted_sum / resp_sum
                    
                    # Update covariances with careful handling
                    diff = X - self.means_[k]
                    
                    # Method 1: Direct calculation
                    weighted_diff = resp[:, k, np.newaxis] * diff
                    cov = np.dot(weighted_diff.T, diff) / resp_sum
                    
                    # Ensure positive definiteness
                    min_eig = np.min(np.linalg.eigvalsh(cov))
                    if min_eig < self.reg_covar:
                        cov.flat[::n_features + 1] += (self.reg_covar - min_eig)
                    
                    self.covariances_[k] = cov
                else:
                    # Handle the degenerate case - reinitialize this component
                    if self.verbose:
                        print(f"Reinitializing component {k} due to small responsibility sum")
                    self.weights_[k] = 1e-3  # Small but non-zero weight
                    self.means_[k] = X[np.random.choice(n_samples)] + np.random.normal(0, 0.01, n_features)
                    self.covariances_[k] = np.eye(n_features) * np.var(X, axis=0) * np.random.rand() + self.reg_covar

            # Normalize weights to sum to 1
            self.weights_ = self.weights_ / np.sum(self.weights_)
            
            # Check for convergence
            try:
                current_log_likelihood = self.score(X)
                if self.verbose and iteration % 10 == 0:
                    print(f"Log-likelihood: {current_log_likelihood}")
                    
                if np.abs(current_log_likelihood - log_likelihood_old) < self.tol:
                    no_improvement_count += 1
                else:
                    no_improvement_count = 0

                if no_improvement_count >= patience:
                    self.converged_ = True
                    if self.verbose:
                        print(f"Early stopping at iteration {iteration}")
                    break
                    
                log_likelihood_old = current_log_likelihood
            except Exception as e:
                if self.verbose:
                    print(f"Error in convergence check: {e}")
                # Continue anyway with adjusted parameters

    def predict(self, X):
        if self.means_ is None:
            raise ValueError("Model not fitted yet.")
            
        n_samples = len(X)
        log_responsibilities = np.zeros((n_samples, self.n_components))

        for k in range(self.n_components):
            try:
                # Use log space for numerical stability
                log_responsibilities[:, k] = np.log(self.weights_[k] + 1e-10) + multivariate_normal.logpdf(
                    X, mean=self.means_[k], cov=self.covariances_[k], allow_singular=True)
            except Exception as e:
                # Handle potential numerical issues
                regularized_cov = self.covariances_[k] + np.eye(self.covariances_[k].shape[0]) * self.reg_covar * 10
                log_responsibilities[:, k] = np.log(self.weights_[k] + 1e-10) + multivariate_normal.logpdf(
                    X, mean=self.means_[k], cov=regularized_cov, allow_singular=True)
        
        # Convert log responsibilities to probabilities and find the max
        return np.argmax(log_responsibilities, axis=1)

    def score(self, X):
        if self.means_ is None:
            raise ValueError("Model not fitted yet.")
            
        n_samples = X.shape[0]
        log_prob = np.zeros((n_samples, self.n_components))
        
        for k in range(self.n_components):
            try:
                log_prob[:, k] = np.log(max(self.weights_[k], 1e-10)) + multivariate_normal.logpdf(
                    X, mean=self.means_[k], cov=self.covariances_[k], allow_singular=True)
            except Exception as e:
                # Handle potential numerical issues
                regularized_cov = self.covariances_[k] + np.eye(self.covariances_[k].shape[0]) * self.reg_covar * 10
                log_prob[:, k] = np.log(max(self.weights_[k], 1e-10)) + multivariate_normal.logpdf(
                    X, mean=self.means_[k], cov=regularized_cov, allow_singular=True)
        
        # Use logsumexp for numerical stability
        return np.sum(logsumexp(log_prob, axis=1))

In [51]:
from sklearn.mixture import GaussianMixture
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
from scipy.special import logsumexp

import numpy as np

def clusters_meaning(X, y, n_pred_clusters, n_classes):
    # print(len(np.unique(X)), len(np.unique(y)))
    cluster_map = np.zeros((n_pred_clusters, n_classes), dtype=int)
    # print(X, y)
    for i in range(len(X)):
        cluster_map[X[i], (y[i]-1)] += 1

    cluster_meanings = np.zeros(n_pred_clusters, dtype=int)
    for i, row in enumerate(cluster_map):
        cluster_meanings[i] = np.argmax(row)
    return cluster_meanings

# Ensure X, y are already defined
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

n_components_list = [20, 40, 60]
# n_components_list = [40]

for n in n_components_list:
    print(f"\n===== n_components = {n} =====")

    # ▶️ Custom GMM
    gmm = GMM(n_components=n, max_iter=50, tol=1e-3, n_init=3, verbose=False)
    gmm.fit(X_train)
    y_pred = gmm.predict(X_test)
    y_pred_sklearn_mapped = clusters_meaning(y_pred, y_test, n, 40)[y_pred]
    
    acc_sklearn = accuracy_score(y_test, y_pred_sklearn_mapped)
    f1_sklearn = f1_score(y_test, y_pred_sklearn_mapped, average='macro')
    
    print("\n[GMM]")
    print(f"Accuracy: {acc_sklearn:.4f}")
    print(f"F1 Score:  {f1_sklearn:.4f}")
    print(confusion_matrix(y_test, y_pred_sklearn_mapped))

    # ▶️ Sklearn GaussianMixture
    gmm = GaussianMixture(n_components=n, max_iter=50, tol=1e-3, n_init=3, random_state=42)
    gmm.fit(X_train)
    y_pred = gmm.predict(X_test)
    y_pred_sklearn_mapped = clusters_meaning(y_pred, y_test, n, 40)[y_pred]
    
    acc_sklearn = accuracy_score(y_test, y_pred_sklearn_mapped)
    f1_sklearn = f1_score(y_test, y_pred_sklearn_mapped, average='macro')

    print("\n[Sklearn GMM]")
    print(f"Accuracy: {acc_sklearn:.4f}")
    print(f"F1 Score:  {f1_sklearn:.4f}")
    print(confusion_matrix(y_test, y_pred_sklearn_mapped))

    # ▶️ KNN
    knn = KNeighborsClassifier(n_neighbors=n)
    knn.fit(X_train, y_train)
    y_pred_knn = knn.predict(X_test)

    acc_knn = accuracy_score(y_test, y_pred_knn)
    f1_knn = f1_score(y_test, y_pred_knn, average='macro')

    print("\n[KNN]")
    print(f"Accuracy: {acc_knn:.4f}")
    print(f"F1 Score:  {f1_knn:.4f}")
    print(confusion_matrix(y_test, y_pred_knn))



===== n_components = 20 =====

[GMM]
Accuracy: 0.0050
F1 Score:  0.0030
[[0 0 0 ... 0 0 0]
 [2 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]

[Sklearn GMM]
Accuracy: 0.0000
F1 Score:  0.0000
[[0 0 0 ... 0 0 0]
 [5 0 0 ... 0 0 0]
 [0 4 0 ... 0 0 0]
 ...
 [0 1 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]

[KNN]
Accuracy: 0.5550
F1 Score:  0.4893
[[2 1 0 ... 0 0 0]
 [0 5 0 ... 0 0 0]
 [0 0 4 ... 0 0 0]
 ...
 [0 0 0 ... 5 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]

===== n_components = 40 =====

[GMM]
Accuracy: 0.0050
F1 Score:  0.0041
[[0 0 0 ... 0 0 0]
 [2 0 0 ... 0 0 0]
 [0 4 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 1 0 0]
 [0 0 0 ... 0 0 0]]

[Sklearn GMM]
Accuracy: 0.0000
F1 Score:  0.0000
[[0 0 0 ... 0 0 0]
 [3 0 0 ... 0 0 0]
 [0 5 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]

[KNN]
Accuracy: 0.3050
F1 Score:  0.2410
[[2 1 0 ... 0 0 0]
 [0 5 0 ... 0 0 0]
 [0 0 4 ... 0 0 0]
 ...
 [0 0 0 

# Auto Encoder