In [None]:
import numpy as np
import pickle, gzip
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from tqdm import tqdm
import pandas as pd
import seaborn as sns
import time
import math
import os
from copy import deepcopy
from scipy.special import expit # Vectorized sigmoid function
from scipy.special import softmax as softmax_
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans

# Function to show single image
def show(image):
    plt.figure(figsize=(10,10))
    new = image.copy()
    if image.shape == 3:                    # Switch R and B channels so it shows up as correctly as R,G,B, if image is 3-channel
        new[:,:,0] = image[:,:,2]
        new[:,:,2] = image[:,:,0]
    plt.imshow(new, cmap = "gray")

# Function to get adjacency matrix for a given latent distribution
def get_adjacency_matrix(latent_distribution):
    # Create dataframe
    df = []
    for index, element in enumerate(latent_distribution):
        vec, label = element
        df.append({'x': vec[0], 'y': vec[1], 'digit': label})
    df = pd.DataFrame(df)

    # Get centroid coordinates
    centroids = df.groupby('digit').mean()

    # Create adjacency matrix
    adjacency_matrix = np.zeros((10, 10))
    for i in range(10):
        for j in range(10):
            if i == j:
                continue
            else:
                adjacency_matrix[i][j] = np.linalg.norm(centroids.iloc[i] - centroids.iloc[j]) # Euclidean distance (L2 norm)
    
    return adjacency_matrix

# Function to compute adjacency spectral distance
def adjacency_spectral_distance(adjacency_matrix_1, adjacency_matrix_2):
    # Compute spectral distance
    eigvals_1, eigvecs_1 = np.linalg.eig(adjacency_matrix_1)
    eigvals_2, eigvecs_2 = np.linalg.eig(adjacency_matrix_2)
    spectral_distance = np.linalg.norm(eigvals_1 - eigvals_2)

    return spectral_distance

# Function to compute cluster inertias
def get_cluster_inertias(latent_distribution):
    # Create dataframe
    df = []
    for index, element in enumerate(latent_distribution):
        vec, label = element
        df.append({'x': vec[0], 'y': vec[1], 'digit': label})
    df = pd.DataFrame(df)

    # Get centroid coordinates
    centroids = df.groupby('digit').mean()

    # Compute cluster inertias
    cluster_inertias = []
    for i in range(10):
        centroid = centroids.iloc[i]
        coords = df.loc[df['digit'] == i, ['x', 'y']].values
        cluster_inertias.append(sum([(coord[0] - centroid['x'])**2 + (coord[1] - centroid['y'])**2 for coord in coords]))

    return cluster_inertias # returns a list of cluster inertias, one for each digit

# Prepare working directory
for dirname in ['results/', 'results/p1', 'results/p2', 'results/p3']:
    if os.path.isdir(dirname) == False:
        os.mkdir(dirname)
        print(f'{dirname} directory created.')
    else:
        print(f'{dirname} directory contains {len(os.listdir(dirname))} items.')

### Load data

In [None]:
# Load MNIST data

with gzip.open('data/mnist.pkl.gz', 'rb') as f:
    train_data, val_data, test_data = pickle.load(f, encoding="latin1")
    # train_data: tuple of (x_train, y_train), where x_train.shape = (50000, 784) and y_train.shape = (50000, 1)
    # val_data: tuple of (x_val, y_val), where x_val.shape = (10000, 784) and y_val.shape = (10000, 1)
    # test_data: tuple of (x_test, y_test), where x_test.shape = (10000, 784) and y_test.shape = (10000, 1)

x_train, y_train = train_data
x_val, y_val = val_data
x_test, y_test = test_data

# Combine training and validation data
x_train = np.concatenate((x_train, x_val), axis=0)
y_train = np.concatenate((y_train, y_val), axis=0)

# Reshape x
x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], 1)
x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], 1)

# One-hot encode labels
y_train = np.eye(10)[y_train].reshape((-1, 10, 1))
y_test = np.eye(10)[y_test].reshape((-1, 10, 1))

# Zip data and labels into tuples
train_data = list(zip(x_train, y_train))
test_data = list(zip(x_test, y_test))

# Shuffle training data
np.random.shuffle(train_data)

### Model constructor

In [None]:
# Constructor for neural network
# Adapted from Nielsen textbook http://neuralnetworksanddeeplearning.com/chap1.html and https://towardsdatascience.com/mnist-handwritten-digits-classification-from-scratch-using-python-numpy-b08e401c4dab and https://github.com/geohot/ai-notebooks/blob/master/mnist_from_scratch.ipynb

