In [4]:
import numpy as np
import pandas as pd
from ucimlrepo import fetch_ucirepo 
from kmodes.kmodes import KModes

from sklearn.metrics import fowlkes_mallows_score, adjusted_rand_score, normalized_mutual_info_score
from sklearn.metrics.cluster import adjusted_rand_score, normalized_mutual_info_score
from sklearn.metrics import fowlkes_mallows_score
from sklearn.preprocessing import LabelEncoder
from scipy.special import logsumexp

class StochasticBernoulliMixture:
    
    def __init__(self, n_components, max_iter, batch_size=100, tol=1e-3):
        self.n_components = n_components
        self.max_iter = max_iter
        self.batch_size = batch_size
        self.tol = tol
    
    def fit(self, x):
        self.x = x
        self.init_params()
        log_bernoullis = self.get_log_bernoullis(self.x)
        self.old_logL = self.get_log_likelihood(log_bernoullis)
        for step in range(self.max_iter):
            if step > 0:
                self.old_logL = self.logL
            self.gamma = self.get_responsibilities(log_bernoullis)
            self.remember_params()
            self.get_Neff()
            self.get_mu(self.x)
            self.get_pi()
            log_bernoullis = self.get_log_bernoullis(self.x)
            self.logL = self.get_log_likelihood(log_bernoullis)
            if np.isnan(self.logL):
                self.reset_params()
                print(self.logL)
                break

    def iterate_batches(self):
        n_samples = len(self.x)
        indices = np.arange(n_samples)
        np.random.shuffle(indices)
        for start_idx in range(0, n_samples, self.batch_size):
            end_idx = min(start_idx + self.batch_size, n_samples)
            batch_indices = indices[start_idx:end_idx]
            yield self.x.iloc[batch_indices]

    def reset_params(self):
        self.mu = self.old_mu.copy()
        self.pi = self.old_pi.copy()
        self.gamma = self.old_gamma.copy()
        self.get_Neff()
        log_bernoullis = self.get_log_bernoullis(self.x)
        self.logL = self.get_log_likelihood(log_bernoullis)
        
    def remember_params(self):
        self.old_mu = self.mu.copy()
        self.old_pi = self.pi.copy()
        self.old_gamma = self.gamma.copy()
    
    def init_params(self):
        self.n_samples = self.x.shape[0]
        self.n_features = self.x.shape[1]
        self.pi = 1/self.n_components * np.ones(self.n_components)
        self.mu = np.random.RandomState(seed=0).uniform(low=0.25, high=0.75, size=(self.n_components, self.n_features))
        self.normalize_mu()
        self.old_mu = None
        self.old_pi = None
        self.old_gamma = None
    
    def normalize_mu(self):
        sum_over_features = np.sum(self.mu, axis=1)
        for k in range(self.n_components):
            self.mu[k,:] /= sum_over_features[k]
            
    def get_responsibilities(self, log_bernoullis):
        gamma = np.zeros(shape=(log_bernoullis.shape[0], self.n_components))
        Z =  logsumexp(np.log(self.pi[None,:]) + log_bernoullis, axis=1)
        for k in range(self.n_components):
            gamma[:, k] = np.exp(np.log(self.pi[k]) + log_bernoullis[:,k] - Z)
        return gamma
        
    def get_log_bernoullis(self, x):
        log_bernoullis = self.get_save_single(x, self.mu)
        log_bernoullis += self.get_save_single(1-x, 1-self.mu)
        return log_bernoullis
    
    def get_save_single(self, x, mu):
        epsilon = 1e-15
        mu_place = np.clip(mu, epsilon, 1 - epsilon)
        return np.tensordot(x, np.log(mu_place), (1,1))

    def get_Neff(self):
        self.Neff = np.sum(self.gamma, axis=0)
    
    def get_mu(self, batch):
        self.mu = np.einsum('ik,id -> kd', self.gamma, batch) / self.Neff[:, None]
        
    def get_pi(self):
        self.pi = self.Neff / self.n_samples
    
    def predict(self, x):
        log_bernoullis = self.get_log_bernoullis(x)
        gamma = self.get_responsibilities(log_bernoullis)
        return np.argmax(gamma, axis=1)
        
    def get_sample_log_likelihood(self, log_bernoullis):
        return logsumexp(np.log(self.pi[None,:]) + log_bernoullis, axis=1)
    
    def get_log_likelihood(self, log_bernoullis):
        return np.mean(self.get_sample_log_likelihood(log_bernoullis))
        
    def score(self, x):
        log_bernoullis = self.get_log_bernoullis(x)
        return self.get_log_likelihood(log_bernoullis)
    
    def score_samples(self, x):
        log_bernoullis = self.get_log_bernoullis(x)
        return self.get_sample_log_likelihood(log_bernoullis)

