## Load MNIST and load a batch

In [7]:
import torch
import torchvision

batch_size = 64

train_loader = torch.utils.data.DataLoader(
  torchvision.datasets.MNIST('/files/', train=True, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])),
  batch_size=batch_size, shuffle=True)

test_loader = torch.utils.data.DataLoader(
  torchvision.datasets.MNIST('/files/', train=False, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])),
  batch_size=batch_size, shuffle=True)

examples = enumerate(test_loader)
batch_idx, (example_data, example_targets) = next(examples)

## Declare and load classifier

In [8]:
#https://nextjournal.com/gkoehler/pytorch-mnist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class MNISTClassifier(nn.Module):
    def __init__(self):
        super(MNISTClassifier, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)

network = MNISTClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") 

import numpy as np

from google.colab import drive
drive.mount('/content/drive', force_remount=True)

PATH_DIR = '/content/drive/MyDrive/XAI-Anna-Carlos/'
PATH_PRETRAINED = PATH_DIR + 'mnist-classifier.pth'

import os.path

if os.path.isfile(PATH_PRETRAINED):
  network.load_state_dict(torch.load(PATH_PRETRAINED))
  network.to(device)
else:
  raise Exception('ERROR: Could not find model at ',PATH_PRETRAINED)

Mounted at /content/drive


## Define functions to generate the plot
These are not needed to generate random data. However, data is stored along with the corresponding plots.
Also, these functions are needed when generating data using the genetic algorithm.

In [9]:
def measure_curve(curve):
  return np.mean(curve)

def measure_curve_with_inverse(curve, inverse_curve):
  return np.mean(curve-inverse_curve)

####################################
# Additional definitions of Q.
####################################

def measure_auc(values: np.array, dx: int = 1):
    return np.trapz(np.array(curve), dx=dx)

def get_masked_images(original_image, alternative_image, ranking_image, selection_levels):
  '''
  Generates as many masked images as selection levels are provided
  Inputs are torch tensors already on device
  '''
  # Reshape selection_levels to be able to broadcast the selection levels and get
  # as many masks as selection levels are provided
  new_shape = (selection_levels.shape[0], 1, 1, 1) # Same shape but with a trailing 1
  selection_levels = torch.reshape(selection_levels, new_shape)
  # Compute all masks in batch
  masks = torch.le(ranking_image,selection_levels)
  # Compute masked images from masks and original and alternative images
  images_masked = (original_image*masks) + (alternative_image*torch.logical_not(masks))
  return images_masked

def get_random_ranking_image(dimensions):
  num_elems = 1
  for d in dimensions:
    num_elems *= d
  image = np.random.permutation(num_elems).reshape(dimensions)/num_elems
  return torch.from_numpy(image)

def get_class_logits_for_masked_images(original_image, alternative_image, ranking_image, selection_levels, model, class_num):
  with torch.no_grad():
    # Send everything to device and work there
    image = original_image.to(device)
    alternative = alternative_image.to(device)
    ranking = ranking_image.to(device)
    levels = selection_levels.to(device)
    images = get_masked_images(image, alternative, ranking, levels)
    logits = model(images).to('cpu').numpy()
  return logits[:,class_num],np.equal(np.argmax(logits, axis=1),class_num)

'''def save_explanation_exploratory_plot(image, curve, is_hit, output_label, filename='unnamed'):
  # Plot and save the figures
  fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(14, 5))
  axes[0].imshow(image[0], cmap='gray', interpolation='none')
  axes[0].title.set_text(output_label)
  axes[0].axis("off")
  for i in range(1, len(curve)):
    axes[1].plot([i-1,i],curve[i-1:i+1], lw=5 if is_hit[i] else 1, color='b')
  fig.savefig(f'{filename}.png')
  plt.show()'''

def get_explanation_exploratory_curve(image, ranking_image, num_samples, output_label, model):
  alternative = torch.from_numpy(np.full(image.shape,  0, dtype=np.float32)) #ZEROED-OUT

  # Selection levels
  selection_levels = torch.from_numpy(np.linspace(0, 1, num_samples))

  # Increasing order
  class_logit,is_hit = get_class_logits_for_masked_images(image, alternative, ranking_image, selection_levels, model, output_label)

  # Compute the numerical value for the measure
  #measure = measure_curves(class_logit)

  return class_logit,is_hit

## Generation parameters

In [10]:
from tqdm import tqdm

index = 22
num_curves = 500000
num_samples = 20

image = example_data[index]
label = example_targets[index]

# Random generation

In [5]:
# These will be the generated attribution distributions
rankings = np.zeros((num_curves, *tuple(image.shape)))
# These will be the corresponding output vs selection level plots
plots = np.zeros((num_curves, num_samples))
# These will be the corresponding output vs selection level plots with the inverse rankings
inverse_plots = np.zeros((num_curves, num_samples))
# These will indicate whether for a given selection level, the examined output is the highest activated
hit_plots = np.zeros((num_curves, num_samples), dtype=bool)
# These will indicate whether for a given selection level, the examined output is the highest activated (for the inverse rankings)
inverse_hit_plots = np.zeros((num_curves, num_samples), dtype=bool)

