In [1]:
import math
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal as mvn
from sklearn.metrics import DetCurveDisplay
from math import sqrt
import math
import time
import os

from numba import njit
import numba as nb

execfile("../sharedFunctions.py")

# K - Means

In [2]:
@njit
def kMeans(X, K, k_iterations):
    N = len(X)
    D = X[0].shape[0]
    
    # Randomly Choose k - points as the means
    mu = [ X[k] for k in range(K) ]
    cls = [ i for i in range(N) ]
    
    for _ in range(k_iterations):
        isSame = True
        
        # Assign Data points
        for j in range(N):
            mn = 0
            currCls = 0
            for k in range(K):
                currDist = np.linalg.norm(mu[k] - X[j])
                if(k == 0 or currDist < mn):
                    mn = currDist
                    currCls = k
            if(cls[j] != currCls):
                isSame = False
                cls[j] = currCls
        
        
        mu = [ np.zeros(D) for k in range(K) ]
        cnt = [ 0 for k in range(K) ]
        for i in range(N):
            mu[cls[i]] += X[i]
            cnt[cls[i]] += 1
        
        for i in range(K):
            mu[i] /= cnt[i]
        
        if(isSame): break
        
    return cls, mu

# GMM EM Algorithm

In [3]:
def my_mvn(mu, sigma, X):
    D = sigma.shape[0]
    return np.exp(-0.5 * (X-mu).T@np.linalg.pinv(sigma)@(X-mu))/(np.linalg.det(sigma))**0.5/(2 * 3.14)**(D/2)

# E Step
# Input: pi, mu, sigma, X
# Output: gamma
def e_step(pi, mu, sigma, X):
    N = len(X)
    K = len(mu)
    D = X[0].shape[0]
    
    gamma = np.zeros([N, K], dtype = np.float64)
    for n in range(N):
        denom = 0.0
        for j in range(K):
#             denom += pi[j] * mvn(mu[j],sigma[j], allow_singular=False).pdf(X[n])
            denom += pi[j] * my_mvn(mu[j],sigma[j], X[n])
        for k in range(K):
#             gamma[n][k] = pi[k] * mvn(mu[k],sigma[k], allow_singular=False).pdf(X[n]) / denom
            gamma[n][k] = pi[k] * my_mvn(mu[k],sigma[k],X[n]) / denom
            
    return gamma

# M Step
# Input:
    # gamma - 2D numpy array N * K => N points, K mixtures
    # x - N length list of numpy arrays of dim([D, 1]) => N points, D features
# Output:
    # theta_new = [pi, mu, sigma]
def m_step(gamma, X):
    N = gamma.shape[0]
    K = gamma.shape[1]
    D = X[0].shape[0]
    
    # Compute Nk for every class
    Nk = np.zeros(K, dtype = np.float64)
    for k in range(K):
        for n in range(N):
            Nk[k] += gamma[n][k]

    # Compute pi
    pi = np.zeros(K, dtype = np.float64)
    for k in range(K):
        pi[k] = Nk[k] / N

    # Compute mu
    mu = [ np.zeros(D, dtype = np.float64) for k in range(K) ]
    for k in range(K):
        for n in range(N):
            mu[k] += gamma[n][k] * X[n]
        mu[k] /= Nk[k]
        
    # Compute sigma
    sigma = [ np.matrix(np.zeros((D, D), dtype = np.float64)) for k in range(K) ]
    for k in range(K):
        for n in range(N):
            sigma[k] += gamma[n][k] * (np.matrix(X[n] - mu[k]).T @ np.matrix(X[n] - mu[k]))
        sigma[k] /= Nk[k]
    
    return pi, mu, sigma

