In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers

from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Activation, Dropout, Flatten, Dense, BatchNormalization
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.optimizers import SGD, Adam, RMSprop
import kerastuner as kt

import os
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from PIL import Image
import time
%matplotlib inline


# What is Pneumonia?

Pneumonia is an infection that causes inflammation of the air sacs in one or both lungs, causing the air sacs to fill with fluid. Pneumonia can be bacterial, viral, or fungal. Pneumonia in patient X-rays is typically denoted by white spots in the lungs, called infiltrates. 

![Pneumonia Image](https://medlineplus.gov/ency/images/ency/fullsize/19680.jpg)
![](https://www.mayoclinic.org/-/media/kcms/gbs/patient-consumer/images/2013/08/26/10/01/ds00135_im00621_pnuesmal_gif.png)

# About this Dataset

The dataset is organized into 3 folders (train, test, val) and contains subfolders for each image category (Pneumonia/Normal). There are 5,863 X-Ray images (JPEG) and 2 categories (Pneumonia/Normal).

Chest X-ray images (anterior-posterior) were selected from retrospective cohorts of pediatric patients of one to five years old from Guangzhou Women and Children’s Medical Center, Guangzhou. All chest X-ray imaging was performed as part of patients’ routine clinical care.

# Load and Process Data 

In [None]:
main_dir = "../input/chest-xray-pneumonia/chest_xray/"
train_data_dir = main_dir + "train/"
validation_data_dir = main_dir + "val/"
test_data_dir = main_dir + "test/"


print("Working Directory Contents:", os.listdir(main_dir))

In [None]:
train_n = train_data_dir+'NORMAL/'
train_p = train_data_dir+'PNEUMONIA/'

In [None]:
print("Number of 'NORMAL' images in training set:", len(os.listdir(train_n)))
print("Number of 'PNEUMONIA' images in training set", len(os.listdir(train_p)))

In [None]:
img_height, img_width = 175, 175

In [None]:
# Create a data generator with rescaling
train_datagen = ImageDataGenerator(
    rescale=1./255,
    shear_range=0.2,
    rotation_range = 30,  # randomly rotate images in the range (degrees, 0 to 180)
    zoom_range = 0.2, # Randomly zoom image 
    width_shift_range=0.1,  # randomly shift images horizontally (fraction of total width)
    height_shift_range=0.1,  # randomly shift images vertically (fraction of total height)
    horizontal_flip = True)

# Load images and labels from directory
train_generator = train_datagen.flow_from_directory(
    train_data_dir,         # Path to the training data
    target_size=(img_height, img_width),  # Resize images
    batch_size=64,           # Number of images to fetch
    class_mode='binary',     # For binary classification            # To shuffle images
)


In [None]:
validation_datagen = ImageDataGenerator(rescale=1./255)
validation_generator = validation_datagen.flow_from_directory(
    validation_data_dir,
    target_size=(img_height, img_width),  # Resize images
    batch_size=64,           # Number of images to fetch
    class_mode='binary',     # For binary classification
    shuffle=False             # To shuffle images
)

In [None]:
test_datagen = ImageDataGenerator(rescale=1. / 255)
test_generator = test_datagen.flow_from_directory(
    test_data_dir,
    target_size=(img_height, img_width),
    batch_size=64,
    class_mode='binary')

In [None]:
# Get a batch of images and labels
images, labels = next(train_generator)

# Get class labels from generator
class_labels = list(train_generator.class_indices.keys())

# Number of columns (3 in this case)
num_columns = 4
# Number of rows needed
num_rows = (len(images) + num_columns - 1) // num_columns

# Plot images with labels
plt.figure(figsize=(15, 5 * num_rows))  # Adjust figure size based on the number of rows
for i in range(len(images)//4):
    plt.subplot(num_rows, num_columns, i + 1)
    plt.imshow(images[i])  # Images are in [0, 1] range due to rescaling
    plt.title(class_labels[int(labels[i])])  # Display label
    plt.axis('on')  # Hide axis
plt.tight_layout()  # Adjust layout to prevent overlap
plt.show()

# Class Weight

Addressing Class Imbalances

In [None]:
total_samples = (len(os.listdir(train_n)) + len(os.listdir(train_p))) 
normal_samples = len(os.listdir(train_n))
pneumonia_samples = len(os.listdir(train_p))

In [None]:
weight_normal = total_samples / (2 * normal_samples)
weight_pneumonia = total_samples / (2*pneumonia_samples)

In [None]:
class_weight = {0: weight_normal, 1: weight_pneumonia}
class_weight

In [None]:
class_frequency = {'NORMAL': normal_samples, 'PNEUMONIA': pneumonia_samples}
classes = class_frequency.keys()
freq = class_frequency.values()

fig = plt.figure(figsize = (5, 5))
plt.bar(classes, freq, color ='pink', 
        width = 0.3)
plt.xlabel("Classes")
plt.ylabel("Frequency")
plt.title("Class Distribution in Training Data")
plt.show()

# Bayesian Optimizer

In [None]:

def build_model(hp):
    model = Sequential()
    
    # First convolutional layer
    model.add(Conv2D(
        filters=hp.Int('filters1', min_value=32, max_value=64, step=32),
        kernel_size=hp.Choice('kernel_size1', values=[3, 5]),
        activation='relu',
        input_shape=(175, 175, 3),
        padding='same'
    ))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))

    # Second convolutional layer
    model.add(Conv2D(
        filters=hp.Int('filters2', min_value=32, max_value=64, step=32),
        kernel_size=hp.Choice('kernel_size2', values=[3, 5]),  
        activation='relu', 
        padding='same'
    ))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))

    # Third convolutional layer
    model.add(Conv2D(
        filters=hp.Int('filters3', min_value=32, max_value=64, step=32),
        kernel_size=hp.Choice('kernel_size3', values=[3, 5]), 
        activation='relu', 
        padding='same'
    ))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    
    # Dense layer
    model.add(Dense(
        units=hp.Int('dense_units', min_value=32, max_value=64, step=32),
        activation='relu'
    ))
    
    # Dropout layer
    model.add(Dropout(
        rate=hp.Float('dropout_rate', min_value=0.1, max_value=0.8, step=0.1)
    ))
    
    # Output layer
    model.add(Dense(1))
    model.add(Activation('sigmoid'))

    model.summary()
    
    model.compile(
        optimizer=RMSprop(
            hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='log')
        ),
        loss='binary_crossentropy',
        metrics=['accuracy']  # or any other metrics you want to track
    )

    return model

