In [None]:
shutil.rmtree('./train/')
shutil.rmtree('./val/')
shutil.rmtree('./test/')

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import keras
import time
import keras.backend as K
import os
import shutil
import random

from keras.optimizers.schedules import InverseTimeDecay,ExponentialDecay
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
from tensorflow.keras.utils import Progbar

from keras.losses import CategoricalCrossentropy,BinaryCrossentropy
from keras.optimizers import Adam,RMSprop
from keras.applications.densenet import DenseNet121
from keras.applications import InceptionV3
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Model, Sequential
from keras.metrics import CategoricalAccuracy, Precision, Recall
from keras.layers import Dense, Conv2D, BatchNormalization, MaxPool2D, AveragePooling2D, Flatten, Input,GlobalAveragePooling2D
from keras.utils.vis_utils import plot_model
from keras.optimizers.schedules import InverseTimeDecay,ExponentialDecay

from sklearn.metrics import classification_report,confusion_matrix,ConfusionMatrixDisplay

# Splitting Data 
### Training (85) : Validation (10) : and Testing (5)

In [None]:
path = "../input/covid19-radiography-database/COVID-19 Radiography Database/"

classesPath = ['COVID', 'Viral Pneumonia', 'NORMAL']

val_ratio = 0.1
test_ratio = 0.05

for cls in classesPath:
    os.makedirs('./train/' + cls)
    os.makedirs('./val/' + cls)
    os.makedirs('./test/' + cls)

    src = path + cls

    allFileNames = os.listdir(src)
    np.random.shuffle(allFileNames)
    train_FileNames, val_FileNames, test_FileNames = np.split(np.array(allFileNames),
                                                              [int(len(allFileNames) * (1 - (val_ratio + test_ratio))), 
                                                               int(len(allFileNames) * (1 - test_ratio))])

    train_FileNames = [src + '/' + name for name in train_FileNames.tolist()]
    val_FileNames = [src + '/' + name for name in val_FileNames.tolist()]
    test_FileNames = [src + '/' + name for name in test_FileNames.tolist()]

    print('Total ', str(cls), len(allFileNames))
    print('Training', len(train_FileNames))
    print('Validation', len(val_FileNames))
    print('Testing', len(test_FileNames))
    print("\n")

    for name in train_FileNames:
        shutil.copy(name, './train/' + cls)

    for name in val_FileNames:
        shutil.copy(name, './val/' + cls)

    for name in test_FileNames:
        shutil.copy(name, './test/' + cls)

# Loading the data with Image Augmentation using real-time augmentation with Keras Image Data Generator

In [None]:
path_train = "./train"
path_val = "./val"
path_test = "./test"

batch_size = 128
interpolation = "bicubic"
seed = 69
target_size = (100, 100)
epochs = 20

train_data_gen = ImageDataGenerator(rescale = 1./99,
                                    rotation_range = 12,
                                    zoom_range = 0.1)

test_data_gen = ImageDataGenerator(rescale = 1./99)

ds_train = train_data_gen.flow_from_directory(directory = path_train,
                                              color_mode = "rgb",
                                              batch_size = batch_size,
                                              target_size = target_size,
                                              shuffle = True,
                                              interpolation = interpolation,
                                              seed = seed)

ds_val = test_data_gen.flow_from_directory( directory = path_val,
                                            color_mode = "rgb",
                                            batch_size = batch_size,
                                            target_size = target_size,
                                            shuffle = True,
                                            interpolation = interpolation,
                                            seed = seed)   

ds_test = test_data_gen.flow_from_directory(directory = path_test,
                                            color_mode = "rgb",
                                            batch_size = 16,
                                            target_size = target_size,
                                            shuffle = False,
                                            interpolation = interpolation,
                                            seed = seed)

# Creating the custom training loop

In [None]:
@tf.function
def train_one_step(x_batch, y_batch, metric, model, loss_function):
    # Gradient for derivative:
    with tf.GradientTape() as tape:
        # Prediction for each batch:
        y_hat = model(x_batch, training = True)
        # Categorical Cross Entropy Loss:
        loss = loss_function(y_batch, y_hat)
    # Get derivative of all trainable weights to loss function:
    gradients = tape.gradient(loss, model.trainable_weights)
    # Optimizer = ADAM.
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
    metric.update_state(y_batch, y_hat)
    return loss

@tf.function
def val_one_step(x_batch, y_batch, metric, model, loss_function):
    y_hat = model(x_batch,training = False)
    loss = loss_function(y_batch, y_hat)
    metric.update_state(y_batch, y_hat)
    return loss

