# Pneumonia Detection in Chest X-Ray Images

## Set up

In [None]:
!pip install eli5

In [None]:
import pathlib
import os
import warnings
# import eli5

import numpy as np
import pandas as pd
import tensorflow as tf
import seaborn as sns
import matplotlib.pyplot as plt
import keras_tuner as kt

from math import ceil
from matplotlib import gridspec
from tensorflow.keras.preprocessing import image_dataset_from_directory
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, roc_curve, roc_auc_score, plot_roc_curve, plot_confusion_matrix

%matplotlib inline

In [None]:
# def set_seed(seed):
#     np.random.seed(seed)
#     tf.random.set_seed(seed)
#     os.environ['PYTHONHASHSEED'] = str(seed)
#     os.environ['TF_DETERMINISTIC_OPS'] = '1'

# set_seed(190121)

plt.rc('figure', autolayout=True)
plt.rc('axes', labelweight='bold', labelsize='large',
       titleweight='bold', titlesize=18, titlepad=10)
plt.rc('image', cmap='magma')

warnings.filterwarnings("ignore")

# tf.config.experimental.enable_op_determinism()

train_path = '../input/chest-xray-pneumonia/chest_xray/train'
test_path = '../input/chest-xray-pneumonia/chest_xray/test'
valid_path = '../input/chest-xray-pneumonia/chest_xray/val'

## Exploratory Data Analysis

https://towardsdatascience.com/exploratory-data-analysis-ideas-for-image-classification-d3fc6bbfb2d2

### Image Visualization

In [None]:
normal_imgs = [x for x in os.listdir(f'{train_path}/NORMAL') if x.endswith('.jpeg')]
pneumo_imgs = [y for y in os.listdir(f'{train_path}/PNEUMONIA') if y.endswith('.jpeg')]

select_norm = np.random.choice(normal_imgs, 3, replace = False)
select_pneu = np.random.choice(pneumo_imgs, 3, replace = False)

fig = plt.figure(figsize = (8,6))
for i in range(6):
    if i < 3:
        fp = f'{train_path}/NORMAL/{select_norm[i]}'
        label = 'NORMAL'
    else:
        fp = f'{train_path}/PNEUMONIA/{select_pneu[i-3]}'
        label = 'PNEUMONIA'
    ax = fig.add_subplot(2, 3, i+1)
    
    fn = tf.keras.preprocessing.image.load_img(fp, target_size = (100,100), color_mode='grayscale')
    plt.imshow(fn, cmap='Greys_r')
    plt.title(label)
    plt.axis('off')
plt.show()

len(normal_imgs), len(pneumo_imgs)

### Images to Numpy Array

In [None]:
size = (64, 64)

def imgs2np(path, file_list, size = (64, 64)):
    
    for i in file_list:
        image_path = path + i
        image = tf.keras.preprocessing.image.load_img(image_path, target_size = size, color_mode='grayscale')
        image = tf.keras.preprocessing.image.img_to_array(image)
        img_ts = [image.ravel()]
        
        try:
            np_array = np.concatenate((np_array, img_ts))

        except NameError:
            np_array = img_ts    

    return np_array

np_normal_images = imgs2np(f'{train_path}/NORMAL/', normal_imgs)
np_pneumonia_images = imgs2np(f'{train_path}/PNEUMONIA/', pneumo_imgs)

## Images Averaging

In [None]:
mean_img_norm = np.mean(np_normal_images, axis=0).reshape(size)
mean_img_pneum = np.mean(np_pneumonia_images, axis=0).reshape(size)
std_img_norm = np.std(np_normal_images, axis=0).reshape(size)
std_img_pneum = np.std(np_pneumonia_images, axis=0).reshape(size)

ax = plt.subplot(2, 2, 1)
plt.imshow(mean_img_norm, cmap='Greys_r')
plt.title('Average Normal', fontsize=10)
plt.axis('off')

ax = plt.subplot(2, 2, 2)
plt.imshow(std_img_norm, cmap='Greys_r')
plt.title('Standard Deviation Normal', fontsize=10)
plt.axis('off')

ax = plt.subplot(2, 2, 3)
plt.imshow(mean_img_pneum, cmap='Greys_r')
plt.title('Average Pneumonia', fontsize=10)
plt.axis('off')

ax = plt.subplot(2, 2, 4)
plt.imshow(std_img_pneum, cmap='Greys_r')
plt.title('Standard Deviation Pneumonia', fontsize=10)
plt.axis('off')

plt.show()

## Contrast Between Images

In [None]:
contrast_mean = mean_img_norm - mean_img_pneum
plt.imshow(contrast_mean, cmap='bwr')
plt.title(f'Difference Between Normal and Pneumonia Average')
plt.axis('off')
plt.show()