class Model(object):

    # Fully connected neural network with layer i having sizes[i] neurons
    def __init__(self, sizes, weights=None, biases=None):
        self.num_layers = len(sizes)
        self.sizes = sizes

        # Initialize weights & biases if not provided
        if weights is None:
            self.weights = [np.random.randn(y, x) for y, x in zip(sizes[1:], sizes[:-1])] # Initialize weights array of shape (num_neurons_i, num_neurons_i-1) for each layer i except the input layer
        else:
            self.weights = weights
        
        if biases is None:
            self.biases = [np.random.randn(y, 1) for y in sizes[1:]] # Initialize biases array of shape (num_neurons_i, 1) for each layer i except the input layer
        else:
            self.biases = biases
        
        # Initialize training history
        self.history = {'train': {'acc':[], 'loss':[]}, 'val': {'acc':[], 'loss':[]}, 'latent':[], 'weights':[], 'biases':[]}

        # Number of parameters
        print(f'Number of model parameters: {sum(np.prod(w.shape) for w in self.weights) + sum(np.prod(b.shape) for b in self.biases)}')

    # Activation functions
    # x is a matrix with nrows = num_hidden_neurons and ncols = batch_size generated by hidden layer affine transformation
    def activation(self, x):
        # Overflow-safe vectorized sigmoid function (https://stackoverflow.com/questions/51976461/optimal-way-of-defining-a-numerically-stable-sigmoid-function-for-a-list-in-pyth) 
        return np.where(x >= 0, 
                    1 / (1 + np.exp(-x)), 
                    np.exp(x) / (1 + np.exp(x)))

    # Derivative of activation function
    def activation_der(self, x):
        return self.activation(x) * (1 - self.activation(x))

    # Softmax function
    # Overflow-safe softmax function (https://timvieira.github.io/blog/post/2014/02/11/exp-normalize-trick/)
    def softmax(self, x):
        # x is a matrix with nrows = num_hidden_neurons and ncols = batch_size generated by hidden layer affine transformation
        # return softmax_(x, axis = 0)
        return self.activation(x)

    # Derivative of softmax function
    def softmax_der(self, x):
        return self.softmax(x) * (1 - self.softmax(x))        

    # Loss function
    def loss(self, y_true, y_pred):
        return np.mean(np.sum(np.nan_to_num(-y_true*np.log(y_pred)-(1-y_true)*np.log(1-y_pred)), axis = 0)) # Cross entropy loss
        # return np.mean(((y_true - y_pred)**2)) # mean squared error

    # Derivative of loss function
    def loss_der(self, y_true, y_pred):
        return y_pred - y_true # Derivative of cross entropy loss
        # return 2*(y_pred - y_true) # Derivative of mean squared error

    # Backpropagation
    # Input x,y are batch matrices with each column being the vector of a single training example
    # Returns (nabla_b, nabla_w), where nabla_b is list of gradients of cost with respect to biases (one matrix for each layer) and nabla_w is list of gradients of cost with respect to weights (one matrix for each layer)
    def backprop(self, x, y):
        # Initialize lists of gradients for each layer
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        nabla_w = [np.zeros(w.shape) for w in self.weights]
        
        ###### Forward pass, storing weighted inputs (z) and activations for each layer ######
        activation = x # Initialize activation with input
        activations = [x] # list to store all the activations, layer by layer
        zs = [] # list to store all the weighted input z vectors, layer by layer
        # Iterate through each layer
        for i, tup in enumerate(zip(self.biases, self.weights)):
            b, w = tup # Unpack biases and weights
            z = np.dot(w, activation) + b # Compute weighted input z
            zs.append(z) # Store weighted input z          
            if i == (self.num_layers - 2):
                # If last layer, apply softmax
                activation = self.softmax(z)
            else:
                # If not last layer, apply activation function
                activation = self.activation(z)
            activations.append(activation) # Store activation

        ###### Backward pass, computing gradients of cost with respect to biases and weights ######
        # Get gradients for output layer
        delta = self.loss_der(y, activations[-1]) * self.softmax_der(zs[-1]) # Hadamard product of loss gradient and softmax derivative for output layer
        nabla_b[-1] = delta # Store gradient of cost with respect to biases of last layer
        nabla_w[-1] = np.dot(delta, activations[-2].transpose()) # Store gradient of cost with respect to weights of last layer
        # Iterate through each layer in reverse order, starting from second to last layer
        for l in range(2, self.num_layers):
            z = zs[-l] # Retrieve weighted input z for current layer
            delta = np.dot(self.weights[-l+1].transpose(), delta) * self.activation_der(z) # Compute gradient of cost with respect to weighted input z
            nabla_b[-l] = delta # Store gradient of cost with respect to biases of current layer
            nabla_w[-l] = np.dot(delta, activations[-l-1].transpose()) # Store gradient of cost with respect to weights of current layer

        # Sum columns of nabla_b matrices to get overall bias gradients for batch
        nabla_b = [np.sum(nabla_b[i], axis=1).reshape(nabla_b[i].shape[0], 1) for i in range(len(nabla_b))]

        # Get training performance for batch
        acc = np.mean(np.argmax(activations[-1], axis=0) == np.argmax(y, axis=0)) # For each column of activations[-1], get predicted label and compare to true label
        loss = self.loss(y, activations[-1]) # Compute loss
        
        return nabla_b, nabla_w, acc, loss

    # Model training using SGD
    # training_data is a list of tuples (x, y) representing the training inputs and the desired outputs
    def fit(self, training_data, epochs, batch_size, learning_rate, decay_strength, validation_size, shuffle_order, store_latent_vecs, verbose=2):
        
        # Take last portion of training data as validation data
        training_data = training_data[:-int(validation_size*len(training_data))]
        val_data = training_data[-int(validation_size*len(training_data)):]

        x_train = np.array([x for x, y in training_data])[:,:,0].T # Retrieve matrix of training examples where each column is a training example
        y_train = np.array([y for x, y in training_data])[:,:,0].T # Retrieve matrix of training labels where each column is the one-hot encoded vector

        ## Store baseline model characteristics (before training)
        # Evaluate model on training data
        train_acc, train_loss = self.evaluate(training_data)
        # Evaluate model on validation data
        val_acc, val_loss = self.evaluate(val_data)
        print(f"Baseline characteristics: Training accuracy: {train_acc:.4f}. Validation accuracy: {val_acc:.4f}. Training loss: {train_loss:.4f}. Validation loss: {val_loss:.4f}")
        # Update global history
        self.history['train']['acc'].append(train_acc)
        self.history['train']['loss'].append(train_loss)
        self.history['val']['acc'].append(val_acc)
        self.history['val']['loss'].append(val_loss)
        # Store latent vectors
        if store_latent_vecs:
            vecs = self.feedforward(x_train[:, :10000], layer_num = 2).T
            labs = np.argmax(y_train[:, :10000], axis=0)
            self.history['latent'].append(list(zip(vecs, labs))) # Store latent representation of training data before training
        # Store layer weights
        self.history['weights'].append(self.weights)
        # Store layer biases
        self.history['biases'].append(self.biases)

        # Iterate through each epoch
        print(f'Training on {len(training_data)} examples, validating on {len(val_data)} examples')
        for j in range(epochs):
            # Initialize epoch training accuracy and loss history
            train_acc = []
            train_loss = []
            # Get time
            start_time = time.time()

            # Adjust learning rate for current epoch based on decay strength
            learning_rate_adj = learning_rate/(1 + decay_strength * j)

            # Shuffle training data in preparation for SGD batching
            if shuffle_order is None:
                np.random.shuffle(training_data) # Shuffle training data randomly
            else:
                training_data = np.array(training_data, dtype=object)[shuffle_order[j]] # Shuffle training data according to order provided
            # Construct training batches for SGD
            batches = [training_data[k:k+batch_size] for k in range(0, len(training_data), batch_size)]
            # Iterate through all batches
            for batch in batches:
                x_batch = np.array([x for x, y in batch])[:,:,0].T # Retrieve matrix of training examples where each column is a training example
                y_batch = np.array([y for x, y in batch])[:,:,0].T # Retrieve matrix of training labels where each column is the one-hot encoded vector

                nabla_b_, nabla_w_, acc, loss = self.backprop(x_batch, y_batch) # Compute loss gradients for each layer for whole batch

                # Update epoch training accuracy and loss history for whole batch
                train_acc.append(acc)
                train_loss.append(loss)

                # Update weights and biases for each layer using average loss gradients
                self.weights = [w-(learning_rate_adj*nw) for w, nw in zip(self.weights, nabla_w_)] # Need to divide by batch size to get average gradient per example
                self.biases = [b-(learning_rate_adj*nb) for b, nb in zip(self.biases, nabla_b_)] # Need to divide by batch size to get average gradient per example
            
            # Compute epoch training accuracy and loss
            train_acc, train_loss = np.mean(train_acc), np.mean(train_loss)
            # Evaluate model on validation data
            val_acc, val_loss = self.evaluate(val_data)
            # Get time
            end_time = time.time()
            # Print epoch number (epochs start at 1)
            if verbose >= 2:
                print(f"Epoch {j+1} complete. Time taken: {end_time - start_time:.2f} seconds.")
            if verbose >= 1:
                print(f"Learning rate: {learning_rate_adj:.4f}. Training accuracy: {train_acc:.4f}. Validation accuracy: {val_acc:.4f}. Mean training loss: {train_loss:.4f}. Mean validation loss: {val_loss:.4f}. Mean weight gradients: {[np.around(np.mean(np.abs(nw)), 4) for nw in nabla_w_]}.")

            # Update global history
            self.history['train']['acc'].append(train_acc)
            self.history['train']['loss'].append(train_loss)
            self.history['val']['acc'].append(val_acc)
            self.history['val']['loss'].append(val_loss)
            # Store latent vectors
            if store_latent_vecs:
                vecs = self.feedforward(x_train[:, :10000], layer_num = 2).T.round(2)
                labs = np.argmax(y_train[:, :10000], axis=0)
                self.history['latent'].append(list(zip(vecs, labs))) # Store latent representation of training data before training
            # Store layer weights
            self.history['weights'].append(self.weights)
            # Store layer biases
            self.history['biases'].append(self.biases)

        print("Training complete.")

    def evaluate(self, test_data):
        x_test = np.array([x for x, y in test_data])[:,:,0].T # Retrieve matrix of training examples where each column is a training example
        y_test = np.array([y for x, y in test_data])[:,:,0].T # Retrieve matrix of training labels where each column is the one-hot encoded vector

        acc = np.mean(np.argmax(self.feedforward(x_test), axis=0) == np.argmax(y_test, axis=0))
        loss = self.loss(y_test, self.feedforward(x_test)) 

        return acc, loss

    # Get feedforward activations for given layer (default is last layer)
    # Equivalent to Tensorflow get_layer()
    def feedforward(self, x, layer_num = None):
        # Dynamically update activations for each layer while moving forward through the network, starting with shape (input_size, 1) and ending with shape (output_size, 1)
        if layer_num is None:
            layer_num = self.num_layers - 1
        activations = x
        current_layer = 1
        while current_layer <= layer_num:
            # If current layer is last layer, return softmax activations
            if current_layer == self.num_layers - 1:
                activations = self.softmax(np.dot(self.weights[current_layer - 1], activations) + self.biases[current_layer - 1])
            else:
                activations = self.activation(np.dot(self.weights[current_layer - 1], activations) + self.biases[current_layer - 1])
            current_layer += 1
        return activations

