In [1]:
import pandas as pd
import numpy as np
import math

In [None]:
import zipfile
with zipfile.ZipFile('Hamza_Custom_Data.zip', 'r') as zip_ref:
    zip_ref.extractall()

In [None]:
!pip install tqdm

In [2]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import RepeatedStratifiedKFold

In [3]:
import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D,Flatten,MaxPooling2D,Dropout,Dense,Activation,BatchNormalization

In [4]:
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import to_categorical
import tensorflow_hub as hub
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.schedules import ExponentialDecay

In [5]:
from tqdm import tqdm
from tabulate import tabulate

In [None]:
os.listdir('Hamza_Custom_Data/CT_Scan/Covid/CT_COVID')

In [8]:
# selecting random number of images from multiple folders and copying it to the destination folder

import os
import shutil
import glob
import random
# ctscan_covid_data = random.sample(glob.glob("Hamza_Custom_Data/CT_Scan/Covid/CT_COVID/*.png"), 330)
# ctscan_normal_data = random.sample(glob.glob("Hamza_Custom_Data/CT_Scan/Normal/CT_nonCOVID/*"), 330)

# ultrasound_covid_data = random.sample(glob.glob("Hamza_Custom_Data/ultrasound/Covid/*"), 330)
# ultrasound_normal_data = random.sample(glob.glob("Hamza_Custom_Data/ultrasound/Normal/*.jpg"), 330)

# xray_covid_data = random.sample(glob.glob("Hamza_Custom_Data/X_ray/Covid/*.png"), 330)
# xray_normal_data = random.sample(glob.glob("Hamza_Custom_Data/X_ray/Normal/*.png"), 330)

ctscan_covid_data = random.sample(glob.glob("Hamza_Custom_Data/CT_Scan/Covid/*.png"), 330)
ctscan_normal_data = random.sample(glob.glob("Hamza_Custom_Data/CT_Scan/Normal/*"), 330)   

ultrasound_covid_data = random.sample(glob.glob("Hamza_Custom_Data/Ultra_Sound/Covid/*"), 330)
ultrasound_normal_data = random.sample(glob.glob("Hamza_Custom_Data/Ultra_Sound/Normal/*.jpg"), 330)

xray_covid_data = random.sample(glob.glob("Hamza_Custom_Data/X_Ray/Covid/*.png"), 330)
xray_normal_data = random.sample(glob.glob("Hamza_Custom_Data/X_Ray/Normal/*.png"), 330)

covid_list=[ctscan_covid_data,ultrasound_covid_data,xray_covid_data]

normal_list = [ctscan_normal_data, ultrasound_normal_data,xray_normal_data]

if os.path.isdir('Data/Covid'):
    shutil.rmtree('Data/Covid')
    
if os.path.isdir('Data/Normal'):
    shutil.rmtree('Data/Normal')
    
# if os.path.isdir('/content/drive/MyDrive/Data/X_Ray'):
#     shutil.rmtree('/content/drive/MyDrive/Data/X_Ray')

for data in covid_list:
    dest = 'Data/Covid'
    if not os.path.isdir('Data/Covid'):
        os.makedirs(dest)
    for imgs in data:
        shutil.copy(imgs, dest)

for data in normal_list:
    dest = 'Data/Normal' 
    if not os.path.isdir('Data/Normal'):
          os.makedirs(dest)
    for imgs in data:
        shutil.copy(imgs, dest)

In [9]:
class Dataset:
    def __init__(self, data_root: str, *, test_size: float, img_size: int, seed: int = 0) -> None:
        self.label2index = {}
        self.index2label = {}
        
        # Discover the class label names.
        class_labels = os.listdir(data_root)
        self.nclasses = len(class_labels)
        X, y = [], []
        
        for label_index, label in enumerate(class_labels):
            # Load the images for this class label.
            self.label2index[label_index] = label
            self.index2label[label] = label_index
            
            img_names = os.listdir(os.path.join(data_root, label))
            for img_name in img_names:
                img_path = os.path.join(data_root, label, img_name)
                img = load_img(img_path, target_size=(img_size, img_size, 3))
                X.append(img_to_array(img))
                y.append(label_index)
        
        X = np.array(X)
        y = np.array(y)
        one_hot_y = to_categorical(y, num_classes=self.nclasses)
        
        # Make a stratified split.
        self.X, self.X_test, self.labels, self.labels_test, self.y, self.y_test = train_test_split(
            X, y, one_hot_y, test_size=test_size, random_state=seed, stratify=y)