# GMM
def GMM(X, K, iterations, k_iterations):
    N = len(X)
    D = X[0].shape[0]
    
    # Initialise pi, mu, sigma using k-Means
    cls, mu = kMeans(X, K , k_iterations)
    print("K-Means Initialisation Done")
    pi = [ 0 for i in range(K) ]
    sigma = [ np.matrix(np.zeros([D, D], dtype = np.float64)) for k in range(K) ]
    Nk = [ 0 for i in range(K) ]
    
    for i in range(N):
        currCls = cls[i]
        pi[currCls] += 1
        Nk[currCls] += 1
        sigma[currCls] += np.matrix(X[i] - mu[currCls]).T @ np.matrix(X[i] - mu[currCls])
        
    for i in range(K):
        pi[i] /= N
        sigma[i] /= Nk[i]
        if(np.linalg.det(sigma[i]) == 0): print(sigma)
    
    for _ in range(iterations):
        start_time = time.time()
        gamma = e_step(pi, mu, sigma, X)
        print("E STEP")
        print("--- %s seconds ---" % (time.time() - start_time))
        
        start_time = time.time()
        pi_new, mu_new, sigma_new = m_step(gamma, X)
        print("M STEP")
        print("--- %s seconds ---\n\n" % (time.time() - start_time))
        
    return pi, mu, sigma

# Other Common Functions

In [4]:
# Convert normal matrix to diagonal matrix
def make_dcov(cov):
    dim = len(cov)
    dcov = np.matrix(np.zeros(shape = (dim,dim)))
    for i in range(dim) : dcov[i,i] = cov[i,i]
    return dcov

# Synthetic Data

X = []
N = []
nClasses = 0

# Directory to save images
dir_path = 'Results/Synthetic_Png/'

with open('Synthetic/train.txt') as train_syn:
    lines = train_syn.readlines()
    
    for line in lines:
        _, _, c = line.strip().split(',')
        nClasses = max(nClasses, int(c))
        
    N = [ 0 for _ in range(len(lines)) ]
    X = [ [] for _ in range(nClasses) ]
    for line in lines:
        x, y, c = line.strip().split(',')
        x, y, c = float(x), float(y), int(c) - 1
        X[c].append(np.array([x, y]))
        N[c] += 1
        
"""Scatter Plot visualization of training data """      
for c in range(nClasses):
    x = []
    y = []
    for a,b in X[c]:
        x.append(a)
        y.append(b)
    plt.scatter(x,y)
plt.savefig(dir_path + 'syn_train.png', dpi = 150)
plt.clf()

X_dev = []
groundtruth_dev = []
nTests = 0
N_dev = []
nClasses_dev = 0

with open('Synthetic/dev.txt') as dev_syn:
    lines = dev_syn.readlines()
    
    for line in lines:
        _, _, c = line.strip().split(',')
        nClasses_dev = max(nClasses_dev, int(c))
        
    N_dev = [0 for c in range(nClasses_dev)]
    for line in lines:
        nTests += 1
        x, y, c = line.strip().split(',')
        x, y, c = float(x), float(y), int(c) - 1
        X_dev.append(np.array([x, y]))
        groundtruth_dev.append(c)
        N_dev[c] += 1

"""Scatter Plot visualization of developement data """      
for c in range(nClasses_dev):
    x = []
    y = []
    for [a,b] in X_dev:
        x.append(a)
        y.append(b)
    plt.scatter(x,y)
plt.savefig(dir_path + 'syn_dev.png', dpi = 150)

"""WARNING : RUNNING OF THIS CELL WILL TAKE TOO LONG TIME """
""" Main part of program """
gmm_all_pis = []
gmm_all_means = []
gmm_all_covs = []
gmm_all_dcovs = []
# K_list = [2,3,4,5,6,11,16,20,25]
K_list = [2]
for K in K_list:
    all_pis = []
    all_means = []
    all_covs = []
    all_dcovs = []

    for c in range(nClasses):
        pis,means,covs = GMM(X[c],K,4,20)
        dcovs = []
        for cov in covs:dcovs.append(make_dcov(cov))
        all_pis.append(pis)
        all_means.append(means)
        all_covs.append(covs)
        all_dcovs.append(dcovs)

    gmm_all_pis.append(all_pis)
    gmm_all_means.append(all_means)
    gmm_all_covs.append(all_covs)
    gmm_all_dcovs.append(all_dcovs)

""" ROC PLOTS """

leg = []
for idx in range(len(K_list)):
    K = K_list[idx]

    
    all_pis = gmm_all_pis[idx]
    all_means = gmm_all_means[idx]
    all_covs = gmm_all_covs[idx]
    
    
    
    all_likelihood_dev = [[] for c in range(nClasses)]
    all_priors_dev = [0.5,0.5]
    for c in range(nClasses):
        for [x,y] in X_dev:
            fvec = np.array([x,y])
            all_likelihood_dev[c].append(likelihood(fvec,all_pis[c],all_means[c],all_covs[c],K))
            
    area = ROC(all_likelihood_dev,all_priors_dev,nTests,nClasses_dev,groundtruth_dev)
    leg.append(f'Area of ROC for K = {K} is : {area}')
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.title('ROC Curves for GMM with different K')
plt.legend(leg)

