In [None]:
"""
Author: Amruth Karun M V
Date: 20-Oct-2021
"""

import os
import pandas as pd
import numpy as np
import zipfile
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Input, Conv2D, MaxPooling2D, Activation,
    AveragePooling2D, Flatten, BatchNormalization,
    Dense, Dropout, ZeroPadding2D, Add)
from keras.layers.merge import concatenate

from sklearn import metrics
import matplotlib.pyplot as plt
%matplotlib inline

TRAIN_PATH = "../input/covid19/"
EPOCHS = 100
BATCH_SIZE = 128
LEARNING_RATE = 0.01
INPUT_SIZE = (224, 224)

def load_data():
    """
    Loads input data from directory
    Arguments: None
    Returns: Train and val generator
    """
    
    train_datagen =  keras.preprocessing.image.ImageDataGenerator(validation_split=0.2) # set validation split

    train_generator = train_datagen.flow_from_directory(
        TRAIN_PATH,
        target_size=INPUT_SIZE,
        batch_size=BATCH_SIZE,
        shuffle=False,
        class_mode='categorical',
        subset='training') # set as training data

    validation_generator = train_datagen.flow_from_directory(
        TRAIN_PATH, 
        target_size=INPUT_SIZE,
        batch_size=BATCH_SIZE,
        shuffle=False,
        class_mode='categorical',
        subset='validation') # set as validation data
    
    return train_generator, validation_generator


def identity_block(X, f, filters, stage, block):
    """
    The identity block is the block that has no conv layer at shortcut.
    Arguments:
        X       -- input tensor
        f       -- kernel size of middle conv layer at main path
        filters -- list of integers, the filters of 3 conv layer at main path
        stage   -- integer, current stage label, used for generating layer names
        block   -- 'a','b'..., current block label, used for generating layer names
    Returns: Output tensor for the block.
    """
    
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    f1, f2, f3 = filters

    X_shortcut = X
   
    X = Conv2D(filters=f1, kernel_size=(1, 1), strides=(1, 1), padding='valid', name=conv_name_base + '2a')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    X = Conv2D(filters=f2, kernel_size=(f, f), strides=(1, 1), padding='same', name=conv_name_base + '2b')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    X = Conv2D(filters=f3, kernel_size=(1, 1), strides=(1, 1), padding='valid', name=conv_name_base + '2c')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)

    X = Add()([X, X_shortcut])# Skip Connection
    X = Activation('relu')(X)

    return X


def convolutional_block(X, f, filters, stage, block, strides=(2,2)):
    """
    A block that has a conv layer at shortcut.
    Arguments:
        X       -- input tensor
        f       -- the kernel size of middle conv layer at main path
        filters -- list of integers, the filters of 3 conv layer at main path
        stage   -- integer, current stage label, used for generating layer names
        block   -- 'a','b'..., current block label, used for generating layer names
        strides -- strides for the first conv layer in the block.
    Returns: Output tensor for the block.
    """
    
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    f1, f2, f3 = filters

    X_shortcut = X

    X = Conv2D(filters=f1, kernel_size=(1, 1), strides=strides, padding='valid', name=conv_name_base + '2a')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    X = Conv2D(filters=f2, kernel_size=(f, f), strides=(1, 1), padding='same', name=conv_name_base + '2b')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    X = Conv2D(filters=f3, kernel_size=(1, 1), strides=(1, 1), padding='valid', name=conv_name_base + '2c')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)

    X_shortcut = Conv2D(filters=f3, kernel_size=(1, 1), strides=strides, padding='valid', name=conv_name_base + '1')(X_shortcut)
    X_shortcut = BatchNormalization(axis=3, name=bn_name_base + '1')(X_shortcut)

    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X


