In [1]:
import pandas as pd
import numpy as np
import math

train_data = pd.read_csv("/content/train-gaussian.csv",skiprows=1,names=['class','x1','x2','x3'])
test_data = pd.read_csv("/content/test-gaussian.csv",skiprows=1,names=['class','x1','x2','x3'])


In [2]:
X = train_data[['x1','x2','x3']]
Y = train_data['class']
c1 = train_data.loc[train_data['class']=='A']
c2 = train_data.loc[train_data['class']=='B']
pr1= c1.shape[0]/train_data.shape[0]
pr2= c2.shape[0]/train_data.shape[0]

In [3]:
k = 16
centroids = []
for i in range(k):
  a = (X.sample()).to_numpy()[0]
  centroids.append(a)
centroids = np.array(centroids)

In [4]:
def dist(x, y):
  return np.linalg.norm(x-y)

In [5]:
def closestCentroid(c, x):
  assigned_cent = []
  for i in x:
    distance = []
    for j in c:
      distance.append(dist(i,j))
    assigned_cent.append(np.argmin(distance))
  return np.array(assigned_cent)

In [6]:
def genNewCent(X,cluster, k):
  centroidmu = []
  centroidsigma = []
  for j in range(k):
    xk = X[cluster == j]
    xk = pd.DataFrame(xk, columns = ['x1','x2','x3'])
    centroidmu.append(xk.mean())
    centroidsigma.append(xk.cov())
  return np.array(centroidmu), np.array(centroidsigma)

In [7]:
def pdf(x, mu, sigma):
  '''
  pdf for large dataset to utilize np array vectorization
  '''
  det = np.linalg.det(sigma)
  const = 1 / (((2 * np.pi) ** (mu.shape[0] / 2)) * (det ** 0.5))
  xmu = x - mu
  xmu = xmu.reshape(xmu.shape[0],xmu.shape[1],1)
  xmut = np.transpose(xmu, (0,2,1))
  test = xmut @ np.linalg.inv(sigma) @ xmu
  test = test.reshape(test.shape[0],test.shape[1])
  exponential = np.exp(-0.5 * test)
  return const * exponential

In [8]:
def pdf1x(x, mu, sigma):
  '''
  pdf for individual data points
  '''
  p1 = (1/(math.pow(2*math.pi,x.shape[0]/2) * np.linalg.det(sigma)))
  p2 = ((x-mu).T) @ np.linalg.inv(sigma) @ (x-mu)
  p3 = math.exp(-(1/2) * p2)
  return p1 * p3

In [9]:
# k-mean clustering initialization
Xdat = X.to_numpy()
for i in range(10):
  get_cent = closestCentroid(centroids,Xdat)
  centroids, centroidsigma = genNewCent(Xdat,get_cent,k)
w = np.ones((k)) /k

In [10]:
print(get_cent)
clusterTarget = []
for j in range(k):
  xk = Y[get_cent == j]
  clusY = xk.to_numpy()
  clusterTarget.append(clusY[int(clusY.shape[0]/2)])
  #print(clusY.median())
  

[ 0  0  0 ... 15 15 15]


In [11]:
likelihood = []
#E Step
for i in range(k):
  likelihood.append(pdf(Xdat, centroids[i], centroidsigma[i]))
likelihood = np.array(likelihood)
means = centroids.copy()
sigmas = centroidsigma.copy()
for i in range(k):
  prior = w[i] * likelihood[i] / (np.sum([likelihood[j] * w[j] for j in range(k)], axis=0))
  #M step
  means[i] = np.sum(prior * Xdat) / np.sum(prior)
  xmumle = Xdat - means[i]
  xmumle = xmumle.reshape(xmumle.shape[0],xmumle.shape[1],1)
  xmumlet = np.transpose(xmumle, (0,2,1))
  priorre = prior.reshape(prior.shape[0], prior.shape[1], 1)
  sigmas[i] = np.sum(priorre * (xmumle  @ xmumlet),axis=0)/np.sum(prior)
  w[i] = np.mean(prior)

In [12]:
correct = 0

for i in range(test_data.shape[0]):
  test = test_data.iloc[i]
  target = test.loc['class']
  test = test.loc[['x1','x2','x3']]
  test = test.to_numpy()
  #uncomment and inversely comment the choice of matrix
  maxscore = 0
  maxvalue = ''
  for j in range(k):
    #full covariance matrix
    score = pdf1x(test,means[j], sigmas[j]) * w[j]
    #diagonal covariance matrix
    #score = pdf1x(test,means[j], np.diag(np.diag(sigmas[j]))) * w[j]
    if score > maxscore:
      maxscore = score
      maxvalue = clusterTarget[j]
  choice = ''
  if target == maxvalue:
    correct += 1
print("classification rate:")
print(correct / test_data.shape[0])

classification rate:
0.7759036144578313