### Set global params

In [None]:
# Model architecture
sizes = [784, 30, 2, 10]
epochs = 99
batch_size = 1000
learning_rate = 0.01
decay_strength = 0
validation_size = 0.1

# Set seed for reproducibility
np.random.seed(1)

# Initialize weights array of shape (num_neurons_i, num_neurons_i-1) for each layer i except the input layer
weights = [np.random.randn(y, x) for y, x in zip(sizes[1:], sizes[:-1])]

# Initialize biases array of shape (num_neurons_i, 1) for each layer i except the input layer
biases = [np.random.randn(y, 1) for y in sizes[1:]]

# Initialize list of data indices for data shuffling per epoch
shuffle_order = [np.random.permutation(int(len(train_data)*(1 - validation_size))) for i in range(epochs)]

### Verify that model training is reproducible

In [None]:
# # Build & train first model
# model1 = Model(sizes, weights, biases)
# model1.fit(train_data, epochs = epochs, batch_size = batch_size, learning_rate = learning_rate, decay_strength = decay_strength, validation_size = validation_size, shuffle_order = shuffle_order, store_latent_vecs = False)
# test_acc, test_loss = model1.evaluate(test_data)
# print(f'Test accuracy: {test_acc}. Mean test loss: {test_loss}')

In [None]:
# # Build & train second model
# model2 = Model(sizes, weights, biases)
# model2.fit(train_data, epochs = epochs, batch_size = batch_size, learning_rate = learning_rate, decay_strength = decay_strength, validation_size = validation_size, shuffle_order = shuffle_order, store_latent_vecs = False)
# test_acc, test_loss = model2.evaluate(test_data)
# print(f'Test accuracy: {test_acc}. Mean test loss: {test_loss}')

In [None]:
# # Are model weights reproducible?
# print(f'Model weights were reproducible: {[np.array_equal(model1.weights[i], model2.weights[i]) for i in range(len(model1.weights))]}')

In [None]:
# # Inspect last layer activations
# last_layer = [model1.feedforward(x, layer_num = 3) for x, y in train_data[:1]]
# print(last_layer[0].sum())
# print(last_layer[0])
# print(train_data[0][1])

## PART 1. Visualize latent space over epochs

In [None]:
path = 'results/p1/model.pkl.gz'
if not os.path.exists(path):
    # Build & train model
    model = Model(sizes, weights, biases)
    model.fit(train_data, epochs = epochs, batch_size = batch_size, learning_rate = learning_rate, decay_strength = decay_strength, validation_size = validation_size, shuffle_order = shuffle_order, store_latent_vecs = True)
    test_acc, test_loss = model.evaluate(test_data)
    print(f'Test accuracy: {test_acc}. Mean test loss: {test_loss}')

    # Save model to file
    print("Saving model to file...")
    start = time.time()
    with gzip.open(path, 'wb') as f:
        pickle.dump(model, f)
    print(f"Model saved in {time.time() - start} seconds.")
else:
    # Read model from file
    with gzip.open(path, 'rb') as f:
        model = pickle.load(f)
    print(f'Model loaded from file.')

