In [70]:
import numpy as np,pandas as pd
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_samples
from typing import Tuple
import numpy as np
import pandas as pd
from scipy.linalg import block_diag
from sklearn.utils import check_random_state
from typing import TypeVar
PandasDataFrame = TypeVar('pandas.core.frame.DataFrame')
import os
os.environ["OMP_NUM_THREADS"] = '1'
import warnings
warnings.filterwarnings('ignore')

In [71]:
#From previous chapter 
def cov2corr(cov):
    std = np.sqrt(np.diag(cov))
    corr = cov/np.outer(std,std)
    corr[corr<-1], corr[corr>1] = -1, 1 
    return corr

In [72]:
class Optimal_Clustering:
    """
    Updated K-means algorithm by:
    1) Introducing objective function as silhouette score and qualify clusters by it
    2) Dealing with initialization problem by having a first loop (which attemps different initialisations) 
        for anothers loops which check different amount of clusters and check quality in them
        -----------------------
        (New improvement of base algorithm:)
        
    3) Deals with clusters of inconsistent quality. The base clustering may capture the more distinct clusters, 
        while missing theless apparent ones
    """
    
    @staticmethod 
    def clusterKMeansBase(corr0: PandasDataFrame, maxNumClusters: int = 10, 
                          n_init: int = 10) -> Tuple[PandasDataFrame, int, pd.Series]:
        """
        Base algorithm for next improved version of clustering. Consisint of first two imporvement of K-means


        Parameters
        ----------
        :param corr0: correlation between of observations 

        :param maxNumClusters: - amount of clusters from 2 to maxNumClusters+1 that will be checked
            and evaluate quality for each amount of clusters

        :param n_init: number of different initialisation 0,1,..,init 
            to be checked to evaluate quality for each initialisation for each amount of clusters


        Returns
        -------
        corr1 - pd.DataFrame
            reduced correlation sorted by best quality observations 

        clstrs - set
            best clustering of observations

        silh -  pd.Series
            silhouette coefficient for each set of clusters

        """

        x = ((1 - corr0.fillna(0))/2.)**.5  #calculating the distance matrix            
        silh = pd.Series(dtype = 'float64')


        for init in range(n_init):
            for i in range(2, maxNumClusters + 1):


                kmeans_ = KMeans(n_clusters = i, n_init = 1)
                kmeans_ = kmeans_.fit(x)


                silh_ = silhouette_samples(x, kmeans_.labels_) #calculate Silhouette Coefficient for each sample
                stat = (silh_.mean()/silh_.std(), silh.mean()/silh.std()) #calculating clustering quality

                #if quality is better then previous one - remember new model and Silhouette Coefficient
                if np.isnan(stat[1]) or stat[0]>stat[1]: 
                    silh, kmeans = silh_, kmeans_

        #creating new distance matrix and sorting by quality
        newIdx = np.argsort(kmeans.labels_) 
        corr1 = corr0.iloc[newIdx] 
        corr1 = corr1.iloc[:,newIdx]

        #getting all clusters with different quality
        clstrs = {i:corr0.columns[np.where(kmeans.labels_ == i)[0]].tolist() for i in np.unique(kmeans.labels_) }  

        silh = pd.Series(silh, index = x.index, dtype = 'float64')

        return corr1, clstrs, silh


    def makeNewOutputs(self, corr0: PandasDataFrame, 
                       clstrs: set, clstrs2: set) -> Tuple[PandasDataFrame, int, pd.Series]:
        """
        function for combining  two different clusters with their quaility and correlation matrix

        Parameters
        ----------
        :param corr0: correlation between of observations 
        :param clstrs: first cluster of observation 
        :param  clstrs2: second cluster of observation


        Returns
        -------
        corrNew - pd.DataFrame
            united correlation of two clusters of observations 

        clstrsNew - set
            united list of clusters of observations

        silhNew -  pd.Series
            united silhouette coefficient for each set of clusters

        """

        clstrsNew = {}

        #getting unioun of lists of clusters
        for i in clstrs.keys():
            clstrsNew[len(clstrsNew.keys())] = list(clstrs[i])

        for i in clstrs2.keys():
            clstrsNew[len(clstrsNew.keys())] = list(clstrs2[i])

        #creating united correlation matrix    
        newIdx = [j for i in clstrsNew for j in clstrsNew[i]]
        corrNew = corr0.loc[newIdx, newIdx]
        x = ((1 - corr0.fillna(0))/2.)**.5 #creating new distance matrix
        kmeans_labels = np.zeros(len(x.columns))

        for i in clstrsNew.keys():
            idxs = [x.index.get_loc(k) for k in clstrsNew[i]]
            kmeans_labels[idxs] = i

        #checking Silhouette Coefficient for new united cluster
        silhNew = pd.Series(silhouette_samples(x,kmeans_labels), index = x.index, dtype = 'float64')

        return corrNew, clstrsNew, silhNew
    
    def clusterKMeansTop(self, corr0: PandasDataFrame,
                         maxNumClusters: int = None, n_init: int = 10) -> Tuple[PandasDataFrame, int, pd.Series]:
        """
        Main function of class that uses all 3 improvement of K-means clustering


        Parameters
        ----------
        :param corr0: correlation between of observations 

        :param maxNumClusters: - amount of clusters that will be checked
            and evaluate quality for each amount of clusters

        :param n_init: number of different initialisation 0,1,..,init 
            to be checked to evaluate quality for each initialisation for each amount of clusters

        Returns
        -------
        corr1 - pd.DataFrame
            reduced correlation sorted by best quality observations 

        clstrs - list
            best clustering of observations

        silh -  pd.Series
            silhouette coefficient for each set of clusters


        """
        #define maximum of clusters if got none
        if maxNumClusters == None: maxNumClusters = corr0.shape[1] - 1


        #evaluating base fucntion for clasterisation 
        corr1, clstrs, silh = self.clusterKMeansBase(corr0, maxNumClusters = 
                                                min(maxNumClusters, corr0.shape[1] - 1), n_init = n_init)

        #evaluate the quality of each cluster
        clusterTstats = {i: np.mean(silh[clstrs[i]])/ (np.std(silh[clstrs[i]])+0.001) for i in clstrs.keys()}


        #finding set of clusters with quality below average
        tStatMean = sum(clusterTstats.values())/len(clusterTstats)
        redoClusters=[i for i in clusterTstats.keys() if clusterTstats[i] < tStatMean]


        #if returned 1 or less clusters then the base algorithm was optimal
        if len(redoClusters) <= 1:
            return corr1, clstrs, silh


        #if number of clusters with quality below average is more than one 
        # we need to rerun clustering in those clusters, while rest considered well clustered
        else:
            keysRedo = [j for i in redoClusters for j in clstrs[i]]
            corrTmp = corr0.loc[keysRedo, keysRedo] #creating new corr maxtrix for reduced observations
                                                    #(those with clusters quality below average)


            #Checking mean quality for clusters with quality below average
            tStatMean = np.mean([clusterTstats[i] for i in redoClusters])
            corr2, clstrs2, silh2 = self.clusterKMeansTop(corrTmp, 
                                                     maxNumClusters = min(maxNumClusters, corrTmp.shape[1] - 1), 
                                                     n_init = n_init)


        #joining optimal clusters that were above average and new ones
        corrNew, clstrsNew, silhNew = self.makeNewOutputs(corr0, 
                                                     {i: clstrs[i] for i in clstrs.keys() if i not in redoClusters}, 
                                                     clstrs2)

        #Checking new mean quality
        newTstatMean = np.mean([np.mean(silhNew[clstrsNew[i]]) / (np.std(silhNew[clstrsNew[i]])+0.001) for i in clstrsNew.keys()])


        #comparing for improvement of algorithm and giving more optimal result
        if newTstatMean <= tStatMean:
            return corr1, clstrs, silh
        else:
            return corrNew, clstrsNew, silhNew
    