## Eigenimages

In [None]:
def eigenimages(full_mat, title, n_comp = 0.7, size = (64, 64)):
    # fit PCA to describe n_comp * variability in the class
    pca = PCA(n_components = n_comp, whiten = True)
    pca.fit(full_mat)
    print('Number of PC: ', pca.n_components_)
    return pca
  
def plot_pca(pca, size = (64, 64)):
    # plot eigenimages in a grid
    n = pca.n_components_
    fig = plt.figure(figsize=(8, 8))
    r = int(n**.5)
    c = ceil(n/ r)
    for i in range(n):
        ax = fig.add_subplot(r, c, i + 1, xticks = [], yticks = [])
        ax.imshow(pca.components_[i].reshape(size), 
                  cmap='Greys_r')
    plt.axis('off')
    plt.show()
    
plot_pca(eigenimages(np_normal_images, 'NORMAL'))
plt.title('Normal patients')
plot_pca(eigenimages(np_pneumonia_images, 'PNEUMONIA'))
plt.title('Pneumonia patients')

## Loading datasets as TensorFlow objects

In [None]:
ds_train = image_dataset_from_directory(
    train_path,
    labels='inferred',
    label_mode='binary',
    image_size=(224,224),
    batch_size=32,
    shuffle=True,
)

ds_test = image_dataset_from_directory(
    test_path,
    labels='inferred',
    label_mode='binary',
    image_size=(224,224),
    batch_size=32,
    shuffle=False,
)

ds_valid = image_dataset_from_directory(
    valid_path,
    labels='inferred',
    label_mode='binary',
    image_size=(224,224),
    batch_size=32,
    shuffle=False,
)

In [None]:
def convert_to_float(image, label):
    image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    return image, label

In [None]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

ds_train = ds_train.map(convert_to_float).cache().prefetch(buffer_size=AUTOTUNE)
ds_test = ds_test.map(convert_to_float).cache().prefetch(buffer_size=AUTOTUNE)
ds_valid = ds_valid.map(convert_to_float).cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.BinaryCrossentropy()
metrics = [tf.keras.metrics.BinaryAccuracy(name='binary_accuracy'),
           tf.keras.metrics.TruePositives(name='TP'),
           tf.keras.metrics.FalsePositives(name='FP'),
           tf.keras.metrics.TrueNegatives(name='TN'),
           tf.keras.metrics.FalseNegatives(name='FN'),
           tf.keras.metrics.Precision(name='P'),
           tf.keras.metrics.Recall(name='R'),
           tf.keras.metrics.AUC(name='AUC'),
           tf.keras.metrics.AUC(name='PR', curve='PR')
          ]

In [None]:
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', patience=10, restore_best_weights=True)

regularizer = tf.keras.regularizers.L2()

## Custom Convolutional Neural Network

In [None]:
model = tf.keras.models.Sequential([
    tf.keras.layers.Rescaling(1./255, input_shape=(224, 224, 3)),
    tf.keras.layers.RandomFlip('horizontal'),
    tf.keras.layers.Conv2D(filters=64, kernel_size=3, activation='relu'),
    tf.keras.layers.MaxPool2D(),
    tf.keras.layers.Conv2D(filters=32, kernel_size=3, activation='relu'),
    tf.keras.layers.MaxPool2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(16, activation='relu', kernel_regularizer=regularizer),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
history = model.fit(ds_train,
                    validation_data=ds_test,
                    epochs=50,
#                   callbacks=early_stopping
                    )

In [None]:
model.summary()

In [None]:
history_frame = pd.DataFrame(history.history)
history_frame.loc[:, ['loss', 'val_loss']].plot()
history_frame.loc[:, ['binary_accuracy', 'val_binary_accuracy']].plot()

## Transfer learning

In [None]:
pretrained_base = tf.keras.applications.vgg16.VGG16()
pretrained_base.trainable = False

In [None]:
model_t = tf.keras.models.Sequential([
    tf.keras.layers.Rescaling(1./255, input_shape=(224, 224, 3)),
    tf.keras.layers.RandomFlip('horizontal'),
    pretrained_base,
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(16, activation='relu', kernel_regularizer=regularizer),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

In [None]:
model_t.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
history_t = model_t.fit(ds_train,
                        validation_data=ds_test,
                        epochs=50,
#                       callbacks=[early_stopping]
                       )

In [None]:
history_frame_t = pd.DataFrame(history_t.history)
history_frame_t.loc[:, ['loss', 'val_loss']].plot()
history_frame_t.loc[:, ['binary_accuracy', 'val_binary_accuracy']].plot()

In [None]:
# predictions = model.predict(ds_valid, verbose=True)