In [None]:
path = 'results/p1/history.png'
if not os.path.exists(path):
    # Plot model history
    fig, ax = plt.subplots(1, 2, figsize = (14, 5))

    # Plot accuracy history
    ax[0].plot(range(len(model.history['val']['acc'])), model.history['val']['acc'], label='Validation')
    ax[0].plot(range(len(model.history['train']['acc'])), model.history['train']['acc'], label='Train')
    ax[0].set_xticks(np.arange(0, len(model.history['val']['acc']), 10)) # Set xticks
    ax[0].set_ylabel('Accuracy') # Y axis label
    ax[0].set_xlabel('Epochs') # X axis label
    ax[0].legend() # Add legend

    # Plot loss history
    ax[1].plot(range(len(model.history['val']['loss'])), model.history['val']['loss'], label='Validation')
    ax[1].plot(range(len(model.history['train']['loss'])), model.history['train']['loss'], label='Train')
    ax[1].set_xticks(np.arange(0, len(model.history['val']['loss']), 10)) # Set xticks
    ax[1].set_ylabel('Loss') # Y axis label
    ax[1].set_xlabel('Epochs') # X axis label
    ax[1].legend() # Add legend

    # Save plot to file
    plt.savefig(path)
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.figure(figsize = (14, 5))
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')

In [None]:
path = 'results/p1/latent.png'
if not os.path.exists(path):
    # Retrieve latent vectors from model history and plot them
    nrows = math.ceil(epochs/10)
    ncols = 10
    fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
    ax = ax.flatten()

    colors = sns.color_palette(['#e6194B','#f58231','#ffe119','#bfef45','#3cb44b','#42d4f4','#4363d8', '#000075', '#911eb4','#f032e6'])

    for epoch in range(0, len(model.history['latent'])):
        # Create dataframe
        df = []
        for index, element in enumerate(model.history['latent'][epoch][:10000]): # Limit to first 10000 examples
            vec, label = element
            df.append({'x': vec[0], 'y': vec[1], 'digit': label})
        df = pd.DataFrame(df)

        # Plot latent vectors in 2D, color-coded by digit
        # print(f'Plotting {len(df)} examples.')
        sns.scatterplot(data=df, x="x", y="y", hue="digit", palette=colors, ax=ax[epoch], legend=False)

        # Add title
        if epoch == 0:
            title = 'Before training'
        else:
            title = f'After epoch {epoch}'
        ax[epoch].set_title(title)

    # Create legend in last subplot
    for i in range(10):
        ax[-1].scatter([], [], color=colors[i], label=i)
    ax[-1].legend()

    # Save plot to file
    plt.savefig(path)
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.figure(figsize = (35, 35))
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

### Quantify cluster cohesion (inertia)

In [None]:
# Plot inertia of each cluster over epochs
path = 'results/p1/inertia.png'
if not os.path.exists(path):
    colors = sns.color_palette(['#e6194B','#f58231','#ffe119','#bfef45','#3cb44b','#42d4f4','#4363d8', '#000075', '#911eb4','#f032e6'])

    # Get list of cluster inertias for each epoch
    cluster_inertias = []
    for epoch in range(len(model.history['latent'])):
        cluster_inertias.append(get_cluster_inertias(model.history['latent'][epoch]))
    
    # Convert to numpy array and transpose
    cluster_inertias = np.array(cluster_inertias).T

    # Plot each row of cluster inertias in different colors
    for i in range(10):
        plt.plot(cluster_inertias[i], color=colors[i], label=i)
    plt.xlabel('Epoch')
    plt.ylabel('Cluster inertia')
    plt.title(f'train_acc: {model.history["train"]["acc"][-1]:.2f}')
    plt.legend()

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

### Quantify cluster separation

In [None]:
path = 'results/p1/cluster_sep.png'
if not os.path.exists(path):
    # Plot separation between clusters over epochs for all pairs of digits
    nrows = 2
    ncols = 5
    fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
    ax = ax.flatten()

    colors = sns.color_palette(['#e6194B','#f58231','#ffe119','#bfef45','#3cb44b','#42d4f4','#4363d8', '#000075', '#911eb4','#f032e6'])

    for digit1 in range(10):
        dict_of_distances = {}
        for digit2 in range(10):
            distances = []
            for epoch in range(0, len(model.history['latent'])):
                # Get separation between digit1 and digit2 cluster centroids from adjacency matrix
                distances.append(get_adjacency_matrix(model.history['latent'][epoch])[digit1][digit2])
            dict_of_distances[digit2] = distances
        df = pd.DataFrame(dict_of_distances)

        # Plot separation between clusters over epochs for all pairs of digits
        sns.lineplot(data=df, ax=ax[digit1], legend=True)
        ax[digit1].set_title(f'Separation from digit {digit1} over epochs')
        ax[digit1].set_xlabel('Epoch')
        ax[digit1].set_ylabel('Separation')
        ax[digit1].set_xticks(np.arange(0, len(model.history['val']['loss']), 10)) # Set xticks
    
    # Add title
    title = 'Separation between clusters over epochs for all pairs of digits'
    fig.suptitle(title)

    # Save plot to file
    plt.savefig(path)
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();            

### Quantify cluster separation vs. training loss

In [None]:
path = 'results/p1/adjacency_mean_v_loss.png'
if not os.path.exists(path):
    # Mean of adjacency matrix vs. training loss
    # Calculate adjacency matrix mean for each epoch, including before training
    adjacency_matrix_means = []
    for epoch in range(0, len(model.history['latent'])):
        adjacency_matrix_means.append(np.mean(get_adjacency_matrix(model.history['latent'][epoch])))

    # Get list of training loss values
    train_losses = model.history['train']['loss']

    # Plot adjacency mean vs. training loss
    # Color by epoch
    plt.scatter(adjacency_matrix_means, model.history['train']['loss'], c=range(len(model.history['train']['loss'])))
    plt.xlabel('Adjacency mean')
    plt.ylabel('Training loss')
    plt.title('Adjacency mean vs. training loss')

    # Save plot to file
    plt.savefig(path)
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

### Quantify cLGG similarity vs. performance similarity

