Dependencies

In [None]:
!pip3 install keras tensorflow-gpu cudatoolkit cudnn
!pip3 install matplotlib
!pip3 install numpy
!pip3 install pandas
!pip3 install scikit-learn
!pip3 install seaborn
!pip3 install 'tensorflow[and-cuda]'

Imports

In [43]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns
import torch
import os
from sklearn.metrics import confusion_matrix, classification_report

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import VGG16, ResNet50, DenseNet121, Xception

Init GPU

In [None]:
def setup_gpu():

    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print(f"Number of TensorFlow GPUs: {len(gpus)}")
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    return device
device = setup_gpu()

Data preprocessing

In [45]:
def create_data_generators(data_dir='chest_xray', img_size=(224, 224), batch_size=32):

    train_dataset = tf.keras.preprocessing.image_dataset_from_directory(
        f'{data_dir}/train',
        seed=123,
        image_size=img_size,
        batch_size=batch_size,
        label_mode='binary'
    )
    
    val_dataset = tf.keras.preprocessing.image_dataset_from_directory(
        f'{data_dir}/val',
        seed=123,
        image_size=img_size,
        batch_size=batch_size,
        label_mode='binary'
    )
    
    test_dataset = tf.keras.preprocessing.image_dataset_from_directory(
        f'{data_dir}/test',
        seed=123,
        image_size=img_size,
        batch_size=batch_size,
        label_mode='binary'
    )
    
    data_augmentation = tf.keras.Sequential([
        tf.keras.layers.RandomRotation(0.2),
        tf.keras.layers.RandomZoom(0.2),
        tf.keras.layers.RandomFlip("horizontal"),
        tf.keras.layers.RandomContrast(0.2)
    ])
    
    normalization_layer = tf.keras.layers.Rescaling(1./255)
    
    # Apply augmentation and normalization to training data
    train_dataset = train_dataset.map(
        lambda x, y: (data_augmentation(x), y)
    ).map(
        lambda x, y: (normalization_layer(x), y)
    )
    
    # Apply only normalization to validation and test data
    val_dataset = val_dataset.map(
        lambda x, y: (normalization_layer(x), y)
    )
    test_dataset = test_dataset.map(
        lambda x, y: (normalization_layer(x), y)
    )
    
    # Configure datasets for performance
    AUTOTUNE = tf.data.AUTOTUNE
    train_dataset = train_dataset.cache().prefetch(buffer_size=AUTOTUNE)
    val_dataset = val_dataset.cache().prefetch(buffer_size=AUTOTUNE)
    test_dataset = test_dataset.cache().prefetch(buffer_size=AUTOTUNE)
    
    return train_dataset, val_dataset, test_dataset


Data vizualization

In [46]:
directory = "chest_xray/train"

filepath =[]
label = []

folds = os.listdir(directory)

for fold in folds:
    f_path = os.path.join(directory , fold)
    
    imgs = os.listdir(f_path)
    
    for img in imgs:
        
        img_path = os.path.join(f_path , img)
        filepath.append(img_path)
        label.append(fold)
        
#Concat data paths with labels
file_path_series = pd.Series(filepath , name= 'filepath')
Label_path_series = pd.Series(label , name = 'label')
df_train = pd.concat([file_path_series ,Label_path_series ] , axis = 1)

In [None]:
count = df_train['label'].value_counts()

fig, axs = plt.subplots(1, 2, figsize=(12, 6), facecolor='white')

palette = sns.color_palette("viridis")
sns.set_palette(palette)
axs[0].pie(count, labels=count.index, autopct='%1.1f%%', startangle=140)
axs[0].set_title('Distribution of Categories')

sns.barplot(x=count.index, y=count.values, ax=axs[1], palette="viridis")
axs[1].set_title('Count of Categories')

plt.tight_layout()

plt.show()

In [48]:
directory = "chest_xray/test"

filepath =[]
label = []

folds = os.listdir(directory)

for fold in folds:
    f_path = os.path.join(directory , fold)
    
    imgs = os.listdir(f_path)
    
    for img in imgs:
        
        img_path = os.path.join(f_path , img)
        filepath.append(img_path)
        label.append(fold)
        