In [73]:
class Random_Block_Correlation_Matrices:
    """
    Generation of Random Block Correlation Matrices. Monte Carlo
    
    
    Parameters
    ----------
    :param nObs: number of observation 
    
    :param nCols: number of columns
    
    :param sigma: Standard deviation of the distribution
    
    :param random_state: fix the random by entering seed
    
    :paramn Blocks:
    
    :param minBlockSize:   
        
    """

    @staticmethod 
    def getCovSub(nObs: int, nCols: int, sigma: Tuple[float, int] , random_state: int = None) -> np.ndarray:
        """
        Generating random covariance matrix by number of observations and columns
        
        
        Returns
        -------
        
        ar0 - np.ndarray
            random covariance matrix where each column represents a variable, 
            while the rows contain observations.
        
        """

        rng = check_random_state(random_state)
        
        if nCols == 1:
            return np.ones((1,1))

        ar0 = rng.normal(size = (nObs,1))
        ar0 = np.repeat(ar0, nCols, axis = 1)
        ar0 += rng.normal(scale = sigma, size = ar0.shape)
        ar0 = np.cov(ar0, rowvar = False)
        
        
        return ar0


    def getRndBlockCov(self, nCols: int, nBlocks: int, minBlockSize: int = 1, 
                       sigma: Tuple[float, int] = 1., random_state: int = None) -> np.ndarray:
        
        """
        Generating random diagonal of covariance matrix
        
        
        Returns
        -------
        ar0 - np.ndarray
            random covariance matrix with zeros
        
        """

        rng = check_random_state(random_state)
        
        #random sample from a given array
        parts = rng.choice(range(1, nCols - (minBlockSize - 1)*nBlocks),
                           nBlocks - 1, replace = False)
        
        
        parts.sort()
        parts = np.append(parts, nCols - (minBlockSize-1)*nBlocks)
        parts = np.append(parts[0], np.diff(parts)) - 1 + minBlockSize
        cov = None
        
        
        for nCols_ in parts:
            #generate covariance matrix
            nObs = int(max(nCols_*(nCols_ + 1)/2., 100))
            cov_= self.getCovSub(nObs, nCols_, sigma, random_state = rng)
            
            
            if cov is None:
                cov = cov_.copy()
            else:
                cov = block_diag(cov, cov_)
                
        return cov
    
    def randomBlockCorr(self, nCols: int, nBlocks: int, random_state: int = None, minBlockSize: int = 1) -> pd.DataFrame():
        """
        Generating random block of corellation matrix
        
        
        Returns
        -------
        ar0 - np.ndarray
            random covariance matrix with zeros
        
        """

        rng = check_random_state(random_state)
        cov0 = self.getRndBlockCov(nCols, nBlocks, minBlockSize = minBlockSize, sigma = .5, random_state = rng)
        cov1 = self.getRndBlockCov(nCols, 1 ,minBlockSize = minBlockSize, sigma= 1., random_state = rng)
        cov0 += cov1
        corr0 = cov2corr(cov0)
        corr0 = pd.DataFrame(corr0)
        
        return corr0

