Based on 

Convolutional Neural Network (CNN)

https://www.tensorflow.org/tutorials/images/cnn

What does a CNN see?

https://www.kaggle.com/code/aakashnain/what-does-a-cnn-see/notebook

Grad-CAM class activation visualization

https://keras.io/examples/vision/grad_cam/

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
cd "drive/MyDrive/Doutorado/Disciplinas/[2022.2] [PUC-Rio] Visão Computacional - Professor: Marcelo Gattass/Trabalhos/Trabalho Final/Code/What does a CNN see?/"

/content/drive/MyDrive/Doutorado/Disciplinas/[2022.2] [PUC-Rio] Visão Computacional - Professor: Marcelo Gattass/Trabalhos/Trabalho Final/Code/What does a CNN see?


In [3]:
!pwd

/content/drive/MyDrive/Doutorado/Disciplinas/[2022.2] [PUC-Rio] Visão Computacional - Professor: Marcelo Gattass/Trabalhos/Trabalho Final/Code/What does a CNN see?


In [4]:
data_folder = './data/'

In [5]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import os
from pathlib import Path
import imgaug as aug
import imgaug.augmenters as iaa
import tensorflow as tf
from tensorflow.keras import datasets, layers, models
from keras.models import Sequential, Model, load_model
from keras.optimizers import Adam, SGD, RMSprop
from keras.utils import to_categorical
from keras import backend as K
from keras.callbacks import ModelCheckpoint, Callback, EarlyStopping

tf.compat.v1.disable_eager_execution() # daniel

In [6]:
# Set the seed for hash based operations in python
os.environ['PYTHONHASHSEED'] = '0'

seed=1234

# Set the numpy seed
np.random.seed(seed)

# Set the random seed in tensorflow at graph level
# tf.set_random_seed(seed) # obsolete
tf.random.set_seed(seed)

# Make the augmentation sequence deterministic
aug.seed(seed)

In [7]:
# As usual, define some paths first to make life simpler
training_data = Path(data_folder + '/training/') 
validation_data = Path(data_folder + '/validation/') 
labels_path = Path(data_folder + '/monkey_labels.txt')

In [8]:
labels_info = []

# Read the file
lines = labels_path.read_text().strip().splitlines()[1:]
for line in lines:
    line = line.split(',')
    line = [x.strip(' \n\t\r') for x in line]
    line[3], line[4] = int(line[3]), int(line[4])
    line = tuple(line)
    labels_info.append(line)
    
# Convert the data into a pandas dataframe
labels_info = pd.DataFrame(labels_info, columns=['Label', 'Latin Name', 'Common Name', 
                                                 'Train Images', 'Validation Images'], index=None)
# Sneak peek 
labels_info.head(10)

Unnamed: 0,Label,Latin Name,Common Name,Train Images,Validation Images
0,n0,alouatta_palliata,mantled_howler,131,26
1,n1,erythrocebus_patas,patas_monkey,139,28
2,n2,cacajao_calvus,bald_uakari,137,27
3,n3,macaca_fuscata,japanese_macaque,152,30
4,n4,cebuella_pygmea,pygmy_marmoset,131,26
5,n5,cebus_capucinus,white_headed_capuchin,141,28
6,n6,mico_argentatus,silvery_marmoset,132,26
7,n7,saimiri_sciureus,common_squirrel_monkey,142,28
8,n8,aotus_nigriceps,black_headed_night_monkey,133,27
9,n9,trachypithecus_johnii,nilgiri_langur,132,26


In [9]:
# Create a dictionary to map the labels to integers
labels_dict= {'n0':0, 'n1':1, 'n2':2, 'n3':3, 'n4':4, 'n5':5, 'n6':6, 'n7':7, 'n8':8, 'n9':9}

# map labels to common names
names_dict = dict(zip(labels_dict.values(), labels_info["Common Name"]))
print(names_dict)

{0: 'mantled_howler', 1: 'patas_monkey', 2: 'bald_uakari', 3: 'japanese_macaque', 4: 'pygmy_marmoset', 5: 'white_headed_capuchin', 6: 'silvery_marmoset', 7: 'common_squirrel_monkey', 8: 'black_headed_night_monkey', 9: 'nilgiri_langur'}


