In [None]:
import cv2
import itertools as it
import matplotlib.pyplot as plt
import mxnet as mx
import numpy as np
import pandas as pd
import pickle
import seaborn as sn
from collections import Counter
from sklearn.metrics import accuracy_score, confusion_matrix

In [None]:
mx.random.seed(1)
np.random.seed(1)
legend_names = {'a': 'Wraith', 'b': 'Bangalore', 'c': 'Caustic', 'g': 'Gibraltar', 'i': 'Lifeline',
               'l': 'Bloodhound', 'm': 'Mirage', 'o': 'Octane', 'p': 'Pathfinder', 'r': 'Crypto', 'w': 'Wattson',
               'z': '(None)'}

def load_data(file_path):
    data = []
    with open(file_path, 'rb') as fin:
        while True:
            try:
                data.append((pickle.load(fin), pickle.load(fin)))
            except:
                break
    return data

def process_data(data):
    data = [(image, label) for image, label in data if label != 'z']
    np.random.shuffle(data)
    images = np.stack([image for image, _ in data])
    images = np.moveaxis(images, -1, 1)
    labels = [label for _, label in data]
    return images, labels

def plot_confusion_matrix(label_names, y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    df_cm = pd.DataFrame(cm, index = label_names, columns = label_names)
    plt.figure(figsize = (5, 5))
    plt.title('Validation Confusion Matrix')
    sn.heatmap(df_cm, annot=True)
    plt.show()

In [None]:
pickle_file_path = r'C:\Users\cidzerda\Documents\GitHub\strevr-data\unsupervised.pickle'
label_file_path = r'C:\Users\cidzerda\Documents\GitHub\strevr-data\unsupervised\unsupervised.txt'
with open(pickle_file_path, 'rb') as fin:
    images = np.array([a.reshape(-1) / 255.0 for a in pickle.load(fin)]).astype('float32')
with open(label_file_path, 'rt') as fin:
    labels = eval(fin.read())
print(images.shape, len(labels))
print(labels[:9])
labels = [next(it.dropwhile(lambda t: i < t[0], reversed(labels)))[1] for i in range(len(images))]
print(len(labels), labels[:9], labels[111:119])
data = [(a, b) for a, b in zip(images, labels)]
images, labels = process_data(data)
data.clear()

# Convert letter labels into numerical labels.
label_names = sorted(Counter(labels))
label_dict = {b: a for a, b in enumerate(label_names)}
labels = [label_dict[label] for label in labels]
print(images.shape, len(labels))

In [None]:
eighty_percent = round(0.8 * len(images))
ninety_percent = round(0.9 * len(images))
training_indices = np.arange(0, eighty_percent)
validation_indices = np.arange(eighty_percent, ninety_percent)
testing_indices = np.arange(ninety_percent, len(images))
training_images, training_labels = images[training_indices], [labels[i] for i in training_indices]
validation_images, validation_labels = images[validation_indices], [labels[i] for i in validation_indices]
testing_images, testing_labels = images[testing_indices], [labels[i] for i in testing_indices]
training_images.shape, len(training_labels), validation_images.shape, len(validation_labels), testing_images.shape, len(testing_labels)

In [None]:
ctx = mx.gpu()

# Define the hyper-parameters of the learning system.
batch_size = 1500
epochs = 30
learning_rate = 0.01

# Create the network.
num_outputs = len(label_names)
net = mx.gluon.nn.Sequential()
with net.name_scope():
    net.add(mx.gluon.nn.Flatten())
    net.add(mx.gluon.nn.Dense(2048, activation="relu"))
    net.add(mx.gluon.nn.Dense(2048, activation="relu"))
    net.add(mx.gluon.nn.Dense(num_outputs))

# Initialize the network's parameters.
net.collect_params().initialize(mx.init.Xavier(magnitude=2.24), ctx=ctx)
print(net)

# Define the loss and the trainer.
softmax_cross_etropy_loss = mx.gluon.loss.SoftmaxCrossEntropyLoss()
trainer = mx.gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': learning_rate})

In [None]:
print(training_labels[:9])
print(training_images[0])
_ = plt.imshow(np.hstack([training_images[i].reshape(30, 35, 3).take([2,1,0], 2) for i in range(5)]))

In [None]:
# Run epochs of training and validation.
for epoch in range(epochs):
    # Run the network on the training data set and determine the training Softmax cross-entopy loss.
    cumulative_training_loss = 0
    training_predictions = []
    for i in range(0, training_images.shape[0], batch_size):
        data = mx.ndarray.array(training_images[i:i + batch_size]).as_in_context(ctx)
        label = mx.ndarray.array(training_labels[i:i + batch_size]).as_in_context(ctx)
        with mx.autograd.record():
            output = net(data)
            training_predictions = training_predictions + np.argmax(output.asnumpy(), axis=1).tolist()
            loss = softmax_cross_etropy_loss(output, label)
            cumulative_training_loss = cumulative_training_loss + mx.ndarray.sum(loss)
        loss.backward()
        trainer.step(data.shape[0])
    training_loss = cumulative_training_loss / len(training_images)

    # Run the network on the validation data set and determine the validation Softmax cross-entopy loss.
    cumulative_validation_loss = 0
    validation_predictions = []
    for i in range(0, validation_images.shape[0], batch_size):
        data = mx.ndarray.array(validation_images[i:i + batch_size]).as_in_context(ctx)
        label = mx.ndarray.array(validation_labels[i:i + batch_size]).as_in_context(ctx)
        output = net(data)
        validation_predictions = validation_predictions + np.argmax(output.asnumpy(), axis=1).tolist()
        validation_loss = softmax_cross_etropy_loss(output, label)
        cumulative_validation_loss = cumulative_validation_loss + mx.ndarray.sum(validation_loss)
    validation_loss = cumulative_validation_loss / len(validation_images)

    # Calculate training and validation accuracies.
    # accuracy = (TP+TN) / (TP+FP+TN+FN)
    training_accuracy = accuracy_score(training_labels, training_predictions)
    validation_accuracy = accuracy_score(validation_labels, validation_predictions)

    # Print the summary and plot the confusion matrix after each epoch.
    print("Epoch {}, training loss: {:.2f}, validation loss: {:.2f}, training accuracy: {:.2f}, validation accuracy: {:.2f}".format(epoch, training_loss.asnumpy()[0], validation_loss.asnumpy()[0], training_accuracy, validation_accuracy))
    plot_confusion_matrix(label_names, validation_labels, validation_predictions)

In [None]:
testing_predictions = []
testing_probabilities = []
for i in range(0, testing_images.shape[0], batch_size):
    data = mx.ndarray.array(testing_images[i:i + batch_size].astype('float32')).as_in_context(ctx)
    output = net(data)
    testing_predictions += np.argmax(output.asnumpy(), axis=1).tolist()
    testing_probabilities.extend([float(mx.nd.softmax(v)[np.argmax(v.asnumpy())].asnumpy()) for v in output])
l = [a == b for a, b in zip(testing_labels, testing_predictions)]
print('predicted', sum(l), 'correct of', len(l), '({:.1%})'.format(sum(l) / len(l)))
l = [c for a, b, c in zip(testing_labels, testing_predictions, testing_probabilities) if a != b]
if l:
    print('lowest incorrect probability:  {:.1%}, highest incorrect probability:  {:.1%}'.format(min(l), max(l)))
l = [c for a, b, c in zip(testing_labels, testing_predictions, testing_probabilities) if a == b]
print('lowest correct probability:  {:.1%}, highest correct probability:  {:.1%}'.format(min(l), max(l)))
for i in range(1, 10):
    print('{}0th percentile correct probability:  {:.1%}'.format(i, sorted(l)[i * len(l) // 10]))
print(testing_images[0].reshape(30, 35, 3).take([2,1,0], 2).shape)
for i in range(9):
    plt.imshow(testing_images[i].reshape(30, 35, 3).take([2,1,0], 2))
    plt.title('{} {:.1%}?\n{}'.format(legend_names[label_names[testing_predictions[i]]], testing_probabilities[i], legend_names[label_names[testing_labels[i]]]))
    plt.show()

In [None]:
model_file_path = r'C:\Users\cidzerda\Documents\GitHub\strevr-data\unsupervised\unsupervised.model'
net.save_parameters(model_file_path)