# Gradient-Weighted Class Activation Mapping (Grad-CAM) - CIFAR10

In [19]:
import numpy as np
import time
import os

from sklearn.preprocessing import OneHotEncoder
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.python.framework import ops
import warnings
import logging
warnings.filterwarnings("ignore")
tf.get_logger().setLevel(logging.ERROR)

import cv2
from skimage import io
from skimage.transform import resize

from matplotlib import pyplot as plt
%matplotlib inline

## Initialization

In [20]:
config = {
    'dataset_name': 'cifar10',
    'width': 32,
    'height': 32,
    'num_channels': 3,
    'num_classes': 10,
    'dropout_rate': [0.15, 0.1],
    'learning_rate': 1e-4,
    'model_path': './grad_cam_model.ckpt',
    'log_path': './grad_cam_log.csv'
}

# Data Preparation

In [21]:
def normalize(x):
    # normalize data
    x = x/255.0
    return x.reshape((-1, config['height'], config['width'], config['num_channels']))

In [22]:
def get_one_hot(label):
    # convert label to one-hot encoding
    label = label.reshape(-1, 1)
    encoder = OneHotEncoder(categories = [range(config['num_classes'])])
    encoder.fit(label)
    return encoder.transform(label).toarray()

In [23]:
def get_data(batch_size):
    # get both train and test data
    dataset, info = tfds.load(name = config['dataset_name'], with_info = True)
    labels = info.features['label'].names

    dataset_train_size = info.splits['train'].num_examples
    dataset_test_size = info.splits['test'].num_examples

    dataset_train = dataset['train'].repeat().shuffle(1024).batch(batch_size)
    dataset_test = dataset['test'].repeat().shuffle(1024).batch(batch_size)
    dataset_train = dataset_train.prefetch(tf.data.experimental.AUTOTUNE)
    dataset_test = dataset_test.prefetch(tf.data.experimental.AUTOTUNE)

    return (dataset_train, dataset_test), (dataset_train_size, dataset_test_size), labels

# Build Model

In [24]:
def model(x, dropout_rate, print_summary = True):
    # build CNN model
    with tf.variable_scope('model_cnn', reuse = False) as scope:
        x_t = tf.transpose(x, [0, 3, 1, 2]) # NHWC to NCHW

        # block 1
        conv1 = tf.layers.conv2d(x_t, 32, [5, 5],
                                 strides = [1, 1],
                                 padding = 'same',
                                 data_format = 'channels_first',
                                 name ='conv1')
        relu1 = tf.nn.relu(conv1, name = 'relu1')
        pool1 = tf.layers.max_pooling2d(relu1, [2, 2],
                                        strides = [2, 2],
                                        padding = 'valid',
                                        data_format = 'channels_first',
                                        name = 'pool1')
        # block 2
        conv2 = tf.layers.conv2d(pool1, 64, [3, 3],
                                 strides = [1, 1],
                                 padding = 'same',
                                 data_format = 'channels_first',
                                 name = 'conv2')
        relu2 = tf.nn.relu(conv2, name = 'relu2')
        pool2 = tf.layers.max_pooling2d(relu2, [2, 2],
                                        strides = [2, 2],
                                        padding = 'valid',
                                        data_format = 'channels_first',
                                        name = 'pool2')
        # block 3
        conv3 = tf.layers.conv2d(pool2, 128, [3, 3],
                                 strides = [1, 1],
                                 padding = 'same',
                                 data_format = 'channels_first',
                                 name = 'conv3')       
        relu3 = tf.nn.relu(conv3, name = 'relu3')
        pool3 = tf.layers.max_pooling2d(relu3, [2, 2],
                                        strides = [2, 2],
                                        padding = 'valid',
                                        data_format = 'channels_first',
                                        name = 'pool3')
        # block 4
        conv4 = tf.layers.conv2d(pool3, 256, [2, 2],
                                 strides = [1, 1],
                                 padding = 'same',
                                 data_format = 'channels_first',
                                 name = 'conv4')
        relu4 = tf.nn.relu(conv4, name = 'relu4')
        pool4  = tf.layers.max_pooling2d(relu4, [2, 2],
                                         strides = [2, 2],
                                         padding = 'valid',
                                         data_format = 'channels_first',
                                         name = 'pool4')
        # block 5
        conv5 = tf.layers.conv2d(pool4, 256, [2, 2],
                                 strides = [1, 1],
                                 padding = 'same',
                                 data_format = 'channels_first',
                                 name = 'conv5')
        relu5 = tf.nn.relu(conv5, name = 'relu5')
        pool5  = tf.layers.max_pooling2d(relu5, [2, 2],
                                         strides = [2, 2],
                                         padding = 'valid',
                                         data_format = 'channels_first',
                                         name = 'pool5')
        dropout5 = tf.layers.dropout(pool5, dropout_rate[0], name = 'dropout5')

        # block 6
        flatten_length = dropout5.get_shape().as_list()[1] * \
                         dropout5.get_shape().as_list()[2] * \
                         dropout5.get_shape().as_list()[3]

        flatten6 = tf.reshape(dropout5, [-1, flatten_length])
        fc6 = tf.layers.dense(flatten6, 512, name = 'fc6')
        relu6 = tf.nn.relu(fc6, name = 'relu6')

        # block 7
        fc7 = tf.layers.dense(relu6, 128, name = 'fc7')
        relu7 = tf.nn.relu(fc7, name = 'relu7')
        dropout7 = tf.layers.dropout(relu7, dropout_rate[1], name = 'dropout7')

        # block 8
        fc8 = tf.layers.dense(dropout7, config['num_classes'], name = 'fc8')
        output = tf.nn.softmax(fc8, name = 'output')
    if print_summary:
        print('model summary:\n ' \
              'Conv1: %s\n Pool1: %s\n Conv2: %s\n Pool2: %s\n' \
              'Conv3: %s\n Pool3: %s\n Conv4: %s\n Pool4: %s\n' \
              'Conv5: %s\n Pool5: %s\n Fc6: %s\n'\
              'Fc7: %s\n Fc8: %s\n' %(conv1.get_shape(), pool1.get_shape(),
                                      conv2.get_shape(), pool2.get_shape(),
                                      conv3.get_shape(), pool3.get_shape(),
                                      conv4.get_shape(), pool4.get_shape(),
                                      conv5.get_shape(), pool5.get_shape(),
                                      fc6.get_shape(), fc7.get_shape(),
                                      fc8.get_shape()))

    return [conv1, conv2, conv3, conv4, conv5], fc8, output