for i in tqdm(range(num_curves)):
  ranking = get_random_ranking_image(image.shape)
  rankings[i] = ranking
  curve,is_hit=get_explanation_exploratory_curve(image, ranking, num_samples, label, network)
  inverse_curve,inverse_is_hit=get_explanation_exploratory_curve(image, 1-ranking, num_samples, label, network)
  plots[i] = curve
  hit_plots[i] = is_hit
  inverse_plots[i] = inverse_curve
  inverse_hit_plots[i] = inverse_is_hit

100%|██████████| 500000/500000 [17:47<00:00, 468.44it/s]


### Computation of metrics for the random data just generated

In [6]:
measures = np.zeros(num_curves)
measures_with_inverse = np.zeros(num_curves)
measures_auc = np.zeros(num_curves)
measures_auc_with_inverse = np.zeros(num_curves)
for i in tqdm(range(num_curves)):
  measures[i] = measure_curve(plots[i])
  measures_with_inverse[i] = measure_curve_with_inverse(plots[i], inverse_plots[i])
  measures_auc[i] = measure_auc(plots[i])
  measures_auc_with_inverse[i] = measure_auc(inverse_plots[i])

100%|██████████| 500000/500000 [00:29<00:00, 17139.71it/s]


## Save to Google Drive

In [11]:
np.savez(PATH_DIR + 'random_generated_auc--.npz', image, label, rankings, plots, inverse_plots, hit_plots, inverse_hit_plots, measures, measures_with_inverse, measures_auc, measures_auc_with_inverse)

# Generation with genetic programming
The algorithm is as follows:


*   Generate an initial population of random rankings
*   For a given number of steps, do:
  *   Compute Q measure for current population
  *   Take top `num_saved` elements according to Q measure. Replace the rest of the population with "children" of randomly selected "parents" from the top `num_saved`

First we declare the function that, given two "parent" $p1$ and $p2$ rankings, generates a "child" $c$ that is a mixture of their "parents". In this case, $c_i$ will be a random element of the set $ \left(p1_{\left[0,i\right]} \bigcup p2_{\left[0,i\right]}\right) \setminus c_{\left[0,i-1\right]} $.

Then we declare a function that trims a population to its top `num_saved` performers and repopulates the rest by crossing them at random.

In [12]:
import random

def spawn_rankings(r1, r2):
  # Generates a new ranking that's a mixture of r1 and r2
  new_r = np.zeros(len(r1), dtype=int)
  # The next position of new_r will be sampled from a pool of candidates
  candidates = []
  # We don't want the same element more than once so we keep track of the ones we use
  used = np.zeros(len(r1),bool)
  for i in range(len(new_r)):
    if not used[r1[i]]:
      used[r1[i]] = True
      candidates.append(r1[i])
    if not used[r2[i]]:
      used[r2[i]] = True
      candidates.append(r2[i])
    #print(candidates)
    selected = candidates.pop(random.randint(0,len(candidates)-1))
    #print('Selected',selected)
    #print(candidates)
    new_r[i] = selected
    #print('New_r',new_r,'\n','-'*20)
  return new_r

# TEST
from numpy.random import default_rng
rng = default_rng()
r1 = rng.permutation(8)
r2 = rng.permutation(8)
print('R1',r1)
print('R2',r2)
r_spawn = spawn_rankings(r1, r2)
print(r_spawn)

def repopulate(population, ranking, num_saved):
  for i in range(num_saved, population.shape[0]):
    parent_indices = rng.permutation(num_saved)[:2]
    parent1 = population[ranking[parent_indices[0]]]
    parent2 = population[ranking[parent_indices[1]]]
    population[ranking[i]] = spawn_rankings(parent1, parent1)

R1 [1 6 4 3 2 0 5 7]
R2 [1 5 7 4 6 2 3 0]
[1 5 6 3 4 2 0 7]


In [13]:
# We also need a helper function to transform a linear ranking into an attribution map to be used to compute the output vs selection level plot.
def get_ranking_image_from_linear_ranking(ranking, desired_shape):
  # We want an image with the same size of the original, but were for each pixel
  # we store its position in the ranking (0 - first element, 1 - last element)
  ranking_image = np.zeros(desired_shape)
  num_features = ranking.size
  for i in range(num_features):
    pos_x = ranking[i] // desired_shape[2]
    pos_y = ranking[i] % desired_shape[2]
    ranking_image[0, pos_x, pos_y] = i/num_features
  return torch.from_numpy(ranking_image)

# TEST
print(get_ranking_image_from_linear_ranking(r_spawn,(1,2,4)))

tensor([[[0.7500, 0.0000, 0.6250, 0.3750],
         [0.5000, 0.1250, 0.2500, 0.8750]]], dtype=torch.float64)


In [14]:
# Actual generation

In [15]:
num_iterations = 10
fraction_stored = 0.5
pop_size = num_curves // int(num_iterations*fraction_stored)
num_saved = 20
num_stored = int(pop_size*fraction_stored)

