<h1>Pneumonia detection using CNN+TensorFlow+Keras</h1>

In [None]:
#! nvidia-smi --query

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import yaml
import h5py
import tensorflow as tf
from tensorflow import keras
from tensorflow.python.keras import models, layers, regularizers, optimizers, callbacks, metrics
from tensorflow.python.keras.layers import Dense, Conv2D, MaxPooling2D, Flatten, Dropout, BatchNormalization
from tensorflow.python.keras.utils.np_utils import to_categorical
from tensorflow.keras.utils import plot_model
from tensorflow.python.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import confusion_matrix
%matplotlib inline

config = yaml.safe_load(open('/content/drive/My Drive/X-Ray-pneumonia-with-CV/X-ray-parameters.yml', 'r'))

<h2>The original images preprocessing:</h2> 

In [None]:
labels = ['NORMAL', 'PNEUMONIA']

def get_data(data_dir):
    data = []
    for label in labels: 
        path = os.path.join(data_dir, label)
        class_num = labels.index(label)
        for img in os.listdir(path):
            img_arr = cv2.imread(os.path.join(path, img))
            resized_arr = cv2.resize(img_arr, (config.get('img_size'), config.get('img_size'))) #Reshaping images to given size
            data.append([resized_arr, class_num])
    return np.array(data)

In [None]:
# Converting images to numpy arrays according to RGB format:
train = get_data('/content/drive/My Drive/X-Ray-pneumonia-with-CV/train')
val = get_data('/content/drive/My Drive/X-Ray-pneumonia-with-CV/val')
test = get_data('/content/drive/My Drive/X-Ray-pneumonia-with-CV/test')

print('Train shape: ', train.shape)
print('Val shape: ', val.shape)
print('Test shape: ', test.shape);

Every image has been converted as a np-array (224, 224, 3)

In [None]:
s = []
for i in train:
    if i[1] == 0:
        s.append('NORMAL')
    else:
        s.append('PNEUMONIA')
sns.countplot(s, linewidth = 1, edgecolor = sns.color_palette('dark', 1), palette = 'gnuplot2');

Let's convert any images of both the classes backwards to RGB format and plot it:

In [None]:
import matplotlib
matplotlib.rcParams.update({'font.size': 16})
plt.figure(figsize = (10, 10))

plt.subplot(1, 2, 1)
plt.imshow(train[0][0], cmap = 'gray');
plt.title(labels[train[0][1]]);

plt.subplot(1, 2, 2)
plt.imshow(train[1223][0], cmap = 'gray');
plt.title(labels[train[-1][1]]);

In [None]:
# Making train, val and test datasets with labels:
x_train = []
y_train = []

x_val = []
y_val = []

x_test = []
y_test = []

for feature, label in train:
    x_train.append(feature)
    y_train.append(label)

for feature, label in test:
    x_test.append(feature)
    y_test.append(label)
    
for feature, label in val:
    x_val.append(feature)
    y_val.append(label)

# Normalizing the data in range from 0 to 1:

x_train = np.asarray(x_train, dtype = 'float32') / 255
x_val = np.asarray(x_val, dtype = 'float32') / 255
x_test = np.asarray(x_test, dtype = 'float32') / 255

y_train = to_categorical(y_train, len(labels))
y_val = to_categorical(y_val, len(labels))
y_test = to_categorical(y_test, len(labels))

print('Train dataset shape:', x_train.shape)
print('Val dataset shape:', x_val.shape)
print('Test dataset shape:', x_test.shape)

print('Train label shape:', y_train.shape)
print('Val label shape:', y_val.shape)
print('Test label shape:', y_test.shape)

<h2>Data augmentation:</h2>

In [None]:
datagen = ImageDataGenerator(
    featurewise_center = False, # set input mean to 0 over the dataset, feature-wise
    samplewise_center = False, # set each sample mean to 0
    featurewise_std_normalization = False, # divide inputs by std of the dataset, feature-wise
    samplewise_std_normalization = False, # divide each input by its std
    zca_whitening = False, # applying ZCA whitening, is made specifically for images. It is like PCA for a tabular data.
    rotation_range = 60, # degree range for random rotations
    zoom_range = 0.2, # zooms image by 20 %
    width_shift_range = 0.1, # randomly shifts the image horizontally
    height_shift_range = 0.1, # randomly shifts the image vertically
    horizontal_flip = True, # randomly flips images horizontally
    vertical_flip = False) # randomly flips images vertically, we don't need it
datagen.fit(x_train)

<h2>Building the model:</h2>