In [25]:
def loss_accuracy(prob, logits, labels):
    # softmax loss and accurary
    with tf.variable_scope('Loss_Acc', reuse = False) as scope:
        loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits = logits,
                                                                      labels = labels))
        correct_pred = tf.equal(tf.argmax(prob, 1), tf.argmax(labels, 1))
        acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

        return loss, acc

In [26]:
def optimizer(loss, learning_rate):
    # ADAM optimizer
    with tf.variable_scope('Optimizer', reuse = False) as scope:
        extra_update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        with tf.control_dependencies(extra_update_ops):
            all_vars = tf.trainable_variables()
            model_vars = [var for var in all_vars if var.name.startswith('model_cnn')]
            optimizer = tf.train.AdamOptimizer(learning_rate).minimize(loss,
                                                                       var_list = model_vars)
            return optimizer

In [27]:
def get_placeholders_tensors(target_layer_index = -1):
    # get model's placeholders and tensors
    # target_layer_index is the index of the conv layer
    x = tf.placeholder(tf.float32, name = 'x', shape = [None,
                                                        config['height'],
                                                        config['width'],
                                                        config['num_channels']])
    y = tf.placeholder(tf.float32, name = 'label', shape = [None,
                                                            config['num_classes']])
    dropout_rate = tf.placeholder(tf.float32, name = 'dropout_rate')

    connvs, logits, probs = model(x, dropout_rate, print_summary = True)
    loss, acc  = loss_accuracy(probs, logits, y)

    optim = optimizer(loss, config['learning_rate'])

    grad_cam = Grad_CAM(connvs[target_layer_index], logits, x, y)

    placeholders_tensors = {'x': x,
                            'y': y,
                            'dropout_rate': dropout_rate,
                            'optimizer': optim,
                            'probs': probs,
                            'loss': loss,
                            'acc': acc,
                            'grad_cam': grad_cam}
    return placeholders_tensors

# Grad-CAM and Visualisation

In [28]:
def Grad_CAM(conv_layer, logits, x, y):
    # gradient-weighted activation mapping (Grad_CAM) for visualisation
    with tf.variable_scope('Grad_CAM', reuse = False) as scope:
        y_c = tf.reduce_sum(tf.multiply(logits, y), axis = 1)
        conv_layer_grad = tf.gradients(y_c, conv_layer)[0] # 0: weight, 1: bias
        alpha = tf.reduce_mean(conv_layer_grad, axis = (2, 3)) # feature map importance
        linear_combination = tf.multiply(tf.reshape(alpha, [-1,
                                                            alpha.get_shape().as_list()[1],
                                                            1, 1]), conv_layer)
        grad_cam = tf.nn.relu(tf.reduce_sum(linear_combination, axis = 1))
        return grad_cam

