Import necessary libraries

In [None]:
# System
import pathlib
import glob
import os, os.path, shutil
# Data Exploration
import random
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline
import pandas as pd
import seaborn as sns
from PIL import Image
# Machine Learning
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
from sklearn.metrics import auc
#Deep Learning
import tensorflow as tf
import keras
from keras.utils import plot_model 
from keras import backend as K 
from keras import metrics
from keras.regularizers import l2,l1
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, InputLayer, Activation
from keras.preprocessing.image import img_to_array, ImageDataGenerator, array_to_img, load_img
from keras.metrics import AUC
from keras.optimizers import Adam, RMSprop
from keras.callbacks import EarlyStopping, ModelCheckpoint
import pydot
from dask import bag,  diagnostics
from mlxtend.plotting import plot_confusion_matrix

Split the dataset

In [None]:

# Show the image path
data_1_NORMAL_dir = 'data_1/NORMAL/'
data_1_PNEUMONIA_dir = 'data_1/PNEUMONIA/'
new_dir = 'split/'
# Create objects that stores all the relevant images
images_NORMAL = [file for file in os.listdir(data_1_NORMAL_dir) if file.endswith('.jpeg')]
images_PNEUMONIA = [file for file in os.listdir(data_1_PNEUMONIA_dir) if file.endswith('.jpeg')]
# See how many images there in NORMAL directory and PNEUMONIA directory
print('There are', len(images_NORMAL), 'NORMAL images')
print('There are', len(images_PNEUMONIA), 'PNEUMONIA images')
#Plot number of classes to identify imbalances
number_classes = {'NORMAL':1583,
                  'PNEUMONIA':4273}
plt.bar(number_classes.keys(), number_classes.values(), width = 0.5)
plt.title("Number of images by Class")
plt.xlabel("Class Name")
plt.ylabel("Numer of Images")

# Redo Train-Val-Test Split
# Creat folders and subfolders to get a hierarchical file structure
# Create a new folder 'split'
os.mkdir(new_dir)

# Create a subfolder 'train' under the 'split'
train_folder = os.path.join(new_dir, 'train')
# Create subfolders 'train_NORMAL' and 'train_PNEUMONIA' under the 'train'
train_NORMAL = os.path.join(train_folder, 'NORMAL')
train_PNEUMONIA = os.path.join(train_folder, 'PNEUMONIA')
#___________________________________________________________________________
# Create a subfolder 'test' under the 'split'
test_folder = os.path.join(new_dir, 'test')
# Create subfolders 'test_NORMAL' and 'test_PNEUMONIA' under the 'test'
test_NORMAL = os.path.join(test_folder, 'NORMAL')
test_PNEUMONIA = os.path.join(test_folder, 'PNEUMONIA')
#___________________________________________________________________________
# Create a subfolder 'test' under the 'split'
val_folder = os.path.join(new_dir, 'validation')
# # Create subfolders 'val_NORMAL' and 'val_PNEUMONIA' under the 'test'
val_NORMAL = os.path.join(val_folder, 'NORMAL')
val_PNEUMONIA = os.path.join(val_folder, 'PNEUMONIA')

#Use all the path strings to make new directories
os.mkdir(train_folder)
os.mkdir(train_NORMAL)
os.mkdir(train_PNEUMONIA)

os.mkdir(test_folder)
os.mkdir(test_NORMAL)
os.mkdir(test_PNEUMONIA)

os.mkdir(val_folder)
os.mkdir(val_NORMAL)
os.mkdir(val_PNEUMONIA)

# Use a 70%/20%/10% split for train/validation/test
print('Number of images to train')
print('# train_NORMAL: ', round(len(images_NORMAL)*0.7))
print('# train_PNEUMONIA: ', round(len(images_PNEUMONIA)*0.7))
print('________________________________________________')
print('Number of images to validation')
print('# val_NORMAL: ', round(len(images_NORMAL)*0.2))
print('# val_PNEUMONIA: ', round(len(images_PNEUMONIA)*0.2))
print('________________________________________________')
print('Number of images to test')
print('# test_NORMAL: ', round(len(images_NORMAL)*0.1))
print('# test_PNEUMONIA: ', round(len(images_PNEUMONIA)*0.1))

