<a href="https://colab.research.google.com/github/Nwanna-Joseph/GaussianMixtureModel/blob/main/GMM_Test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt


class KMeans():
    def __init__(self, K, epsilon):
        self.K = K
        self.epsilon = epsilon

    def initialize(self, X, K):
        """
        Initilize K random Centroids

        """
        indices = np.arange(len(X))
        np.random.shuffle(indices)
        X = np.array(X)
        X = X[indices]
        Centroids = X[:K]
        return Centroids

    def EuclideanDistance(self, x1, x2):
        """
        The sum of the squared differences of the elements

        """
        dist = np.sqrt(np.sum((x1 - x2) ** 2))
        return dist

    def Classify(self, centroids, X):
        """
        Classify item to the cluster with minimum distance

        """

        distances = np.zeros((len(centroids), len(X)))
        for i in range(len(centroids)):
            for j in range(len(X)):
                distances[i][j] = self.EuclideanDistance(centroids[i], X[j])

        labels = np.argmin(distances, axis=0)

        return labels

    def update_centroids(self, Assignments, X):
        """
        Calculate the new means

        """
        centroids = np.zeros((self.K, X.shape[1]))
        """
        for i in range(self.K):
          arr = np.array([X[j] for j,q in enumerate(Assignments) if q==i])
          centroids[i] = np.sum(arr, axis=0)/len(arr)
        """
        for i in range(self.K):
            centroids[i] = np.mean(X[Assignments == i], axis=0)

        return centroids

    def fit(self, data):
        """
        Fit the K-Means algorithm until no changes happens or changes < epsilon

        """
        old_centroids = self.initialize(data, self.K)
        assignments = self.Classify(old_centroids, np.array(data))

        error = 1

        while (error >= self.epsilon):
            new_centroids = self.update_centroids(assignments, np.array(data))
            assignments = self.Classify(new_centroids, np.array(data))
            error = np.mean(np.abs(new_centroids - old_centroids))
            print("loss", error)
            old_centroids = new_centroids

        return assignments, new_centroids

    def visualize_clusters(self, X, assignments, new_centroids):

        num_centers = max(assignments)

        for i in range(num_centers + 1):
            c_ = [index for index, value in enumerate(assignments) if value == i]
            c = X.loc[c, :]

            plt.scatter(
                c.iloc[:, 0], c.iloc[:, 1],
                s=50,
                marker='s', edgecolor='black',
                label='cluster' + str(i + 1)
            )

        # plot the centroids

        plt.scatter(
            new_centroids[:, 0], new_centroids[:, 1],
            s=250, marker='*',
            c='red', edgecolor='black',
            label='centroids'
        )

        plt.legend(scatterpoints=1)
        plt.grid()
        plt.show()


In [None]:
import numpy as np
class Gaussian:
  def __init__(self, mean, std, weight):
    self.mean = mean
    self.std = std
    self.weight = weight

    self.weights = []

