In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [1]:
from tensorflow.keras.layers import Input
import os
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import datetime
from sklearn.metrics import mean_squared_error
from tqdm import tqdm
import time
import numpy as np
import tensorflow as tf
import cv2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, concatenate, Conv2DTranspose, BatchNormalization, MaxPooling2D, Dropout, ReLU, Reshape, \
 Flatten, Dense
from tensorflow.keras import initializers
import numpy as np
from random import shuffle
import pandas as pd 
from PIL import Image
import shutil
from skimage.filters import sobel
from skimage.transform import resize
from sklearn.model_selection import train_test_split
from keras import backend as K

In [2]:
def pure_pil_alpha_to_color_v2(image, color=(255, 255, 255)):
    """Alpha composite an RGBA Image with a specified color.

    Simpler, faster version than the solutions above.

    Source: http://stackoverflow.com/a/9459208/284318

    Keyword Arguments:
    image -- PIL RGBA Image object
    color -- Tuple r, g, b (default 255, 255, 255)

    """
    image.load()  # needed for split()
    background = Image.new('RGB', image.size, color)
    background.paste(image, mask=image.split()[3])  # 3 is the alpha channel
    return background


def convert(rgba):
    rgb = pure_pil_alpha_to_color_v2(rgba)
    return np.array(rgb)


In [3]:
# Data Specifications

IMG_HEIGHT = 224
IMG_WIDTH = 224
IMG_CHANNELS = 3
BATCH_SIZE = 10

In [4]:
# Functions to calculate the recall, precision, and F1-Scores
# Adopted from: https://datascience.stackexchange.com/questions/45165/how-to-get-accuracy-f1-precision-and-recall-for-a-keras-model

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))


# Data Reader

In [6]:
# A Data Reader Class used in one of the experiment to read the images from folders

class ImageDataset(tf.data.Dataset):
    # This class was only used for the experiment in which we used image 
    # gradients as a part of the input images.

    OUTPUT_TYPES = (tf.float32, tf.float32)
    OUTPUT_SHAPES = ((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS), ())

    def _generator(use_images_paths,labels):
        for image_path,label in zip(use_images_paths,labels):
             
            image = Image.open(image_path.decode())
            
            if np.array(image).shape[-1] == 4:
                image = convert(image)
            
            image = np.array(image,dtype=np.float32)/255.

            # converting the images to grayscale and extracting the gradients
            gray = np.zeros((IMG_HEIGHT, IMG_WIDTH), dtype=np.float)
            gray += image.mean(axis=-1)
            gradient = sobel(gray)

            yield np.stack([gray,gray,gradient],axis=-1), label
    
    def __new__(cls, use_images_paths,labels):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(use_images_paths,labels)
        )


#Data Pipline

In [5]:
def scheduler(epoch):
    """
    This function is used to change the learning rate as the training progresses.

    :param epoch: an int variable showing the current epoch number.
    :return: the learning corresponding the value of the epoch
    """
    if epoch < 10:

        return 0.00001

    else:

        return 0.00001


def configure_for_performance(ds, BATCH_SIZE):
    """
    This function prepares the dataset and builds up data pipeline.
    Shuffling, Batching, and Prefetching is performed to enhance the training speed.
    :param ds: the dataset, which is a tf.data.Dataset
    :return: the modified dataset
    """

    # Depending on the amount of the available RAM you could use either just Batching
    # or all of instructions mentioned below.

    ds = ds.cache()
    ds = ds.shuffle(buffer_size=1000)
    ds = ds.batch(BATCH_SIZE)
    ds = ds.prefetch(buffer_size=AUTOTUNE)
    return ds


#Cost Function

In [6]:
# It is a customized training loop that we used initially, however, we switched 
# to default training loop in the final experiment.

class CustomFit(tf.keras.Model):
    def train_step(self, data):
        x, label = data

        with tf.GradientTape() as tape:
            logits = self(x, training=True)
            c = self.compiled_loss(label, logits, regularization_losses=self.losses)

        training_vars = self.trainable_variables
        gradients = tape.gradient(c, training_vars)

        # Step with optimizer
        self.optimizer.apply_gradients(zip(gradients, training_vars))

        self.compiled_metrics.update_state(label, logits)

        return {m.name: m.result() for m in self.metrics}