In [None]:
tuner = kt.BayesianOptimization(
    build_model,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=1,
    directory='pneumonia_classifer_BO', 
    overwrite= True
)


In [None]:
tuner.search(
    train_generator,  # Replace with your training generator
    validation_data=validation_generator,  # Replace with your validation generator
    epochs=10,  # Number of epochs to train each model
    class_weight = class_weight
) 
best_hyperparameters = tuner.get_best_hyperparameters(num_trials=1)[0]

print("Best Hyperparameters:")
print(f"Filters1: {best_hyperparameters.get('filters1')}")
print(f"Kernel Size1: {best_hyperparameters.get('kernel_size1')}")
print(f"Filters2: {best_hyperparameters.get('filters2')}")
print(f"Kernel Size2: {best_hyperparameters.get('kernel_size2')}")
print(f"Filters3: {best_hyperparameters.get('filters3')}")
print(f"Kernel Size3: {best_hyperparameters.get('kernel_size3')}")
print(f"Dense Units: {best_hyperparameters.get('dense_units')}")
print(f"Dropout Rate: {best_hyperparameters.get('dropout_rate')}")
print(f"Learning Rate: {best_hyperparameters.get('learning_rate')}")

# Get the best model
best_model = tuner.get_best_models(num_models=1)[0]