In [None]:
path = 'results/p1/spectral_dist_v_loss_delta.png'
if not os.path.exists(path):
    # Adjacency spectral distance vs. change in training loss for consecutive epochs
    # Calculate adjacency spectral distance between latent distributions of consecutive epochs (starting from epoch 1)
    spectral_distances = []
    for epoch in range(0, len(model.history['latent']) - 1):
        adjacency_matrix_1 = get_adjacency_matrix(model.history['latent'][epoch])
        adjacency_matrix_2 = get_adjacency_matrix(model.history['latent'][epoch + 1])
        spectral_distances.append(adjacency_spectral_distance(adjacency_matrix_1, adjacency_matrix_2))

    # Get change in training loss between consecutive epochs (starting from epoch 0)
    loss_deltas = []
    for epoch in range(0, len(model.history['train']['loss']) - 1):
        loss_deltas.append(np.abs(model.history['train']['loss'][epoch + 1] - model.history['train']['loss'][epoch]))

    # Plot spectral distance vs. change in training loss
    # Color by epoch
    plt.scatter(spectral_distances, loss_deltas, c=range(0, len(model.history['train']['loss']) - 1))
    plt.xlabel('Spectral distance')
    plt.ylabel('Change in training loss')
    plt.title('Spectral distance vs. change in training loss')

    # Save plot to file
    plt.savefig(path, bbox_inches='tight')
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

### Compare change in cLGG vs. change in model weights across epochs

In [None]:
path = 'results/p1/spectral_dist_v_weights_dist.png'
if not os.path.exists(path):
    # Adjacency spectral distance vs. Frobenius weights distance for consecutive epochs
    # Get Frobenius distance between bottleneck weight matrices of consecutive epochs
    weight_distances = []
    for i in range(0, len(model.history['weights']) - 1):
        weight_distances.append(np.linalg.norm(model.history['weights'][i + 1][-2] - model.history['weights'][i][-2])) # Bottleneck layer is second to last layer

    print(len(model.history['weights']), len(model.history['latent']))
    # Plot spectral distance vs. Frobenius weights distance
    plt.scatter(spectral_distances, weight_distances, c=range(0, len(model.history['train']['loss']) - 1))
    plt.xlabel('Spectral distance')
    plt.ylabel('Frobenius weights distance')
    plt.title('Spectral distance vs. Frobenius weights distance')

    # Save plot to file
    plt.savefig(path)
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

## Part 2. Full random search of initial weights

In [None]:
num_models = 100
path = 'results/p2'

# Set seed for reproducibility
np.random.seed(1)

# Initialize list of lists weights array of shape (num_neurons_i, num_neurons_i-1) for each layer i except the input layer
weights_list = [[np.random.randn(y, x) for y, x in zip(sizes[1:], sizes[:-1])] for _ in range(num_models)]

# Initialize biases array of shape (num_neurons_i, 1) for each layer i except the input layer
biases = [np.random.randn(y, 1) for y in sizes[1:]]

# Initialize list of data indices for data shuffling per epoch
shuffle_order = [np.random.permutation(int(len(train_data)*(1 - validation_size))) for i in range(epochs)]

# Train 100 models with different initial weights
models_list = []
for idx, i in enumerate(range(num_models)):
    if not os.path.exists(f'{path}/model_{idx}.pkl.gz'):
        weights = weights_list[idx]
        
        # Build & train model
        print(f'Training model {idx}...')
        model = Model(sizes, weights, biases)
        model.fit(train_data, epochs = epochs, batch_size = batch_size, learning_rate = learning_rate, decay_strength = decay_strength, validation_size = validation_size, shuffle_order = shuffle_order, store_latent_vecs = True, verbose=0)
        models_list.append(deepcopy(model)) # Deepcopy to avoid overwriting previous models

        # Save model to file
        print(f'Saving model {idx} to file...')
        start_time = time.time()
        with gzip.open(f'{path}/model_{idx}.pkl.gz', 'wb') as f:
            pickle.dump(model, f)
        print(f'Done. Saving took {time.time() - start_time:.2f} seconds.')
    else:
        # Open model from file
        print(f'Opening model {idx} from file...')
        start_time = time.time()
        with gzip.open(f'{path}/model_{idx}.pkl.gz', 'rb') as f:
            models_list.append(pickle.load(f))
        print(f'Done. Opening took {time.time() - start_time:.2f} seconds.')

In [None]:
path = 'results/p2/acc.png'
if not os.path.exists(path):
    # Plot histogram of final training accuracy
    plt.hist([model.history['train']['acc'][-1] for model in models_list], bins=10)
    plt.xlabel('Final training accuracy')
    plt.ylabel('Number of models')
    plt.title(f'Variation in training accuracy after {len(models_list[0].history["train"]["loss"]) - 1} epochs')
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p2/history.png'
if not os.path.exists(path):
    # Plot training history for each model
    nrows = 10
    ncols = 10
    fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
    ax = ax.flatten()

    for i, model in enumerate(models_list):
        # Plot accuracy history
        ax[i].plot(range(len(model.history['val']['acc'])), model.history['val']['acc'], label='Validation')
        ax[i].plot(range(len(model.history['train']['acc'])), model.history['train']['acc'], label='Train')
        ax[i].set_xticks(np.arange(0, len(model.history['val']['acc']), 10)) # Set xticks
        ax[i].set_ylabel('Accuracy') # Y axis label
        ax[i].set_xlabel('Epochs') # X axis label
        ax[i].legend() # Add legend

        # Add title
        title = f'val_acc: {model.history["val"]["acc"][-1]:.2f}'
        ax[i].set_title(title)

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p2/latent.png'
if not os.path.exists(path):
    # Visualize final latent space for each model
    nrows = 10
    ncols = 10
    fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
    ax = ax.flatten()

    colors = sns.color_palette(['#e6194B','#f58231','#ffe119','#bfef45','#3cb44b','#42d4f4','#4363d8', '#000075', '#911eb4','#f032e6'])

    for i, model in enumerate(models_list):
        # Get latent vecs for final epoch
        latent_vecs = model.history['latent'][-1]
        # Create dataframe
        df = []
        for index, element in enumerate(latent_vecs[:10000]): # Limit to first 10000 example
            vec, label = element
            df.append({'x': vec[0], 'y': vec[1], 'digit': label})
        df = pd.DataFrame(df)

        # Plot latent vectors in 2D, color-coded by digit
        # print(f'Plotting {len(df)} examples.')
        sns.scatterplot(data=df, x="x", y="y", hue="digit", palette=colors, ax=ax[i], legend=False)

        # Add title
        title = f'weight[2][0][0] = {model.history["weights"][0][2][0][0]:.3f}; val_acc: {model.history["val"]["acc"][-1]:.2f}'
        ax[i].set_title(title)

    # Create legend in last subplot
    for i in range(10):
        ax[-1].scatter([], [], color=colors[i], label=i)
    ax[-1].legend()

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p2/initweights_dist_v_cLGG_dist.png'
if not os.path.exists(path):
    # Plot Frobenius distance of first hidden layer initial weights versus spectral distance for each pair of models
    frobenius_dists = []
    spectral_dists = []
    for i, model1 in enumerate(models_list):
        for j, model2 in enumerate(models_list):
            if i != j:
                # Get first hidden layer initial weights for each model
                init_weights_i = model1.history["weights"][0][0]
                init_weights_j = model2.history["weights"][0][0]
                # Get Frobenius distance
                frobenius_dists.append(np.linalg.norm(init_weights_i - init_weights_j))
                # Get spectral distance of last epoch latent vectors
                adjacency_matrix_1 = get_adjacency_matrix(model1.history['latent'][-1])
                adjacency_matrix_2 = get_adjacency_matrix(model2.history['latent'][-1])
                spectral_dists.append(adjacency_spectral_distance(adjacency_matrix_1, adjacency_matrix_2))

    # Plot Frobenius distance vs. spectral distance
    plt.scatter(spectral_dists, frobenius_dists, s=2)
    plt.xlabel('Spectral distance')
    plt.ylabel('Frobenius distance')
    plt.title(f'Frobenius distance of first hidden layer initial weights vs. spectral distance after {len(model1.history["train"]["loss"]) - 1} epochs')

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p2/adjacency_mean_v_loss.png'
if not os.path.exists(path):
    # Plot mean adjacency versus final training loss for each model in models_list
    
    # Calculate mean adjacency for each model
    adjacency_means = []
    for model in models_list:
        adjacency_means.append(np.mean(get_adjacency_matrix(model.history['latent'][-1])))
    
    # Get final training loss for each model
    final_losses = []
    for model in models_list:
        final_losses.append(model.history['train']['loss'][-1])

    # Plot mean adjacency versus final training loss
    plt.scatter(adjacency_means, final_losses)
    plt.xlabel('Adjacency mean')
    plt.ylabel('Final training loss')
    plt.title('Mean adjacency versus final training loss')

    # Save plot to file
    plt.savefig(path, bbox_inches='tight')
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

