# Bachelor Thesis XAI 

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

%matplotlib inline

## Data preprocessing

### Data download

In [None]:
import tensorflow_datasets as tfds

In [None]:
(raw_train, raw_validation, raw_test), metadata = tfds.load(
    'cats_vs_dogs',
    split=['train[:80%]', 'train[80%:90%]', 'train[90%:]'],
    with_info=True,
    as_supervised=True,
)

The resulting `tf.data.Dataset` objects contain `(image, label)` pairs where the images have variable shape and 3 channels, and the label is a scalar.

In [None]:
print(raw_train)
print(raw_validation)
print(raw_test)

Show the first two images and labels from the training set:

In [None]:
get_label_name = metadata.features['label'].int2str

for image, label in raw_train.take(2):
  plt.figure()
  plt.imshow(image)
  plt.title(get_label_name(label))

### Format the Data

Use the `tf.image` module to format the images for the task.

Resize the images to a fixed input size, and rescale the input channels to a range of `[-1,1]`

<!-- TODO(markdaoust): fix the keras_applications preprocessing functions to work in tf2 -->

In [None]:
IMG_SIZE = 160 # All images will be resized to 160x160

def format_example(image, label):
  image = tf.cast(image, tf.float32)
  image = (image/127.5) - 1
  image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
  return image, label

Apply this function to each item in the dataset using the map method:

In [None]:
train = raw_train.map(format_example)
validation = raw_validation.map(format_example)
test = raw_test.map(format_example)

Now shuffle and batch the data.

In [None]:
BATCH_SIZE = 32
SHUFFLE_BUFFER_SIZE = 1000

In [None]:
train_batches = train.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
validation_batches = validation.batch(BATCH_SIZE)
test_batches = test.batch(BATCH_SIZE)

Inspect a batch of data:

In [None]:
for image_batch, label_batch in train_batches.take(1):
   pass

image_batch.shape

## Create the base model from the pre-trained convnets
You will create the base model from the **MobileNet V2** model developed at Google. This is pre-trained on the ImageNet dataset, a large dataset consisting of 1.4M images and 1000 classes. ImageNet is a research training dataset with a wide variety of categories like `jackfruit` and `syringe`. This base of knowledge will help us classify cats and dogs from our specific dataset.

First, you need to pick which layer of MobileNet V2 you will use for feature extraction. The very last classification layer (on "top", as most diagrams of machine learning models go from bottom to top) is not very useful.  Instead, you will follow the common practice to depend on the very last layer before the flatten operation. This layer is called the "bottleneck layer". The bottleneck layer features retain more generality as compared to the final/top layer.

First, instantiate a MobileNet V2 model pre-loaded with weights trained on ImageNet. By specifying the **include_top=False** argument, you load a network that doesn't include the classification layers at the top, which is ideal for feature extraction.

In [None]:
IMG_SHAPE = (IMG_SIZE, IMG_SIZE, 3)

# Create the base model from the pre-trained model MobileNet V2
base_model = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE,
                                               include_top=False,
                                               weights='imagenet')

This feature extractor converts each `160x160x3` image into a `5x5x1280` block of features. See what it does to the example batch of images:

In [None]:
feature_batch = base_model(image_batch)
print(feature_batch.shape)

## Feature extraction
In this step, you will freeze the convolutional base created from the previous step and to use as a feature extractor. Additionally, you add a classifier on top of it and train the top-level classifier.

### Freeze the convolutional base

It is important to freeze the convolutional base before you compile and train the model. Freezing (by setting layer.trainable = False) prevents the weights in a given layer from being updated during training. MobileNet V2 has many layers, so setting the entire model's trainable flag to False will freeze all the layers.

In [None]:
base_model.trainable = False

In [None]:
# Let's take a look at the base model architecture
base_model.summary()

### Add a classification head

To generate predictions from the block of features, average over the spatial `5x5` spatial locations, using a `tf.keras.layers.GlobalAveragePooling2D` layer to convert the features to  a single 1280-element vector per image.

In [None]:
global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
feature_batch_average = global_average_layer(feature_batch)
print(feature_batch_average.shape)

Apply a `tf.keras.layers.Dense` layer to convert these features into a single prediction per image. You don't need an activation function here because this prediction will be treated as a `logit`, or a raw prediction value.  Positive numbers predict class 1, negative numbers predict class 0.

