<a href="https://colab.research.google.com/github/Artshouldterrify/btp/blob/main/CNN_Pneumonia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BTP Project- Pneumonia X-Ray Classification
***
This project aims to classify X-Rays into two categories - ones afflicted by Pneumonia and ones that are normal, via the use of an ensemble of Deep Learning Convolutional Neural Networks (CNNs).

We use two datasets, one containing images of 'Normal' and 'Pneumonia' X-Rays and one containing masks of chest X-Rays for segmentation purposes. We preprocess our images using a U-net based segmentation model and apply data augumentation to it, training the U-net model on the mask dataset.

We then train several CNNs and combine them using an ensemble to form a classifier. We achieve a testing accuracy of 94.6% and a recall of 98.205%.



## Importing the data
***
The datasets are hosted on Kaggle, and are imported using the *kaggle* library.

**Note**: This code requires an uploaded kaggle.json file to work.

In [None]:
!pip install kaggle
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download paultimothymooney/chest-xray-pneumonia

In [None]:
!unzip /content/chest-xray-pneumonia.zip

In [None]:
# installing a library we'll need that's not automatically availlable in the Colab environment.

!pip install tf-clahe

## Setting up data pipelines
***
To feed in data to our models, we set up data pipelines using Keras preprocessing. We create a custom function that'll allow us to create a separate pipeline for each model we make, allowing us to perform specific and modifiable preprocessing for each model.

In [None]:
# required imports

import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import cv2
import tf-clahe
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

In [None]:
# loading the images
train_dir = "/content/chest_xray/train"
test_dir = "/content/chest_xray/test"

# pipeline generator
def datagen(func):
  train_gen = tf.keras.preprocessing.image.ImageDataGenerator(
              preprocessing_function=func,
              horizontal_flip = True,
              vertical_flip = True,
              rotation_range = 0.2,
              width_shift_range = 0.1,
              height_shift_range = 0.1,
              shear_range = 0.2,
              zoom_range = 0.2)

  test_gen = tf.keras.preprocessing.image.ImageDataGenerator(
              preprocessing_function=func)

  train_set = train_gen.flow_from_directory(train_dir, class_mode = "binary", batch_size = 32, target_size = (224, 224))
  test_set = test_gen.flow_from_directory(test_dir, class_mode = "binary", batch_size = 32, target_size = (224, 224), shuffle = False, seed = 10)
  return train_set, test_set

## Visualizing the data
***

In [None]:
# loading the images into a Dataset object

batch_size, h, w = 32, 224, 224
train_ds = tf.keras.utils.image_dataset_from_directory("/content/chest_xray/train", image_size=(h, w), subset='training', validation_split=0.2, seed=12, batch_size=batch_size, labels="inferred", label_mode="int")
test_ds = tf.keras.utils.image_dataset_from_directory("/content/chest_xray/test", image_size=(h, w), seed=12, batch_size=batch_size, labels="inferred", label_mode="int")
val_ds = tf.keras.utils.image_dataset_from_directory("/content/chest_xray/train", image_size=(h, w), subset='validation', validation_split=0.2, seed=12, batch_size=batch_size, labels="inferred", label_mode="int")
class_names = train_ds.class_names
train_ds.class_names, test_ds.class_names, val_ds.class_names

In [None]:
# plotting 9 random images with their labels

plt.figure(figsize=(10,10))
for images, labels in train_ds.take(1):
  for i in range(9):
    ax = plt.subplot(3, 3, i+1)
    plt.imshow(images[i].numpy().astype("uint8"))
    plt.title(train_ds.class_names[labels[i]])
    plt.axis("off")

In [None]:
# checking the size of each image

for images, labels in train_ds.take(1):
  for i in range(1):
    print(images[i].numpy().shape)

## Defining the model
***

We train multiple models based on differing pre-trained models and pick the best performing ones to use in an Ensemble. The code for training these models is largely the same, and we indicate the nuances which allow us to change the pre-trained model being used.

We firstly import the Segmentation mask-generating model. We include this pre-processing step as a layer above our our actual model.

In [None]:
# loading in the kaggle-trained segmentation model