#Model Definition

In [6]:
inputs = Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))

# Pre-trained Backbone
mbnet = tf.keras.applications.InceptionV3(input_shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS), include_top=False, weights='imagenet',
    input_tensor=inputs)

# Freezing the weights
for layer in mbnet.layers:
    layer.trainable = False

# Feature Extraction
features = mbnet(inputs)
flat_features = tf.keras.layers.Flatten()(features)

# Adding the additional 2 FC layers 

l1 = Dense(4096,'relu')(flat_features)
l1 = tf.keras.layers.BatchNormalization()(l1)
l
1 = Dense(4096,'relu')(l1)
l1 = tf.keras.layers.BatchNormalization()(l1)
dp = tf.keras.layers.Dropout(0.4)(l1)

output = Dense(3)(dp)

model = tf.keras.Model(inputs, output)


model.summary()
model.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True), metrics=["accuracy",f1_m,precision_m, recall_m])


Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 224, 224, 3)]     0         
_________________________________________________________________
inception_v3 (Functional)    (None, 5, 5, 2048)        21802784  
_________________________________________________________________
flatten (Flatten)            (None, 51200)             0         
_________________________________________________________________
dense (Dense)                (None, 4096)              209719296 
_________________________________________________________________
batch_normalization_94 (Batc (None, 4096)              16384     
_________________________________________________________________
dense_1 (Dense)              (None, 4096)              16781312  
_________________________________________________________________
batch_normalization_95 (Batc (None, 4096)              16384 

#Data Configuration

In [7]:
AUTOTUNE = tf.data.AUTOTUNE

# A switch between train and test phases.

is_train = True

# path to save the model weights and parameters
if is_train:
    checkpoint_path = "drive/MyDrive/SoftwareAI/baseline_weights/InceptionV3-{epoch:04d}.ckpt"
else:
    checkpoint_path = "drive/MyDrive/SoftwareAI/baseline_weights/InceptionV3-0020.ckpt"

checkpoint_dir = os.path.dirname(checkpoint_path)

# Path to save the logs according to the time
log_dir = "drive/MyDrive/SoftwareAI/logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

#Preparing Training Set

In [13]:
# Loading data files from the Google Drive and creating a dataframe from names and paths

data_dir = 'drive/MyDrive/SoftwareAI/dataset2/train/'
data_dirs = []
for sub_folder in os.listdir(data_dir):
    for name in os.listdir(data_dir + sub_folder):
        data_dirs.append([sub_folder, data_dir+sub_folder+'/'+name])

data_frame = pd.DataFrame(data=data_dirs,columns=['label','dir'])
data_frame

Unnamed: 0,label,dir
0,covid,drive/MyDrive/SoftwareAI/dataset2/train/covid/...
1,covid,drive/MyDrive/SoftwareAI/dataset2/train/covid/...
2,covid,drive/MyDrive/SoftwareAI/dataset2/train/covid/...
3,covid,drive/MyDrive/SoftwareAI/dataset2/train/covid/...
4,covid,drive/MyDrive/SoftwareAI/dataset2/train/covid/...
...,...,...
6754,pneumonia,drive/MyDrive/SoftwareAI/dataset2/train/pneumo...
6755,pneumonia,drive/MyDrive/SoftwareAI/dataset2/train/pneumo...
6756,pneumonia,drive/MyDrive/SoftwareAI/dataset2/train/pneumo...
6757,pneumonia,drive/MyDrive/SoftwareAI/dataset2/train/pneumo...


In [None]:
# Splitting the training from testing data
image_train,image_test,label_train,label_test = train_test_split(data_frame['dir'].values,data_frame['label'].values,stratify=data_frame['label'].values,test_size=0.2)

In [None]:
# Transfering the training data into separate covid, normal and pneumonia folders

os.makedirs('drive/MyDrive/SoftwareAI/dataset3/train/covid',exist_ok=True)
os.makedirs('drive/MyDrive/SoftwareAI/dataset3/train/normal',exist_ok=True)
os.makedirs('drive/MyDrive/SoftwareAI/dataset3/train/pneumonia',exist_ok=True)

for name in image_train:
    if 'pneumonia' in name:
        shutil.copy(name,'drive/MyDrive/SoftwareAI/dataset3/train/pneumonia')
    if 'covid' in name:
        shutil.copy(name,'drive/MyDrive/SoftwareAI/dataset3/train/covid')
    if 'normal' in name:
        shutil.copy(name,'drive/MyDrive/SoftwareAI/dataset3/train/normal')

In [None]:
# Transfering the test data into separate covid, normal and pneumonia folders
os.makedirs('drive/MyDrive/SoftwareAI/dataset3/test/covid',exist_ok=True)
os.makedirs('drive/MyDrive/SoftwareAI/dataset3/test/normal',exist_ok=True)
os.makedirs('drive/MyDrive/SoftwareAI/dataset3/test/pneumonia',exist_ok=True)

for name in image_test:
    if 'pneumonia' in name:
        shutil.copy(name,'drive/MyDrive/SoftwareAI/dataset3/test/pneumonia')
    if 'covid' in name:
        shutil.copy(name,'drive/MyDrive/SoftwareAI/dataset3/test/covid')
    if 'normal' in name:
        shutil.copy(name,'drive/MyDrive/SoftwareAI/dataset3/test/normal')

In [12]:
data_dir = 'drive/MyDrive/SoftwareAI/dataset2/train/'

# Creating separate dataset generators upon the folders 
train_datagen = ImageDataGenerator(rescale=1./255, rotation_range=30, fill_mode='nearest')
val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

# Creating the datasets for training, validation and testing

train_ds = train_datagen.flow_from_directory(
    directory='drive/MyDrive/SoftwareAI/dataset3/train/',
    target_size=(224, 224),
    color_mode="rgb",
    batch_size=10,
    class_mode="categorical",
    shuffle=True,
    seed=42)

test_ds = val_datagen.flow_from_directory(
    directory='drive/MyDrive/SoftwareAI/dataset3/test/',
    target_size=(224, 224),
    color_mode="rgb",
    batch_size=10,
    class_mode="categorical",
    shuffle=True,
    seed=42)

val_ds = val_datagen.flow_from_directory(
    directory='drive/MyDrive/SoftwareAI/dataset2/val/',
    target_size=(224, 224),
    color_mode="rgb",
    batch_size=10,
    class_mode="categorical",
    shuffle=True,
    seed=42)


Found 5380 images belonging to 3 classes.
{'covid': 0, 'normal': 1, 'pneumonia': 2}
Found 1343 images belonging to 3 classes.
Found 179 images belonging to 3 classes.


In [None]:
cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    verbose=1,
    save_weights_only=True,
    save_freq='epoch')

# Callback to save the logs on tensorboard

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, update_freq=200)