In [10]:
# Creating a dataframe for the training dataset
train_df = []
for folder in os.listdir(training_data):
    # Define the path to the images
    imgs_path = training_data / folder
    
    # Get the list of all the images stored in that directory
    imgs = sorted(imgs_path.glob('*.jpg'))
    
    # Store each image path and corresponding label 
    for img_name in imgs:
        train_df.append((str(img_name), labels_dict[folder]))


train_df = pd.DataFrame(train_df, columns=['image', 'label'], index=None)
# shuffle the dataset 
train_df = train_df.sample(frac=1.).reset_index(drop=True)

####################################################################################################

# Creating dataframe for validation data in a similar fashion
valid_df = []
for folder in os.listdir(validation_data):
    imgs_path = validation_data / folder
    imgs = sorted(imgs_path.glob('*.jpg'))
    for img_name in imgs:
        valid_df.append((str(img_name), labels_dict[folder]))

        
valid_df = pd.DataFrame(valid_df, columns=['image', 'label'], index=None)
# shuffle the dataset 
valid_df = valid_df.sample(frac=1.).reset_index(drop=True)

####################################################################################################

# How many samples do we have in our training and validation data?
print("Number of traininng samples: ", len(train_df))
print("Number of validation samples: ", len(valid_df))

# sneak peek of the training and validation dataframes
print("\n",train_df.head(), "\n")
print("=================================================================\n")
print("\n", valid_df.head())

Number of traininng samples:  1096
Number of validation samples:  272

                         image  label
0  data/training/n2/n2117.jpg      2
1  data/training/n7/n7028.jpg      7
2  data/training/n0/n0155.jpg      0
3  data/training/n7/n7064.jpg      7
4  data/training/n2/n2133.jpg      2 



                           image  label
0   data/validation/n7/n718.jpg      7
1  data/validation/n6/n6013.jpg      6
2  data/validation/n7/n7012.jpg      7
3   data/validation/n5/n512.jpg      5
4   data/validation/n5/n509.jpg      5


In [11]:
# some constants(not truly though!) 

# dimensions to consider for the images
img_rows, img_cols, img_channels = 224,224,3

# batch size for training  
batch_size=8

# total number of classes in the dataset
nb_classes=10

In [12]:
# Augmentation sequence 
seq = iaa.OneOf([
    iaa.Fliplr(), # horizontal flips
    iaa.Affine(rotate=20), # roatation
    iaa.Multiply((1.2, 1.5))]) #random brightness

In [13]:
def data_generator(data, batch_size, is_validation_data=False):
    # Get total number of samples in the data
    n = len(data)
    nb_batches = int(np.ceil(n/batch_size))

    # Get a numpy array of all the indices of the input data
    indices = np.arange(n)
    
    # Define two numpy arrays for containing batch data and labels
    batch_data = np.zeros((batch_size, img_rows, img_cols, img_channels), dtype=np.float32)
    batch_labels = np.zeros((batch_size, nb_classes), dtype=np.float32)
    
    while True:
        if not is_validation_data:
            # shuffle indices for the training data
            np.random.shuffle(indices)
            
        for i in range(nb_batches):
            # get the next batch 
            next_batch_indices = indices[i*batch_size:(i+1)*batch_size]
            
            # process the next batch
            for j, idx in enumerate(next_batch_indices):
                img = cv2.imread(data.iloc[idx]["image"])
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                label = data.iloc[idx]["label"]
                
                if not is_validation_data:
                    img = seq.augment_image(img)
                
                img = cv2.resize(img, (img_rows, img_cols)).astype(np.float32)
                batch_data[j] = img
                batch_labels[j] = to_categorical(label,num_classes=nb_classes)
            
            # batch_data = preprocess_input(batch_data) # daniel
            yield batch_data, batch_labels

In [14]:
#training data generator 
train_data_gen = data_generator(train_df, batch_size)

# validation data generator 
valid_data_gen = data_generator(valid_df, batch_size, is_validation_data=True)

In [15]:
# (train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()

# # Normalize pixel values to be between 0 and 1
# train_images, test_images = train_images / 255.0, test_images / 255.0

In [16]:
# class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
#                'dog', 'frog', 'horse', 'ship', 'truck']