class GMM:
  # https://youtu.be/Dn6b9fCIUpM, https://youtu.be/q71Niz856KE

  def __init__(self,unlabelled_data, centers):
    self.k = len(centers)
    self.data = unlabelled_data
    self.gaussians = [Gaussian(i,1,1/self.k) for i in centers]

  def train(self,iter = 100):
    while(iter):
      self.assignGaussianChanceToDatapoints()
      # print("self.gaussians[0].weights",len(self.gaussians[0].weights),np.array(self.gaussians[0].weights))
      self.updateGaussianParameters()
      iter -= 1
      

  def calculateLikeliHood(self, data, gaussian):
    #PDF of gaussian distribution
    key = ( (data - gaussian.mean ) / np.abs(gaussian.std) )
    exp_value = ( (key * key) / -2 ) 
    return ( np.exp( exp_value ) ) / ( np.sqrt(2 * np.pi) * (gaussian.std) )


  def assignGaussianChanceToDatapoints(self):
    # an (N x M) matrix where N = number of datapoints and M = number of Gaussians

    for _gaussian in self.gaussians:
      _gaussian.weights = [] 

    for _point in self.data:
      #for each point
      point_weights = []
      point_weights_sum = 0
      for _gaussian in self.gaussians:
        #calculate the sum across all gaussians
        gaussian_weight = self.calculateLikeliHood(_point,_gaussian) * _gaussian.weight
        point_weights_sum += np.sum(gaussian_weight)
        point_weights.append(gaussian_weight)

      for i, _gaussian in enumerate(self.gaussians):
        # print("lll")
        _gaussian.weights.append(point_weights[i]/point_weights_sum)


  def updateGaussianParameters(self):
    # print(np.array(self.gaussians[0].weights))
    for _gaussian in self.gaussians:
      sum_of_weights = np.sum(_gaussian.weights)
      _gaussian.mean = np.sum( (_gaussian.weights * self.data) / sum_of_weights, axis=0 )
      _gaussian.std = np.sqrt( np.sum( np.array(_gaussian.weights) * np.linalg.norm(self.data - _gaussian.mean ) , axis = 0) / sum_of_weights )
      _gaussian.weight = sum_of_weights / len(self.data)
    # pass

  def calculateMaxLikelihoodOfMean(self, data):
    #https://youtu.be/Dn6b9fCIUpM
    return np.sum(data)/len(data)
  
  def calculateMaxLikelihoodOfSTD(self, data, mean_constant):
    #https://youtu.be/Dn6b9fCIUpM
    return np.sqrt( (np.sum( np.linalg(data - mean_constant))) / len(data) )

  def predict(self, unlabelled_data):
    #calculate the chance of each datapoint belonging to each k Gaussian
    pass



In [None]:
from sklearn.datasets import load_iris
data = load_iris()
train_data = data.data
train_data

In [None]:
len(train_data)

150

In [None]:
qX = np.array([[1, 2], [1, 4], [1, 0], [10, 2], [10, 4], [10, 0]])

# kmeans = KMeans( K=2, epsilon = 0.0001)
# labels, centers = kmeans.fit(train_data)

centers = np.array([
           [5.00566038, 3.36981132, 1.56037736, 0.29056604],
           [6.30103093, 2.88659794, 4.95876289, 1.69587629]
          ])
print(f"centers:\n {centers}")
gmm = GMM(train_data, centers)
gmm.train()

centers:
 [[5.00566038 3.36981132 1.56037736 0.29056604]
 [6.30103093 2.88659794 4.95876289 1.69587629]]


In [None]:
gmm.gaussians[0].std

array([3.60689391, 4.75644273, 4.49910027, 4.88608838])

In [None]:
for g in gmm.gaussians:
  print(g.std)

[3.60689391 4.75644273 4.49910027 4.88608838]
[3.60689391 4.75644273 4.49910027 4.88608838]


In [None]:
import numpy as np
from sklearn.mixture import GaussianMixture
X = np.array([[1, 2], [1, 4], [1, 0], [10, 2], [10, 4], [10, 0]])
gm = GaussianMixture(n_components=2, random_state=0).fit(train_data)
gm
# array([[10.,  2.],
#        [ 1.,  2.]])
# gm.predict([[0, 0], [12, 3]])
# array([1, 0])

GaussianMixture(n_components=2, random_state=0)

In [None]:
gm = GaussianMixture(n_components=2, random_state=0).fit(train_data)
gm.means_

array([[5.00600639, 3.4280142 , 1.46200203, 0.24599932],
       [6.26198886, 2.87199642, 4.90597719, 1.67599129]])

