# Import all the libraries needed

In [1]:
from keras.layers import Input, Lambda, Dense, Flatten
from keras.models import Model
from keras.preprocessing import image
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
import numpy as np
from glob import glob
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense, Activation, Rescaling
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras.applications import imagenet_utils
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.utils import class_weight
import itertools
import os
import shutil
import random
import matplotlib.pyplot as plt
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras import layers
import pandas as pd
import seaborn as sns



In [1]:
# Create train, test and validate images generator

In [2]:
def initiateGenerator(path, batchSize):
    base_path = path
    print("\nTotal : ", end=" ")
    train_dataset = tf.keras.preprocessing.image_dataset_from_directory(batch_size=32, directory=base_path+"/"+"train")

    train_datagen = ImageDataGenerator()

    print("\nFor Training : ", end=" ")
    train_generator = train_datagen.flow_from_directory(
        base_path+"/"+"train",
        target_size=(224, 224),
        batch_size=batchSize,
        class_mode='categorical', subset='training')

    print("\nFor Val : ", end=" ")
    valid_datagen = ImageDataGenerator()
    validation_generator = valid_datagen.flow_from_directory(
        base_path+"/"+"val",
        target_size=(224, 224),
        batch_size=batchSize,
        class_mode='categorical',shuffle=False)
    print("\nFor Test : ", end=" ")

    test_datagen = ImageDataGenerator()
    test_generator = test_datagen.flow_from_directory(
        base_path+"/"+"test",
        target_size=(224, 224),
        batch_size=batchSize,
        class_mode='categorical', shuffle=False)
    class_names = train_dataset.class_names
    noOfClasses = len(class_names)
    print("\nNo of Classes : ", noOfClasses)
    print("Classes : ", class_names)

    plt.figure(figsize=(10, 10))
    for images, labels in train_dataset.take(1):
        for i in range(noOfClasses):
            ax = plt.subplot(4, 4, i + 1)
            plt.imshow(images[i].numpy().astype("uint8"))
            plt.title(class_names[labels[i]])
            plt.axis("off")

    for image_batch, labels_batch in train_dataset:
        print("Image Shape : ",image_batch.shape)
        break
        
    return noOfClasses,class_names, train_generator, validation_generator,test_generator

# Create our model
- VGG-19
- Xception
- InceptionV3
- DenseNet201

In [4]:
def initiateVGG19(noOfClasses):
    norm = layers.Rescaling(1./255, input_shape=IMAGE_SIZE + [3])
    modelInput = tf.keras.applications.VGG19(
        input_shape=IMAGE_SIZE + [3],
        include_top=False,
        weights="imagenet"
    )
    
    for layer in modelInput.layers:
        layer.trainable = False
    model = keras.models.Sequential([
        norm,
        modelInput,
        Flatten(),
        Dense(noOfClasses, activation='softmax')
    ])
    return model

def modelSummary(model):
    model.summary()

In [5]:
def initiateXception(noOfClasses):
    norm = layers.Rescaling(1./255, input_shape=IMAGE_SIZE + [3])
    modelInput = tf.keras.applications.Xception(
        input_shape=IMAGE_SIZE + [3],
        include_top=False,
        weights="imagenet"
    )
    
    for layer in modelInput.layers:
        layer.trainable = False
    model = keras.models.Sequential([
        norm,
        modelInput,
        Flatten(),
        Dense(noOfClasses, activation='softmax')
    ])
    return model


In [7]:
def initiateInceptionV3(noOfClasses):
    norm = layers.Rescaling(1./255, input_shape=IMAGE_SIZE + [3])
    modelInput = tf.keras.applications.InceptionV3(
        input_shape=IMAGE_SIZE + [3],
        include_top=False,
        weights="imagenet"
    )
    
    for layer in modelInput.layers:
        layer.trainable = False
    model = keras.models.Sequential([
        norm,
        modelInput,
        Flatten(),
        Dense(noOfClasses, activation='softmax')
    ])
    return model

In [8]:
def initiateDenseNet201(noOfClasses):
    norm = layers.Rescaling(1./255, input_shape=IMAGE_SIZE + [3])
    modelInput = tf.keras.applications.DenseNet201(
        input_shape=IMAGE_SIZE + [3],
        include_top=False,
        weights="imagenet"
    )
    
    for layer in modelInput.layers:
        layer.trainable = False
    model = keras.models.Sequential([
        norm,
        modelInput,
        Flatten(),
        Dense(noOfClasses, activation='softmax')
    ])
    return model

# Initializations of
- Optimizer
- Loss function and metric
- Learning rate scheduler
- Checkpoint saving
- Early stopping