path = dir_path + 'ROC.svg'
plt.savefig(path)
plt.show()
plt.clf()

ax = plt.gca()
for idx in range(len(K_list)):
    K = K_list[idx]

    
    all_pis = gmm_all_pis[idx]
    all_means = gmm_all_means[idx]
    all_covs = gmm_all_covs[idx]
    
    
    
    all_likelihood_dev = [[0 for c in range(nClasses)] for n in range(nTests)]
    all_priors_dev = [0.5,0.5]
    for c in range(nClasses):
        for i in range(len(X_dev)):
            x = X_dev[i][0]
            y = X_dev[i][1]
            fvec = np.array([x,y])
            all_likelihood_dev[i][c] = likelihood(fvec,all_pis[c],all_means[c],all_covs[c],K)
            
    FPR,FNR = DET(all_likelihood_dev,all_priors_dev,nTests,nClasses_dev,groundtruth_dev)
    DetCurveDisplay(fpr = FPR, fnr = FNR, estimator_name = 'K = ' + str(K)).plot(ax)
#     leg.append(f'Area of DET for K = {K} is : {area}')
plt.xlabel('FPR')
plt.ylabel('FNR')
plt.title('DET Curves for GMM with different K')
# plt.legend(leg)

path = dir_path + 'DET.svg'
plt.savefig(path)
plt.show()
plt.clf()

# Plotting contour-plots of all GMM's, decision surface for synthetic data
def contour_decision_plot(all_pis,all_means,all_covs,K,nClasses,path):
    
    # Contour plots
    for c in range(nClasses):
        for k in range(K):

            mean = all_means[c][k]
            
            count = 100
            x = np.linspace(mean[0] - 3, mean[0] + 3, count) 
            y = np.linspace(mean[1] - 3, mean[1] + 3, count)
            X,Y = np.meshgrid(x,y)
    
            cov = all_covs[c][k]

            Z = np.zeros(X.shape)
            for i in range(X.shape[0]):
                for j in range(X.shape[1]):
                    Z[i,j] = mvn(mean,cov, allow_singular=True).pdf(np.array([X[i,j],Y[i,j]]))
            plt.scatter([mean[0]], [mean[1]])
            plt.contour(X, Y, Z, levels = [0.2, 0.3, 0.4, 0.5, 0.6], cmap = plt.get_cmap('magma'))
    
    # Decision Surface
    # Assuming nClasses := 2
    count = 100
    x = np.linspace(-16, -1, count) 
    y = np.linspace(-10, 6, count)
    X,Y = np.meshgrid(x,y)
    
    Z = np.zeros(X.shape)
    for i in range(X.shape[0]):
        for j in range(X.shape[1]):
            x = X[i,j]
            y = Y[i,j]

            fvec = np.array([x,y])
            p1 = likelihood(fvec,all_pis[0],all_means[0],all_covs[0],K)
            p2 = likelihood(fvec,all_pis[1],all_means[1],all_covs[1],K)
            
            if(p1 >= p2) : Z[i,j] = 1
            else : Z[i,j] = 2
    
    plt.xlabel('X-axis')
    plt.ylabel('Y-axix')
    plt.title(f'Decision and Contour plot for K = {K}')
    plt.contourf(X,Y,Z,cmap=plt.cm.jet, alpha=.5)   
    plt.savefig(path,dpi = 150)

"""DECISION-CONTOUR PLOTS"""
for idx in range(len(K_list)):
    K = K_list[idx]
    
    all_pis = gmm_all_pis[idx]
    all_means = gmm_all_means[idx]
    all_covs = gmm_all_covs[idx]
    
    path = dir_path + 'K' + str(K) + '_contour_decision.png'
    contour_decision_plot(all_pis, all_means, all_covs, K, nClasses,path)
    plt.clf()
    

""" Printing/Saving plots for diagonal covariance matrix case """