In [29]:
def get_results_for_visualization(sess, placeholders_tensors, dataset, count):
    # get images, grad_cams, and predicated probabilites
    iterator = dataset.make_one_shot_iterator()
    next_element = iterator.get_next()
    batch = sess.run(next_element)

    feed_dictionary = {placeholders_tensors['x']: np.array(normalize(batch['image'][:count])),
                       placeholders_tensors['y']: np.array(get_one_hot(batch['label'][:count])),
                       placeholders_tensors['dropout_rate']: [0, 0]}
    probs = sess.run(placeholders_tensors['probs'], feed_dict = feed_dictionary)
    predicted_label = np.argmax(probs, 1)
    feed_dictionary = {placeholders_tensors['x']: np.array(normalize(batch['image'][:count])),
                       placeholders_tensors['y']: np.array(get_one_hot(predicted_label)),
                       placeholders_tensors['dropout_rate']: [0, 0]}
    grad_cam = sess.run(placeholders_tensors['grad_cam'], feed_dict = feed_dictionary)

    return np.array(batch['image']), grad_cam, probs, np.array(batch['label'])

In [30]:
def visualisation(sess, placeholders_tensors, dataset, labels, count):
    # visualise some images and their grad-cam heatmap
    images, grad_cams, probs, ground_truths = get_results_for_visualization(sess,
                                                                            placeholders_tensors,
                                                                            dataset, count)

    _, axes = plt.subplots(figsize = [8, 2 * count], nrows = count,
                           ncols = 4, sharey = True, sharex = True)

    for idx in list(range(count)):
        grad_cam = grad_cams[idx] / np.max(grad_cams[idx]) # normalize
        grad_cam = resize(grad_cam, (config['height'], config['width']),
                          preserve_range = True, mode = 'constant')
        grad_cam_heatmap = cv2.applyColorMap(np.uint8(255.0 * grad_cam), cv2.COLORMAP_JET)
        grad_cam_heatmap = cv2.cvtColor(grad_cam_heatmap, cv2.COLOR_BGR2RGB)

        image = normalize(images[idx]).reshape(config['height'],
                                               config['width'],
                                               config['num_channels'])
        max_prob_idx = np.argmax(probs[idx])
        ground_truth = ground_truths[idx]
        pred_Truth_labels = labels[max_prob_idx] + " / " + labels[ground_truth]

        axes[idx, 0].imshow(image)
        axes[idx, 0].set_title('Input Image')
        axes[idx, 0].axis('off')
        axes[idx, 1].imshow(grad_cam_heatmap)
        axes[idx, 1].set_title('Grad_CAM')
        axes[idx, 1].axis('off')
        axes[idx, 2].imshow(image)
        axes[idx, 2].imshow(grad_cam_heatmap, alpha = 0.5)
        axes[idx, 2].set_title('Overlayed')
        axes[idx, 2].axis('off')
        axes[idx, 3].imshow(np.ones_like(image), alpha = 0.0)
        axes[idx, 3].text(5, 16, pred_Truth_labels, color = 'white', fontsize = 15)
        axes[idx, 3].set_title('Prediction / GroundTruth')
        axes[idx, 3].axis('off')
    plt.tight_layout()
    plt.show()

# Train Model

In [31]:
def save_model_on_imporvemnet(file_path, sess, cv_acc, cv_accs):
  #  save model when there is improvemnet in cv_acc value
    if cv_accs == [] or cv_acc > np.max(cv_accs):
        saver = tf.train.Saver(max_to_keep = 1)
        saver.save(sess, file_path)
        print('Model saved')
        return True
    print('')
    return False

In [32]:
def log_loss_acc(file_path, epoch, train_loss, train_acc,
                 cv_loss, cv_acc, log_mode = 'a'):
    # log train and cv losses as well as accuracy
    mode = log_mode if epoch == 0 else 'a'

    with open(file_path, mode) as f:
        if mode == 'w':
            header = 'epoch, train_loss, train_acc, cv_loss, cv_acc\n'
            f.write(header)

        line = '%d, %f, %f, %f, %f\n' %(epoch, train_loss, train_acc, cv_loss, cv_acc)
        f.write(line)

In [33]:
def train_per_epoch(sess, dataset, placeholders_tensors, epoch, train_batches_count):
    # one epoch of training
    # loss and accuracy are returned
    tmp_loss, tmp_acc = [], []
    t_total = 0

    iterator = dataset.make_one_shot_iterator()
    next_element = iterator.get_next()
    for iteration in range(train_batches_count):
        t_start = time.time()
        batch = sess.run(next_element)
        feed_dictionary = {placeholders_tensors['x']: np.array(normalize(batch['image'])),
                           placeholders_tensors['y']: np.array(get_one_hot(batch['label'])),
                           placeholders_tensors['dropout_rate']: config['dropout_rate']}

        sess.run(placeholders_tensors['optimizer'], feed_dict = feed_dictionary)
        train_loss = sess.run(placeholders_tensors['loss'], feed_dict = feed_dictionary)
        train_acc = sess.run(placeholders_tensors['acc'], feed_dict = feed_dictionary)
        tmp_loss.append(train_loss)
        tmp_acc.append(train_acc)
        t_total += (time.time() - t_start)
        print(' '*60, end = '\r')
        print('epoch: %d, time: %f | train_loss: %f | acc: %f' %(epoch, t_total, train_loss,
                                                                 train_acc), end = '\r')
    train_loss = np.mean(tmp_loss)
    train_acc = np.mean(tmp_acc)
    print(' '*60, end = '\r')
    print('epoch: %d, time: %f | train_loss: %f | acc: %f\n' %(epoch, t_total, train_loss,
                                                               train_acc), end = '\r')
    return train_loss, train_acc