#Concat data paths with labels
file_path_series = pd.Series(filepath , name= 'filepath')
Label_path_series = pd.Series(label , name = 'label')
df_test = pd.concat([file_path_series ,Label_path_series ] , axis = 1)

In [None]:
count = df_test['label'].value_counts()

fig, axs = plt.subplots(1, 2, figsize=(12, 6), facecolor='white')

palette = sns.color_palette("viridis")
sns.set_palette(palette)
axs[0].pie(count, labels=count.index, autopct='%1.1f%%', startangle=140)
axs[0].set_title('Distribution of Categories')

sns.barplot(x=count.index, y=count.values, ax=axs[1], palette="viridis")
axs[1].set_title('Count of Categories')

plt.tight_layout()

plt.show()

In [50]:
def visualize_images(path, num_images=5):
    
    image_filenames = os.listdir(path)
    
    num_images = min(num_images, len(image_filenames))
    
    fig, axes = plt.subplots(1, num_images, figsize=(15, 3),facecolor='white')
    
    for i, image_filename in enumerate(image_filenames[:num_images]):
        image_path = os.path.join(path, image_filename)
        image = mpimg.imread(image_path)
        
        axes[i].imshow(image)
        axes[i].axis('off')
        axes[i].set_title(image_filename)
    
    plt.tight_layout()
    plt.show()

In [None]:
path_to_visualize = "chest_xray/train/NORMAL"
visualize_images(path_to_visualize, num_images=5)

In [None]:
path_to_visualize = "chest_xray/train/PNEUMONIA"
visualize_images(path_to_visualize, num_images=5)

Model Architectures

In [53]:
def create_xception_model():
    base_model = Xception(weights='imagenet', include_top=False, pooling='avg', input_shape=(224, 224, 3))
    base_model.trainable = False
    model = tf.keras.Sequential([
        base_model,
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.45),
        tf.keras.layers.Dense(220, activation='relu'),
        tf.keras.layers.Dropout(0.25),
        tf.keras.layers.Dense(60,activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid'),
    ])
    return model

