In [1]:
import numpy as np
from sklearn.datasets import make_spd_matrix
from scipy.stats import multivariate_normal
import pandas as pd
from collections import Counter
from sklearn.preprocessing import StandardScaler


In [2]:
class k_means():
    
    def __init__(self,n_components,dataset):
        
        self.n_components = n_components
        self.dataset = dataset
        self.covariance_matrix = []
        self.weights = []
        self.cluster_centroid = []
        self.scaler = StandardScaler()
        self.fit_kmeans()
        
    def final_covariance_matrix(self,points):
        '''For generating the covariance matrix of each cluster'''
        for cluster,points in points.items():
            self.covariance_matrix.append(np.cov(np.asarray(points).T))
        
        
    def cluster_assigment_step(self):
        '''Cluster assignment step based on euclidean distance b/w point and cluster'''
        
        cluster_assignment = {k:[] for k in range(self.n_components)}
        for point in self.dataset:
            cluster_dist =[]
            for cluster in self.cluster_centroid:
                cluster_dist.append(np.sqrt(np.sum((point-cluster)**2)))
            located_cluster = cluster_dist.index(min(cluster_dist))
            cluster_assignment[located_cluster].append(point)


        groupby_points_previous = {cluster:len(points) for cluster,points in cluster_assignment.items()}
        
        return groupby_points_previous,cluster_assignment

    def scaled_dataset(self):
        '''Scaling the dataset'''
        
        self.dataset = self.scaler.fit_transform(self.dataset)
        
        
    
    def fit_kmeans(self):
        '''Fitting k-means with convergence criterion that no. of points assigned to different cluster do not change'''
        #sclaing of the dataset
        self.scaled_dataset()
        
        #initialisation of cluster centroid
        
        np.random.shuffle(dataset)
        
        self.cluster_centroid = [self.dataset[i] for i in range(self.n_components)]
        
        Counter_list = []
        
        while True:
            
            # Assignment step
            
            grouped_cluster_previous,assigned_clusters_previous = self.cluster_assigment_step()
            
            # Cluster centroid update step 
            self.cluster_centroid = [np.mean(np.asarray(points),axis = 0) for cluster,points in assigned_clusters_previous.items()]
            
            groupby_points_next,assigned_clusters_new = self.cluster_assigment_step()
            
            # check for no changes in assignment of points to the cluster
            if groupby_points_next==grouped_cluster_previous:
                break
        #calcuation of weights after clustering using kmeans, for initialising for GMM
        self.weights = [value/sum([value for value in groupby_points_next.values()]) for key,value in groupby_points_next.items()]
        self.final_covariance_matrix(assigned_clusters_new)
        
            
        
        
        
    

In [4]:
class Gaussian_mixture_models(k_means):
    
    def __init__(self,n_components,dataset,max_iter=100,conv_threshold=.001):
        # Initialising cluster centroids and cluster covariances matrix using kmeans
        k_means.__init__(self,n_components,dataset) 
        self.max_iter = max_iter
        self.conv_threshold = conv_threshold
        self.dim = self.dataset.shape[1]
        self.n_iter_ = 0
        self.converged_ = False
        self.eps = 0 # for zero division error
        self.log_likehood= []
    
    
    def calculate_likelihood(self):
        '''calclulating pdf of samples using scipy multivariate normal distribution'''
        likelihood = []
        responsbility = []
        for i in range(self.n_components):
            likelihood.append(multivariate_normal.pdf(self.dataset,self.cluster_centroid[i],self.covariance_matrix[i]))
        for i in range(self.n_components):
            responsbility.append((self.weights[i]*likelihood[i])/(np.sum([self.weights[i]*likelihood[i] for i in range(self.n_components)],axis = 0)+self.eps))
        return responsbility
    
    def m_step(self,responsbility):
        '''Parameters update step'''
        
        for i in range(self.n_components):
            self.cluster_centroid[i]=np.sum(responsbility[i].reshape(len(self.dataset),1)*self.dataset,axis = 0)/(np.sum(responsbility[i])+self.eps)
            self.covariance_matrix[i] = np.dot((responsbility[i].reshape(len(self.dataset),1)*(self.dataset-self.cluster_centroid[i])).T,
                                                    (self.dataset-self.cluster_centroid[i]))/(np.sum(responsbility[i])+self.eps)
            self.weights[i]=np.mean(responsbility[i])
                                             
        
    
    def calculate_log_likelihood(self,mean_vector,covariance_matrix,weights):
        '''Calculation of complete log-likelihood across all the clusters and data points'''
        
        sum_across_cluster = np.sum([weights[i]*multivariate_normal.pdf(self.dataset,mean_vector[i],covariance_matrix[i]) for i in range(self.n_components)],1)
        
        
        sum_across_data_points = np.sum(np.log(sum_across_cluster))
        
        return sum_across_data_points
    
    def fit(self):
        '''Iterative training of parameters using Expectation-Maximisation algorithm'''
        
        for i in range(self.max_iter):
            initial_likelihood = self.calculate_log_likelihood(self.cluster_centroid,self.covariance_matrix,self.weights)
            self.log_likehood.append(initial_likelihood)    
            
            self.n_iter_+=1
            
            # calculate responsbility (E-step)
            responsbility = self.calculate_likelihood()
            
            # calculate the updated parameters (M-step)
            self.m_step(responsbility)
            
            # calculate new loglikelihood for checking conevergence
            final_likelihood = self.calculate_log_likelihood(self.cluster_centroid,self.covariance_matrix,self.weights)
            
            if final_likelihood-initial_likelihood <= self.conv_threshold: #convergence criterion
                ''' Covergence to be achived when change in log-likelihood is less than convergence threshold'''
                self.converged_ = True
                break
        
        if self.converged_ == False:
            raise ValueError('Max iteration reached, either increase number of EM iteration or change conv_threshold')
        
    def predict(self,dataset):
        '''Prediction of cluster for each point'''
        # scaling of the dataset
        
        prediction = []
        scaled_dataset = self.scaler.transform(dataset)
        for point in scaled_dataset:
            cluster_prob = [multivariate_normal.pdf(point,mean,covariance) for mean,covariance in zip(self.cluster_centroid,self.covariance_matrix)]
            cluster = cluster_prob.index(max(cluster_prob))
            prediction.append(cluster)
        
        return prediction