def load_model():
    """
    Creates a keras ResNet-50 model
    Arguments: None
    Returns: ResNet-50 Model
    """
    
    input_layer = Input(shape=(224, 224, 3))

    X = ZeroPadding2D((3, 3))(input_layer)

    X = Conv2D(64, (7, 7), strides=(2, 2), name='conv1')(X)
    X = BatchNormalization(axis=3, name='bn_conv1')(X)
    X = Activation('relu')(X)
    X = MaxPooling2D((3, 3), strides=(2, 2))(X)

    X = convolutional_block(X, f=3, filters=[64, 64, 256], stage=2, block='a', strides=(1,1))
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='b')
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='c')


    X = convolutional_block(X, f=3, filters=[128, 128, 512], stage=3, block='a', strides=(2,2))
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='b')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='c')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='d')

    X = convolutional_block(X, f=3, filters=[256, 256, 1024], stage=4, block='a', strides=(2,2))
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='b')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='c')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='d')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='e')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='f')

    X = convolutional_block(X, f=3, filters=[512, 512, 2048], stage=5, block='a', strides=(2,2))
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='b')
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='c')
    X = AveragePooling2D(pool_size=(2, 2), padding='same')(X)
    X = Dropout(0.4)(X)
        
    # Define fully connected layers and output
    X = Flatten()(X)
    X = Dense(units=512,activation="relu")(X)
    X = Dense(units=256,activation="relu")(X)
    X = Dense(units=3, activation="softmax")(X)
    
    model = Model(inputs=input_layer, outputs=X, name='ResNet50')
    model.summary()
    
    opt = Adam(learning_rate=LEARNING_RATE)
    model.compile(loss = keras.losses.categorical_crossentropy, optimizer=opt, metrics=['accuracy'])
    
    return model    
    

def plot_curves(history):
    """
    Plots loss and accuracy and loss plots for
    training and validation datasets
    Arguments: 
        history -- training history
    Returns: None
    """
   
    plt.plot(history.history['loss'], color='b', label="Training loss")
    plt.plot(history.history['val_loss'], color='r', label="Validation loss")
    plt.legend()
    plt.title('Training Loss VS Validation Loss')
    plt.show()
    
    plt.plot(history.history['accuracy'], color='b', label="Training accuracy")
    plt.plot(history.history['val_accuracy'], color='r',label="Validation accuracy")
    plt.title('Training Accuracy VS Validation Accuracy')
    plt.legend()
    plt.show()
    

def get_confusion_matrix(model, data_generator):
    """
    Calculates the accuracy and displays the 
    confusion matrix for the input data
    Arguments:
        model           -- trained model
        data_generator  -- input data generator
    Returns: None
    """
    
    predictions = model.predict(data_generator, BATCH_SIZE)
    y_pred = np.argmax(predictions, axis=1)
    y_true = data_generator.classes
    class_names = ['COVID', 'Normal', 'Pneumonia']
    
    print("Score =", model.evaluate(data_generator, batch_size=BATCH_SIZE))
    print("Accuracy  = ", metrics.accuracy_score(y_true, y_pred))
    cm = metrics.confusion_matrix(y_true, y_pred)
    metrics.ConfusionMatrixDisplay(cm, display_labels=class_names).plot(cmap=plt.cm.Blues,
                                                                       xticks_rotation='vertical')
    plt.show()

    
def train_model(train_generator, val_generator):
    """
    Trains ResNet-50 model and saves the 
    trained weights to an H5 file.
    Arguments: 
        train_generator   -- train data generator
        val_generator     -- validation data generator
    Returns: Trained model
    """
    
    # Loads the model
    model = load_model()
    earlystop = keras.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', verbose=1, patience=20)
    callbacks = [earlystop]
    
    history = model.fit(
        train_generator, 
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=val_generator,
        validation_steps=val_generator.samples//BATCH_SIZE,
        steps_per_epoch=train_generator.samples//BATCH_SIZE,
        callbacks=callbacks)
    
    plot_curves(history)
    model.save_weights("model_resnet50.h5")
    print("Model saved successfully!")
    
    return model


In [None]:
train_generator, val_generator = load_data()
model = train_model(train_generator, val_generator)

print("Confusion matrix for train data:")
get_confusion_matrix(model, train_generator)

print("Confusion matrix for val/test data:")
get_confusion_matrix(model, val_generator)