In [10]:
# 660 * 0.7 = 462 
# X shape in 3 dimensions
# Y has 2 classes (Covid, Normal)
import os
import shutil
import glob
import random
data = Dataset("Data/", test_size=0.3, img_size=224)
print(data.X.shape, data.y.shape)

# Normal = Dataset("/content/drive/MyDrive/Data/", test_size=0.3, img_size=224)
# print(Normal.X.shape, Normal.y.shape)

(1386, 224, 224, 3) (1386, 2)


In [11]:
# feature extractor model resnet 101 v2
model = hub.KerasLayer("https://tfhub.dev/google/bit/m-r101x1/1", trainable=False)

In [13]:
covid_embedding = model(data.X)
covid_test_embedding = model(data.X_test)
print(covid_embedding.shape, covid_test_embedding.shape)

(1386, 1280) (594, 1280)


In [14]:
# feature extractor model mobinet v2
model_mobinet = hub.KerasLayer("https://tfhub.dev/google/tf2-preview/mobilenet_v2/feature_vector/4", trainable=False)

In [15]:
covid_embedding_mob = model_mobinet(data.X)
covid_test_embedding_mob = model_mobinet(data.X_test)
print(covid_embedding_mob.shape, covid_test_embedding_mob.shape)

(1386, 1280) (594, 1280)


