In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
#from tensorflow import keras
import keras
import matplotlib.pyplot as plt
from vis.visualization import visualize_saliency
from vis.visualization import get_num_filters
from vis.visualization import visualize_saliency_init
from vis.visualization import visualize_saliency_run
from vis.utils import utils
from tqdm import tqdm
from scipy.stats import describe
from scipy import ndimage
import os


# 1. Loading Data 

## Read dataset

In [None]:
data = tf.keras.datasets.mnist

In [None]:
(trainX, trainY),(testX, testY) = data.load_data()

In [None]:
trainX = trainX.reshape((trainX.shape[0], 28, 28, 1))
testX = testX.reshape((testX.shape[0], 28, 28, 1))

In [None]:
trainY = keras.utils.to_categorical(trainY, 10)
testY = keras.utils.to_categorical(testY, 10)

In [None]:
trainX = trainX.astype("float32") / 255.0
testX = testX.astype("float32") / 255.0

In [None]:
print(trainX.shape)

# Training

## Code for training

In [None]:
#Creates Sequential model using Keras
#Number of nodes is the same as number of features (different number of nodes were tried but it did not
#affect validation accuracy significantly)
lenet = keras.Sequential([
                            #Input layer:
                            keras.layers.Conv2D(20, 5, padding="same", input_shape=[28,28,1], use_bias=True),
                            #Hidden Layers:
                            keras.layers.Activation(activation="relu"),
                            keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)),
                            keras.layers.Conv2D(50, 5, padding="same"),
                            keras.layers.Activation(activation="relu"),
                            keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)),
                            keras.layers.Flatten(),
                            keras.layers.Dense(500),
                            keras.layers.Activation(activation="relu"),
                            keras.layers.Dense(10, name='vis',use_bias=True),
                            #Output layer
                            keras.layers.Activation(activation="softmax"),
                        ])

In [None]:
#lenets = [keras.models.clone_model(lenet),keras.models.clone_model(lenet),keras.models.clone_model(lenet),keras.models.clone_model(lenet),keras.models.clone_model(lenet),keras.models.clone_model(lenet)]

In [None]:
classifier_number = 5
lenets = [keras.models.clone_model(lenet)]
for i in range(1,classifier_number):
    lenets.append(keras.models.clone_model(lenet))

In [None]:
#Compiles sequential model
#Using learning rate 0.01
#Loss function will be categorical crossentropy
lenet.compile(
                optimizer=keras.optimizers.SGD(lr=0.01),
                loss = 'categorical_crossentropy',
                metrics = ['accuracy']
                )
#Trains network over a number of epochs and evaluates network agains validation data
#after each epoch
lenetEpochHistory = lenet.fit(trainX, trainY, epochs = 5, validation_data = (testX, testY))

In [None]:
#Compiles sequential model
#Using learning rate 0.01
#Loss function will be categorical crossentropy
for model in lenets:
    model.compile(
                    optimizer=keras.optimizers.SGD(lr=0.01),
                    loss = 'categorical_crossentropy',
                    metrics = ['accuracy']
                    )
#Trains network over a number of epochs and evaluates network agains validation data
#after each epoch
for model in lenets:
    model.fit(trainX, trainY, epochs = 5, validation_data = (testX, testY))

# General Evaluation

In [None]:
#Predicting Label:

print(np.argmax(lenet.predict(testX)[100]))
print(np.argmax(testY[100]))

In [None]:
#Get accuracy for lenet
sequentialLoss, sequentialAccuracy = lenet.evaluate(testX, testY)
print('Lenet accuracy: ', sequentialAccuracy)
print('Lenet loss: ', sequentialLoss)

plt.style.use('dark_background')
plt.plot(lenetEpochHistory.history['acc'])
plt.plot(lenetEpochHistory.history['val_acc'])
plt.title('Neural Network accuracy per epoch')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Training data', 'Validation data'])
plt.show()

plt.style.use('dark_background')
plt.plot(lenetEpochHistory.history['loss'])
plt.plot(lenetEpochHistory.history['val_loss'])
plt.title('Neural Network loss per epoch')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Training data', 'Validation data'])
plt.show()

# Saliency

In [None]:
#This line outputs the layer_idx that the saliency is meant to be extracted from. (Usually the output layer pre-activation)
utils.find_layer_idx(lenet, 'vis')

In [None]:
#Check if that layer is correct by checking if the number of nodes matches the number of outputs
get_num_filters(lenet.layers[9])

In [None]:
#Check if the saliency map is working for the singular network
input_idx = 1
fig, ax = plt.subplots(nrows=1, ncols=2)
saliency_map = visualize_saliency(model = lenet,layer_idx = 9, filter_indices = np.argmax(testY[input_idx]), seed_input = testX[input_idx])
ax[0].imshow(saliency_map.reshape(28,28),interpolation='nearest')
ax[1].imshow(testX[input_idx].reshape(28,28))
plt.show()

