# Dissertation Project Code: Common Chest X-ray Classification and Localization with Deep Learning

This is another source code for my final project dissertation on "Chest X-ray Classification and Localization" by analysing (breaking down) each disease

Let's start by importing the libraries

In [None]:
import numpy as np
import pandas as pd
import os
import seaborn as sns
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

from glob import glob
from matplotlib.patches import Rectangle

import keras.backend as K
from keras.preprocessing.image import ImageDataGenerator, load_img, image
from sklearn.model_selection import train_test_split
from keras.models import Sequential, Model
from keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Dropout, Flatten, Dense, BatchNormalization
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.applications import VGG16, VGG19, MobileNet, MobileNetV2, InceptionResNetV2, InceptionV3, ResNet50, DenseNet121, DenseNet169, DenseNet201
from keras import regularizers, optimizers
from keras.applications.vgg16 import decode_predictions, preprocess_input
%matplotlib inline

In [None]:
print(os.listdir('../input/data')) # list the items in the directory

Don't forget to create the glob object and insert the full path to the directory of the image to a new column.

In [None]:
PATH = '../input/data/images*/images/*.png'
pattern = glob(PATH)
print("Total number of images: ", len(pattern))

In [None]:
# load the data
data = pd.read_csv('../input/data/Data_Entry_2017.csv')
data.head()

In [None]:
full_path = {os.path.basename(x): x for x in pattern}
data['full_path'] = data['Image Index'].map(full_path.get)

## Breaking Down The Diseases

To simplify the problem, let's take the 1 vs all approach

Let's take "Infiltration" for example. We can try with different diseases by commenting and uncommenting the corresponding codes below.

In [None]:
# data['is_infiltration'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Infiltration' in result else 'No')
# data['is_atelectasis'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Atelectasis' in result else 'No')
# data['is_cardiomegaly'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Cardiomegaly' in result else 'No')
# data['is_mass'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Mass' in result else 'No')
# data['is_nodule'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Nodule' in result else 'No')
# data['is_pneumonia'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Pneumonia' in result else 'No')
# data['is_consolidation'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Consolidation' in result else 'No')
# data['is_edema'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Edema' in result else 'No')
# data['is_fibrosis'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Fibrosis' in result else 'No')
# data['is_effusion'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Effusion' in result else 'No')
# data['is_pleural'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Pleural_Thickening' in result else 'No')
# data['is_hernia'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Hernia' in result else 'No')
# data['is_emphysema'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Emphysema' in result else 'No')
data['is_pneumothorax'] = data['Finding Labels'].map(lambda result: 'Yes' if 'Pneumothorax' in result else 'No')

In [None]:
data.head()

Instead of using the one-hot encoding method to create a label for the dataset, **we can just create a label if there is the word "Infiltration" in the "Finding Labels" column**<br>

Now the problem becomes a binary classification problem, instead of the multi classification problem which makes it easier for us

## Data Pre-processing and Augmentation

Split between train/test/val data

In [None]:
# Prepare the training and validation set
train_set, val_set = train_test_split(data, test_size=0.1, random_state=1993)
train_set, test_set = train_test_split(train_set, test_size=0.1, random_state=1993)

Same data augmentation as before

In [None]:
# creates image data generator to improve x-ray readings
IMG_SIZE = (224, 224)
train_datagen = ImageDataGenerator(rescale=1./255, # scales the image pixel value
                                   #samplewise_center=True,
                                   horizontal_flip=True, # allows horizontal flip
                                   vertical_flip=False, # we don't want the xrays to be upside-down
                                   height_shift_range=0.2,
                                   width_shift_range=0.2,
                                   rotation_range=20, # random rotations to 20 degrees
                                   shear_range=0.2,
                                   zoom_range=0.2,
                                   )

test_datagen = ImageDataGenerator(rescale=1./255) # just scale the pixel image, we don't want to preprocess the test set

I just found that Keras has built-in method called **.flow_from_dataframe** which is the same as **flow_from_dataframe** method from the previous notebook.<br><br>

Let's try it in this notebook

In [None]:
train_generator = train_datagen.flow_from_dataframe(dataframe=train_set,
                                                   directory=None,
                                                   batch_size=32,
                                                    x_col='full_path',
                                                    y_col='is_pneumothorax',
                                                    shuffle=True,
                                                    target_size=IMG_SIZE,
                                                    seed=111,
                                                   class_mode='binary')

val_X, val_Y = next(test_datagen.flow_from_dataframe(dataframe=val_set,
                                                   directory=None,
                                                   batch_size=256,
                                                   x_col='full_path',
                                                   y_col='is_pneumothorax',
                                                   shuffle=True,
                                                   target_size=IMG_SIZE,
                                                   seed=111,
                                                   class_mode='binary'))