### Quantify cLGG similarity vs. performance similarity

In [None]:
path = 'results/p2/cLGG_dist_v_loss_delta.png'
if not os.path.exists(path):
    # Plot spectral distance versus loss difference for each pair of models
    spectral_dists = []
    loss_diffs = []
    for i, model1 in enumerate(models_list):
        for j, model2 in enumerate(models_list):
            if i < j:
                # Get spectral distance of last epoch latent vectors
                adjacency_matrix_1 = get_adjacency_matrix(model1.history['latent'][-1])
                adjacency_matrix_2 = get_adjacency_matrix(model2.history['latent'][-1])
                spectral_dists.append(adjacency_spectral_distance(adjacency_matrix_1, adjacency_matrix_2))
                # Get absolute loss difference
                loss_diffs.append(abs(model1.history['train']['loss'][-1] - model2.history['train']['loss'][-1]))

    # Plot spectral distance vs. loss difference
    plt.scatter(spectral_dists, loss_diffs, s=2)
    plt.xlabel('Spectral distance')
    plt.ylabel('Loss difference')
    plt.title(f"Spectral distance vs. loss difference after {len(model1.history['train']['loss']) - 1} epochs")

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p2/initweights_dist_v_loss_diff.png'
if not os.path.exists(path):
    # Plot Frobenius distance of initial weights versus loss difference for each pair of models
    frobenius_dists = []
    loss_diffs = []
    for i, model1 in enumerate(models_list):
        for j, model2 in enumerate(models_list):
            if i < j:
                # Get first hidden layer initial weights for each model
                init_weights_i = model1.history["weights"][0][0]
                init_weights_j = model2.history["weights"][0][0]
                # Get Frobenius distance
                frobenius_dists.append(np.linalg.norm(init_weights_i - init_weights_j))
                # Get absolute loss difference
                loss_diffs.append(abs(model1.history['train']['loss'][-1] - model2.history['train']['loss'][-1]))

    # Plot Frobenius distance vs. loss difference
    plt.scatter(frobenius_dists, loss_diffs, s=2)
    plt.xlabel('Frobenius distance')
    plt.ylabel('Loss difference')
    plt.title(f"Frobenius distance vs. loss difference after {len(model1.history['train']['loss']) - 1} epochs")

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p2/heatmaps.png'
if not os.path.exists(path):
    # Plot heatmap of adjacency matrix for each model in models_list
    nrows = 10
    ncols = 10
    fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
    ax = ax.flatten()

    for i, model in enumerate(models_list):
        # Get adjacency matrix
        adjacency_matrix = get_adjacency_matrix(model.history['latent'][-1])
        # Plot heatmap
        ax[i].imshow(adjacency_matrix, cmap='magma')
        ax[i].set_title(f'Last epoch training accuracy: {model.history["train"]["acc"][-1]:.2f}')
        
    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p2/pca.png'

# KMeans clustering of LGGs via adjacency matrices
# Plot PCA of adjacency matrices
if not os.path.exists(path):
    # Get adjacency matrices
    adjacency_matrices = []
    for model in models_list:
        adjacency_matrices.append(get_adjacency_matrix(model.history['latent'][-1]))

    # Convert to dataframe
    adj_df = []
    for mat in adjacency_matrices:
        adj_dict = {}
        for i in range(10):
            for j in range(i+1, 10):
                adj_dict[f'{i}_{j}'] = mat[i][j]
        adj_df.append(adj_dict)
    adj_df = pd.DataFrame(adj_df)

    # KMeans clustering
    fit = KMeans(n_clusters = 8, init = 'random', n_init=10, random_state=1).fit(adj_df)

    # Compute PCA
    fitted_pca = PCA().fit(adj_df)
    pca_df = pd.DataFrame(fitted_pca.transform(adj_df), columns=[f'PCA{i}' for i in range(len(adj_df.columns.values))])
    pca_df['label'] = fit.labels_

    # Plot PCA
    for cluster_id, cluster_df in pca_df.groupby(['label']):
        pca1 = cluster_df.iloc[:,0]
        pca2 = cluster_df.iloc[:,1]
        plt.scatter(pca1, pca2, label=cluster_id)

    plt.xlabel(f'PC1 ({np.round(fitted_pca.explained_variance_ratio_[0], 1)})')
    plt.ylabel(f'PC2 ({np.round(fitted_pca.explained_variance_ratio_[1], 1)})')
    plt.legend()

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
# Plot latent spaces for all models in each cluster