# Callback to adjust the learning rate according to the training epoch

lr_callback = tf.keras.callbacks.LearningRateScheduler(scheduler)

# Starting the training on GPU
with tf.device('/gpu:0'):
    model.load_weights('drive/MyDrive/SoftwareAI/baseline_weights/InceptionV3-0007.ckpt')
    model.fit(train_ds, epochs=20, verbose=1, validation_data=val_ds,initial_epoch=7,
                callbacks=[cp_callback,tensorboard_callback, lr_callback])


Epoch 8/20

Epoch 00008: saving model to drive/MyDrive/SoftwareAI/baseline_weights/InceptionV3-0008.ckpt


In [10]:
# Loading the trained model
model.load_weights('drive/MyDrive/SoftwareAI/baseline_weights/InceptionV3-0004.ckpt')

# Evaluating the model on the validation and test sets according to the metrics
model.evaluate(val_ds)
model.evaluate(test_ds)




[0.20467253029346466,
 0.9352196455001831,
 0.8563085794448853,
 0.8334306478500366,
 0.8888892531394958]

In [113]:
def backprop_receptive_field(image_dir, image_name, weight_path, save_directory, use_trained=False,
                             use_max_activation=False):
    """
        CNN Receptive Field Computation Using Backprop with TensorFlow
        Adopted from: https://learnopencv.com/cnn-receptive-field-computation-using-backprop-with-tensorflow/
    :param image_dir: path to the image
    :param image_name: name of the image
    :param weight_path: path to saved weights
    :param save_directory: path to saved the results
    :param use_trained: boolean, whether to use trained model or random
    :param use_max_activation: a parameter to whether use maximum activation score 
    """

    inputs = Input((224, 224, 3))


    # The backbone definition
    mbnet = tf.keras.applications.InceptionV3(input_shape=(224, 224, 3), include_top=False,
                                              weights='imagenet',
                                              input_tensor=inputs)
    # freezing the backbone
    for layer in mbnet.layers:
        layer.trainable = False

    features = mbnet(inputs)
    flat_features = tf.keras.layers.Flatten()(features)
    l1 = Dense(4096, 'relu')(flat_features)
    l1 = tf.keras.layers.BatchNormalization()(l1)
    l1 = Dense(4096, 'relu')(l1)
    l1 = tf.keras.layers.BatchNormalization()(l1)
    dp = tf.keras.layers.Dropout(0.4)(l1)
    output = Dense(3)(dp)
    model = tf.keras.Model(inputs, features)

    # Loading the weights
    if use_trained:
        model.load_weights(weight_path)
        print('model weights loaded.')

    new_image = plt.imread(image_dir)
    if len(new_image.shape) == 4:
        new_image = convert(image_dir)
    elif len(new_image.shape) == 2:
        image = plt.imread(image_dir)
        new_image = np.zeros(list(image.shape)[:2]+[3],dtype=np.float32)
        new_image[:, :, 0] = image
        new_image[:, :, 1] = image
        new_image[:, :, 2] = image
    new_image = new_image.astype(np.uint8)
    image = Image.fromarray(new_image).resize((224, 224))
    image = np.array(image, dtype=np.float32) / 255.
    image = tf.expand_dims(image, 0)

    pred = model.predict(image)
    preds = tf.transpose(pred, perm=[0, 3, 1, 2])
    preds = tf.nn.softmax(preds, axis=1)
    pred = tf.math.reduce_max(preds, axis=1)
    class_idx = tf.math.argmax(preds, axis=1)
    row_max = tf.math.reduce_max(pred, axis=1)
    row_idx = tf.math.argmax(pred, axis=1)
    col_idx = tf.math.argmax(row_max, axis=1)
    predicted_class = tf.gather_nd(class_idx, (0, tf.gather_nd(row_idx, (0, col_idx[0])), col_idx[0]))
    score_map = tf.expand_dims(preds[0, predicted_class, :, :], 0).numpy()

    input = tf.ones_like(image)
    out = model.predict(image)

    receptive_field_mask = tf.Variable(tf.zeros_like(out))

    if not use_max_activation:
        receptive_field_mask[:, :, :, predicted_class].assign(score_map)
    else:
        scoremap_max_row_values = tf.math.reduce_max(score_map, axis=1)
        max_row_id = tf.math.argmax(score_map, axis=1)
        max_col_id = tf.math.argmax(scoremap_max_row_values, axis=1).numpy()[0]
        max_row_id = max_row_id[0, max_col_id].numpy()
        # update grad
        receptive_field_mask = tf.tensor_scatter_nd_update(
            receptive_field_mask,
            [(0, max_row_id, max_col_id, 0)], [1],
        )
    grads = []
    with tf.GradientTape() as tf_gradient_tape:

        tf_gradient_tape.watch(input)
        # get the predictions
        preds = model(input)
        # apply the mask
        pseudo_loss = preds * receptive_field_mask
        pseudo_loss = K.mean(pseudo_loss)
        # get gradient
        grad = tf_gradient_tape.gradient(pseudo_loss, input)
        grad = tf.transpose(grad, perm=[0, 3, 1, 2])
        grads.append(grad)

    # The grads contains the heatmap that we need 
    return grads

