In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np  # linear algebra
import pandas as pd  # data processing, CSV file I/O (e.g. pd.read_csv)
import glob
import keras
import tensorflow as tf
from keras.applications import ResNet152V2, ResNet50V2
from keras.optimizers import Adam
from keras.layers import Dense, GlobalAveragePooling2D, Flatten, Dropout, Input
from keras.callbacks import ReduceLROnPlateau, EarlyStopping
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.utils import shuffle

Generate Data From chest_xray Folder:
* Here we split the data in a way that we have bacteria, virus and normal images. in the proper way- some of each and not in a total randomize way
* TODO: Need to chek why the depth of the image is 3 and not 1 as in regular cnn network

In [None]:
# IMG_SIZE = 150
IMG_DEPTH = 3
IMG_SIZE = 224
BATCH = 16
SEED = 42

normal_dataset = glob.glob('/content/drive/MyDrive/Deep Learning Project/chest_xray/NORMAL/*.jpeg')
pneumonia_dataset = glob.glob('/content/drive/MyDrive/Deep Learning Project/chest_xray/PNEUMONIA/*.jpeg')
virus_dataset = list(filter(lambda x: 'virus' in x, pneumonia_dataset))
bacterial_dataset = list(filter(lambda x: 'bacteria' in x, pneumonia_dataset))


Generate Train, Validation and Test Sets:

In [None]:
def split_data(dataSet, testSize, valSize):
    train, test= train_test_split(dataSet, test_size=testSize, random_state=SEED, shuffle=True)
    train, val= train_test_split(train, test_size=valSize, random_state=SEED, shuffle=True)
    return train, test, val


train_normal, test_normal, val_normal = split_data(normal_dataset, 0.15, 0.038)
train_bacterial, test_bacterial, val_bacterial = split_data(bacterial_dataset, 0.05, 0.0095)
train_virus, test_virus, val_virus = split_data(virus_dataset, 0.075, 0.019)



train = [x for x in train_normal]
train.extend([x for x in train_bacterial])
train.extend([x for x in train_virus])

test = [x for x in test_normal]
test.extend([x for x in test_bacterial])
test.extend([x for x in test_virus])

val = [x for x in val_normal]
val.extend([x for x in val_bacterial])
val.extend([x for x in val_virus])

df_train = pd.DataFrame(np.concatenate([['Normal']*(len(train_normal)) , ['Pneumonia']*(len(train_bacterial) + len(train_virus))]), columns = ['class'])
df_train['image'] = [x for x in train]

df_val = pd.DataFrame(np.concatenate([['Normal']*(len(val_normal)) , ['Pneumonia']*(len(val_bacterial) + len(val_virus))]), columns = ['class'])
df_val['image'] = [x for x in val]

df_test = pd.DataFrame(np.concatenate([['Normal']*len(test_normal) , ['Pneumonia']*(len(test_bacterial) + len(test_virus))]), columns = ['class'])
df_test['image'] = [x for x in test]



Perform Data Augmentation on Each Set:
* TODO: Need to check why we are doing data augmentation on all 3 sets
  and not just on the training set as in the regular cnn network.

In [None]:

# With data augmentation to prevent overfitting and handling the imbalance in dataset
# Because the dataset is small we "increase" the dataset by change of images parameters.
# In this way we increase our dataset and prevent overfitting.

train_datagen = ImageDataGenerator(rescale=1/255.,
                                  zoom_range = 0.1,
                                  #rotation_range = 0.1,
                                  width_shift_range = 0.1,
                                  height_shift_range = 0.1)

test_datagen = ImageDataGenerator(rescale=1/255.)

ds_train = train_datagen.flow_from_dataframe(df_train,
                                             #directory=train_path, #dataframe contains the full paths
                                             x_col = 'image',
                                             y_col = 'class',
                                             target_size = (IMG_SIZE, IMG_SIZE),
                                             class_mode = 'binary',
                                             batch_size = BATCH,
                                             seed = SEED)

ds_val = test_datagen.flow_from_dataframe(df_val,
                                            #directory=train_path,
                                            x_col = 'image',
                                            y_col = 'class',
                                            target_size = (IMG_SIZE, IMG_SIZE),
                                            class_mode = 'binary',
                                            batch_size = BATCH,
                                            seed = SEED)

ds_test = test_datagen.flow_from_dataframe(df_test,
                                            #directory=test_path,
                                            x_col = 'image',
                                            y_col = 'class',
                                            target_size = (IMG_SIZE, IMG_SIZE),
                                            class_mode = 'binary',
                                            batch_size = 1,
                                            shuffle = False)

Create and Freeze the Base Model:

In [None]:
# --------- create base model ----------#
base_model = ResNet50V2(
    weights='imagenet',
    input_shape=(IMG_SIZE, IMG_SIZE, IMG_DEPTH),
    include_top=False)