num_features = image.detach().numpy().size
rankings = None
population = np.zeros((pop_size, num_features), dtype=int)
print('Generating initial population...')
for i in tqdm(range(pop_size)):
  population[i] = rng.permutation(num_features)

image_shape = image.detach().numpy().shape
for i in range(num_iterations):
  iteration_qs = []
  for j in tqdm(range(pop_size)):
    ranking_image = get_ranking_image_from_linear_ranking(population[j], image_shape)
    curve,is_hit = get_explanation_exploratory_curve(image, ranking_image, num_samples, label, network)
    inverse_curve,inverse_is_hit = get_explanation_exploratory_curve(image, 1-ranking_image, num_samples, label, network)
    measure = measure_curve(curve)#measure_curve_with_inverse(curve, inverse_curve)
    iteration_qs.append((measure, j))
  iteration_qs.sort()
  avg_q = np.mean(list(map(lambda x:x[0], iteration_qs)))
  print(f'{i}/{num_iterations} - Avg. Q {avg_q}')
  repopulate(population, list(map(lambda x:x[1], iteration_qs)), num_saved)
  # The population of curves is saved after each iteration
  #np.savez(PATH_DIR + f'genetic_generated-inv-{i}--.npz', image, population, avg_q)
  if rankings is None: # First iteration
    rankings = population[:num_stored]
  else:
    rankings = np.vstack((rankings, population[:num_stored]))

Generating initial population...


100%|██████████| 100000/100000 [00:01<00:00, 57722.96it/s]
100%|██████████| 100000/100000 [04:37<00:00, 360.50it/s]


0/10 - Avg. Q -1.6195298433303833


100%|██████████| 100000/100000 [04:36<00:00, 361.40it/s]


1/10 - Avg. Q -3.2683277130126953


100%|██████████| 100000/100000 [04:36<00:00, 361.62it/s]


2/10 - Avg. Q -3.593883752822876


100%|██████████| 100000/100000 [04:36<00:00, 361.93it/s]


3/10 - Avg. Q -3.652021884918213


100%|██████████| 100000/100000 [04:37<00:00, 360.92it/s]


4/10 - Avg. Q -3.6526994705200195


100%|██████████| 100000/100000 [04:36<00:00, 361.83it/s]


5/10 - Avg. Q -3.652451276779175


100%|██████████| 100000/100000 [04:36<00:00, 361.75it/s]


6/10 - Avg. Q -3.6527371406555176


100%|██████████| 100000/100000 [04:36<00:00, 361.50it/s]


7/10 - Avg. Q -3.6530253887176514


100%|██████████| 100000/100000 [04:36<00:00, 361.33it/s]


8/10 - Avg. Q -3.6533360481262207


100%|██████████| 100000/100000 [04:36<00:00, 361.35it/s]


9/10 - Avg. Q -3.6528217792510986


In [16]:
from tqdm import tqdm

num_curves = rankings.shape[0]
num_samples = 20

# These will be the corresponding output vs selection level plots
plots = np.zeros((num_curves, num_samples))
# These will be the corresponding output vs selection level plots with the inverse rankings
inverse_plots = np.zeros((num_curves, num_samples))
# These will indicate whether for a given selection level, the examined output is the highest activated
hit_plots = np.zeros((num_curves, num_samples), dtype=bool)
# These will indicate whether for a given selection level, the examined output is the highest activated (for the inverse rankings)
inverse_hit_plots = np.zeros((num_curves, num_samples), dtype=bool)

image_tensor = image#torch.from_numpy(image)

for i in tqdm(range(num_curves)):
  ranking = get_ranking_image_from_linear_ranking(rankings[i], image.shape)
  curve,is_hit=get_explanation_exploratory_curve(image_tensor, ranking, num_samples, label, network)
  inverse_curve,inverse_is_hit=get_explanation_exploratory_curve(image_tensor, 1-ranking, num_samples, label, network)
  plots[i] = curve
  hit_plots[i] = is_hit
  inverse_plots[i] = inverse_curve
  inverse_hit_plots[i] = inverse_is_hit

100%|██████████| 500000/500000 [22:59<00:00, 362.58it/s]


In [None]:
measures = np.zeros(num_curves)
measures_with_inverse = np.zeros(num_curves)
measures_auc = np.zeros(num_curves)
measures_auc_with_inverse = np.zeros(num_curves)
for i in tqdm(range(num_curves)):
  measures[i] = measure_curve(plots[i])
  measures_with_inverse[i] = measure_curve_with_inverse(plots[i], inverse_plots[i])
  measures_auc[i] = measure_auc(plots[i])
  measures_auc_with_inverse[i] = measure_auc(inverse_plots[i])

  7%|▋         | 34586/500000 [00:02<00:27, 17027.01it/s]

In [None]:
np.savez(PATH_DIR + 'genetic_generated_auc--.npz', image, label, rankings, plots, inverse_plots, hit_plots, inverse_hit_plots, measures, measures_with_inverse, measures_auc, measures_auc_with_inverse)