for cluster_id, cluster_df in pca_df.groupby(['label']):

    path = f'results/p2/latent_cLGGclust{cluster_id}'
    if not os.path.exists(path):

        cluster_models_idx = pca_df.loc[pca_df['label'] == cluster_id,:].index
        cluster_models = [models_list[idx] for idx in cluster_models_idx]

        # Visualize final latent space for each model
        ncols = 10
        nrows = math.ceil(len(cluster_models)/ncols)
        fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
        ax = ax.flatten()

        colors = sns.color_palette(['#e6194B','#f58231','#ffe119','#bfef45','#3cb44b','#42d4f4','#4363d8', '#000075', '#911eb4','#f032e6'])

        for i, model in enumerate(cluster_models):
            # Get latent vecs for final epoch
            latent_vecs = model.history['latent'][-1]
            # Create dataframe
            df = []
            for index, element in enumerate(latent_vecs[:10000]): # Limit to first 10000 example
                vec, label = element
                df.append({'x': vec[0], 'y': vec[1], 'digit': label})
            df = pd.DataFrame(df)

            # Plot latent vectors in 2D, color-coded by digit
            # print(f'Plotting {len(df)} examples.')
            sns.scatterplot(data=df, x="x", y="y", hue="digit", palette=colors, ax=ax[i], legend=False)

            # Add title
            title = f'Model {cluster_models_idx[i]}; val_acc: {model.history["val"]["acc"][-1]:.2f}'
            ax[i].set_title(title)

        # Create legend in last subplot
        for i in range(10):
            ax[-1].scatter([], [], color=colors[i], label=i)
        ax[-1].legend()

        # Save plot to file
        plt.savefig(path);
        plt.show();
    else:
        # Load plot from file
        print('Loading image from file.')
        plt.axes([0,0,1,1])
        plt.imshow(mpimg.imread(path))
        plt.axis('off')
        plt.show();

In [None]:
# Plot heatmap of adjacency matrix for all models in each cluster

for cluster_id, cluster_df in pca_df.groupby(['label']):

    path = f'results/p2/latent_cLGGclust{cluster_id}'
    if not os.path.exists(path): 

        cluster_models_idx = pca_df.loc[pca_df['label'] == cluster_id,:].index
        cluster_models = [models_list[idx] for idx in cluster_models_idx]

        ncols = 10
        nrows = math.ceil(len(cluster_models)/ncols)
        fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
        ax = ax.flatten()

        for i, model in enumerate(cluster_models):
            # Get adjacency matrix
            adjacency_matrix = get_adjacency_matrix(model.history['latent'][-1])
            # Plot heatmap
            ax[i].imshow(adjacency_matrix, cmap='magma')
            ax[i].set_title(f'Last epoch training accuracy: {model.history["train"]["acc"][-1]:.2f}')
            
        # Save plot to file
        plt.savefig(path);
        plt.show();
    else:
        # Load plot from file
        print('Loading image from file.')
        plt.axes([0,0,1,1])
        plt.imshow(mpimg.imread(path))
        plt.axis('off')
        plt.show();

In [None]:
path = 'results/p2/cluster_dist_hist.png'

# For every pair of clusters, plot histogram of distance between cluster centroids
if not os.path.exists(path):
    nrows = 5
    ncols = 9
    fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
    ax = ax.flatten()

    # Get adjacency matrices for each model
    adjacency_matrices = []
    for model in models_list:
        adjacency_matrices.append(get_adjacency_matrix(model.history['latent'][-1]))

    i = 0
    for digit1 in range(10):
        for digit2 in range(digit1 + 1, 10):
            distances = []
            for j, model in enumerate(models_list):
                # Get separation between digit1 and digit2 cluster centroids from adjacency matrix
                distances.append(adjacency_matrices[j][digit1][digit2])

            # Plot histogram
            ax[i].hist(distances, bins=20)
            ax[i].set_title(f'{digit1} vs. {digit2}')
            ax[i].set_xlabel('Distance between cluster centroids')
            i += 1
    
    # Add title
    title = 'Cluster separation across randomly initialized models'
    fig.suptitle(title)

    # Save plot to file
    plt.savefig(path)
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();            

In [None]:
path = 'results/p2/inertia.png'
# Plot inertia of each cluster over epochs
if not os.path.exists(path):
    nrows = 10
    ncols = 10
    fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
    ax = ax.flatten()

    colors = sns.color_palette(['#e6194B','#f58231','#ffe119','#bfef45','#3cb44b','#42d4f4','#4363d8', '#000075', '#911eb4','#f032e6'])

    for n, model in enumerate(models_list):
        # Get list of cluster inertias for each epoch
        cluster_inertias = []
        for epoch in range(len(model.history['latent'])):
            cluster_inertias.append(get_cluster_inertias(model.history['latent'][epoch]))
        
        # Convert to numpy array and transpose
        cluster_inertias = np.array(cluster_inertias).T

        # Plot each row of cluster inertias in different colors
        for i in range(10):
            ax[n].plot(cluster_inertias[i], color=colors[i], label=i)
        ax[n].set_xlabel('Epoch')
        ax[n].set_ylabel('Cluster inertia')
        ax[n].set_title(f'val_acc: {model.history["val"]["acc"][-1]:.2f}')
        ax[n].legend()

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

## Part 3. Linear traversal of weights hyperspace

In [None]:
num_models = 100
path = 'results/p3'

# Set seed for reproducibility
np.random.seed(1)

# Initialize weights array of shape (num_neurons_i, num_neurons_i-1) for each layer i except the input layer
weights = [np.random.randn(y, x) for y, x in zip(sizes[1:], sizes[:-1])]

# Initialize biases array of shape (num_neurons_i, 1) for each layer i except the input layer
biases = [np.random.randn(y, 1) for y in sizes[1:]]

# Initialize list of data indices for data shuffling per epoch
shuffle_order = [np.random.permutation(int(len(train_data)*(1 - validation_size))) for i in range(epochs)]

