In [None]:
# Implementation of *Automated Diagnosis of Pneumonia from Classification of Chest X-Ray Im ages using EfficientNet*
# Reference: https://ieeexplore.ieee.org/abstract/document/9397055

import os
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import EfficientNetB2
from tensorflow.keras.layers import RandomFlip,GlobalAveragePooling2D, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall
from sklearn.metrics import confusion_matrix, roc_curve, auc
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# load dataset
# dataset URL: https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia
base_path = 'G:\Chest X-Ray Images (Pneumonia)\chest_xray'

data = []
for path in ['/train', '/test', '/val']:
    for path2 in ['/NORMAL', '/PNEUMONIA']:
        for dirname, _, filenames in os.walk(base_path + path + path2):
            for i, file in enumerate(filenames):
                img_class = path2
                data.append({'dirname': dirname, 'filename': file, 'class': img_class})
                #data.append({'filename': file, 'class': img_class})
df = pd.DataFrame(data)
print("Dataset has " + str(len(df)) + " files.")

In [None]:
# split dataset (60-20-20)
train_data, test_data = train_test_split(df, test_size=0.4, random_state=42)
eval_data, test_data = train_test_split(test_data, test_size=0.5, random_state=42)

print("Training data size:", len(train_data))
print("Evaluation data size:", len(eval_data))
print("Test data size:", len(test_data))

In [None]:
# show sample of train data
image_data = train_data[train_data['class'] == '/NORMAL'].head(2)
for index, row in image_data.iterrows():
    img_path = row['dirname'] + '/' + row['filename']
    img = plt.imread(img_path)
    plt.imshow(img)
    plt.title('NORMAL')
    plt.show()

image_data = train_data[train_data['class'] == '/PNEUMONIA'].head(2)
for index, row in image_data.iterrows():
    img_path = row['dirname'] + '/' + row['filename']
    img = plt.imread(img_path)
    plt.imshow(img)
    plt.title('PNEUMONIA')
    plt.show()

In [None]:
# Set image size and batch size
img_size = (128, 128)
batch_size = 32

In [None]:
# Prefetch the datasets for better performance
AUTOTUNE = tf.data.AUTOTUNE

train_data = train_data.prefetch(buffer_size=AUTOTUNE)
val_data = eval_data.prefetch(buffer_size=AUTOTUNE)
test_data = test_data.prefetch(buffer_size=AUTOTUNE)

In [None]:
# Data augmentation
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomRotation(0.2),
    tf.keras.layers.RandomZoom(0.2),
    tf.keras.layers.RandomTranslation(0.1, 0.1),
])

In [None]:
# Load the pre-trained EfficientNetB2 model
model = EfficientNetB2(weights='imagenet', include_top=False, input_shape=(128, 128, 3))

# Freeze the layers of the base model
model.trainable = False

In [None]:
# Add custom layers on top of the base model
inputs = tf.keras.Input(shape=(128, 128, 3))
x = data_augmentation(inputs)
x = model(x, training=False)
x = GlobalAveragePooling2D()(x)
x = Dense(128, activation='relu')(x)
x = Dropout(0.3)(x)
x = Dense(64, activation='relu')(x)
x = Dropout(0.2)(x)
outputs = Dense(1, activation='sigmoid')(x)

In [None]:
# Create the final model
model = Model(inputs, outputs)

# Compile the model with the Adam optimizer, binary crossentropy loss, and additional metrics
model.compile(optimizer=Adam(learning_rate=0.001), 
              loss='binary_crossentropy', 
              metrics=['accuracy', Precision(name='precision'), Recall(name='recall')])

# Print the model summary
model.summary()

In [None]:
# Train the model with training data and validate with validation data
history = model.fit(
    train_data,
    epochs=10,  # Number of epochs for training
    validation_data=eval_data,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True),
        tf.keras.callbacks.ModelCheckpoint('model_best.keras', save_best_only=True)
    ]
)

In [None]:
# Evaluate the model on the test set
test_loss, test_accuracy, test_precision, test_recall = model.evaluate(test_data)

# Calculate F1-score using the test precision and test recall
test_f1_score = 2 * (test_precision * test_recall) / (test_precision + test_recall)

# Print the evaluation metrics
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")
print(f"Test Precision: {test_precision * 100:.2f}%")
print(f"Test Recall: {test_recall * 100:.2f}%")
print(f"Test F1-Score: {test_f1_score:.2f}")

In [None]:
# Plot accuracy and loss over epochs
plt.figure(figsize=(14, 5))

# Accuracy plot
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
# Loss plot
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Confusion Matrix
predictions = (model.predict(test_dataset) > 0.5).astype("int32")
cm = confusion_matrix(np.concatenate([y for x, y in test_dataset], axis=0), predictions)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Negative', 'Positive'], yticklabels=['Negative', 'Positive'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

In [None]:
# ROC Curve
fpr, tpr, _ = roc_curve(np.concatenate([y for x, y in test_dataset], axis=0), model.predict(test_dataset))
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()