""" ROC PLOTS """
leg = []
for idx in range(len(K_list)):
    K = K_list[idx]
    
    all_pis = gmm_all_pis[idx]
    all_means = gmm_all_means[idx]
    all_dcovs = gmm_all_dcovs[idx]
    
    all_likelihood_dev = [[] for c in range(nClasses)]
    all_priors_dev = [0.5,0.5]
    for c in range(nClasses):
        for [x,y] in X_dev:
            fvec = np.array([x,y])
            all_likelihood_dev[c].append(likelihood(fvec,all_pis[c],all_means[c],all_dcovs[c],K))
    area = ROC(all_likelihood_dev,all_priors_dev,nTests,nClasses_dev,groundtruth_dev)
    leg.append(f'Area of ROC for K = {K} is : {area}')
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.title('ROC Curves for Diagonal covariance GMM with different K')
plt.legend(leg)

path = dir_path + 'diag_ROC.svg'
plt.savefig(path)
plt.show()
plt.clf()

#######################################

ax = plt.gca()
for idx in range(len(K_list)):
    K = K_list[idx]

    
    all_pis = gmm_all_pis[idx]
    all_means = gmm_all_means[idx]
    all_dcovs = gmm_all_dcovs[idx]
    
    
    
    all_likelihood_dev = [[0 for c in range(nClasses)] for n in range(nTests)]
    all_priors_dev = [0.5,0.5]
    for c in range(nClasses):
        for i in range(len(X_dev)):
            x = X_dev[i][0]
            y = X_dev[i][1]
            fvec = np.array([x,y])
            all_likelihood_dev[i][c] = likelihood(fvec,all_pis[c],all_means[c],all_dcovs[c],K)
            
    FPR,FNR = DET(all_likelihood_dev,all_priors_dev,nTests,nClasses_dev,groundtruth_dev)
    DetCurveDisplay(fpr = FPR, fnr = FNR, estimator_name = 'K = ' + str(K)).plot(ax)
#     leg.append(f'Area of DET for K = {K} is : {area}')
plt.xlabel('FPR')
plt.ylabel('FNR')
plt.title('DET Curves for Diagonal Covariances GMM with different K')
# plt.legend(leg)

path = dir_path + 'diag_DET.svg'
plt.savefig(path)
plt.show()
plt.clf()

######################################


"""DECISION-CONTOUR PLOTS"""
for idx in range(len(K_list)):
    K = K_list[idx]
    
    all_pis = gmm_all_pis[idx]
    all_means = gmm_all_means[idx]
    all_dcovs = gmm_all_dcovs[idx]
    
    path = dir_path + 'diag_K' + str(K) + '_contour_decision.png'
    contour_decision_plot(all_pis, all_means, all_dcovs, K, nClasses,path)
    plt.clf()


# Real Data

In [5]:
# Data Extraction from files
def extractImgData(dir):
    files = os.listdir(dir)
    images = []
    for file in files:
        with open(dir + file, 'r') as f:
            blocks = []
            for line in f.readlines():
                blocks.append(np.array(line.split(), dtype=np.float64))
            images.append(blocks)
    return images

trainData = []
classList = ['coast', 'forest', 'highway', 'mountain', 'opencountry']
nClasses_Real = len(classList)
for cls in classList:
    trainData.append(extractImgData('RealData/' + cls + '/train/'))

X_dev = []
nTests = 0
groundtruth_dev = []
for i in range(len(classList)):
    cls = classList[i]
    images = extractImgData('RealData/' + cls + '/dev/')
    for image in images:
        nTests += 1
        X_dev.append(image)
        groundtruth_dev.append(i)

In [6]:
""" Warning this cell takes a lot of time to run """
gmm_all_pis = []
gmm_all_means = []
gmm_all_covs = []
gmm_all_dcovs = []
# K_list = [2,3,4,5,6,11,16,20,25]
K_list = [2]
for K in K_list:
    all_pis = []
    all_means = []
    all_covs = []
    all_dcovs = []

    for c in range(nClasses_Real):
        X = []
        for image in trainData[c]:
            for block in image:
                X.append(block)
        pis,means,covs = GMM(X,K,4,10)
        dcovs = []
        for cov in covs:dcovs.append(make_dcov(cov))
        all_pis.append(pis)
        all_means.append(means)
        all_covs.append(covs)
        all_dcovs.append(dcovs)

    gmm_all_pis.append(all_pis)
    gmm_all_means.append(all_means)
    gmm_all_covs.append(all_covs)
    gmm_all_dcovs.append(all_dcovs)