In [29]:
def make_model(nclasses: int):
    model = Sequential()
    model.add(Conv2D(filters = 16, kernel_size = (3,3), padding='same', input_shape=(2,2,320), activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(24, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(32, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(48, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(64, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Flatten())
    model.add(Dense(128,activation="relu"))
    model.add(Dropout(0.3))
    model.add(Dense(128,activation="relu"))
    model.add(Dense(nclasses, activation="sigmoid"))
    #model.summary()
    return model

In [30]:
def evaluate_model(nclasses, X, y, X_test, y_test, *, epochs: int, batch_size: int, learning_rate: float, 
                   **model_params) -> tuple:
    
    # Math to compute the learning rate schedule. We will divide our
    # learning rate by a factor of 10 every 30% of the optimizer's
    # total steps.
    steps_per_epoch = math.ceil(len(X) / batch_size)
    third_of_total_steps = math.floor(epochs * steps_per_epoch / 3)
    
    # Make and compile the model.
    #model = model_maker(nclasses, **model_params)
    model = make_model(nclasses)
    model.compile(
        optimizer=Adam(
            learning_rate=ExponentialDecay(
                learning_rate,
                decay_steps=third_of_total_steps,
                decay_rate=0.1,
                staircase=True
            )
        ),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )
    
    # Train the model on the training set and evaluate it on the test set.
    history = model.fit(X, y, batch_size=batch_size, epochs=epochs, verbose=0)
    _, train_acc = model.evaluate(X, y, batch_size=batch_size, verbose=0)
    _, test_acc = model.evaluate(X_test, y_test, batch_size=batch_size, verbose=0)
    return model, history, train_acc, test_acc

In [31]:
def cv_evaluate_model(
    X, y, labels, *, nfolds: int, nrepeats: int, epochs: int, batch_size: int,
    learning_rate: float, model_maker, verbose: bool = True, seed: int = 0,
    **model_params
) -> dict:
    """
    Performs `nfolds` cross-validated training and evaluation of a
    model hyperparameter configuration. Returns a dictionary of
    statistics about the outcome of the cross-validated experiment.
    """
    _, nclasses = y.shape
    train_accs, test_accs = [], []
    
    # Train and evaluate the model for each fold.
    for train_index, test_index in tqdm(
        RepeatedStratifiedKFold(
            n_splits=nfolds, n_repeats=nrepeats, random_state=seed
        ).split(X, labels),
        total=nfolds*nrepeats, disable=not verbose
    ):
        
        # Select the data for this fold.
        X_train_fold = tf.gather(X, train_index) 
        y_train_fold = tf.gather(y, train_index)
        X_test_fold = tf.gather(X, test_index)
        y_test_fold = tf.gather(y, test_index)
        
        # Train and evaluate the model.
        _, _, train_acc, test_acc = evaluate_model(
            nclasses,
            X_train_fold,
            y_train_fold,
            X_test_fold,
            y_test_fold,
            epochs=epochs,
            batch_size=batch_size,
            learning_rate=learning_rate,
            model_maker=model_maker,
            **model_params
        )
        train_accs.append(train_acc)
        test_accs.append(test_acc)
    
    # Aggregate.
    results = {
        "train_mean": np.mean(train_accs),
        "train_std": np.std(train_accs),
        "test_mean": np.mean(test_accs),
        "test_std": np.std(test_accs)
    }
    
    # Report.
    if verbose:
        print(
            tabulate(
                [
                    ["Train", results["train_mean"], results["train_std"]],
                    ["Test", results["test_mean"], results["test_std"]]
                ],
                headers=["Set", "Accuracy", "Standard Deviation"]
            )
        )
    
    return results

In [32]:
X_list = [covid_embedding]
y_list = [data.y]
data_labels = [data.labels]

In [33]:
covid_embedding.shape

TensorShape([1386, 1280])

In [34]:
# Chaning shape for CNN2D
# for 2048 - > [-1,2,2,512]
# for 1280 - > [-1,2,2,320]

covid_embedding2d = tf.reshape(covid_embedding, [-1,2,2,320])
print(covid_embedding2d.shape)

covid_embedding2d_mob = tf.reshape(covid_embedding_mob, [-1,2,2,320])
print(covid_embedding2d_mob.shape)

X_list = [covid_embedding2d]
X_list_mob = [covid_embedding2d_mob]

(1386, 2, 2, 320)
(1386, 2, 2, 320)


### Model Results 

In [35]:
n_class = y_list[0].shape[1]
model_cnn = make_model(n_class)
model_cnn.summary()

Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_10 (Conv2D)           (None, 2, 2, 16)          46096     
_________________________________________________________________
batch_normalization_10 (Batc (None, 2, 2, 16)          64        
_________________________________________________________________
max_pooling2d_10 (MaxPooling (None, 2, 2, 16)          0         
_________________________________________________________________
conv2d_11 (Conv2D)           (None, 2, 2, 24)          3480      
_________________________________________________________________
batch_normalization_11 (Batc (None, 2, 2, 24)          96        
_________________________________________________________________
max_pooling2d_11 (MaxPooling (None, 2, 2, 24)          0         
_________________________________________________________________
conv2d_12 (Conv2D)           (None, 2, 2, 32)         

### Results with Resnet 101 v2

In [36]:
model_evaluate_params = {
    "X": X_list[0],
    "y": y_list[0],
    "labels": data_labels[0],
    "nfolds": 10,
    "nrepeats": 3,
    "model_maker": make_model,
    "epochs": 200,
    "batch_size": 32,
    "verbose": False,
    "learning_rate": 3e-3 #0.003
}

In [37]:
_ = cv_evaluate_model(
    **{
        **model_evaluate_params,
        "verbose": True
    }
)

  3%|██▋                                                                             | 1/30 [03:24<1:39:04, 205.00s/it]


KeyboardInterrupt: 

### Results with Mobinet v2

In [38]:
model_evaluate_params_mob = {
    "X": X_list_mob[0],
    "y": y_list[0],
    "labels": data_labels[0],
    "nfolds": 10,
    "nrepeats": 3,
    "model_maker": make_model,
    "epochs": 200,
    "batch_size": 32,
    "verbose": False,
    "learning_rate": 3e-3 #0.003
}

In [39]:
_ = cv_evaluate_model(
    **{
        **model_evaluate_params_mob,
        "verbose": True
    }
)

100%|███████████████████████████████████████████████████████████████████████████████| 30/30 [1:00:55<00:00, 121.86s/it]

Set      Accuracy    Standard Deviation
-----  ----------  --------------------
Train    0.99984            0.000599645
Test     0.839582           0.0335461





### Graph Method for Resnet 101 v2

In [None]:
X_value = X_list[0]
y_value = y_list[0]
labels = data_labels[0]
train_accs, test_accs = [], []

nclasses = y_value.shape[1]
nfolds = 10
nrepeats = 3
epochs = 200
batch_size = 32
learning_rate = 3e-3 #0.003

# Train and evaluate the model for each fold.
for train_index, test_index in tqdm(
    RepeatedStratifiedKFold(n_splits = nfolds, n_repeats = nrepeats, random_state = 0).split(X, labels), \
    total = nfolds*nrepeats, disable = not True #(verbose - True)
):

    # Select the data for this fold.
    X_train = tf.gather(X_value, train_index) 
    y_train = tf.gather(y_value, train_index)
    X_test = tf.gather(X_value, test_index)
    y_test = tf.gather(y_value, test_index)
    
    print(X_train.shape)
    print(X_test.shape)
    print(y_train.shape)
    print(y_test.shape)
    
    #Define Model
    model = Sequential()
    model.add(Conv2D(filters = 16, kernel_size = (3,3), padding='same', input_shape=(2,2,320), activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(24, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(32, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(48, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(64, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Flatten())
    model.add(Dense(128,activation="relu"))
    model.add(Dropout(0.3))
    model.add(Dense(128,activation="relu"))
    model.add(Dense(nclasses, activation="sigmoid"))

    #Learning Rate
    steps_per_epoch = math.ceil(len(X_train) / batch_size)
    third_of_total_steps = math.floor(epochs * steps_per_epoch / 3)
    
    # Train and evaluate the model.
    model.compile(
        optimizer=Adam(
            learning_rate=ExponentialDecay(
                learning_rate,
                decay_steps=third_of_total_steps,
                decay_rate=0.1,
                staircase=True
            )
        ),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )
    
    # Train the model on the training set and evaluate it on the test set.
    history = (model.fit(X_train, y_train, batch_size = batch_size, epochs = epochs, verbose=1, \
                         validation_data=(X_test, y_test)))
    train_loss, train_acc = model.evaluate(X_train, y_train, batch_size = batch_size, verbose=0)
    test_loss, test_acc = model.evaluate(X_test, y_test, batch_size = batch_size, verbose=0)
    
    train_accs.append(train_acc)
    test_accs.append(test_acc)

# Aggregate.
results = {
    "Train_Acc": np.mean(train_accs),
    "Train_std": np.std(train_accs),
    "Test_Acc": np.mean(test_accs),
    "Test_std": np.std(test_accs)
}

# Report.
if verbose:
    print(
        tabulate(
            [
                ["Train", results["Train_Acc"], results["Train_std"]],
                ["Test", results["Test_Acc"], results["Test_std"]]
            ],
            headers=["Set", "Accuracy", "Standard Deviation"]
        )
    )

In [None]:
# ploting graph by using values of last epoch for graphs
plt.style.use('ggplot')

def plot_history(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    x = range(1, len(acc) + 1)

    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(x, acc, 'b', label='Training acc')
    plt.plot(x, val_acc, 'r', label='Validation acc')
    plt.title('Training and validation accuracy')
    plt.ylabel('Accuracy value (%)')
    plt.xlabel('No. epoch')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(x, loss, 'b', label='Training loss')
    plt.plot(x, val_loss, 'r', label='Validation loss')
    plt.title('Training and validation loss')
    plt.ylabel('Loss value')
    plt.xlabel('No. epoch')
    plt.legend()

In [None]:
plot_history(history)

In [None]:
#All in one Graph (loss and accuracy)
plt.style.use('default')
plt.figure(figsize=(7, 3))
plt.plot(history.history["accuracy"])
plt.plot(history.history["val_accuracy"])
plt.plot(history.history["loss"]) 
plt.plot(history.history["val_loss"]) 
plt.title("Model Evaluation")
plt.ylabel("Value (%)")
plt.xlabel("No. of Epochs")
plt.legend(["Training Accuracy","Validation Accuracy","Training loss","Validation loss"])
plt.show()

In [None]:
#These confussion matrix are not correct
y_pred = model.predict(X_test)
y_pred = np.argmax(y_pred, axis=1)
y_test1 = np.argmax(y_test, axis=1)

print(accuracy_score(y_test1, y_pred))
print(confusion_matrix(y_test,y_pred))
print(classification_report(y_test,y_pred))

### Graph Method for Mobinet

In [None]:
X_value = X_list_mob[0]
y_value = y_list[0]
labels = data_labels[0]
train_accs, test_accs = [], []

nclasses = y_value.shape[1]
nfolds = 10
nrepeats = 3
epochs = 200
batch_size = 32
learning_rate = 3e-3 #0.003

# Train and evaluate the model for each fold.
for train_index, test_index in tqdm(
    RepeatedStratifiedKFold(n_splits = nfolds, n_repeats = nrepeats, random_state = 0).split(X, labels), \
    total = nfolds*nrepeats, disable = not True #(verbose - True)
):

    # Select the data for this fold.
    X_train = tf.gather(X_value, train_index) 
    y_train = tf.gather(y_value, train_index)
    X_test = tf.gather(X_value, test_index)
    y_test = tf.gather(y_value, test_index)
    
    print(X_train.shape)
    print(X_test.shape)
    print(y_train.shape)
    print(y_test.shape)
    
    #Define Model
    model = Sequential()
    model.add(Conv2D(filters = 16, kernel_size = (3,3), padding='same', input_shape=(2,2,320), activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(24, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(32, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(48, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Conv2D(64, (3,3), padding='same', activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(1,1))
    model.add(Flatten())
    model.add(Dense(128,activation="relu"))
    model.add(Dropout(0.3))
    model.add(Dense(128,activation="relu"))
    model.add(Dense(nclasses, activation="sigmoid"))

    #Learning Rate
    steps_per_epoch = math.ceil(len(X_train) / batch_size)
    third_of_total_steps = math.floor(epochs * steps_per_epoch / 3)
    
    # Train and evaluate the model.
    model.compile(
        optimizer=Adam(
            learning_rate=ExponentialDecay(
                learning_rate,
                decay_steps=third_of_total_steps,
                decay_rate=0.1,
                staircase=True
            )
        ),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )
    
    # Train the model on the training set and evaluate it on the test set.
    history = (model.fit(X_train, y_train, batch_size = batch_size, epochs = epochs, verbose=1, \
                         validation_data=(X_test, y_test)))
    train_loss, train_acc = model.evaluate(X_train, y_train, batch_size = batch_size, verbose=0)
    test_loss, test_acc = model.evaluate(X_test, y_test, batch_size = batch_size, verbose=0)
    
    train_accs.append(train_acc)
    test_accs.append(test_acc)

# Aggregate.
results = {
    "Train_Acc": np.mean(train_accs),
    "Train_std": np.std(train_accs),
    "Test_Acc": np.mean(test_accs),
    "Test_std": np.std(test_accs)
}

# Report.
if verbose:
    print(
        tabulate(
            [
                ["Train", results["Train_Acc"], results["Train_std"]],
                ["Test", results["Test_Acc"], results["Test_std"]]
            ],
            headers=["Set", "Accuracy", "Standard Deviation"]
        )
    )

In [None]:
# ploting graph by using values of last epoch for graphs
plt.style.use('ggplot')

def plot_history(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    x = range(1, len(acc) + 1)

    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(x, acc, 'b', label='Training acc')
    plt.plot(x, val_acc, 'r', label='Validation acc')
    plt.title('Training and validation accuracy')
    plt.ylabel('Accuracy value (%)')
    plt.xlabel('No. epoch')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(x, loss, 'b', label='Training loss')
    plt.plot(x, val_loss, 'r', label='Validation loss')
    plt.title('Training and validation loss')
    plt.ylabel('Loss value')
    plt.xlabel('No. epoch')
    plt.legend()

In [None]:
plot_history(history)

In [None]:
#All in one Graph (loss and accuracy)
plt.style.use('default')
plt.figure(figsize=(7, 3))
plt.plot(history.history["accuracy"])
plt.plot(history.history["val_accuracy"])
plt.plot(history.history["loss"]) 
plt.plot(history.history["val_loss"]) 
plt.title("Model Evaluation")
plt.ylabel("Value (%)")
plt.xlabel("No. of Epochs")
plt.legend(["Training Accuracy","Validation Accuracy","Training loss","Validation loss"])
plt.show()

In [None]:
#These confussion matrix are not correct
y_pred = model.predict(X_test)
y_pred = np.argmax(y_pred, axis=1)
y_test1 = np.argmax(y_test, axis=1)

print(accuracy_score(y_test1, y_pred))
print(confusion_matrix(y_test,y_pred))
print(classification_report(y_test,y_pred))