In [54]:
def create_custom_cnn():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(224, 224, 3)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(64, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(128, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    return model

def create_transfer_learning_model(base_model_name='VGG16'):

    if base_model_name == 'VGG16':
        base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    elif base_model_name == 'ResNet50':
        base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    elif base_model_name == 'DenseNet121':
        base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        for layer in base_model.layers[-20:]:
            layer.trainable = True
    elif base_model_name == 'CNN':
        base_model = create_custom_cnn()
        return base_model
    elif base_model_name == 'Xception':
        base_model = create_xception_model()
        return base_model
    if base_model_name == 'VGG16' or base_model_name == 'ResNet50' or base_model_name == 'CNN':
        model = tf.keras.Sequential([
            base_model,
            tf.keras.layers.GlobalAveragePooling2D(),
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ])
    elif base_model_name == 'DenseNet121':
        model = tf.keras.Sequential([
            base_model,
            tf.keras.layers.GlobalAveragePooling2D(),
            tf.keras.layers.Dense(512, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ])
    return model


Training

In [55]:
def load_saved_model(model_name):
    """Load a saved model"""
    try:
        return tf.keras.models.load_model(f'best_{model_name}.keras')
    except (OSError, IOError) as e:
        print("Model file not found, continuining with training...")
    return None

def evaluate_model(model, test_dataset):
    """Evaluate a model on test data"""
    test_loss, test_accuracy = model.evaluate(test_dataset)
    print(f"\nTest accuracy: {test_accuracy:.4f}")
    print(f"Test loss: {test_loss:.4f}")
    
    return test_loss, test_accuracy

In [56]:
def predict_image(image_path):
    # Load and preprocess the image
    img = image.load_img(image_path, target_size=(224, 224))
    img_array = image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array = img_array / 255.0  # Normalize pixel values
    
    # Make prediction
    prediction = model.predict(img_array)
    
    # For binary classification
    result = "Pneumonia" if prediction[0] > 0.5 else "Normal"
    confidence = prediction[0] if prediction[0] > 0.5 else 1 - prediction[0]
    
    return result, confidence

In [57]:
def train_and_evaluate(model, model_name, continue_training=False):

    model_path = f'best_{model_name}.h5'

    if not continue_training and os.path.exists(model_path):
        print(f"Loading existing model: {model_path}")
        model = tf.keras.models.load_model(model_path)
    else:
        print("Creating new model...")
        model = create_transfer_learning_model(model_name)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
        loss='binary_crossentropy',
        metrics=['accuracy', tf.keras.metrics.Recall(), tf.keras.metrics.F1Score()]
    )


    callbacks = [
        tf.keras.callbacks.EarlyStopping(
        patience=5,  # Increased patience
        restore_best_weights=True
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.2,
            patience=3,
            min_lr=0.00001
        ),
        tf.keras.callbacks.ModelCheckpoint(
            f'best_{model_name}.keras',
            save_best_only=True,
            monitor='val_f1_score'
        )
    ]

    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=20,
        callbacks=callbacks
    )
    print(history)
    return history

Model Comparison

In [58]:
def plot_results(results):
    best_epoch = results['val_accuracy'].index(max(results['val_accuracy'])) + 1

    plt.style.use('seaborn-v0_8-darkgrid')

    fig, axs = plt.subplots(1, 2, figsize=(16, 5))

    # Plot training and validation accuracy
    axs[0].plot(results['accuracy'], label='Training Accuracy', color='blue')
    axs[0].plot(results['val_accuracy'], label='Validation Accuracy', color='red')
    axs[0].scatter(best_epoch - 1, results['val_accuracy'][best_epoch - 1], color='green', label=f'Best Epoch: {best_epoch}')
    axs[0].set_xlabel('Epoch')
    axs[0].set_ylabel('Accuracy')
    axs[0].set_title('Training and Validation Accuracy')
    axs[0].legend()

    # Plot training and validation loss
    axs[1].plot(results['loss'], label='Training Loss', color='blue')
    axs[1].plot(results['val_loss'], label='Validation Loss', color='red')
    axs[1].scatter(best_epoch - 1, results['val_loss'][best_epoch - 1], color='green',label=f'Best Epoch: {best_epoch}')
    axs[1].set_xlabel('Epoch')
    axs[1].set_ylabel('Loss')
    axs[1].set_title('Training and Validation Loss')
    axs[1].legend()

    plt.tight_layout()
    plt.show()


In [None]:
import shutil
import os

def create_balanced_test_set(source_dir, dest_dir, n_samples=20):
    os.makedirs(os.path.join(dest_dir, 'normal'), exist_ok=True)
    os.makedirs(os.path.join(dest_dir, 'pneumonia'), exist_ok=True)
    
    for class_name in ['NORMAL', 'PNEUMONIA']:
        source_files = os.listdir(os.path.join(source_dir, class_name))[:n_samples]
        for file_name in source_files:
            shutil.copy(
                os.path.join(source_dir, class_name, file_name),
                os.path.join(dest_dir, class_name, file_name)
            )
create_balanced_test_set('chest_xray/test/','chest_xray/balanced-test/')

test_dir = 'chest_xray/balanced-test/'
test_ds = tf.keras.utils.image_dataset_from_directory(
    test_dir, 
    seed=123,
    image_size=(224, 224),
    batch_size=40,
    shuffle=True,
    validation_split=None,
    class_names=['normal', 'pneumonia'])
class_labels = ['normal', 'pneumonia']
def plot_images_with_predictions(model, dataset, class_labels, num_images=40):
    images = []
    labels = []
    
    for img, lbl in dataset.take(1):
        images = img.numpy()
        labels = lbl.numpy()
    
    predictions = model.predict(dataset)
    predictions = (predictions > 0.5).astype(int)
    
    plt.figure(figsize=(15, 10))
    for i in range(min(num_images, len(images))):
        plt.subplot(8, 5, i + 1)
        plt.imshow(images[i].astype("uint8"))
        true_class = class_labels[int(labels[i])]
        pred_class = class_labels[int(predictions[i])]
        plt.title(f'True: {true_class}\nPred: {pred_class}')
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()


cnn_best = load_saved_model('VGG16')
plot_images_with_predictions(cnn_best, test_ds, class_labels)
cnn_best = load_saved_model('CNN')
plot_images_with_predictions(cnn_best, test_ds, class_labels)
cnn_best = load_saved_model('ResNet50')
plot_images_with_predictions(cnn_best, test_ds, class_labels)
cnn_best = load_saved_model('DenseNet121')
plot_images_with_predictions(cnn_best, test_ds, class_labels)
cnn_best = load_saved_model('Xception')
plot_images_with_predictions(cnn_best, test_ds, class_labels)


In [65]:
from tensorflow.keras import mixed_precision

def compare_models():
    models = {
        'CNN': create_custom_cnn(),
        'VGG16': create_transfer_learning_model('VGG16'),
        'ResNet50': create_transfer_learning_model('ResNet50'),
        'DenseNet121': create_transfer_learning_model('DenseNet121'),
        'Xception': create_xception_model()
    }
    
    results = {}
    for name, model in models.items():
        print(f"Training {name}...")
        
        if name == "DenseNet121":
            policy = mixed_precision.Policy('mixed_float16')
            mixed_precision.set_global_policy(policy)

        history = train_and_evaluate(model, name)

        if history.history != None:
            results = history.history
            plot_results(results)
            results = None
        
        tf.keras.backend.clear_session()
        with torch.no_grad():
            torch.cuda.empty_cache()
    



Comparison

In [64]:
def plot_performance_comparison(metrics):
    # Bar plot for Recall and F1 Score
    plt.figure(figsize=(12, 6))
    
    models = list(metrics.keys())
    recalls = [m['recall'] for m in metrics.values()]
    f1_scores = [m['f1'] for m in metrics.values()]
    
    x = np.arange(len(models))
    width = 0.35
    
    plt.bar(x - width/2, recalls, width, label='Recall')
    plt.bar(x + width/2, f1_scores, width, label='F1 Score')
    
    plt.xlabel('Models')
    plt.ylabel('Score')
    plt.title('Model Performance Comparison')
    plt.xticks(x, models, rotation=45)
    plt.legend()
    plt.tight_layout()
    plt.show()

In [63]:
def plot_confusion_matrices(metrics):
    fig, axes = plt.subplots(2, 2, figsize=(15, 15))
    fig.suptitle('Confusion Matrices for Different Models')
    
    for (name, metric), ax in zip(metrics.items(), axes.flat):
        sns.heatmap(metric['confusion_matrix'], 
                   annot=True, 
                   fmt='d',
                   cmap='Blues',
                   ax=ax)
        ax.set_title(name)
        ax.set_xlabel('Predicted')
        ax.set_ylabel('True')
    
    plt.tight_layout()
    plt.show()

def create_performance_table(metrics):
    data = {
        'Model': [],
        'Recall': [],
        'F1 Score': []
    }
    
    for name, metric in metrics.items():
        data['Model'].append(name)
        data['Recall'].append(f"{metric['recall']:.4f}")
        data['F1 Score'].append(f"{metric['f1']:.4f}")
    
    return pd.DataFrame(data)

In [62]:
from sklearn.metrics import confusion_matrix, recall_score, f1_score

def load_and_evaluate_models(test_ds):
    metrics = {}
    
    model_names = ['ResNet50', 'VGG16', 'DenseNet121', 'CNN', 'Xception']
    
    for name in model_names:
        model = load_saved_model(name)
        
        y_pred = model.predict(test_ds)
        y_pred = (y_pred > 0.5).astype(int)
        
        y_true = np.concatenate([y for x, y in test_ds], axis=0)
        
        metrics[name] = {
            'recall': recall_score(y_true, y_pred),
            'f1': f1_score(y_true, y_pred),
            'confusion_matrix': confusion_matrix(y_true, y_pred)
        }

    return metrics

Main Program

In [None]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print("Running code on GPU:", gpus)
else:
    print("Running code on CPU:", tf.config.list_physical_devices('CPU'))

train_ds, val_ds, test_ds = create_data_generators()

compare_models()
metrics = load_and_evaluate_models(test_ds)

plot_performance_comparison(metrics)
plot_confusion_matrices(metrics)

performance_table = create_performance_table(metrics)
print("\nModel Performance Metrics:")
print(performance_table)