In [None]:
import numpy as np
import pandas as pd
class GMMl:
    '''
        This class is the implementation of the Gaussian Mixture Models 
        inspired by sci-kit learn implementation.
    '''
    def __init__(self, n_components, max_iter = 100, comp_names=None):
        '''
            This functions initializes the model by seting the following paramenters:
                :param n_components: int
                    The number of clusters in which the algorithm must split
                    the data set
                :param max_iter: int, default = 100
                    The number of iteration that the algorithm will go throw to find the clusters
                :param comp_names: list of strings, default=None
                    In case it is setted as a list of string it will use to
                    name the clusters
        '''
        self.n_componets = n_components
        self.max_iter = max_iter
        if comp_names == None:
            self.comp_names = [f"comp{index}" for index in range(self.n_componets)]
        else:
            self.comp_names = comp_names
        # pi list contains the fraction of the dataset for every cluster
        self.pi = [1/self.n_componets for comp in range(self.n_componets)]

    def multivariate_normal(self, X, mean_vector, covariance_matrix):
        '''
            This function implements the multivariat normal derivation formula,
            the normal distribution for vectors it requires the following parameters
                :param X: 1-d numpy array
                    The row-vector for which we want to calculate the distribution
                :param mean_vector: 1-d numpy array
                    The row-vector that contains the means for each column
                :param covariance_matrix: 2-d numpy array (matrix)
                    The 2-d matrix that contain the covariances for the features
        '''
        return (2*np.pi)**(-len(X)/2)*np.linalg.det(covariance_matrix)**(-1/2)*np.exp(-np.dot(np.dot((X-mean_vector).T, np.linalg.inv(covariance_matrix)), (X-mean_vector))/2)

    def fit(self, X):
        '''
            The function for training the model
                :param X: 2-d numpy array
                    The data must be passed to the algorithm as 2-d array, 
                    where columns are the features and the rows are the samples
        '''
        # Spliting the data in n_componets sub-sets
        new_X = np.array_split(X, self.n_componets)
        # Initial computation of the mean-vector and covarience matrix
        self.mean_vector = [np.mean(x, axis=0) for x in new_X]
        self.covariance_matrixes = [np.cov(x.T) for x in new_X]
        # Deleting the new_X matrix because we will not need it anymore
        del new_X
        for iteration in range(self.max_iter):
            ''' --------------------------   E - STEP   -------------------------- '''
            # Initiating the r matrix, evrey row contains the probabilities
            # for every cluster for this row
            self.r = np.zeros((len(X), self.n_componets))
            # Calculating the r matrix
            for n in range(len(X)):
                for k in range(self.n_componets):
                    self.r[n][k] = self.pi[k] * self.multivariate_normal(X[n], self.mean_vector[k], self.covariance_matrixes[k])
                    self.r[n][k] /= sum([self.pi[j]*self.multivariate_normal(X[n], self.mean_vector[j], self.covariance_matrixes[j]) for j in range(self.n_componets)])
                    # print(self.multivariate_normal(X[n], self.mean_vector[k], self.covariance_matrixes[k]))
            # Calculating the N
            N = np.sum(self.r, axis=0)
            ''' --------------------------   M - STEP   -------------------------- '''
            # Initializing the mean vector as a zero vector
            self.mean_vector = np.zeros((self.n_componets, len(X[0])))
            # Updating the mean vector
            for k in range(self.n_componets):
                for n in range(len(X)):
                    self.mean_vector[k] += self.r[n][k] * X[n]
            self.mean_vector = [1/N[k]*self.mean_vector[k] for k in range(self.n_componets)]
            # Initiating the list of the covariance matrixes
            self.covariance_matrixes = [np.zeros((len(X[0]), len(X[0]))) for k in range(self.n_componets)]
            # Updating the covariance matrices
            for k in range(self.n_componets):
                self.covariance_matrixes[k] = np.cov(X.T, aweights=(self.r[:, k]), ddof=0)
            self.covariance_matrixes = [1/N[k]*self.covariance_matrixes[k] for k in range(self.n_componets)]
            # Updating the pi list
            self.pi = [N[k]/len(X) for k in range(self.n_componets)]
    def predict(self, X):
        '''
            The predicting function
                :param X: 2-d array numpy array
                    The data on which we must predict the clusters
        '''
        probas = []
        for n in range(len(X)):
            probas.append([self.multivariate_normal(X[n], self.mean_vector[k], self.covariance_matrixes[k])
                           for k in range(self.n_componets)])
        
        cluster = []
        for proba in probas:
            cluster.append(self.comp_names[proba.index(max(proba))])
        return cluster

In [None]:
gm = GMMl(2)
gm.fit(train_data)

In [None]:
gm.mean_vector

[array([5.14153846, 3.18461538, 2.01384615, 0.45538462]),
 array([6.38      , 2.96      , 5.09176471, 1.76823529])]

In [None]:
gm.mean_vector