In [None]:
model = tf.keras.applications.resnet.ResNet152(
    include_top = True, weights = None, input_tensor = None,
    input_shape = (config.get('img_size'), config.get('img_size'), 3),
    pooling = 'max', classes = 2,
    classifier_activation = 'sigmoid')

model.compile(loss = tf.keras.losses.BinaryCrossentropy(),
              optimizer = tf.keras.optimizers.Adam(lr = config.get('learning_rate')),
              metrics = [metrics.BinaryAccuracy(),
                         metrics.Recall(),
                         metrics.Precision()])

'''Since we have to diagnose pneumonia, quite serious disease, we should to monitor FalseNegative metric and minimize it,
because diagnosing a sick person as a healthy is a huge mistake unlike diagnosing a healthy person as sick.
Therefore, Recall is our goal metric.'''
#model.summary()

In [None]:
#Plotting the model as a graph:
#plot_model(model, show_shapes = True, to_file = 'model_plot.png')

<h2>Callbacks:</h2>

In [None]:
# Callbacks:
learning_rate_reduction = callbacks.ReduceLROnPlateau(
    monitor = 'val_binary_accuracy', # tracing validation's accuracy
    factor = 0.1, # decreasing learning rate by 10 times
    patience = 2, # number of epochs with no loss improvement. By this number callback is called
    verbose = 1, # updating messages
    min_lr = 0.000001) # lower bound on the learning rate

tensorboard = callbacks.TensorBoard(
    log_dir = '/content/drive/My Drive/X-Ray-pneumonia-with-CV', # the path of the directory where to save the log files to be parsed by TensorBoard
    histogram_freq = 1,
    write_graph = True,
    embeddings_freq = 1) # frequency (in epochs) at which embedding layers will be visualized

model_checkpoint = callbacks.ModelCheckpoint(
    filepath = '/content/drive/My Drive/X-Ray-pneumonia-with-CV/X-Ray-modelcheckpoint.h5', # path to save the model file
    monitor = 'val_loss', # model metric to monitor
    mode = 'auto', # replacing filepath with a new best model
    save_best_only = True) # this callback will save the best model

Callbacks = [learning_rate_reduction, tensorboard, model_checkpoint]

In [None]:
history = model.fit(datagen.flow(x_train, y_train),
                    batch_size = config.get('batch_size'),
                    epochs = config.get('epochs'),
                    shuffle = True,
                    validation_data = datagen.flow(x_val, y_val),
                    callbacks = Callbacks)

In [None]:
print('Accuracy of the model is: {:.2f}'.format(model.evaluate(x_test, y_test)[1]))
print('Recall of the model is: {:.2f}'.format(model.evaluate(x_test, y_test)[3]))

In [None]:
#Loss and accuracy graphs:
history_dict = history.history
epochs = range(1, len(history_dict['binary_accuracy']) + 1)
loss_values = history_dict['loss']
val_loss_values = history_dict['val_loss']

plt.figure(figsize = (55, 7))
plt.subplot(1, 2, 1)

plt.plot(epochs, loss_values, 'ro', label = 'Training loss')
plt.plot(epochs, val_loss_values, 'b', label = 'Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.figure(figsize = (55, 7))

plt.subplot(1, 2, 2)
acc_values = history_dict['binary_accuracy']
val_acc_values = history_dict['val_binary_accuracy']
plt.plot(epochs, acc_values, 'ro', label = 'Training_acc')
plt.plot(epochs, val_acc_values, 'b', label = 'Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
trained_model = models.load_model('/content/drive/My Drive/X-Ray-pneumonia-with-CV/X-Ray-modelcheckpoint.h5')

In [None]:
print('Accuracy of the model is: {:.2f}'.format(trained_model.evaluate(x_test, y_test)[1]))
print('Recall of the model is: {:.2f}'.format(trained_model.evaluate(x_test, y_test)[3]))

In [None]:
# Plotting confusion matrix:
from sklearn.metrics import confusion_matrix
import matplotlib
matplotlib.rcParams.update({'font.size': 18})

pred = trained_model.predict(x_test)
pred = np.argmax(pred, axis = 1)
y_true = np.argmax(y_test, axis = 1)

cm = confusion_matrix(y_true, pred)
from mlxtend.plotting import plot_confusion_matrix
fig, ax = plot_confusion_matrix(conf_mat = cm, figsize = (7, 7))
plt.itle('Confusion matrix:')
plt.show()

<h4>Conclusion: 
I have attempted to train ResNet152 ('out the box'), with 100 epochs. ResNet152 has been trained about 2 hours 30 minutes. 
I have obtained validation recall = 97 % and test recall = 88 %. Despite of augmentation and large amount of epochs, along with confusion matrix it means overfitting.

Therefore, I have decided to revise my model and architecture and take a different approach.</h4>