#train NORMAL
imgs = images_NORMAL[:1108]
for img in imgs:
    origin = os.path.join(data_1_NORMAL_dir, img)
    destination = os.path.join(train_NORMAL, img)
    shutil.copyfile(origin, destination)
# validation NORMAL
imgs = images_NORMAL[1108:1425]
for img in imgs:
    origin = os.path.join(data_1_NORMAL_dir, img)
    destination = os.path.join(val_NORMAL, img)
    shutil.copyfile(origin, destination)
# test NORMAL
imgs = images_NORMAL[1425:]
for img in imgs:
    origin = os.path.join(data_1_NORMAL_dir, img)
    destination = os.path.join(test_NORMAL, img)
    shutil.copyfile(origin, destination)
    
# train PNEUMONIA
imgs = images_PNEUMONIA
for img in imgs[:2991]:
    origin = os.path.join(data_1_PNEUMONIA_dir, img)
    destination = os.path.join(train_PNEUMONIA, img)
    shutil.copyfile(origin, destination)
# validation PNEUMONIA
imgs = images_PNEUMONIA[2991:3846]
for img in imgs:
    origin = os.path.join(data_1_PNEUMONIA_dir, img)
    destination = os.path.join(val_PNEUMONIA, img)
    shutil.copyfile(origin, destination)
# test PNEUMONIA
imgs = images_PNEUMONIA[3846:]
for img in imgs:
    origin = os.path.join(data_1_PNEUMONIA_dir, img)
    destination = os.path.join(test_PNEUMONIA, img)
    shutil.copyfile(origin, destination)

Image Preprocessing

In [None]:
# Build data generator function

def make_data_generator(
    train_datagen, train_dir, 
    val_datagen, val_dir, 
    test_datagen, test_dir):
    
    datagen_lst = [train_datagen, val_datagen, test_datagen]
    directory_lst = [train_dir, val_dir, test_dir]
    generator_lst = []
    for generator, directory in zip(datagen_lst, directory_lst):
        if directory == train_dir:
            shuffle = True
        else:
            shuffle = False
        g = generator.flow_from_directory(directory = directory,
                                         target_size = (64,64),
                                         batch_size = 128,
                                         color_mode = 'grayscale',
                                         class_mode = 'binary',
                                         shuffle = shuffle,
                                         seed = 42)
        generator_lst.append(g)
    
    return generator_lst
  
  # Load the images

train_datagen = ImageDataGenerator(rescale = 1.0/255.0,
                                   zoom_range = 0.2,
                                   shear_range = 0.2,
                                   horizontal_flip = True)
val_datagen = ImageDataGenerator(rescale=1.0/255.0)
test_datagen = ImageDataGenerator(rescale=1.0/255.0)

train_generator,val_generator, test_generator = make_data_generator(
    train_datagen, train_dir,
    val_datagen, validation_dir,
    test_datagen, test_dir)

Modeling

In [None]:
# Design the model
    
def build_model_1():
    cnn = Sequential()

    cnn.add(InputLayer(input_shape=(64, 64, 1)))
    cnn.add(Conv2D(32, (3, 3), activation='relu'))
    cnn.add(MaxPooling2D((2, 2)))
    cnn.add(Conv2D(32, (3, 3), activation='relu'))
    cnn.add(MaxPooling2D((2, 2)))
    cnn.add(Conv2D(32, (3, 3), activation='relu'))
    cnn.add(MaxPooling2D((2, 2)))

    cnn.add(Flatten())

    cnn.add(Dense(64, activation='relu'))
    cnn.add(Dense(1, activation='sigmoid'))
    
    # Compile
    cnn.compile(optimizer='adam',loss='binary_crossentropy',metrics=['acc'])
    return cnn

cnn = build_model_1()