### Checking of random generated data

In [81]:
rnd = Random_Block_Correlation_Matrices()

In [82]:
cor = rnd.randomBlockCorr(10,10,1)

In [83]:
oc = Optimal_Clustering()

In [84]:
corrNew, clstrsNew, silhNew = oc.clusterKMeansTop(cor)

In [85]:
cor

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,1.0,0.28713,0.302079,0.267462,0.290355,0.287395,0.281367,0.299143,0.287887,0.375113
1,0.28713,1.0,0.378757,0.309739,0.314949,0.250392,0.264539,0.293714,0.259873,0.262234
2,0.302079,0.378757,1.0,0.41265,0.314393,0.377985,0.383591,0.305856,0.341106,0.325173
3,0.267462,0.309739,0.41265,1.0,0.317591,0.194235,0.297797,0.191816,0.315236,0.229094
4,0.290355,0.314949,0.314393,0.317591,1.0,0.274655,0.205417,0.231482,0.227272,0.234002
5,0.287395,0.250392,0.377985,0.194235,0.274655,1.0,0.323916,0.278926,0.232884,0.280203
6,0.281367,0.264539,0.383591,0.297797,0.205417,0.323916,1.0,0.29093,0.268295,0.306712
7,0.299143,0.293714,0.305856,0.191816,0.231482,0.278926,0.29093,1.0,0.284218,0.292368
8,0.287887,0.259873,0.341106,0.315236,0.227272,0.232884,0.268295,0.284218,1.0,0.272804
9,0.375113,0.262234,0.325173,0.229094,0.234002,0.280203,0.306712,0.292368,0.272804,1.0


In [86]:
clstrsNew

{0: [2, 3, 6, 8], 1: [0, 5, 7, 9], 2: [1, 4]}

In [87]:
silhNew

0    0.024660
1    0.009376
2    0.025474
3    0.018410
4    0.036012
5    0.001010
6    0.009750
7    0.017933
8    0.024749
9    0.026864
dtype: float64