In [9]:
def initiateParams(className, model, lr,model_name):
    opt = tf.keras.optimizers.Adam(learning_rate=lr)

    model.compile(optimizer=opt,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    annealer = ReduceLROnPlateau(monitor='val_accuracy', factor=0.5, patience=2, verbose=1, min_lr=1e-5, mode="max")
    checkpoint = ModelCheckpoint(className + "/" + className + model_name + "-{epoch:02d}-{val_accuracy:.3f}.h5", verbose=2, save_best_only=True, monitor="val_accuracy", mode="max")
    early = EarlyStopping(monitor="val_accuracy", patience=3, verbose=1, mode="max")
    return model, annealer, early, checkpoint

# Fit the model

In [10]:
def modelFit(model, annealer, early, checkpoint, epochs=1, class_weight=None):

    history = model.fit(
      train_generator,
      validation_data=validation_generator,
      epochs=epochs,
      callbacks=[annealer, early, checkpoint],
      steps_per_epoch=len(train_generator),
      validation_steps=len(validation_generator),
        class_weight=class_weight
    )
    
    return history


# Plot the train and validate accuracy

In [11]:
def plotOutput(history, className, modelName, epochs):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']

    loss = history.history['loss']
    val_loss = history.history['val_loss']

    epochs_range = range(len(loss))

    plt.figure(figsize=(12, 12))
    plt.subplot(3, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(3, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.savefig(className + "/" + className + "_" + modelName + '_graph.png')
    plt.show()


# Evaluate and Save the final model

In [12]:
def evalModel(model):
    evl = model.evaluate(test_generator)
    acc = evl[1]*100
    msg=f'Accuracy on the Test Set = {acc:5.2f} %'
    print(msg)
    return acc
    
def saveModel(model, className, model_name, acc):
    model.save(className + "/" + className + " - "+ model_name + f"_{acc:.3f}" + "Final.h5")
    print(f"Final {model_name} model saved!")

## Calculate weighted recall, precision, f1 and accuracy
## Plot the confusion matrix
## Plot the classification report

In [13]:
from sklearn.metrics import recall_score,precision_score,f1_score, accuracy_score
def callPlot(model, modelName, className, classes):
    y_true = test_generator.classes
    print("True : ", (y_true))

    y_pred = model.predict(test_generator)
    y_pred = np.argmax(y_pred, axis=1)
    print("Predicted : ", (y_pred))
    
    recall=recall_score(y_true,y_pred,average='weighted')
    p=precision_score(y_true, y_pred,average='weighted')
    f1=f1_score(y_true, y_pred,average='weighted')
    acc = accuracy_score(y_true, y_pred)
    print(f"ACCURACY={acc}")
    print(f"RECALL={recall}")
    print(f"precision={p}")
    print(f"F1 Score{f1}")

    conf_mat = confusion_matrix(y_true, y_pred)
    conf_df = pd.DataFrame(conf_mat, index=classes, columns=classes)
    plt.figure(figsize=(10, 8))
    plt.title(f"{modelName}_{className}_{acc:.3f}")
    sns.heatmap(conf_df, annot=True, fmt="g")
    plt.savefig(className + "/" + className + "_" + modelName + f"{acc:.3f}_confusionMatrix.png")
    plt.show()



    print(classification_report(y_true, y_pred))
    report = {
        c : 0 for c in classes
    }
    report.update(classification_report(y_true, y_pred, output_dict=True))
    for idx, _ in enumerate(classes):
        report[_] = report[f"{idx}"]
        del report[f"{idx}"]
    del report["accuracy"]
    df = pd.DataFrame(report).transpose()
    plt.figure(figsize=(10, 8))
    plt.title(f"{modelName}_{className}_{acc:.3f}")
    sns.heatmap(df, annot=True)
    plt.savefig(className + "/" + className + "_" + modelName + f"_{acc:.3f}_classificationReport.png")
    plt.show()

    

# Ensemble model
- Sum up the prediction from the models chose with weight (test accuracy)
- Plot the confusion matrix
- Plot the classification report
- Normalization of the final prediction is carried out in the predict.py file so that user receive the probability between 0 and 1

In [14]:
def Ensemble(c, classes):
    y_true = test_generator.classes
    print("True : ", (y_true))
    pred=[]
    for model,obj in part[c]['models'].items():
        pred.append(obj['model'].predict(test_generator) * obj["acc"])
    
    y_pred = pred[0]
    for i in range(1,len(pred)):
        y_pred = y_pred + pred[i]
        
    y_pred = np.argmax(y_pred, axis=1)
    print("Predicted : ", (y_pred))

    recall=recall_score(y_true,y_pred,average='weighted')
    p=precision_score(y_true, y_pred,average='weighted')
    f1=f1_score(y_true, y_pred,average='weighted')
    acc = accuracy_score(y_true, y_pred)
    print(f"ACCURACY={acc}")
    print(f"RECALL={recall}")
    print(f"precision={p}")
    print(f"F1 Score{f1}")

    conf_mat = confusion_matrix(y_true, y_pred)
    conf_df = pd.DataFrame(conf_mat, index=classes, columns=classes)
    plt.figure(figsize=(10, 8))
    plt.title(f"Ensemble_{className}_{acc:.3f}")
    sns.heatmap(conf_df, annot=True, fmt="g")
    plt.savefig(className + "/" + className + "_" + f"Ensemble_{acc:.3f}_confusionMatrix.png")
    plt.show()



    print(classification_report(y_true, y_pred))
    report = {
        c : 0 for c in classes
    }
    report.update(classification_report(y_true, y_pred, output_dict=True))
    for idx, _ in enumerate(classes):
        report[_] = report[f"{idx}"]
        del report[f"{idx}"]
    del report["accuracy"]
    df = pd.DataFrame(report).transpose()
    plt.figure(figsize=(10, 8))
    plt.title(f"Ensemble_{className}_{acc:.3f}")
    sns.heatmap(df, annot=True)
    plt.savefig(className + "/" + className + "_" + f"Ensemble_{acc:.3f}_classificationReport.png")
    plt.show()


# Set the folder path and hyperparameter

In [15]:
mpath = r'/kaggle/input/chest-xray-pneumoniacovid19tuberculosis'
c = className = "lung"
part={}
part[c]={'models':{},'no_of_classes':0,"ClassNames":None} 
IMAGE_SIZE = [224, 224]
img_height = 224
img_width = 224
noOfClasses = 0
gEpochs = 30
lr = 0.001
batchSize = 32

# Calculate class weight to deal with class imbalance and train the models

In [None]:
className = c
noOfClasses, class_names, train_generator, validation_generator, test_generator = initiateGenerator(mpath, batchSize=batchSize)
part[c]['ClassNames'] = class_names

class_weight = class_weight.compute_class_weight(
               class_weight='balanced',
                classes=np.unique(train_generator.classes), 
                y=train_generator.classes)
class_weight = {x : class_weight[x] for x in range(len(class_weight))}

model_name="VGG-19"
print("######################################################")
print(f"RESULTS FOR{model_name}")
curVGG19 = initiateVGG19(noOfClasses)
curVGG19, annealer, early, checkpoint = initiateParams(className, curVGG19, lr,model_name)
curHistory = modelFit(curVGG19, annealer, early, checkpoint, epochs=gEpochs, class_weight=class_weight)
plotOutput(curHistory, className, model_name, gEpochs)
acc=evalModel(curVGG19)

saveModel(curVGG19, className, model_name, acc)
part[c]['models'][model_name]={"model":curVGG19,'acc':acc}
callPlot(curVGG19, model_name, className, class_names)

model_name="Xception"
print("######################################################")
print(f"RESULTS FOR{model_name}")
curXception= initiateXception(noOfClasses)
curXception, annealer, early, checkpoint = initiateParams(className, curXception, lr,model_name)
curHistory = modelFit(curXception, annealer, early, checkpoint, epochs=gEpochs, class_weight=class_weight)
plotOutput(curHistory, className, model_name, gEpochs)
acc=evalModel(curXception)
saveModel(curXception, className, model_name, acc)
part[c]['models'][model_name]={"model":curXception,'acc':acc}
callPlot(curXception, model_name, className, class_names)

model_name="InceptionV3"
print("######################################################")
print(f"RESULTS FOR{model_name}")
curInceptionV3 = initiateInceptionV3(noOfClasses)
#modelSummary(curInceptionV3)
curInceptionV3, annealer, early, checkpoint = initiateParams(className, curInceptionV3, lr,model_name)
curHistory = modelFit(curInceptionV3, annealer, early, checkpoint, epochs=gEpochs, class_weight=class_weight)
plotOutput(curHistory, className, model_name, gEpochs)
acc=evalModel(curInceptionV3)
saveModel(curInceptionV3, className, model_name, acc)
part[c]['models'][model_name]={"model":curInceptionV3,'acc':acc}
callPlot(curInceptionV3, model_name, className, class_names)

model_name="DenseNet201"
print("######################################################")
print(f"RESULTS FOR{model_name}")
curDenseNet201= initiateDenseNet201(noOfClasses)
curDenseNet201, annealer, early, checkpoint = initiateParams(className, curDenseNet201, lr,model_name)
curHistory = modelFit(curDenseNet201, annealer, early, checkpoint, epochs=gEpochs, class_weight=class_weight)
plotOutput(curHistory, className, model_name, gEpochs)
acc=evalModel(curDenseNet201)
saveModel(curDenseNet201, className, model_name, acc)
part[c]['models'][model_name]={"model":curDenseNet201,'acc':acc}
callPlot(curDenseNet201, model_name, className, class_names)

print("######################################################")
print(f"RESULTS FOR ENSEMBLE")
Ensemble(c, class_names)

    
    


Total :  Found 6326 files belonging to 4 classes.

For Training :  Found 6326 images belonging to 4 classes.

For Val :  Found 38 images belonging to 4 classes.
Found 771 images belonging to 4 classes.

No of Classes :  4
Classes :  ['COVID19', 'NORMAL', 'PNEUMONIA', 'TURBERCULOSIS']
Image Shape :  (32, 256, 256, 3)
######################################################
RESULTS FORVGG-19
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg19/vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5
Epoch 1/30
Epoch 1: val_accuracy improved from -inf to 0.89474, saving model to lung/lungVGG-19-01-0.895.h5
Epoch 2/30
Epoch 2: val_accuracy improved from 0.89474 to 0.94737, saving model to lung/lungVGG-19-02-0.947.h5
Epoch 3/30
 43/198 [=====>........................] - ETA: 59s - loss: 0.0801 - accuracy: 0.9709