# plt.figure(figsize=(10,10))
# for i in range(25):
#     plt.subplot(5,5,i+1)
#     plt.xticks([])
#     plt.yticks([])
#     plt.grid(False)
#     plt.imshow(train_images[i])
#     # The CIFAR labels happen to be arrays, 
#     # which is why you need the extra index
#     plt.xlabel(class_names[train_labels[i][0]])
# plt.show()

In [17]:
# # https://www.tensorflow.org/api_docs/python/tf/keras/layers/Conv2D
# # tf.keras.layers.Conv2D( filters, kernel_size, ...)

# model = models.Sequential()

# model.add(layers.Conv2D(32, (2, 2), activation='relu', input_shape=(32, 32, 3), padding='same'))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(64, (2, 2), activation='relu', padding='same'))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(64, (2, 2), activation='relu', padding='same'))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(128, (2, 2), activation='relu', padding='same'))
# model.add(layers.MaxPooling2D((2, 2)))

# model.add(layers.Flatten())
# model.add(layers.Dense(512, activation='relu'))
# model.add(layers.Dense(10))

# model.compile(optimizer='adam',
#               loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
#               metrics=['accuracy'])

# model.summary()

In [18]:
# https://www.tensorflow.org/api_docs/python/tf/keras/layers/Conv2D
# tf.keras.layers.Conv2D( filters, kernel_size, ...)

model = models.Sequential()

model.add( layers.InputLayer(input_shape=(224, 224, 3)) )
model.add( layers.Conv2D(32, (2, 2), 
                        padding='same',
                        activation='relu') )
model.add( layers.MaxPooling2D((2, 2)) )

model.add( layers.Conv2D(64, (2, 2), 
                        padding='same',
                        activation='relu') )
model.add( layers.MaxPooling2D((2, 2)) )

model.add( layers.Conv2D(128, (2, 2), 
                        padding='same',
                        activation='relu') )
model.add( layers.MaxPooling2D((2, 2)) )

model.add( layers.Conv2D(256, (2, 2), 
                        padding='same',
                        activation='relu') )
model.add( layers.MaxPooling2D((2, 2)) )

model.add( layers.Flatten() )
model.add( layers.Dense(50176, activation='relu') )
model.add( layers.Dense(10) )