# Print the summary of the best model
best_model.summary()

In [None]:
best_model = tuner.get_best_models(num_models=1)[0]

# Compile Model

I used the parameters found by the Bayesian Optimizer and tweaked it more to improve performance. 

In [None]:
 

cnn_model = Sequential()
cnn_model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(img_width,img_height,3), strides = 1, padding='same'))
cnn_model.add(BatchNormalization())
cnn_model.add(MaxPooling2D(pool_size=(2, 2), strides = 2 ))

cnn_model.add(Conv2D(32, (3, 3),  activation='relu',  strides = 1, padding='same'))
cnn_model.add(Dropout(0.1))
cnn_model.add(BatchNormalization())
cnn_model.add(MaxPooling2D(pool_size=(2, 2), strides = 2 ))


cnn_model.add(Conv2D(64, (5, 5),  activation='relu', strides = 1 , padding="same"))
cnn_model.add(BatchNormalization())
cnn_model.add(Dropout(0.3))
cnn_model.add(MaxPooling2D(pool_size=(2, 2), strides = 2 ))

cnn_model.add(Conv2D(128, (3, 3),  activation='relu', strides = 1, padding="same"))
cnn_model.add(BatchNormalization())
cnn_model.add(Dropout(0.3))
cnn_model.add(MaxPooling2D(pool_size=(2, 2), strides = 2 ))

cnn_model.add(Conv2D(256, (3, 3),  activation='relu', strides = 1, padding="same"))
cnn_model.add(BatchNormalization())
cnn_model.add(Dropout(0.2))
cnn_model.add(MaxPooling2D(pool_size=(2, 2), strides = 2 ))

cnn_model.add(Flatten())
cnn_model.add(Dense(128, activation="relu", kernel_regularizer=regularizers.l2(0.1)))
cnn_model.add(Dropout(0.2))
cnn_model.add(Dense(1))
cnn_model.add(Activation('sigmoid'))

cnn_model.summary()

In [None]:
RMSprop_optimizer = RMSprop(learning_rate=0.001)

In [None]:
metrics = [tf.keras.metrics.BinaryAccuracy(),
        tf.keras.metrics.Precision(name="precision"),
        tf.keras.metrics.Recall(name="recall")]

In [None]:
callbacks = [
    EarlyStopping(
        monitor='val_binary_accuracy',           
        patience=10,
        mode='max',#was6                 # Number of epochs with no improvement after which training will be stopped
        restore_best_weights=True      # Restore model weights from the epoch with the best value of the monitored quantity
    ),
    ReduceLROnPlateau(
        monitor='val_binary_accuracy',            
        factor=0.4,                    # Factor by which the learning rate will be reduced
        patience=2,    # Number of epochs with no improvement after which learning rate will be reduced
        min_lr=1e-6,
        verbose = 1
    )]


In [None]:
cnn_model.compile(optimizer=RMSprop_optimizer, loss='binary_crossentropy', metrics=metrics)

In [None]:
history = cnn_model.fit(
    train_generator,
    epochs=30, validation_data=validation_generator,
    class_weight = class_weight,
    callbacks=callbacks)
    #class_weight=class_weight

In [None]:
scores = cnn_model.evaluate(test_generator, return_dict=True)

print(scores)

# Visualizing Model Performance

In [None]:
def draw_learning_curve(history, keys=['binary_accuracy', 'loss']):
    plt.figure(figsize=(12,6))
    for i, key in enumerate(keys):
        plt.subplot(1, 2, i + 1)
        sns.lineplot(x = history.epoch, y = history.history[key])
        sns.lineplot(x = history.epoch, y = history.history['val_' + key])
        plt.title('Learning Curve')
        plt.ylabel(key.title())
        plt.xlabel('Epoch')
        plt.legend(['train', 'test'], loc='best')
    plt.show()
    
draw_learning_curve(history)

In [None]:
draw_learning_curve(history)