# --------- FREEZE base model ----------#
base_model.trainable = False

**TODO:** check why we need to split the validation set from the training set in advance instead of using validation split.

the following comment is not working:
we use dataaugmantion on all of the training set (including the validatio set because we use validation split in model/fit method). maybe we should check what is the difference in the results if we split the validation set before we train and use data augmentation only on the actual training set.

Get the Pre-Trained Model:

In [None]:
# --------- get pretrained model ----------#
keras.backend.clear_session()

#Input shape = [width, height, color channels]
inputs = Input(shape=(IMG_SIZE, IMG_SIZE, IMG_DEPTH))

x = base_model(inputs, training=False)

# Head
x = GlobalAveragePooling2D()(x)
x = Dense(128, activation='relu')(x)
x = Dropout(0.2)(x)
# x = layers.Dense(64, activation='relu')(x)
# x = layers.Dropout(0.2)(x)

#Final Layer (Output)
output = Dense(1, activation='sigmoid')(x)

model = keras.Model(inputs=[inputs], outputs=output)

Compile and Summerize The Model:

In [None]:
model.compile(loss='binary_crossentropy',
               optimizer = Adam(learning_rate=0.001), metrics='accuracy')

model.summary()

Train the Model:

In [None]:
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    min_delta=1e-7,
    restore_best_weights=True,
)

learning_rate_reduction = ReduceLROnPlateau(
    monitor='val_loss',
    factor = 0.2,
    patience = 2,
    min_delt = 1e-7,
    cooldown = 0,
    verbose = 1
)

Epochs = 20

history = model.fit(ds_train,
          batch_size=BATCH, epochs=Epochs,
          validation_data=ds_val,
          # callbacks=[early_stop, learning_rate_reduction],
          steps_per_epoch=(len(df_train)/BATCH),
          validation_steps=(len(df_val)/BATCH))

Evaluate the Accuracy and Loss:

In [None]:
test_loss, test_accuracy = model.evaluate(ds_test, verbose=0)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)

train_acc = history.history['accuracy']
train_loss = history.history['loss']
val_acc = history.history['val_accuracy']
val_loss = history.history['val_loss']
epochs = range(1, len(train_acc) + 1)

Plot Graphs of the Training and Validation Accuracy and Loss vs Epochs:

In [None]:
accuracy_fig = plt.figure()
plt.plot(epochs, train_acc, 'b-', label='Training Accuracy')
plt.plot(epochs, val_acc, 'r-', label='Validation Accuracy')
plt.title('Training & Validation Accuracy vs. Epochs')
plt.legend()
plt.xlabel("Epochs")
plt.ylabel("Accuracy")

loss_fig = plt.figure()
plt.plot(epochs, train_loss, 'b-', label='Training Loss')
plt.plot(epochs, val_loss, 'r-', label='Validation Loss')
plt.title('Training & Validation Loss vs. Epochs')
plt.legend()
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.show()

Precision vs. Recall Graph with F1-Score Points Marked on the Plot:

In [None]:
num_label = {'Normal': 0, 'Pneumonia' : 1}
y_test = df_test['class'].copy().map(num_label).astype('int')
ds_test.reset()
y_scores = model.predict(ds_test, steps=len(ds_test), verbose=0)  # X_test is the input data for testing

# y_true is the true binary labels
# y_scores is the predicted scores for each sample
# Define thresholds
thresholds = np.arange(0.1, 0.95, 0.05)

precision = []  # Initialize an empty list for precision scores
recall = []  # Initialize an empty list for recall scores

for t in thresholds:
    y_pred = y_scores > t
    p = precision_score(y_test, y_pred)
    r = recall_score(y_test, y_pred)
    precision.append(p)
    recall.append(r)
    print(f'Threshold: {t:.2f}, Precision: {p:.2f}, Recall: {r:.2f}')

f_scores = [f1_score(y_test, y_scores > t) for t in thresholds]

# plot the precision-recall curve
plt.plot(recall, precision)

# plot the F-score points per threshold on the precision-recall curve
plt.scatter(recall, precision, c=f_scores, cmap='viridis')
plt.colorbar(label='F-score')

# add text annotations with the F-score values
for r, p, f in zip(recall, precision, f_scores):
    plt.annotate(f'f1={f:.3f}', (r, p), xytext=(4, 2), textcoords='offset points', fontsize=6, rotation=20)

plt.title('Precision-Recall curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
# # Set the number of digits after the decimal point on the x-axis
# ax = plt.gca()
# ax.xaxis.set_major_formatter(ticker.FormatStrFormatter('%.3f'))
plt.show()

# Find the maximum F1-score and its corresponding threshold
max_f1_score = max(f_scores)
max_threshold = thresholds[f_scores.index(max_f1_score)]

print(f"Maximum F1-score: {max_f1_score:.3f}")
print(f"Corresponding threshold: {max_threshold:.2f}")