In [None]:
# Get optimisers for each network's output node to speed up saliency processing
optimisers = []
classifiers = lenets
for i in range(0,len(classifiers)):
    classifier_optmisers = []
    for j in range(0,10):
        opt = visualize_saliency_init(classifiers[i],9,filter_indices=j)
        classifier_optmisers.append(opt)
    optimisers.append(classifier_optmisers)
        

In [None]:
#Function to calculate ensemble outputs (for series of inputs) using mean of outputs
def get_ensenmble_outputs(classifiers,classifier_inputs):
    predictions = np.zeros((np.size(testX, axis=0), np.size(testY, axis=1)))
    for classifier in classifiers:
        predictions = predictions + classifier.predict(classifier_inputs)
    prediction_average = predictions / classifier_number
    outputs = np.apply_along_axis(np.argmax, axis=1, arr=prediction_average)
    return(outputs)
        

In [None]:
#Function to calculate ensemble output (for one input) using mean of outputs
def get_ensemble_output(classifiers,classifier_input):
    predictions = np.zeros(np.size(testY, axis=1))
    for classifier in classifiers:
        predictions = predictions + classifier.predict(np.expand_dims(classifier_input,axis=0))
    prediction_average = predictions / classifier_number
    output = np.argmax(prediction_average)
    return(output)
        

In [None]:
get_ensemble_output(lenets,testX[100])

In [None]:
# Function to generate multiple saliency maps for each input

# If doing this over multiple inputs at a time, it is faster to go over multiple inputs
# with the same classifier and then doing the same for the different classifiers. This is 
# because switching optimisers is computationally expensive.

def generate_saliency_maps_for_one_input(classifiers,classifier_input,optimisers,visualised_layer):
    output_node = get_ensemble_output(classifiers,classifier_input)
    saliency_maps = np.zeros((len(classifiers),classifier_input.shape[0],classifier_input.shape[1]))
    for i in range(0,len(classifiers)):
        saliency_maps[i] = visualize_saliency_run(model = classifiers[i],layer_idx = visualised_layer, opt = optimisers[i][output_node], seed_input = classifier_input)
    return(saliency_maps)
    

In [None]:
#Function to visualise the multiple saliency maps
def visualize_saliency_maps(classifier_input,saliency_maps):
    fig, ax = plt.subplots(nrows=1, ncols=len(saliency_maps)+1, figsize = (15,15))
    i = 1
    for s_map in saliency_maps:
        ax[i].imshow(s_map)
        i = i+1
    ax[0].imshow(classifier_input)
    plt.show()

In [None]:
# Compute difference of saliency maps
def generate_uncertainty_map(saliency_maps):
    return(np.std(saliency_maps,axis=0)/np.average(saliency_maps,axis=0))

In [None]:
# Wrapper function to arrive at uncertainty output using classifiers and input
def calculate_uncertainty(classifiers,classifier_input,optimisers,visualised_layer):
    
    saliency_maps = generate_saliency_maps_for_one_input(classifiers = classifiers,
                                  classifier_input = classifier_input,
                                  optimisers = optimisers,
                                  visualised_layer = visualised_layer)
    
    uncertainty_map = generate_uncertainty_map(saliency_maps)
    
    return(np.average(uncertainty_map))


In [None]:
# Compute difference of saliency maps
def calculate_uncertainty_with_maps(saliency_maps):
    return(np.mean(np.std(saliency_maps,axis=0)/np.average(saliency_maps,axis=0)))

In [None]:
def generate_saliency_maps_for_multiple_inputs(classifier,classifier_inputs,classifier_outputs,
                                               classifier_optimisers,visualised_layer):
    
    saliency_maps = []
    for input_idx in tqdm(range(0,np.size(classifier_inputs,axis=0))):
        classifier_input = classifier_inputs[input_idx]
        output_node = classifier_outputs[input_idx]
        saliency_maps.append(visualize_saliency_run(model = classifier,
                                                    layer_idx = 9, 
                                                    opt = classifier_optimisers[output_node],   
                                                    seed_input = classifier_input))
    return(saliency_maps)


In [None]:
def generate_ensemble_saliency_maps_for_multiple_inputs(classifiers,
                                                        classifier_inputs,classifier_outputs,
                                                        optimisers,visualised_layer):
    saliency_maps = []
    for classifier_idx in range(0,len(classifiers)):
        saliency_maps.append(generate_saliency_maps_for_multiple_inputs(
                                classifiers[classifier_idx],
                                classifier_inputs,
                                classifier_outputs,
                                optimisers[classifier_idx],
                                9))
    return(saliency_maps)
    

# MNIST