In [34]:
def cv_per_epoch(sess, dataset, placeholders_tensors, epoch, cv_batches_count):
    # cross-validation per epoch
    # cv_loss and cv_accuracy are returned
    tmp_loss, tmp_acc = [], []
    t_total = 0

    iterator = dataset.make_one_shot_iterator()
    next_element = iterator.get_next()
    for iteration in range(cv_batches_count):
        t_start = time.time()
        batch = sess.run(next_element)
        cv_feed_dictionary = {placeholders_tensors['x']: np.array(normalize(batch['image'])),
                              placeholders_tensors['y']: np.array(get_one_hot(batch['label'])),
                              placeholders_tensors['dropout_rate']: [0, 0]}

        cv_loss = sess.run(placeholders_tensors['loss'], feed_dict = cv_feed_dictionary)
        cv_acc = sess.run(placeholders_tensors['acc'], feed_dict = cv_feed_dictionary)

        tmp_loss.append(cv_loss)
        tmp_acc.append(cv_acc)
        t_total += (time.time() - t_start)
        print(' '*60, end = '\r')
        print('          cv_time: %f | cv_loss: %f | cv_acc: %f' %(t_total, cv_loss,
                                                                   cv_acc), end = '\r')
    cv_loss = np.mean(tmp_loss)
    cv_acc = np.mean(tmp_acc)
    print(' '*60, end = '\r')
    print('          cv_time: %f | cv_loss: %f | cv_acc: %f\n' %(t_total, cv_loss,
                                                                 cv_acc), end = '\r')
    return cv_loss, cv_acc

In [37]:
def train_model(batch_size, epochs, resume, conv_layer_vis_index = -1):
    # train CNN model
    init_epoch = 0
    train_losses, cv_losses = [], []
    train_accs, cv_accs = [], []
    ops.reset_default_graph()
    placeholders_tensors = get_placeholders_tensors(target_layer_index = conv_layer_vis_index)

    with tf.Session() as sess:
        if resume:
            print('loading weights....')
            saver = tf.train.Saver()
            saver.restore(sess, (config['model_path']))  # to load the best saved model
            # load saved losses and accuracies so that less accurate model
            # won't be saved after resume
            tmp = np.genfromtxt(config['log_path'], delimiter = ',', names = True)
            train_losses = list(tmp['train_loss'])
            train_accs = list(tmp['train_acc'])
            cv_losses = list(tmp['cv_loss'])
            cv_accs = list(tmp['cv_acc'])
            init_epoch  = len(train_losses)
            del tmp
        else:
            print('initializing weights....')
            init_op = tf.group(tf.local_variables_initializer(), tf.global_variables_initializer())
            sess.run(init_op)

        print('training....')
        dataset, dataset_count, labels = get_data(batch_size)
        train_batch_count = int(dataset_count[0] / batch_size) + 1
        cv_batch_count = int(dataset_count[1] / batch_size) + 1
        for epoch in range(init_epoch, init_epoch + epochs):
            # training
            train_loss, train_acc = train_per_epoch(sess, dataset[0],
                                                    placeholders_tensors, epoch, train_batch_count)
            train_losses.append(train_loss)
            train_accs.append(train_acc)

            # cross-validation
            cv_loss, cv_acc = cv_per_epoch(sess, dataset[1],
                                           placeholders_tensors, epoch, cv_batch_count)
            # save model
            is_saved = save_model_on_imporvemnet(config['model_path'], sess, cv_acc, cv_accs)
            cv_losses.append(cv_loss)
            cv_accs.append(cv_acc)
            # log results
            log_loss_acc(config['log_path'], epoch, train_loss, train_acc, cv_loss, cv_acc,
                         log_mode = ('a' if resume else 'w'))
            # visualization
            if is_saved:
                visualisation(sess, placeholders_tensors, dataset[1], labels, 4)
        return train_losses, cv_losses, train_accs, cv_accs

In [38]:
loss_acc = train_model(256, 300, False, -1)

AttributeError: ignored