Encountered the use of a type that is scheduled for deprecation: type 'reflected list' found for argument 'X' of function 'kMeans'.

For more information visit https://numba.readthedocs.io/en/stable/reference/deprecation.html#deprecation-of-reflection-for-list-and-set-types

File "../../../../../../../var/folders/gw/twbzw73141s0st9tn4gddx2m0000gn/T/ipykernel_9812/3470100165.py", line 1:
<source missing, REPL/exec in use?>



K-Means Initialisation Done
E STEP
--- 6.37533164024353 seconds ---
M STEP
--- 0.45465898513793945 seconds ---




KeyboardInterrupt: 

In [None]:
dir_path = 'Results/Real_Png'

# Calculate likelihood probability value for given image
def log_likelihood(image,pis,means,covs,K):
    log_likelihood = 0
    for block in image:
        prob_block = 0
#         print(K)
        for i in range(K):
            prob_block += pis[i] * mvn(means[i],covs[i], allow_singular=True).pdf(block)
#         print(prob_block)
#         log_likelihood += math.log2(prob_block)
        log_likelihood *= prob_block
    return log_likelihood

for idx in range(len(K_list)):
    
    K = K_list[idx]

    all_pis = gmm_all_pis[idx]
    all_means = gmm_all_means[idx]
    all_covs = gmm_all_covs[idx]
    
    all_likelihood_dev = [[] for c in range(nClasses_Real)]
    all_priors_dev = [1 / nClasses_Real for c in range(nClasses_Real)]

    conf_matrix = [[0 for i in range(5)] for j in range(5)]
    cnt = 0
    for image in X_dev:
        LL = np.zeros(nClasses_Real)
        for c in range(nClasses_Real):
            LL[c] = log_likelihood(image,all_pis[c],all_means[c],all_covs[c],K)
        trueCls = groundtruth_dev[cnt]
        predCls = np.argmin(LL)
        conf_matrix[trueCls][predCls] += 1
        cnt += 1

# """ ROC PLOTS """
# for idx in range(len(K_list)):
#     K = K_list[idx]

#     all_pis = gmm_all_pis[idx]
#     all_means = gmm_all_means[idx]
#     all_covs = gmm_all_covs[idx]
    
#     all_likelihood_dev = [[] for c in range(nClasses_Real)]
#     all_priors_dev = [1 / nClasses_Real for c in range(nClasses_Real)]
#     for c in range(nClasses_Real):
#         for image in X_dev:
#             all_likelihood_dev[c].append(log_likelihood(image,all_pis[c],all_means[c],all_covs[c],K))

#     area = ROC(all_likelihood_dev,all_priors_dev,nTests,nClasses_Real,groundtruth_dev)

# plt.xlabel('FPR')
# plt.ylabel('TPR')
# plt.title('ROC Curves for GMM with different K')

# path = dir_path + 'ROC.svg'
# plt.savefig(path)
# plt.show()
# plt.clf()

# ax = plt.gca()
# for idx in range(len(K_list)):
#     K = K_list[idx]

#     all_pis = gmm_all_pis[idx]
#     all_means = gmm_all_means[idx]
#     all_covs = gmm_all_covs[idx]
    
#     all_likelihood_dev = [[] for n in range(nClasses_Real)]
#     all_priors_dev = [1 / nClasses_Real for c in range(nClasses_Real)]
#     for c in range(nClasses_Real):
#         for image in X_dev:
#             all_likelihood_dev[c].append(log_likelihood(image,all_pis[c],all_means[c],all_covs[c],K))
            
#     FPR,FNR = DET(all_likelihood_dev,all_priors_dev,nTests,nClasses_Real,groundtruth_dev)
#     DetCurveDisplay(fpr = FPR, fnr = FNR, estimator_name = 'K = ' + str(K)).plot(ax)

# plt.xlabel('FPR')
# plt.ylabel('FNR')
# plt.title('DET Curves for GMM with different K')

# path = dir_path + 'DET.svg'
# plt.savefig(path)
# plt.show()
# plt.clf()

In [None]:
print(conf_matrix)