In [5]:
import warnings
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
# Suppress the FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning)
from ucimlrepo import fetch_ucirepo   
# fetch dataset 
zoo = fetch_ucirepo(id=111) 

# data (as pandas dataframes) 
X = zoo.data.features
y = zoo.data.targets 
zoo_df = pd.merge(X, y, left_index=True, right_index=True)
zoo_df = zoo_df.dropna()

encoder = LabelEncoder()
encoded_df = zoo_df.copy()
for column in zoo_df.columns:
        if zoo_df[column].dtype == 'object':
            encoded_df[column] = encoder.fit_transform(zoo_df[column])

In [7]:
# Perform clustering
n_components = 3  # Number of clusters
max_iter = 100  # Maximum number of iterations
batch_size = 50  # Size of mini-batch

X =  encoded_df.iloc[:, :-1]
true_labels = encoded_df.iloc[:, -1]
# Initialize the StochasticBernoulliMixture model
stochastic_bmm = StochasticBernoulliMixture(n_components=n_components, max_iter=max_iter, batch_size=batch_size)

# Fit the model to the binary encoded data
stochastic_bmm.fit(X)

# Extract cluster assignments
labels = stochastic_bmm.predict(X)
if true_labels is not None:
        fmi = fowlkes_mallows_score(true_labels, labels)
        ari = adjusted_rand_score(true_labels, labels)
        nmi = normalized_mutual_info_score(true_labels, labels)

# Print the results
print("Stochastic Bernoulli Mixture:")
print("Adjusted Rand Index (ARI):", ari)
print("Normalized Mutual Information (NMI):", nmi)
print("Folkes-Mallows Index (FMI):", fmi)

Stochastic Bernoulli Mixture:
Adjusted Rand Index (ARI): 0.5973920064427068
Normalized Mutual Information (NMI): 0.6342850921689538
Folkes-Mallows Index (FMI): 0.7226653463342378


In [10]:
km = KModes(n_clusters = 3, max_iter = 100)
km_labels = km.fit_predict(X)
if true_labels is not None:
        fmi = fowlkes_mallows_score(true_labels, labels)
        ari = adjusted_rand_score(true_labels, labels)
        nmi = normalized_mutual_info_score(true_labels, labels)

# Print the results
print("KModes:")
print("Adjusted Rand Index (ARI):", ari)
print("Normalized Mutual Information (NMI):", nmi)
print("Folkes-Mallows Index (FMI):", fmi)

KModes:
Adjusted Rand Index (ARI): 0.5973920064427068
Normalized Mutual Information (NMI): 0.6342850921689538
Folkes-Mallows Index (FMI): 0.7226653463342378


<br>

$\textbf{Report}$
<br>
<br>
The similarity in performance measures between the Stochastic Bernoulli Mixture and KModes indicates that both clustering algorithms performed similarly on the provided dataset. The ratings for the Adjusted Rand Index (ARI), Normalized Mutual Information (NMI), and Folkes-Mallows Index (FMI) are consistent across both approaches, showing that they produced clusterings that were similar to ground truth labels and of equivalent quality. This shows that the underlying structures recorded by the two techniques are comparable, resulting in identical clustering results.