In [None]:
MNIST_ensemble_predicted_outputs = get_ensenmble_outputs(lenets, testX)

In [None]:
# Generate saliency maps examples
input_idx = 6
maps = generate_saliency_maps_for_one_input(classifiers = lenets,
                                            classifier_input = testX[input_idx],
                                            optimisers = optimisers,
                                            visualised_layer = 9)

In [None]:
# Visualise saliency maps examples
visualize_saliency_maps(classifier_input = testX[input_idx].reshape(28,28),
                        saliency_maps = maps)

In [None]:
for mapx in maps:
    print(describe(mapx,axis=None))

In [None]:
# Visualise uncertainty map example
uncertainty_map = generate_uncertainty_map(maps)
plt.imshow(uncertainty_map)
plt.show()

In [None]:
# Compute the average difference value for each pixel (uncertainty)
np.average(uncertainty_map)

In [None]:
# Generating example using wrapper function
input_idx = 4
uncertainty = calculate_uncertainty(classifiers = lenets,
                              classifier_input = testX[input_idx],
                              optimisers = optimisers,
                              visualised_layer = 9)
print(uncertainty)

generate_saliency_maps_for_multiple_inputs(lenets[0],testX,ensemble_predicted_outputs,optimisers[0],9)

In [None]:
MNIST_saliency_maps = generate_ensemble_saliency_maps_for_multiple_inputs(lenets,testX,MNIST_ensemble_predicted_outputs,optimisers,9)

In [None]:
MNIST_saliency_maps = np.swapaxes(MNIST_saliency_maps,0,1)

In [None]:
visualize_saliency_maps(testX[1].reshape(28,28),MNIST_saliency_maps[1])

In [None]:
MNIST_uncertanties = np.zeros(np.size(MNIST_saliency_maps,axis=0))
for i in range(0,np.size(MNIST_saliency_maps, axis=0)):
    MNIST_uncertanties[i] = calculate_uncertainty_with_maps(MNIST_saliency_maps[i])

In [None]:
MNIST_uncertanties[6]

In [None]:
plt.hist(MNIST_uncertanties)
plt.show()

# Not-MNIST

In [None]:
folder = './data/notMNIST_small/A'

In [None]:
image_size = 28  # Pixel width and height.
pixel_depth = 255.0  # Number of levels per pixel.

In [None]:
def load_letter(folder, min_num_images):
  """Load the data for a single letter label."""
  image_files = os.listdir(folder)
  dataset = np.ndarray(shape=(len(image_files), image_size, image_size),
                         dtype=np.float32)
  print(folder)
  num_images = 0
  for image in image_files:
    image_file = os.path.join(folder, image)
    try:
      image_data = (ndimage.imread(image_file).astype(float) - 
                    pixel_depth / 2) / pixel_depth
      if image_data.shape != (image_size, image_size):
        raise Exception('Unexpected image shape: %s' % str(image_data.shape))
      dataset[num_images, :, :] = image_data
      num_images = num_images + 1
    except IOError as e:
      print('Could not read:', image_file, ':', e, '- it\'s ok, skipping.')
    
  dataset = dataset[0:num_images, :, :]
  if num_images < min_num_images:
    raise Exception('Many fewer images than expected: %d < %d' %
                    (num_images, min_num_images))
    
  print('Full dataset tensor:', dataset.shape)
  print('Mean:', np.mean(dataset))
  print('Standard deviation:', np.std(dataset))
  return dataset

In [None]:
letter = load_letter(folder,1)

In [None]:
data = tf.keras.datasets.mnist

In [None]:
(trainX, trainY),(testX, testY) = data.load_data()

In [None]:
trainX = trainX.reshape((trainX.shape[0], 28, 28, 1))
testX = testX.reshape((testX.shape[0], 28, 28, 1))

In [None]:
trainY = keras.utils.to_categorical(trainY, 10)
testY = keras.utils.to_categorical(testY, 10)

In [None]:
trainX = trainX.astype("float32") / 255.0
testX = testX.astype("float32") / 255.0

In [None]:
print(trainX.shape)

# Misc.

In [None]:
#This prints out the pre-activation outputs for the output layer (for curiosity)
intermediate_layer_model = keras.Model(inputs=lenet.input,
                                 outputs=lenet.get_layer("vis").output)
intermediate_output = intermediate_layer_model.predict(testX)[input_idx]
print(max(intermediate_output))
print(lenet.predict(testX)[input_idx])

In [None]:
# This is what happens if we don't normalise the pixels' standard deviation
plt.imshow(np.std(maps,axis=0))
plt.show()

In [None]:
#Output images that were misclassified by the singular classifier
idx = 0
for i in lenet.predict(testX):
    if (np.argmax(i) != np.argmax(testY[idx])):
        print(idx)
    idx = idx + 1