test_X, test_Y = next(test_datagen.flow_from_dataframe(dataframe=test_set,
                                                   directory=None,
                                                   batch_size=1024,
                                                   x_col='full_path',
                                                   y_col='is_pneumothorax',
                                                   shuffle=True,
                                                   target_size=IMG_SIZE,
                                                   seed=111,
                                                   class_mode='binary'))

Don't forget to **change class_mode into binary** instead of sparse or categorical, since now we are dealing with the binary classification problem

## Building The Model

In [None]:
base_choice = 'VGG16'
# VGG16
if base_choice.upper() == 'VGG16':
    base_model = VGG16(include_top=False, input_shape=(224,224,3), weights='imagenet')
# VGG19
elif base_choice.upper() == 'VGG19':   
    base_model = VGG19(include_top=False, input_shape=(224,224,3))
# MobileNet
elif base_choice.upper() == 'MOBILE':
    base_model = MobileNet(include_top=False, input_shape=(224,224,3))
# MobileNetV2
elif base_choice.upper() == 'MOBILEV2':
    base_model = MobileNetV2(include_top=False, input_shape=(224,224,3))
# InceptionResNetV2
elif base_choice.upper() == 'INCEPTIONV2':
    base_model = InceptionResNetV2(include_top=False, input_shape=(224,224,3))
# InceptionV3
elif base_choice.upper() == 'INCEPTIONV3':
    base_model = InceptionV3(include_top=False, input_shape=(224,224,3))
# ResNet50
elif base_choice.upper() == 'RESNET50':
    base_model = ResNet50(include_top=False, input_shape=(224,224,3))
# DenseNet 121
elif base_choice.upper() == 'DENSE121':
    base_model = DenseNet121(include_top=False, input_shape=(224,224,3))
# DenseNet 169
elif base_choice.upper() == 'DENSE169':
    base_model = DenseNet169(include_top=False, input_shape=(224,224,3))
# DenseNet 201
elif base_choice.upper() == 'DENSE201':
    base_model = DenseNet201(include_top=False, input_shape=(224,224,3))
    
print("Base pre-trained model:", base_choice)
base_model.summary()

In [None]:
# freeze the base model
for layer in base_model.layers:
    layer.trainable = False
        
# adds our own dense layers
output = base_model.output
output = Flatten()(output)
output = Dense(512, activation='relu', kernel_initializer='glorot_uniform', bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01))(output)
last_output = Dense(1, activation='sigmoid')(output)
# construct final model
final_model = Model(base_model.input, last_output)
# compile the model
# opt = optimizers.Adam(lr=1e-3, decay=1e-5)
final_model.compile(optimizer='adagrad', loss='binary_crossentropy', metrics=['binary_accuracy'])
final_model.summary()

In [None]:
#saves the best model for each epoch
# WEIGHT_PATH = 'checkpoint.{epoch:02d}-{val_loss:.2f}.hdf5'
# checkpoint = ModelCheckpoint(filepath=WEIGHT_PATH, monitor='val_loss', verbose=1, save_best_only=True)

In [None]:
fitted_model = final_model.fit_generator(generator=train_generator, steps_per_epoch=len(train_generator)//32, epochs=5, validation_data=(val_X, val_Y), validation_steps=len(val_X)//256)

In [None]:
final_model.save('weights.'+base_choice+ '-' + 'pneumothorax' + '.hdf5')

In [None]:
pred_Y = final_model.predict(test_X, batch_size = 64, verbose = True)

In [None]:
final_model.evaluate(test_X, test_Y, batch_size=128)

As you can see our ROC is only consisted of one curve, which is the disease we broke down at the beginning of this notebook. 

In [None]:
from sklearn.metrics import roc_curve, auc
fig, c_ax = plt.subplots(1,1, figsize = (9, 9))
# for (idx, c_label) in enumerate(diseases):
fpr, tpr, thresholds = roc_curve(test_Y.astype(int), pred_Y)
c_ax.plot(fpr, tpr, label = '%s (AUC:%0.2f)'  % ('Infiltration', auc(fpr, tpr)))
c_ax.legend()
c_ax.set_xlabel('False Positive Rate')
c_ax.set_ylabel('True Positive Rate')

In [None]:
print(fitted_model.history.keys())

In [None]:
plt.plot(fitted_model.history['loss'])
plt.plot(fitted_model.history['val_loss'])
plt.title('Model Loss vs Validation Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train', 'validation'], loc='upper right')
plt.show()