In [None]:
def train(model, optimizer, metric, metric_val,epochs):
    losses = []
    losses_val = []
    accuracies = []
    accuracies_val = []
    best_loss=999999
    loss_function = CategoricalCrossentropy()
    for epoch in range(epochs):
        print("\nepoch {}/{}".format(epoch + 1, epochs))
        batch_num = 0
        metric_names = [metric.name, "loss"]
        loss_train=0
        for batch_num, (x_batch_train, y_batch_train) in enumerate(ds_train):
            loss_train += train_one_step(x_batch_train, y_batch_train, metric, model, loss_function)
            values = [('acc',metric.result().numpy()), ('loss', loss_train)]
            accuracy = metric.result().numpy()
            accuracies.append(accuracy)
            losses.append(loss_train)
            if batch_num == (3108 // batch_size):
                break

        print("\nepoch {}/{}".format(epoch + 1, epochs))
        metric_val_names = [metric_val.name, "val_loss"]
        loss_val=0
        for val_batch_num,(x_batch_val,y_batch_val) in enumerate(ds_val):
            loss_val += val_one_step(x_batch_val, y_batch_val, metric_val, model, loss_function)
            values = [('val_acc',metric_val.result().numpy()), ('val_loss', loss_val)]
            accuracy_val = metric_val.result().numpy()
            accuracies_val.append(accuracy_val)
            losses_val.append(loss_val)
            if val_batch_num == (388//batch_size):
                break

        if(best_loss>loss_val):
            best_weights=model.get_weights()
            model.save_weights(model.name+".h5")
            best_loss=loss_val
        
            
        
        print(metric.name + " over epoch " + str(epoch + 1) + " = " + str(accuracy) + " and " + metric_val.name + " = " + str(accuracy_val))
        print("loss over epoch " + str(epoch + 1) + " = " + str(loss_train.numpy()) + " and val_loss = " + str(loss_val.numpy()))
        
        metric.reset_states()
        metric_val.reset_states()
        
        
    model.set_weights(best_weights)
    return losses, loss_val, accuracies, accuracies_val

In [None]:
def evaluate(model, metric):
    losses = []
    accuracies = []
    loss_function = CategoricalCrossentropy()
    batch_num = 0
    y_pred=np.array([])
    y_test=np.array([])
    metric_names = [metric.name, "loss"]
    for batch_num, (x_batch_test,y_batch_test) in enumerate(ds_test):
        loss_test = val_one_step(x_batch_test, y_batch_test, metric, model, loss_function)
        values = [('acc',metric.result().numpy()), ('loss', loss_test)]
        y_pred=np.append(y_pred,model.predict(x_batch_test))
        y_test=np.append(y_test,y_batch_test)
        if batch_num == (196 // 16):
            break
    y_pred = np.reshape(y_pred,(196,3))
    y_test = np.reshape(y_test,(196,3))
    y_pred = np.argmax(y_pred,axis=1)
    y_test = np.argmax(y_test,axis=1)
    report = classification_report(y_true=y_test,y_pred=y_pred)
    metric.reset_states()
    confusion_mat = confusion_matrix(y_pred = y_pred, y_true = y_test)
    return report, confusion_mat


# 1.1. Shallow-Fully Connected Network Model

In [None]:
## We have tried to use sigmoid and tanh functions but the accurracy wasn't good enough but the relu was the best one.
model_fc_3 = Sequential([
    Flatten(input_shape = (100, 100, 3)),
    Dense(128, 'relu'),
    Dense(256, 'relu'),
    Dense(128, 'relu'),
    Dense(64, 'relu'),
    Dense(3, 'softmax')],name="ShallowFullyConnected"
)

In [None]:
optimizer = Adam()
metric = CategoricalAccuracy(name = "Accuracy")
metric_val = CategoricalAccuracy(name = "Val_Accuracy")
losses, accuracy, losses_val, accuracies_val = train(model_fc_3, optimizer, metric, metric_val, 50)

In [None]:
report, confusion_mat = evaluate(model_fc_3, metric)

print(report)
ConfusionMatrixDisplay(confusion_matrix = confusion_mat).plot()

# 1.2. Deep-Fully Connected Network Model

In [None]:
model_fc_10 = Sequential([
    Flatten(input_shape = (100,100,3)),
    Dense(512, 'relu'),
    Dense(512, 'relu'),
    Dense(1024, 'relu'),
    Dense(1024, 'relu'),
    Dense(1024, 'relu'),
    Dense(512, 'relu'),
    Dense(512, 'relu'),
    Dense(256, 'relu'),
    Dense(128, 'relu'),
    Dense(3, 'softmax')],name="DeepFullyConnected")

In [None]:
optimizer = Adam()
metric = CategoricalAccuracy(name = "Accuracy")
metric_val = CategoricalAccuracy(name = "Val_Accuracy")
losses, accuracy,losses_val, accuracies_val = train(model_fc_10, optimizer, metric,metric_val, 50)

In [None]:
report, confusion_mat = evaluate(model_fc_10, metric)

print(report)
ConfusionMatrixDisplay(confusion_matrix = confusion_mat).plot()

# 2.1. Shallow-Convolutional Network Model

In [None]:
model_LeNet5=Sequential([
    Conv2D(6,(5,5),input_shape=(100,100,3),activation="tanh",strides=1),
    AveragePooling2D((2,2),strides=2),
    Conv2D(16,(5,5),activation="tanh",strides=1),
    AveragePooling2D((2,2),strides=2),
    Flatten(),
    Dense(120,"tanh"),
    Dense(84,"tanh"),
    Dense(3,"softmax")],name="ShallowConvolutionalNetwork")

In [None]:
optimizer = Adam()
metric = CategoricalAccuracy(name = "Accuracy")
metric_val = CategoricalAccuracy(name = "Val_Accuracy")
losses, accuracy, losses_val, accuracies_val = train(model_LeNet5, optimizer, metric,metric_val, 50)

In [None]:
report, confusion_mat = evaluate(model_LeNet5, metric)

print(report)
ConfusionMatrixDisplay(confusion_matrix = confusion_mat).plot()

# 2.2. Deep-Convolutional Network Model

In [None]:
model_VGG8= Sequential([
    Conv2D(64,(3,3),input_shape=(100,100,3),activation ="relu",strides=1),
    MaxPool2D((3,3),strides=2),
    Conv2D(256,(3,3),activation="relu",strides=1),
    MaxPool2D((3,3),strides=2),
    Conv2D(512,(3,3),activation="relu",strides=1),
    MaxPool2D((3,3),strides=2),
    Conv2D(512,(3,3),activation="relu",strides=1),
    MaxPool2D((3,3),strides=2),
    Flatten(),
    Dense(4096,"relu"),
    Dense(4096,"relu"),
    Dense(3,"softmax")],name="DeepCovolutionalNetwork")

In [None]:
optimizer = Adam()
metric = CategoricalAccuracy(name = "Accuracy")
metric_val = CategoricalAccuracy(name = "Val_Accuracy")
losses, accuracy,losses_val,accuracies_val = train(model_VGG8, optimizer, metric,metric_val, 50)

In [None]:
report, confusion_mat = evaluate(model_VGG8, metric)

print(report)
ConfusionMatrixDisplay(confusion_matrix = confusion_mat).plot()

# 3.1. ResNet50 Model

In [None]:
model_resnet50 = ResNet50(include_top = False, weights = "imagenet", input_shape = (100, 100, 3))
model_resnet50.trainable = False

In [None]:
x = model_resnet50.output
x = GlobalAveragePooling2D()(x)
x = Dense(512, activation = 'relu')(x)
x = Dense(256, activation = 'relu')(x)
x = Dense(128, activation = 'relu')(x)
y_hat = Dense(3,'softmax')(x)
model_resnet50 = Model(inputs = model_resnet50.input, outputs = y_hat,name="ResNet50")

In [None]:
optimizer = Adam()
metric = CategoricalAccuracy(name = "Accuracy")
metric_val = CategoricalAccuracy(name = "Val_Accuracy")
losses, accuracy, losses_val, accuracies_val = train(model_resnet50, optimizer, metric,metric_val, 50)

In [None]:
report, confusion_mat = evaluate(model_resnet50, metric)

print(report)
ConfusionMatrixDisplay(confusion_matrix = confusion_mat).plot()

# 3.2. DenseNet121 Model

In [None]:
model_DenseNet121 = DenseNet121(include_top = False, weights = "imagenet", input_shape = (100, 100, 3))
model_DenseNet121.trainable = False

x = model_DenseNet121.output
x = GlobalAveragePooling2D()(x)
x = Dense(512, activation = 'relu')(x)
x = Dense(128, activation = 'relu')(x)

y_hat = Dense(3,'softmax')(x)

model_DenseNet121 = Model(inputs = model_DenseNet121.input, outputs = y_hat,name="DenseNet121")

In [None]:
optimizer = Adam()
metric = CategoricalAccuracy(name = "Accuracy")
metric_val = CategoricalAccuracy(name = "Val_Accuracy")

losses, accuracy, losses_val, accuracies_val = train(model_DenseNet121, optimizer, metric, metric_val, 50)

In [None]:

report, confusion_mat = evaluate(model_DenseNet121, metric)

print(report)
ConfusionMatrixDisplay(confusion_matrix = confusion_mat).plot()

# 3.3. InceptionV3 Model

In [None]:
model_InceptionV3 = InceptionV3(include_top = False, weights = "imagenet", input_shape = (100, 100, 3))
model_InceptionV3.trainable = False

x = model_InceptionV3.output
x = GlobalAveragePooling2D()(x)
x = Dense(512, activation = 'relu')(x)
x = Dense(128, activation = 'relu')(x)

y_hat = Dense(3,'softmax')(x)

model_InceptionV3 = Model(inputs = model_InceptionV3.input, outputs = y_hat,name="InceptionV3")

In [None]:
optimizer = Adam()
metric = CategoricalAccuracy(name = "Accuracy")
metric_val = CategoricalAccuracy(name = "Val_Accuracy")

losses, accuracy, losses_val, accuracies_val = train(model_InceptionV3, optimizer, metric, metric_val, 50)

In [None]:
report, confusion_mat = evaluate(model_InceptionV3, metric)

print(report)
ConfusionMatrixDisplay(confusion_matrix = confusion_mat).plot()