# Train the model

cnn_model = cnn.fit_generator(train_generator,
                              epochs = 10,
                              steps_per_epoch = len(train_generator),
                              validation_data = val_generator,
                              validation_steps = len(val_generator),
                              verbose = 1)

Evaluate the Model Performance

In [None]:
# Define function for plotting train and validation curve
def train_validation_loss(cnn_model):
    train_loss = cnn_model.history['loss']
    val_loss = cnn_model.history['val_loss']
    fig = plt.figure(figsize = (8,5))
    plt.title("Training vs. Validation Loss")
    plt.plot(train_loss, label='training loss')
    plt.plot(val_loss, label='validation loss')
    plt.xlabel("Number of Epochs", size=14)
    plt.legend()

In [None]:
# Define function for plotting train and validation curve
def train_validation_acc(cnn_model):
    train_loss = cnn_model.history['acc']
    val_loss = cnn_model.history['val_acc']
    fig = plt.figure(figsize = (8,5))
    plt.title("Training vs. Validation Acc")
    plt.plot(train_loss, label='training acc')
    plt.plot(val_loss, label='validation acc')
    plt.xlabel("Number of Epochs", size=14)
    plt.legend()

In [None]:
from mlxtend.plotting import plot_confusion_matrix
def cm_plot_1(cnn):
    y_true = test_generator.classes
    Y_pred = cnn.predict_generator(test_generator, steps = len(test_generator))
    y_pred = (Y_pred > 0.5).T[0]
    y_pred_prob = Y_pred.T[0]
    cm = confusion_matrix(y_true,y_pred,normalize = 'true')
    plot_confusion_matrix(cm,figsize = (12,8), hide_ticks = True, cmap = plt.cm.Blues)
    plt.title("Confusion Matrix", fontsize = 22) 
    plt.xticks(range(2), ['Normal','Pneumonia'], fontsize = 16)
    plt.yticks(range(2), ['Normal','Pneumonia'], fontsize = 16)

In [None]:
def ROC_curve_AUC_score(cnn):
    fig = plt.figure(figsize=(10, 8))
    y_true = test_generator.classes
    Y_pred = cnn.predict_generator(test_generator, steps = len(test_generator))
    y_pred = (Y_pred > 0.5).T[0]
    y_pred_prob = Y_pred.T[0]
    

    
    fpr, tpr, thresholds = roc_curve(y_true, y_pred_prob)
    auc = roc_auc_score(y_true, y_pred_prob)
    plt.title('ROC Curve')
    plt.plot([0, 1], [0, 1], 'k--', label = "Random (AUC = 50%)")
    plt.plot(fpr, tpr, label='CNN (AUC = {:.2f}%)'.format(auc*100))
    plt.xlabel('False Positive Rate', size=14)
    plt.ylabel('True Positive Rate', size=14)
    plt.legend(loc='best')

In [None]:
def Summary_Stats(cnn):
    y_true = test_generator.classes
    Y_pred = cnn.predict_generator(test_generator, steps = len(test_generator))
    y_pred = (Y_pred > 0.5).T[0]
    y_pred_prob = Y_pred.T[0]
    cm = confusion_matrix(y_true, y_pred)
    #############################
    TN, FP, FN, TP = cm.ravel() # cm[0,0], cm[0, 1], cm[1, 0], cm[1, 1]
    #ravel, which is used to change a 2-dimensional array or a multi-dimensional array into a contiguous flattened array. 
    #The returned array has the same data type as the source array or input array.
    accuracy = (TP + TN) / np.sum(cm) 
    precision = TP / (TP+FP) 
    recall =  TP / (TP+FN)
    specificity = TN / (TN+FP) 
    f1 = 2*precision*recall / (precision + recall)
    stats_summary = '[Summary Statistics]\nAccuracy = {:.2%} | Precision = {:.2%} | Recall = {:.2%} | Specificity = {:.2%} | F1 Score = {:.2%}'.format(accuracy, precision, recall, specificity, f1)
    return stats_summary