In [None]:
plt.plot(fitted_model.history['binary_accuracy'])
plt.plot(fitted_model.history['val_binary_accuracy'])
plt.title('Training Accuracy vs validation Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['train', 'validation'], loc='upper right')
plt.show()

In this directory, there is also a data for the correct localisation from several images. It is in the BBox_List_2017.csv

In [None]:
box_data = pd.read_csv('../input/data/BBox_List_2017.csv')

In [None]:
len(box_data)

Remember that this is only consists of 984 data, not the whole 112,120 data that we used to train and test the model

In [None]:
box_data['Finding Label'].unique()

Also, there are only 8 diseases available in this dataset

Let's focus on Infiltration first

In [None]:
infiltrate = box_data[box_data['Finding Label'] == 'Infiltrate']

In [None]:
infiltrate.head()

There are 4 columns (x, y, w, and h) that we can use to draw a rectangle to locate where the disease is in the image.

In [None]:
for i in range(len(data)):
    if data['Image Index'].iloc[i] == '00025787_027.png':
        print(data['full_path'].iloc[i])
        break

Let's test with a random x-ray image which has the Infiltration disease

In [None]:
PATH_IMG = '../input/data/images_011/images/00025787_027.png'

img = load_img(PATH_IMG, target_size=(224,224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)

Make a prediction with our model on that image

In [None]:
pred = final_model.predict(x) # predict with the model we just trained
pred = list(pred.flatten())
argmax = np.argmax(pred[0])
output = final_model.output[:, argmax]

In [None]:
pred

## Localisation with Grad-CAM

Grad-CAM is similar with the CAM algorithm. However, it implements gradient-weighted class activation in the process.

> Gradient-weighted Class Activation Mapping (Grad-CAM), uses the gradients of any target concept (say logits for ‘dog’ or even a caption), flowing into the final convolutional layer to produce a coarse localization map highlighting the important regions in the image for predicting the concept. (R. Ramprasaath et al., 2019).

The general algorithm looks like this:

1. It extracts the final convolutional layer from the model like CAM does
2. The next step is slightly different from CAM. Instead of just taking the logits (np.argmax) from the class, it also extracts the gradients of that class.
3. Next, take the mean from it. Why mean? This method does not need us to add another pooling layer e.g. GlobalAveragePooling2D.
4. Construct a temporary model with the input from final_model and output from the last convolutional layer output

In [None]:
# take the last convolutional layer
last_conv_layer = final_model.get_layer('block5_conv3')
# extract the gradients from the last convolutional layer against the class logits (np.argmax)
grads = K.gradients(output, last_conv_layer.output)[0]
# print(grads.shape)
# take the mean of the gradients, leaving us with the channel dimension --> global average pooling
pooled_grads = K.mean(grads, axis=(0,1,2))
# define a temporary model with pre-trained model as its input and the last convolutional layer as its output
i = K.function([final_model.input], [pooled_grads, last_conv_layer.output[0]])
pooled_grads_value, conv_layer_output_value = i([x])

5. Finally, compute the matrix multiplication from both of them.

In [None]:
# do the matrix multiplication to obtain weight between the last convolutional layer and its gradient
for l in range(512):
    conv_layer_output_value[:, :, l] *= pooled_grads_value[l]

Don't forget to normalize those values into 0-1 range

In [None]:
# average the weighted feature map along the channel dimension which resulting in a heatmap
heatmap = np.mean(conv_layer_output_value, axis=-1)
# normalize the heatmap
heatmap = np.maximum(heatmap, 0)
heatmap /= np.max(heatmap)
# display the matrix values with matshow
plt.matshow(heatmap) 
plt.show()

We can get a glimpse of where the the disease occured (if any) just by seeing the plot above.

In [None]:
pred[0]

Now, let's apply this heatmap to our test image

In [None]:
img = cv2.imread(PATH_IMG)
heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))

# multiply heatmap by 255 to convert it back to RGB
heatmap = np.uint8(255 * heatmap)
heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)

hif = .5 # intensity (transparency) value

superimposed_img = heatmap * hif + img # paste the heatmap * intensity to the original image
output = 'output.jpeg'
cv2.imwrite(output, superimposed_img)
img=mpimg.imread(output)
plt.imshow(img)
plt.axis('off')
plt.title('Result:' + str('Normal' if pred[0] < 0.5 else 'Infiltration') + '|' + 'Confidence:' + str(pred[0] * 100))

And we are done! :)

We can check if our localisation is the same with the dataset.

In [None]:
# get x,y,w,h from the row of the test image
row = box_data[box_data['Image Index'] == '00025787_027.png']

In [None]:
row.iloc[1]

In [None]:
row['Bbox [x'].iloc[1]

In [None]:
# draw a rectangle around the chest x-ray
plt.imshow(cv2.imread(PATH_IMG))
rect = Rectangle((row['Bbox [x'].iloc[1],row['y'].iloc[1]), 
                  row['w'].iloc[1], row['h]'].iloc[1], 
                  fill=False, color='red')
plt.axes().add_patch(rect)
plt.show()