unet_mask = tf.keras.models.load_model("/content/drive/MyDrive/newUnet (1).h5")
unet_mask.trainable = False
unet_mask.summary()

In [None]:
# defining a custom layer to generate a mask

class maskLayer(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super(maskLayer, self).__init__(**kwargs)
  def call(self, inputs):
    grayscale_image = tf.image.rgb_to_grayscale(inputs)
    img_clahe = tf_clahe.clahe(grayscale_image, clip_limit=10.0)
    mask = unet_mask(grayscale_image)
    mask = tf.where(mask>0.5, 1.0, 0.0)
    masked_output = inputs * mask
    return masked_output

We test this mask layer on a random image taken from our dataset.

In [None]:
# testing the mask layer

transf = maskLayer()

plt.figure(figsize=(10,10))
for images, labels in train_ds.take(1):
  print(images[0].shape)
  ax = plt.subplot(2,1,1)
  im = transf(tf.reshape(images[0], (1,224,224,3))).numpy()
  plt.imshow(images[0].numpy().astype("uint8"))
  ax = plt.subplot(2,1,2)
  im = tf.reshape(im, (224,224,3))
  plt.imshow(im.numpy().astype("uint8"))

We're now ready to define our actual model. We've shown here an example of a model based off MobileNetV2, which can be changed to, for example, ResNetV2 by simply modifying the model defined as the *base_model*, using different modules of *tf.keras.applications*. As an example, we include code of ResNetV2 in the next cell.

In [None]:
# model

base_model = tf.keras.applications.mobilenet_v2.MobileNetV2(include_top=False, weights='imagenet', input_shape=(224,224,3))
base_model.trainable = False
inputs = tf.keras.Input(shape=(224,224,3))
x = maskLayer()(inputs)
x = base_model(x, training=False)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(128, activation="relu")(x)
x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dense(32, activation="relu")(x)
outputs = tf.keras.layers.Dense(1, activation="sigmoid")(x)
model = tf.keras.Model(inputs, outputs)
model.summary()

model.compile(optimizer = tf.keras.optimizers.Adam(), loss = "binary_crossentropy", metrics = ["accuracy"])

In [None]:
# model

base_model = tf.keras.applications.resnet.ResNet50V2(include_top=False, weights='imagenet', input_shape=(224,224,3))
base_model.trainable = False
inputs = tf.keras.Input(shape=(224,224,3))
x = maskLayer()(inputs)
x = base_model(x, training=False)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(128, activation="relu")(x)
x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dense(32, activation="relu")(x)
outputs = tf.keras.layers.Dense(1, activation="sigmoid")(x)
model = tf.keras.Model(inputs, outputs)
model.summary()

model.compile(optimizer = tf.keras.optimizers.Adam(), loss = "binary_crossentropy", metrics = ["accuracy"])

We now generate our training and testing data pipelines using the function we defined previously. We note here that the preprocessing function passed as an argument to our function must correspond to the pre-trained model you're using.

We additionally define a callback to stop training if validation performance hasn't increased for a set amount of training iterations.

In [None]:
# training and testing set
tr_set, te_set = datagen(tf.keras.applications.mobilenet_v2.preprocess_input)

# callback
early_stopping_callbacks = tf.keras.callbacks.EarlyStopping(patience = 5,
                                                            restore_best_weights = True,
                                                            verbose = 1)

In [None]:
# training the model
history_mv2 = model.fit(tr_set, epochs = 50, validation_data = te_set, steps_per_epoch = 100,
                              callbacks = [early_stopping_callbacks])

In [None]:
# evaluation testing accuracy
model.evaluate(te_set)

In [None]:
# loss curve
l = np.concatenate([history_mv2.history['loss']])
acc = np.concatenate([history_mv2.history['accuracy']])
lv = np.concatenate([history_mv2.history['val_loss']])
accv = np.concatenate([history_mv2.history['val_accuracy']])

plt.plot(l, label="Loss")
plt.plot(acc, label="Accuracy")
plt.legend()
plt.show()
plt.plot(lv, label="Val_Loss")
plt.plot(accv, label="Val_Accuracy")
plt.legend()
plt.show()

Fine-tuning the models by unfreezing some layers of the base model.

In [None]:
# fine tune
base_model = model.layers[1]
base_model.trainable = True

fine_tune_at = 45

for layer in base_model.layers[:fine_tune_at]:
  layer.trainable = False
model.compile(loss = "binary_crossentropy", optimizer=tf.keras.optimizers.Adam(learning_rate=0.00001), metrics=["accuracy"])

history_mb_ft = model.fit(tr_set, epochs = 100, validation_data = te_set, steps_per_epoch = 100,
                              callbacks = [early_stopping_callbacks])

In [None]:
# loss curve for fine-tuning
l = np.concatenate([history_mb_ft.history['loss']])
acc = np.concatenate([history_mb_ft.history['accuracy']])
lv = np.concatenate([history_mb_ft.history['val_loss']])
accv = np.concatenate([history_mb_ft.history['val_accuracy']])

plt.plot(l, label="Loss")
plt.plot(acc, label="Accuracy")
plt.legend()
plt.show()
plt.plot(lv, label="Val_Loss")
plt.plot(accv, label="Val_Accuracy")
plt.legend()
plt.show()

In [None]:
# testing the final model
model.evaluate(test_ds)

We now generate a Confusion Matrix, calculating different metrics for our model.

In [None]:
# confusion matrix
preds = model.predict(te_set)
y_pred = tf.where(preds<=0.5,0,1)

labels = te_set.labels
y_true = labels

cm = confusion_matrix(y_pred, y_true)
f = ConfusionMatrixDisplay(cm, display_labels=class_names)
fig, ax = plt.subplots(figsize=(10,10))
f.plot(ax=ax)

In [None]:
# precision, recall, f1-score, accuracy
p, r, f, b = sk.precision_score(y_pred, y_true, average="weighted"), sk.recall_score(y_pred, y_true, average="weighted"), sk.f1_score(y_pred, y_true, average="weighted"), sk.balanced_accuracy_score(y_pred, y_true)
p, r, f, b

# Ensemble
***
We load the best models and combine them into an ensemble.

In [None]:
# loading the models
rv2 = tf.keras.models.load_model("/content/drive/MyDrive/rsnt_cs.h5")
mbv2 = tf.keras.models.load_model("/content/drive/MyDrive/mobilenet_model.h5")
eff = tf.keras.models.load_model("/content/drive/MyDrive/pretrain_eff (1).h5")
tr_set, te_set = datagen(tf.keras.applications.mobilenet_v2.preprocess_input)
tr_set_eff, te_set_eff = datagen(tf.keras.applications.efficientnet_v2.preprocess_input)

In [None]:
r_pred, e_pred, m_pred = rv2.predict(te_set), eff.predict(te_set_eff), mbv2.predict(te_set)

Combining the models according to some weights, we ran multiple sets of weights to find the best ones using the following code.

In [None]:
best_params = list()
best_score = 0.0
labels = te_set.labels
y_true = labels
A = np.arange(1, 10, 0.5)
for a in A:
  for b in A:
    for c in A:
      tot = (a*r_pred + b*e_pred + c*m_pred)/(a + b + c)
      y_pred = tf.where(tot<=0.5,0,1)
      score = sk.f1_score(y_true, y_pred)
      if score > best_score:
        best_score = score
        best_params = [a,b,c]
best_params, best_score

Final combination.

In [None]:
model_outputs = (3.0*r_pred + 1.0*e_pred + 2.0*m_pred)/(6.0)

Finally creating a Confusion Matrix and calculating metrics.

In [None]:
# confusion matrix
y_pred = tf.where(model_outputs<=0.5,0,1)
labels = te_set_eff.labels
y_true = labels

cm = confusion_matrix(y_pred, y_true)
f = ConfusionMatrixDisplay(cm, display_labels=class_names)
fig, ax = plt.subplots(figsize=(10,10))
f.plot(ax=ax)

In [None]:
# precision, recall, f1-score, accuracy
p, r, f, b = sk.precision_score(y_pred, y_true, average="weighted"), sk.recall_score(y_pred, y_true, average="weighted"), sk.f1_score(y_pred, y_true, average="weighted"), sk.balanced_accuracy_score(y_pred, y_true)
p, r, f, b