In [66]:
import numpy as np
import tensorflow as tf
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import yaml

mnist = tf.keras.datasets.mnist

In [67]:
# Load data from mnist into train set and test set
(x_train, y_train),(x_test, y_test) = mnist.load_data()

# Turn data into one dimensional array (for KMeans fitting)
train_features = np.array([np.concatenate(i) for i in x_train])

test_features = np.array([np.concatenate(i) for i in x_test])
test_set = np.hstack((test_features, np.resize(y_test, (test_features.shape[0], 1))))

In [2]:
with open(f'classes_{10}.yaml', 'r') as f:
    clustering = yaml.load(f)

In [113]:
import numpy as np
from sklearn.cluster import KMeans
from kklib import knn

class kkMachine:
    def __init__(self, dataset = [], data_labels = [], kmeans = 120, knn = 5):
        self.features = np.array([np.concatenate(i) for i in dataset])
        self.labels = data_labels
        self.k_means = kmeans
        self.k_nn = knn
    
    def split_classes(self):
        # Split dataset into classes to be minimized (k-means)
        n_classes = len(set(self.labels))
        classes = [[] for i in range(n_clusters)]
        for i in range(n_classes):
            classes[i] = self.features[np.where(self.labels == i)]
        
        return classes
    
    def train(self, training_set = None, label_set = None, kmeans = None):
        # Load training set
        if training_set:
            self.features = np.array([np.concatenate(i) for i in training_set])
            self.labels = label_set
        else:
            if len(self.features) == 0:
                raise Exception('Empty dataset')
        
        if not kmeans:
            kmeans = self.k_means 
            
        self.classes = self.split_classes()
        new_features = []
        new_labels = []
        i = 0
        
        for c in self.classes:
            # Reduce number of elements in each class of dataset
            cluster = KMeans(n_clusters = kmeans, random_state = 0).fit(c)
            new_features.append(cluster.cluster_centers_)
            new_labels.append([i for x in range(len(cluster.cluster_centers_))])
            i += 1
        # Standardize dataset to be processed
        new_features = np.concatenate(new_features)
        new_labels = np.concatenate(new_labels)
        self.features = np.hstack((new_features, np.resize(new_labels, (new_features.shape[0], 1))))
        self.labels = new_labels

    def predict(self, sample_features, new_knn = None):
        # Predict sample using k-NN
        if not new_knn:
            new_knn = self.k_nn
        prediction = knn(self.features, sample_features, new_knn)
        
        return prediction

In [115]:
mac = kkMachine(x_train, y_train)

In [116]:
%%time
mac.train()

CPU times: user 12min 26s, sys: 4min 44s, total: 17min 11s
Wall time: 5min 44s


In [100]:
x_test[0]
y_test[0]

7

In [99]:
test_features = np.array([np.concatenate(i) for i in x_test])
test_set = np.hstack((test_features, np.resize(y_test, (test_features.shape[0], 1))))

In [118]:
correct = 0
for i in range(len(test_features)):
    n = mac.predict(test_features[i])
    if n == y_test[i]:
        correct += 1
        
print(correct/i)

0.9515951595159516


In [112]:
import pdb

pdb.run('mac.predict(test_features[0])')

> <string>(1)<module>()
(Pdb) n
TypeError: 'int' object is not callable
> <string>(1)<module>()
(Pdb) p knn
<function knn at 0x7f1738a2cea0>
(Pdb) quit