optimizer = RMSprop(0.001)
model.compile(optimizer = optimizer, 
              loss = 'categorical_crossentropy',
              metrics = ['accuracy'])

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 224, 224, 32)      416       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 112, 112, 32)     0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 112, 112, 64)      8256      
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 56, 56, 64)       0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 56, 56, 128)       32896     
                                                                 
 max_pooling2d_2 (MaxPooling  (None, 28, 28, 128)      0

In [19]:
# always user earlystopping
# the restore_best_weights parameter load the weights of the best iteration once the training finishes
es = EarlyStopping(patience=10, restore_best_weights=True)

# checkpoint to save model
chkpt = ModelCheckpoint(filepath="model1", save_best_only=True)

# number of training and validation steps for training and validation
nb_train_steps = int(np.ceil(len(train_df)/batch_size))
nb_valid_steps = int(np.ceil(len(valid_df)/batch_size))

In [20]:
epochs = 1

In [None]:
%%time

history = model.fit(train_data_gen, 
                              epochs = epochs, 
                              steps_per_epoch = nb_train_steps, 
                              validation_data = valid_data_gen, 
                              validation_steps = nb_valid_steps,
                              callbacks = [es,chkpt])

In [22]:
print(train_acc)

NameError: ignored

In [None]:
# let's plot the loss and accuracy 

# get the training and validation accuracy from the history object
train_acc = history.history['accuracy']
valid_acc = history.history['val_accuracy']

# get the loss
train_loss = history.history['loss']
valid_loss = history.history['val_loss']

# get the number of entries
xvalues = np.arange(len(train_acc))

# visualize
f,ax = plt.subplots(1,2, figsize=(10,5))
ax[0].plot(xvalues, train_loss)
ax[0].plot(xvalues, valid_loss)
ax[0].set_title("Loss curve")
ax[0].set_xlabel("Epoch")
ax[0].set_ylabel("loss")
ax[0].legend(['train', 'validation'])

ax[1].plot(xvalues, train_acc)
ax[1].plot(xvalues, valid_acc)
ax[1].set_title("Accuracy")
ax[1].set_xlabel("Epoch")
ax[1].set_ylabel("accuracy")
ax[1].legend(['train', 'validation'])

plt.show()

In [None]:
# What is the final loss and accuracy on our validation data?
valid_loss, valid_acc = model.evaluate_generator(valid_data_gen, steps=nb_valid_steps)
print(f"Final validation accuracy: {valid_acc*100:.2f}%")

In [None]:
# select all the layers for which you want to visualize the outputs and store it in a list
outputs = [layer.output for layer in model.layers[0:-3]]

# Define a new model that generates the above output
vis_model = Model(model.input, outputs)

# check if we have all the layers we require for visualization 
vis_model.summary()

In [None]:
# vis_model.predict( test_images )

In [None]:
# outputs[-1].name

In [None]:
# store the layer names we are interested in
layer_names = []
for layer in outputs:
    layer_names.append(layer.name.split("/")[0])

    
print("Layers going to be used for visualization: ")
print(layer_names)

In [None]:
# img_rows = train_images[16].shape[0]
# img_rows

In [None]:
# img_cols = train_images[16].shape[1]
# img_cols

In [None]:
# img_channels = train_images[16].shape[2]
# img_channels

In [None]:
# test_images[16]

In [None]:
# test_labels[16][0]

In [None]:
# model.predict(test_images)

In [None]:
# pred_labels = model.predict(test_images)
# pred_labels

In [None]:
# pred_labels[0]

In [None]:
# pred_label = np.argmax(pred_labels[0], axis=-1)
# pred_label

In [None]:
# model1 = Model(model.input, model.outputs)
# model1.summary()

In [None]:
# test_images[16].shape

In [None]:
# sample_image_processed = np.expand_dims(test_images[16], axis=0)
# sample_image_processed.shape

In [None]:
# pred_labels_0 = model.predict(sample_image_processed)
# pred_labels_0

In [None]:
# pred_label_0 = np.argmax(pred_labels_0, axis=-1)[0]
# pred_label_0

In [None]:
# test_labels[16][0]

In [None]:
# class_names[pred_label_0]

In [None]:
# class_names[test_labels[16][0]]

In [None]:
idx = 0

In [None]:
# select the sample and read the corresponding image and label
sample_image = cv2.imread(valid_df.iloc[idx]['image'])
sample_image = cv2.cvtColor(sample_image, cv2.COLOR_BGR2RGB)
sample_image = cv2.resize(sample_image, (img_rows, img_cols))
sample_label = valid_df.iloc[idx]["label"]

# pre-process the image
sample_image_processed = np.expand_dims(sample_image, axis=0)
# sample_image_processed = preprocess_input(sample_image_processed)

In [None]:
# get the label predicted by our original model
pred_label = np.argmax(model.predict(sample_image_processed), axis=-1)[0]
pred_label

In [None]:
def get_CAM(image, predicted_label, model, layer_name):

    # choose the last conv layer in your model
    conv_layer = model.get_layer( layer_name )

    # https://keras.io/examples/vision/grad_cam/
    # First, we create a model that maps the input image to the activations
    # of the last conv layer as well as the output predictions
    grad_model = tf.keras.models.Model(
        [model.inputs], 
        [conv_layer.output, model.output]
    )

    # Then, we compute the gradient of the top predicted class for our input image
    # with respect to the activations of the last conv layer
    with tf.GradientTape() as tape:
        conv_layer_output, preds = grad_model(image)
        class_channel = preds[:, tf.argmax(preds[0])]

    # This is the gradient of the output neuron (top predicted or chosen)
    # with regard to the output feature map of the last conv layer
    grads = tape.gradient(class_channel, conv_layer_output)

    # This is a vector where each entry is the mean intensity of the gradient
    # over a specific feature map channel
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # We multiply each channel in the feature map array
    # by "how important this channel is" with regard to the top predicted class
    # then sum all the channels to obtain the heatmap class activation
    conv_layer_output = conv_layer_output[0]
    heatmap = conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    # For visualization purpose, we will also normalize the heatmap between 0 & 1
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

    # # https://www.kaggle.com/code/aakashnain/what-does-a-cnn-see/notebook
    # # we want the activations for the predicted label
    # predicted_output = model.output[:, predicted_label]
    
    # # get the gradients wrt to the last conv layer
    # grads = K.gradients(predicted_output, conv_layer.output)[0]
    
    # # take mean gradient per feature map
    # grads = K.mean(grads, axis=(0,1,2)) # GAP - Global Average Pooling
    
    # # Define a function that generates the values for the output and gradients
    # evaluation_function = K.function([model.input], [grads, conv_layer.output[0]])
    
    # # get the values
    # grads_values, conv_ouput_values = evaluation_function([image])
    
    # # CAM - Class Activation Map
    # # iterate over each feature map in yout conv output and multiply
    # # the gradient values with the conv output values. This gives an 
    # # indication of "how important a feature is"
    # # for i in range(512): # we have 512 features in our last conv layer
    # for i in range(256): # we have 512 features in our last conv layer
    #     conv_ouput_values[:,:,i] *= grads_values[i]
    
    # # create a heatmap
    # heatmap = np.mean(conv_ouput_values, axis=-1)
    
    # # remove negative values
    # heatmap = np.maximum(heatmap, 0)
    
    # # normalize
    # heatmap /= heatmap.max()
    
    # return heatmap

In [None]:
# get_CAM(image, predicted_label, model, layer_name)
get_CAM(
    sample_image_processed, 
    pred_label, 
    model, 
    layer_names[-1])

In [None]:
# # get the heatmap for class activation map(CAM)
# heatmap = get_CAM(sample_image_processed, 
#                   pred_label_0, 
#                   model, 
#                   layer_names[-1])
# heatmap = cv2.resize(heatmap, (test_images[16].shape[0], test_images[16].shape[1]))
# heatmap = heatmap *255
# heatmap = np.clip(heatmap, 0, 255).astype(np.uint8)
# heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
# super_imposed_image = heatmap * 0.5 + test_images[16]
# super_imposed_image = np.clip(super_imposed_image, 0,255).astype(np.uint8)

In [None]:
# f,ax = plt.subplots(2,2, figsize=(15,8))
# ax[0,0].imshow(test_images[16])
# ax[0,0].set_title(f"True label: {class_names[test_labels[16][0]]} \n Predicted label: {class_names[pred_label_0]}")
# ax[0,0].axis('off')

# ax[0,1].imshow(heatmap)
# ax[0,1].set_title("Class Activation Map")
# ax[0,1].axis('off')

# ax[1,0].imshow(super_imposed_image)
# ax[1,0].set_title("Activation map superimposed")
# ax[1,0].axis('off')
# plt.show()

In [None]:
def visualize_layer(layer):
  
  # get the heatmap for class activation map(CAM)
  heatmap = get_CAM(sample_image_processed, 
                    pred_label_0, 
                    model, 
                    layer)
  heatmap = cv2.resize(
      heatmap, 
      (test_images[16].shape[0], test_images[16].shape[1]))
  heatmap = heatmap *255
  heatmap = np.clip(heatmap, 0, 255).astype(np.uint8)
  heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
  super_imposed_image = heatmap * 0.5 + test_images[16]
  super_imposed_image = np.clip(super_imposed_image, 0, 255).astype(np.uint8)

  # Ploting
  fig, axes = plt.subplots( 1, 3, figsize=( 30, 10 ) )
  axes[0].set_title( f'True label: {class_names[test_labels[16][0]]} \n Predicted label: {class_names[pred_label_0]}' )
  axes[0].axis('off')
  axes[0].imshow( test_images[16] )
  axes[1].set_title( f'Class Activation Map - Layer: {layer}' )
  axes[1].axis('off')
  axes[1].imshow( heatmap )
  axes[2].set_title( f'Activation Map Superimposed - Layer: {layer}' )
  axes[2].axis('off')
  axes[2].imshow( super_imposed_image )
  plt.show()

  # # Plot just CAM of the layer
  # plt.figure( figsize=(2, 2) )
  # plt.title( f'Class Activation Map - Layer: {layer}' )
  # plt.imshow( heatmap )
  # plt.show()

  print()

In [None]:
# layer_names_reversed = layer_names[::-1] #reversing using list slicing
# for layer in layer_names_reversed:
for layer in layer_names:

  if 'pooling' not in layer:
    visualize_layer(layer)

In [None]:
# for index, test_img in enumerate( test_images ):

#   if index > 20:
#     break
  
#   # Plot just CAM of the layer
#   plt.figure( figsize=(1, 1) )
#   plt.title( f'index: {index}' )
#   plt.imshow( test_img )
#   plt.show()