In [None]:
#Imports

from sklearn import datasets, metrics
import numpy as np
import random
import math
from sklearn.cluster import KMeans 
from collections import Counter 

In [None]:
def load_data(augment=False, normalize=False):
    digits = datasets.load_digits()
    
    if augment:
        for image in digits.data:
            for pixel in image:
                if pixel<5:
                    pixel = 0
                elif pixel <10:
                    pixel = 1
                else:
                    pixel = 2
                    
    data=digits.data
    if normalize:
        data = np.zeros(np.shape(digits.data))
        for i,image in enumerate(digits.data):
            data[i] = image/16
    
    temp = list(zip(data,digits.target))
    random.shuffle(temp)

    train = temp[0:int(len(temp)*0.7)]
    test = temp[int(len(temp)*0.7):]

    train_data = []
    train_target = []
    for x, y in train:
        train_data.append(x)
        train_target.append(y)
    train_data = np.array(train_data)
    train_target= np.array(train_target)

    test_data = []
    test_target = []
    for x, y in test:
        test_data.append(x)
        test_target.append(y)
    test_data = np.array(test_data)
    test_target = np.array(test_target)
    
    return train_data,train_target,test_data,test_target, digits.target_names

In [None]:
class Model:
    
    def __init__(self, stop):
        self.stop_criteria = stop
    
    def fit(self,data,labels,n_classes):
        self.n_classes = n_classes
        self.n_attributes = len(data[0])
        self.n_pics = len(data)
        pi = np.random.rand(self.n_classes)
        means = np.random.rand(self.n_classes,self.n_attributes)
        variances = np.random.rand(self.n_classes,self.n_attributes)
        theta = [pi,means,variances]
        
        while True:
            r = self.E_step(data,theta)
            new_pi, new_mean, new_variances = self.M_step(r,data)
            diff_mean = np.max(abs(theta[1] - new_mean))
            diff_variances = np.max(abs(theta[2] - new_variances))
            print(diff_mean,diff_variances)
            if(diff_mean <self.stop_criteria and diff_variances <self.stop_criteria ):
                self.means = new_mean
                self.vars = new_variances
                self.theta = [new_pi, new_mean, new_variances]
                break
            theta = [new_pi, new_mean, new_variances]
        
        self.cluster2label = {x:[] for x in range(10)}
        for i ,pic in enumerate(data):
            index = self.predict_cluster(pic)
            self.cluster2label[index].append(labels[i])
        for i in range(len(self.cluster2label)):
            self.cluster2label[i] = self.most_frequent(self.cluster2label[i])
    
    def most_frequent(self,List): 
        occurence_count = Counter(List) 
        return occurence_count.most_common(1)[0][0]
            
    def predict_cluster(model,pic):
        cluster_prob=np.zeros(10)
        for number in range(model.n_classes):
            cluster_prob[number] = model.P_X_theta(pic,model.theta,number)
        return np.argmax(cluster_prob)
    
    def P_X_theta(self,x,theta,k):
        val = 1
        for j in range(self.n_attributes):
            exp = math.exp( -( (x[j]-theta[1][k][j])**2 ) / (2*theta[2][k][j]) )
            den = math.sqrt(2*math.pi*theta[2][k][j])
            val*=(exp/den)
        return val
        
    def E_step(self,X,theta):
        r = np.zeros((self.n_pics,self.n_classes))
        P_vec = np.zeros((self.n_pics,self.n_classes))
        for i in range(self.n_pics):
            for k in range(self.n_classes):
                P_vec[i][k] = self.P_X_theta(X[i],theta,k)
                
        den = np.sum(P_vec,axis=1)
        for i in range(self.n_pics):
            if den[i] == 0:
                r[i] == np.zeros(self.n_classes)
                continue
            for k in range(self.n_classes):
                r[i][k] = theta[0][k] * self.P_X_theta(X[i],theta,k)/den[i]
        return r
                
    def M_step(self,r,X):
        new_r = np.zeros(self.n_classes)
        for k in range(self.n_classes):
            for i in range(self.n_pics):
                new_r[k] += r[i][k]
        new_pi = new_r / self.n_pics
        
        new_mean = np.zeros((self.n_classes,self.n_attributes))
        new_variances = np.zeros((self.n_classes,self.n_attributes))
        
        for k in range(self.n_classes):
            val = 0
            val2 = 0
            for i in range(self.n_pics):
                val += r[i][k] * X[i]
                val2 += r[i][k] * np.outer(X[i],X[i])
            val /= new_r[k]
            val2 /= new_r[k] 
            new_mean[k] = val
            new_variances[k] = np.diag(val2 - np.outer(new_mean[k],new_mean[k]))
        
        new_variances += np.ones((10,64))*0.001
        return new_pi, new_mean, new_variances

train_data,train_target,test_data,test_target, labels = load_data(normalize=True)
model_2 = Model(0.005)
model_2.fit(train_data,train_target,10)

In [None]:
def predict(model,pic):
    cluster_prob=np.zeros(10)
    for number in range(model.n_classes):
        cluster_prob[number] = model.P_X_theta(pic,model.theta,number)
    return model.cluster2label[np.argmax(cluster_prob)]

def predict_cluster(model,pic):
    cluster_prob=np.zeros(10)
    for number in range(model.n_classes):
        cluster_prob[number] = model.P_X_theta(pic,model.theta,number)
    return np.argmax(cluster_prob)

def evaluate_model(model):
    y_pred = []
    for i, val in enumerate(test_target):
        y_pred.append(predict_cluster(model,test_data[i]))
    conf_mat = metrics.confusion_matrix(test_target, y_pred)
    print("Confusion matrix:")
    print(metrics.confusion_matrix(test_target, y_pred))
    model.cluster2label = {x:[] for x in range(10)}
    for i in range(10):
        max_index = 0
        max_value = 0
        for j in range(10):
            if conf_mat[j][i] > max_value:
                max_value = conf_mat[j][i] 
                max_index = j
        max_in_row = np.max(conf_mat[max_index])
        print(max_value,max_in_row)
        if max_value == max_in_row:
            print("Assigned " + str(i) + " to " + str(max_index))
            model.cluster2label[i] = max_index
        else:
            min_val = 0
            min_index = 0
            for k in range(10):
                if min_val < np.argmax(conf_mat[k]):
                    min_val = np.argmax(conf_mat[k])
                    min_index = k
            model.cluster2label[i] = min_index
    y_pred = []
    for i, val in enumerate(test_target):
        y_pred.append(predict(model,test_data[i]))  

    print("Classification report:\n%s\n"
      % (metrics.classification_report(test_target, y_pred)))
    print("Confusion matrix:")
    print(metrics.confusion_matrix(test_target, y_pred))

print(model_2.cluster2label)
evaluate_model(model_2)

In [None]:
print(model_2.cluster2label)
evaluate_model(model_2)

In [None]:
def most_frequent(List): 
    occurence_count = Counter(List) 
    return occurence_count.most_common(1)[0][0]

kmeans = KMeans(n_clusters=10, random_state=0).fit(train_data)
class2label = {x:[] for x in range(10)}
for i ,pic in enumerate(train_data):
    class2label[kmeans.labels_[i]].append(train_target[i])
for i in range(len(class2label)):
    class2label[i] = most_frequent(class2label[i])
    
y_pred = []
for i, val in enumerate(test_target):
    y_pred.append(class2label[kmeans.predict([test_data[i]])[0]])

print("Classification report:\n%s\n"
  % (metrics.classification_report(test_target, y_pred)))
print("Confusion matrix:")
print(metrics.confusion_matrix(test_target, y_pred))