In [1]:
import numpy as np
from mnist import MNIST
from sklearn.model_selection import train_test_split
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import accuracy_score
from PIL import Image
from matplotlib import pyplot as plt
import copy
import pandas as pd

In [2]:
def shuffle(X, y):
    permutation = np.arange(X.shape[0])
    np.random.shuffle(permutation)
    return X[permutation], y[permutation]

def load_dataset():
    mndata = MNIST('./data/')
    X_train, labels_train = map(np.array, mndata.load_training())
    X_test, labels_test = map(np.array, mndata.load_testing())
    return X_train, labels_train, X_test, labels_test

def get_scene(img, proj):
    return np.dot(img, proj.T)

# Transform the image vectors into the hypervectors
def get_scenes(images, proj):
    return np.dot(images[:NUM_SAMPLES, :], proj.T)

def classify(images, digit_vectors):
    similarities = cosine_similarity(images, digit_vectors)
    classifications = np.argmax(similarities, axis=1)
    return classifications

In [3]:
X_train, labels_train, _, _ = load_dataset()
# X_train, labels_train = shuffle(X_train, labels_train)
X_train, X_test, y_train, y_test = train_test_split(X_train, labels_train, test_size=0.33, random_state=42)

In [4]:
D = 10000 # dimensions in random space
IMG_LEN = 28
NUM_SAMPLES = X_train.shape[0]

In [None]:
def HD_classifiers(seed, encoding="float"):
    # print("Generating random projection...")
    # proj = np.random.rand(D, IMG_LEN * IMG_LEN)
    print("Seed: ", seed)
    print("Encoding: ", encoding)
    print("Generating random projection...")
    np.random.seed(seed)
    proj = np.random.rand(D, IMG_LEN * IMG_LEN)
    if encoding == "bipolar":
        proj[proj==0] = -1
    print(proj.shape)
    print("Projecting images to higher dim space...")
    X_train_copy = get_scenes(X_train, proj)
    
    digit_vectors = np.zeros((10, D))
    print("Dimension of digit vector: ", digit_vectors.shape)
    
    for i in range(NUM_SAMPLES):
        digit_vectors[y_train[i]] += X_train_copy[i]
    digit_vectors = np.array(digit_vectors)
    
    if encoding == "bipolar":
        digit_vectors[digit_vectors > 0] = 1
        digit_vectors[digit_vectors <= 0] = -1
    
    predictions = classify(X_train_copy, digit_vectors)
    acc = accuracy_score(y_train[:X_train_copy.shape[0]], predictions)
    print("Train accuracy: ", acc)
    
    X_test_copy = get_scenes(X_test, proj)
    predictions = classify(X_test_copy, digit_vectors)
    acc = accuracy_score(y_test[:X_test_copy.shape[0]], predictions)
    print("Test accuracy: ", acc)
        
    return digit_vectors, proj, X_train_copy, X_test_copy

In [None]:
digit_vector, proj, _, _ = HD_classifiers(40)

In [None]:
def retraining(seeds, epochs, retrain_on='train', method="direct"):
    models = []
    projs = []
    X_train_projs = []
    X_test_projs = []
    for i in range(len(seeds)):
        digit_vector, proj, X_train_copy, X_test_copy = HD_classifiers(seeds[i])
        models.append(digit_vector)
        projs.append(proj)
        X_train_projs.append(X_train_copy)
        X_test_projs.append(X_test_copy)
        
    for epoch in range(epochs):
        print("Retraining Started: ")
        print(f"Epoch {epoch+1}")
        results_test = []
        results_train = []
        for i in range(len(seeds)):
            predictions_train = classify(X_train_projs[i], models[i])
            predictions_test = classify(X_test_projs[i], models[i])
            results_train.append(predictions_train)
            results_test.append(predictions_test)
        
        results_train_dict = {}
        results_test_dict = {}
        model_names = []
        for i in range(len(seeds)):
            model_names.append(f"model_{seeds[i]}")
            results_train_dict[f"model_{seeds[i]}"] = list(results_train[i])
            results_test_dict[f"model_{seeds[i]}"] = list(results_test[i])
        
        df_train = pd.DataFrame(results_train_dict)
        df_train["y"] = y_train
        df_test = pd.DataFrame(results_test_dict)
        df_test["y"] = y_test
        
        if retrain_on.lower() == "train":
            for i in range(len(model_names)):
                mask = (df_train[model_names[i]] != df_train[model_names[i+1]])
                for j in range(i+2, len(model_names)):
                    mask = mask + (df_train[model_names[i]] != df_train[model_names[j]])
                break
            df_discrepencies_train = df_train[mask]
            df_discrepencies_train.reset_index(inplace=True)
            
            if method == "direct":
                print("Retraining on training set...")
                for epoch in range(epochs):
                    for row in df_discrepencies_train.iterrows():
                        idx = row[1]["index"]
                        for i in range(len(model_names)):
                            y_false = row[1][model_names[i]]
                            y_true = row[1]["y"]
                            hv = get_scene(X_train[idx].reshape((1, -1)), projs[i])
                            models[i][y_false] -= hv[0]
                            models[i][y_true] += hv[0]
                    
                    print(f"Epoch {epoch+1}")
                for i in range(len(seeds)):
                    predictions = classify(X_train_projs[i], models[i])
                    acc = accuracy_score(y_train[:X_train_projs[i].shape[0]], predictions)
                    print(model_names[i] + ": " + str(acc))
                print("Retraining Stopped...")
                return models
        if retrain_on.lower() == "test":
            for i in range(len(model_names)):
                mask = (df_test[model_names[i]] != df_test[model_names[i+1]])
                for j in range(i+2, len(model_names)):
                    mask = mask + (df_test[model_names[i]] != df_test[model_names[j]])
            df_discrepencies_test = df_train[mask]
            
        if retrain_on.lower() == 'both':
            for i in range(len(model_names)):
                mask_test = (df_test[model_names[i]] != df_test[model_names[i+1]])
                mask_train = (df_train[model_names[i]] != df_train[model_names[i+1]])
                for j in range(i+2, len(model_names)):
                    mask_test = mask_test + (df_test[model_names[i]] != df_test[model_names[j]])
                    mask_train = mask_train + (df_train[model_names[i]] != df_train[model_names[j]])
            df_discrepencies_test = df_train[mask_test]
            df_discrepencies_train = df_train[mask_train]

In [None]:
models = retraining([30, 40, 50], 10)

Seed:  30
Encoding:  float
Generating random projection...
(10000, 784)
Projecting images to higher dim space...
Dimension of digit vector:  (10, 10000)
Train accuracy:  0.8133084577114428
Test accuracy:  0.8077777777777778
Seed:  40
Encoding:  float
Generating random projection...
(10000, 784)
Projecting images to higher dim space...
Dimension of digit vector:  (10, 10000)
Train accuracy:  0.8139800995024875
Test accuracy:  0.8082323232323232
Seed:  50
Encoding:  float
Generating random projection...
(10000, 784)
Projecting images to higher dim space...
Dimension of digit vector:  (10, 10000)
Train accuracy:  0.8131094527363184
Test accuracy:  0.8085858585858586
Retraining Started: 
Epoch 1


  .format(op=op_str, alt_op=unsupported[op_str]))


Retraining on training set...
Epoch 1
Epoch 2
Epoch 3
Epoch 4
Epoch 5
Epoch 6
Epoch 7
Epoch 8
Epoch 9