# Train models with different initial weights of first neuron for first pixel in first hidden layer
models_list = []
for idx, variable_weight in enumerate(np.linspace(-10, 10, num_models)):
    if not os.path.exists(f'{path}/model_{idx}.pkl.gz'):
        # Edit initial weight matrix
        new_weights = weights.copy()
        new_weights[2][0][0] = variable_weight
        
        # Build & train model
        print(f'Building model {idx} with initial weight[2][0][0]: {new_weights[2][0][0]:.4f}...')
        model = Model(sizes, weights=new_weights, biases=biases)
        model.fit(train_data, epochs = epochs, batch_size = batch_size, learning_rate = learning_rate, decay_strength = decay_strength, validation_size = validation_size, shuffle_order = shuffle_order, store_latent_vecs = True, verbose=0)
        models_list.append(deepcopy(model)) # Must deepcopy model to avoid overwriting previous models

        # Save model to file
        print(f'Saving model {idx} to file...')
        start_time = time.time()
        with gzip.open(f'{path}/model_{idx}.pkl.gz', 'wb') as f:
            pickle.dump(model, f)
        print(f'Done. Saving took {time.time() - start_time:.2f} seconds.')
    else:
    # Open model from file
        print(f'Opening model {idx} from file...')
        start_time = time.time()
        with gzip.open(f'{path}/model_{idx}.pkl.gz', 'rb') as f:
            models_list.append(pickle.load(f))
        print(f'Done. Opening took {time.time() - start_time:.2f} seconds.')

In [None]:
path = 'results/p3/acc.png'
if not os.path.exists(path):
    # Plot histogram of final training accuracy
    plt.hist([model.history['train']['acc'][-1] for model in models_list], bins=10)
    plt.xlabel('Final training accuracy')
    plt.ylabel('Number of models')
    plt.title(f'Variation in training accuracy after {len(models_list[0].history["train"]["loss"]) - 1} epochs')
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p3/history.png'
if not os.path.exists(path):
    # Plot training history for each model
    nrows = 10
    ncols = 10
    fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
    ax = ax.flatten()

    for i, model in enumerate(models_list):
        # Plot accuracy history
        ax[i].plot(range(len(model.history['val']['acc'])), model.history['val']['acc'], label='Validation')
        ax[i].plot(range(len(model.history['train']['acc'])), model.history['train']['acc'], label='Train')
        ax[i].set_xticks(np.arange(0, len(model.history['val']['acc']), 10)) # Set xticks
        ax[i].set_ylabel('Accuracy') # Y axis label
        ax[i].set_xlabel('Epochs') # X axis label
        ax[i].legend() # Add legend

        # Add title
        title = f'Weight[2][0][0] = {model.history["weights"][0][2][0][0]:.3f}'
        ax[i].set_title(title)

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p3/latent.png'
if not os.path.exists(path):
    # Visualize final latent space for each model
    nrows = 10
    ncols = 10
    fig, ax = plt.subplots(nrows, ncols, tight_layout=True, figsize = (ncols * 5, nrows * 5))
    ax = ax.flatten()

    colors = sns.color_palette(['#e6194B','#f58231','#ffe119','#bfef45','#3cb44b','#42d4f4','#4363d8', '#000075', '#911eb4','#f032e6'])

    for i, model in enumerate(models_list):
        # Get latent vecs for final epoch
        latent_vecs = model.history['latent'][-1]
        # Create dataframe
        df = []
        for index, element in enumerate(latent_vecs[:10000]): # Limit to first 10000 example
            vec, label = element
            df.append({'x': vec[0], 'y': vec[1], 'digit': label})
        df = pd.DataFrame(df)

        # Plot latent vectors in 2D, color-coded by digit
        # print(f'Plotting {len(df)} examples.')
        sns.scatterplot(data=df, x="x", y="y", hue="digit", palette=colors, ax=ax[i], legend=False)

        # Add title
        title = f'weight[2][0][0] = {model.history["weights"][0][2][0][0]:.3f}; val_acc: {model.history["val"]["acc"][-1]:.2f}'
        ax[i].set_title(title)

    # Create legend in last subplot
    for i in range(10):
        ax[-1].scatter([], [], color=colors[i], label=i)
    ax[-1].legend()

    # Save plot to file
    plt.savefig(path);
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p3/cLGG_distance_v_weights_delta.png'
if not os.path.exists(path):
    # Plot latent space similarity (by adjacency spectral distance) versus difference in initial weights, for each pair of models in models_list
    spectral_distances = []
    weight_deltas = []
    for i in range(len(models_list)):
        for j in range(i + 1, len(models_list)):
            # Get adjacency spectral distances of last epoch latent vectors
            adjacency_matrix_1 = get_adjacency_matrix(models_list[i].history['latent'][-1])
            adjacency_matrix_2 = get_adjacency_matrix(models_list[j].history['latent'][-1])
            spectral_distances.append(adjacency_spectral_distance(adjacency_matrix_1, adjacency_matrix_2))

            # Get absolute value difference between initial weights of first neuron for first pixel in first hidden layer
            weight_difference = np.abs(models_list[i].history["weights"][0][2][0][0] - models_list[j].history["weights"][0][2][0][0])
            weight_deltas.append(weight_difference)

    # Plot spectral distance vs. change in training loss
    # Color by epoch
    plt.scatter(spectral_distances, weight_deltas, s=1)
    plt.xlabel('Spectral distance')
    plt.ylabel('Difference in initial weight of first neuron in bottleneck layer')
    plt.title('Spectral distance vs. difference in single initial weight')

    # Save plot to file
    plt.savefig(path, bbox_inches='tight')
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();

In [None]:
path = 'results/p3/adjacency_mean_v_loss.png'
if not os.path.exists(path):
    # Plot mean adjacency versus final training loss for each model in models_list
    
    # Calculate mean adjacency for each model
    adjacency_means = []
    for model in models_list:
        adjacency_means.append(np.mean(get_adjacency_matrix(model.history['latent'][-1])))
    
    # Get final training loss for each model
    final_losses = []
    for model in models_list:
        final_losses.append(model.history['train']['loss'][-1])

    # Plot mean adjacency versus final training loss
    plt.scatter(adjacency_means, final_losses)
    plt.xlabel('Adjacency mean')
    plt.ylabel('Final training loss')
    plt.title('Mean adjacency versus final training loss')

    # Save plot to file
    plt.savefig(path, bbox_inches='tight')
    plt.show();
else:
    # Load plot from file
    print('Loading image from file.')
    plt.axes([0,0,1,1])
    plt.imshow(mpimg.imread(path))
    plt.axis('off')
    plt.show();