In [None]:
prediction_layer = tf.keras.layers.Dense(1)
prediction_batch = prediction_layer(feature_batch_average)
print(prediction_batch.shape)

Now stack the feature extractor, and these two layers using a `tf.keras.Sequential` model:

In [None]:
from tensorflow.keras.layers import Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
    
model = tf.keras.Sequential([
  base_model,
  global_average_layer,
  Flatten(),
        Dropout(0.50),
        Dense(1024, activation='relu'),
        Dropout(0.20),        
        Dense(512, activation='relu'),
        Dropout(0.10),         
        Dense(1, activation='sigmoid')    
])

### Compile the model

You must compile the model before training it.  Since there are two classes, use a binary cross-entropy loss with `from_logits=True` since the model provides a linear output.

In [None]:
base_learning_rate = 0.0001
model.compile(optimizer=tf.keras.optimizers.RMSprop(lr=base_learning_rate),
              loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
model.summary()

The 2.5M parameters in MobileNet are frozen, but there are 1.2K _trainable_ parameters in the Dense layer.  These are divided between two `tf.Variable` objects, the weights and biases.

In [None]:
len(model.trainable_variables)

### Train the model

After training for 10 epochs, you should see ~96% accuracy.


In [None]:
initial_epochs = 1
validation_steps=20

loss0,accuracy0 = model.evaluate(validation_batches, steps = validation_steps)

In [None]:
print("initial loss: {:.2f}".format(loss0))
print("initial accuracy: {:.2f}".format(accuracy0))

In [None]:
history = model.fit(train_batches,
                    epochs=initial_epochs,
                    validation_data=validation_batches)

In [None]:
tf.saved_model.save(model, "model/")

In [None]:
loaded = tf.saved_model.load("model")

### Learning curves

Let's take a look at the learning curves of the training and validation accuracy/loss when using the MobileNet V2 base model as a fixed feature extractor.

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0,1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

Note: If you are wondering why the validation metrics are clearly better than the training metrics, the main factor is because layers like `tf.keras.layers.BatchNormalization` and `tf.keras.layers.Dropout` affect accuracy during training. They are turned off when calculating validation loss.

To a lesser extent, it is also because training metrics report the average for an epoch, while validation metrics are evaluated after the epoch, so validation metrics see a model that has trained slightly longer.

## Predict images

In [None]:
import keras

IMAGE_PATH_1 = 'cat1.jpg'
IMAGE_PATH_2 = 'cat2.jpg'
IMAGE_PATH_3 = 'dog.1.jpg'
IMAGE_PATH_4 = 'dog.2.jpg'

img_1 = keras.preprocessing.image.load_img(IMAGE_PATH_1,target_size=(160, 160))
img_1 = keras.preprocessing.image.img_to_array(img_1)
img_2 = keras.preprocessing.image.load_img(IMAGE_PATH_2, target_size=(160, 160))
img_2 = keras.preprocessing.image.img_to_array(img_2)
img_3 = keras.preprocessing.image.load_img(IMAGE_PATH_3, target_size=(160, 160))
img_3 = keras.preprocessing.image.img_to_array(img_3)
img_4 = keras.preprocessing.image.load_img(IMAGE_PATH_4, target_size=(160, 160))
img_4 = keras.preprocessing.image.img_to_array(img_4)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

to_predict = np.array([img_1, img_2, img_3, img_4])
fig, ax = plt.subplots(1, 4, figsize=(18, 10))
ax[0].imshow(to_predict[0]/255.)
ax[1].imshow(to_predict[1]/255.)
ax[2].imshow(to_predict[2]/255.)
ax[3].imshow(to_predict[3]/255.)

In [None]:
loaded = tf.saved_model.load("model")
print(list(loaded.signatures.keys()))  # ["serving_default"]

In [None]:
def visualize_model_decisions(shap_values, x, labels=None, figsize=(20, 30)):
    
    colors = []
    for l in np.linspace(1, 0, 100):
        colors.append((30./255, 136./255, 229./255,l))
    for l in np.linspace(0, 1, 100):
        colors.append((255./255, 13./255, 87./255,l))
    red_transparent_blue = LinearSegmentedColormap.from_list("red_transparent_blue", colors)

    multi_output = True
    if type(shap_values) != list:
        multi_output = False
        shap_values = [shap_values]

    # make sure labels
    if labels is not None:
        assert labels.shape[0] == shap_values[0].shape[0], "Labels must have same row count as shap_values arrays!"
        if multi_output:
            assert labels.shape[1] == len(shap_values), "Labels must have a column for each output in shap_values!"
        else:
            assert len(labels.shape) == 1, "Labels must be a vector for single output shap_values."

    # plot our explanations
    fig_size = figsize
    fig, axes = plt.subplots(nrows=x.shape[0], ncols=len(shap_values) + 1, figsize=fig_size)
    if len(axes.shape) == 1:
        axes = axes.reshape(1,axes.size)
    for row in range(x.shape[0]):
        x_curr = x[row].copy()

        # make sure
        if len(x_curr.shape) == 3 and x_curr.shape[2] == 1:
            x_curr = x_curr.reshape(x_curr.shape[:2])
        if x_curr.max() > 1:
            x_curr /= 255.
        
        axes[row,0].imshow(x_curr)
        axes[row,0].axis('off')
        
        # get a grayscale version of the image
        if len(x_curr.shape) == 3 and x_curr.shape[2] == 3:
            x_curr_gray = (0.2989 * x_curr[:,:,0] + 0.5870 * x_curr[:,:,1] + 0.1140 * x_curr[:,:,2]) # rgb to gray
        else:
            x_curr_gray = x_curr

        if len(shap_values[0][row].shape) == 2:
            abs_vals = np.stack([np.abs(shap_values[i]) for i in range(len(shap_values))], 0).flatten()
        else:
            abs_vals = np.stack([np.abs(shap_values[i].sum(-1)) for i in range(len(shap_values))], 0).flatten()
        max_val = np.nanpercentile(abs_vals, 99.9)
        for i in range(len(shap_values)):
            if labels is not None:
                axes[row,i+1].set_title(labels[row,i])
            sv = shap_values[i][row] if len(shap_values[i][row].shape) == 2 else shap_values[i][row].sum(-1)
            axes[row,i+1].imshow(x_curr_gray, cmap=plt.get_cmap('gray'), alpha=0.15, extent=(-1, sv.shape[0], sv.shape[1], -1))
            im = axes[row,i+1].imshow(sv, cmap=red_transparent_blue, vmin=-max_val, vmax=max_val)
            axes[row,i+1].axis('off')
        
    cb = fig.colorbar(im, ax=np.ravel(axes).tolist(), label="SHAP value", orientation="horizontal", aspect=fig_size[0]/0.2)
    cb.outline.set_visible(False)

In [None]:
infer = loaded.signatures["serving_default"]
print(infer.structured_outputs)
 
get_label_name = metadata.features['label'].int2str
 
for image, label in raw_test.take(3):  
  img,label=format_example(image,label)
  img=tf.expand_dims(img,axis=0)
  result = infer(tf.constant(img))['dense_3']
  if(result[0][0]<.50):
    result="cat"
  else:
    result="dog"
  plt.figure()
  plt.imshow(image)
  plt.title((result,label.numpy()))

In [None]:
for image, label in raw_test.take(1000):  
    img,label=format_example(image,label)
    img=tf.expand_dims(img,axis=0)
    result = infer(tf.constant(img))['dense_3']
    if(np.argmax(model.predict(img))<.50):
        result="cat"
    else:
        result="dog"
    plt.figure()
    plt.imshow(image)
    plt.title((result,label.numpy()))

to_predict = np.array([img])

## Make model transparent

In [None]:
model.layers[0].input

In [None]:
layer_input = model.layers[0].input
layer_output = model.layers[-1].output

In [None]:
import shap



# select a set of background examples to take an expectation over
background = img
# explain predictions of the model on four images
e = shap.DeepExplainer(model, background)
# ...or pass tensors directly
#e = shap.DeepExplainer((model.layers[0].input, model.layers[-1].output), background)
shap_values = e.shap_values(to_predict)

# plot the feature attributions
shap.image_plot(shap_values, -to_predict)

In [None]:
import shap

def map2layer(x, layer):
    feed_dict = dict(zip([layer_input], [x]))
    return K.get_session().run(model.layers[layer].input, feed_dict)

In [None]:
e = shap.GradientExplainer((model.layers[7].input, model.layers[-1].output), 
                            map2layer(train_batches, 7))


shap_values, indexes = e.shap_values(map2layer(to_predict, 7), ranked_outputs=2)

#index_names = np.vectorize(lambda x: class_names[str(x)][1])(indexes)
#index_names