In [19]:
import os
import glob
import cv2
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc, precision_recall_curve, accuracy_score
from sklearn.model_selection import train_test_split, KFold
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.applications import MobileNet, DenseNet121, ResNet50, EfficientNetB0
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from keras.layers import Dense, GlobalAveragePooling2D, Dropout, BatchNormalization, Activation, Input, DepthwiseConv2D, Add, Conv2D
from keras.metrics import AUC, Precision, Recall
from keras.initializers import VarianceScaling
import warnings


In [25]:
import pandas as pd       
import matplotlib as mat
import matplotlib.pyplot as plt    
import numpy as np
import seaborn as sns
get_ipython().run_line_magic('matplotlib', 'inline')

pd.options.display.max_colwidth = 100

import random
import os
from numpy.random import seed
seed(42)

random.seed(42)
os.environ['PYTHONHASHSEED'] = str(42)
os.environ['TF_DETERMINISTIC_OPS'] = '1'

from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.metrics import accuracy_score

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import callbacks
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import glob
import cv2

from tensorflow.random import set_seed
set_seed(42)

import warnings
warnings.filterwarnings('ignore')

IMG_SIZE = 224
BATCH = 32
SEED = 42

In [21]:
# Paths
MAIN_PATH = "D:/DataSet1/chest_xray"
TRAIN_PATH = os.path.join(MAIN_PATH, "train")
TEST_PATH = os.path.join(MAIN_PATH, "test")

# Labels and image size
labels = ['PNEUMONIA', 'NORMAL']



In [18]:
# Function to load and preprocess data
def get_training_data(data_dir):
    data = []
    for label in labels:
        path = os.path.join(data_dir, label)
        class_num = labels.index(label)
        for img in os.listdir(path):
            try:
                img_arr = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
                resized_arr = cv2.resize(img_arr, (IMG_SIZE, IMG_SIZE))
                data.append([resized_arr, class_num])
            except Exception as e:
                print(e)
    return np.array(data)


In [5]:

# Function to create dataframes for training and testing
def create_dataframes():
    train_normal = glob.glob(os.path.join(TRAIN_PATH, "NORMAL/*.jpeg"))
    train_pneumonia = glob.glob(os.path.join(TRAIN_PATH, "PNEUMONIA/*.jpeg"))
    test_normal = glob.glob(os.path.join(TEST_PATH, "NORMAL/*.jpeg"))
    test_pneumonia = glob.glob(os.path.join(TEST_PATH, "PNEUMONIA/*.jpeg"))

    df_train = pd.DataFrame({
        'image': train_normal + train_pneumonia,
        'class': ['Normal'] * len(train_normal) + ['Pneumonia'] * len(train_pneumonia)
    })

    df_test = pd.DataFrame({
        'image': test_normal + test_pneumonia,
        'class': ['Normal'] * len(test_normal) + ['Pneumonia'] * len(test_pneumonia)
    })

    return df_train, df_test


In [6]:
# Function to plot data distribution
def plot_data_distribution(df, title):
    plt.figure(figsize=(6, 4))
    ax = sns.countplot(x='class', data=df, palette="mako")
    plt.xlabel("Class", fontsize=12)
    plt.ylabel("# of Samples", fontsize=12)
    plt.xticks([0, 1], ['Normal', 'Pneumonia'], fontsize=11)
    plt.title(title)
    for p in ax.patches:
        ax.annotate((p.get_height()), (p.get_x() + 0.30, p.get_height() + 300), fontsize=13)
    plt.show()

    plt.figure(figsize=(7, 5))
    df['class'].value_counts().plot(kind='pie', labels=['', ''], autopct='%1.1f%%', colors=['darkcyan', 'blue'], explode=[0, 0.05], textprops={"fontsize": 15})
    plt.legend(labels=['Pneumonia', 'Normal'])
    plt.title(title)
    plt.show()


In [7]:
# Function to create image data generators
def create_data_generators(df_train, df_test):
    train_datagen = ImageDataGenerator(
        rescale=1./255,
        rotation_range=30,
        width_shift_range=0.15,
        height_shift_range=0.15,
        shear_range=0.15,
        zoom_range=0.15,
        horizontal_flip=True,
        vertical_flip=True,
        fill_mode='nearest'
    )

    val_datagen = ImageDataGenerator(rescale=1./255)

    ds_test = val_datagen.flow_from_dataframe(
        df_test,
        x_col='image',
        y_col='class',
        target_size=(IMG_SIZE, IMG_SIZE),
        class_mode='binary',
        batch_size=1,
        shuffle=False
    )

    return train_datagen, val_datagen, ds_test


In [22]:
# Function to create MobileNet model
def create_mobilenet_model():
    base_model = MobileNet(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))
    base_model.trainable = False

    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.5)(x)
    predictions = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=base_model.input, outputs=predictions)
    model.compile(optimizer=Adam(lr=0.00005), loss='binary_crossentropy', metrics=['accuracy', AUC(), Precision(), Recall()])
    model.summary()

    return model


In [26]:
# Function to create DenseNet model
def create_densenet_model():
    base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))
    base_model.trainable = False

    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.5)(x)
    predictions = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=base_model.input, outputs=predictions)
    model.compile(optimizer=Adam(lr=0.00005), loss='binary_crossentropy', metrics=['accuracy', AUC(), Precision(), Recall()])
    model.summary()

    return model


In [10]:
# Function to create ResNet model
def create_resnet_model():
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))
    base_model.trainable = False

    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.5)(x)
    predictions = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=base_model.input, outputs=predictions)
    model.compile(optimizer=Adam(lr=0.00005), loss='binary_crossentropy', metrics=['accuracy', AUC(), Precision(), Recall()])
    model.summary()

    return model


In [11]:
# Function to create EfficientNet model
def create_efficientnet_model():
    base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))
    base_model.trainable = False

    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.5)(x)
    predictions = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=base_model.input, outputs=predictions)
    model.compile(optimizer=Adam(lr=0.00005), loss='binary_crossentropy', metrics=['accuracy', AUC(), Precision(), Recall()])
    model.summary()

    return model


In [12]:
# Function to create GhostNet model
def create_ghostnet_model():
    def ghost_module(inputs, in_channels, out_channels, kernel_size, stride):
        x = Conv2D(out_channels, kernel_size, strides=stride, padding='same')(inputs)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)

        ghost_output = DepthwiseConv2D(kernel_size, strides=(1, 1), padding='same')(x)
        ghost_output = BatchNormalization()(ghost_output)
        ghost_output = Activation('relu')(ghost_output)

        ghost_output = Conv2D(out_channels, kernel_size=(1, 1), strides=(1, 1), padding='same')(ghost_output)
        ghost_output = BatchNormalization()(ghost_output)
        ghost_output = Activation('relu')(ghost_output)

        concat = Add()([x, ghost_output])
        return concat

    inputs = Input(shape=(IMG_SIZE, IMG_SIZE, 3))

    x = Conv2D(16, (3, 3), strides=(2, 2), padding='same')(inputs)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = ghost_module(x, 16, 16, (3, 3), (1, 1))

    x = GlobalAveragePooling2D()(x)
    outputs = Dense(1, activation='sigmoid')(x)

    model = Model(inputs, outputs)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', AUC(), Precision(), Recall()])
    model.summary()

    return model


In [13]:
# Function to train models using K-Fold Cross-Validation
def train_model_kfold(model, df_train, train_datagen, val_datagen, epochs=5, k=10):
    fold_performance = []
    kf = KFold(n_splits=k, random_state=SEED, shuffle=True)

    for fold, (train_index, val_index) in enumerate(kf.split(df_train)):
        print(f"Training fold {fold + 1}/{k}")

        train_fold = df_train.iloc[train_index]
        val_fold = df_train.iloc[val_index]

        ds_train_fold = train_datagen.flow_from_dataframe(
            train_fold,
            x_col='image',
            y_col='class',
            target_size=(IMG_SIZE, IMG_SIZE),
            class_mode='binary',
            batch_size=BATCH,
            seed=SEED
        )

        ds_val_fold = val_datagen.flow_from_dataframe(
            val_fold,
            x_col='image',
            y_col='class',
            target_size=(IMG_SIZE, IMG_SIZE),
            class_mode='binary',
            batch_size=BATCH,
            seed=SEED
        )

        history = model.fit(
            ds_train_fold,
            validation_data=ds_val_fold,
            epochs=epochs,
            callbacks=[early_stopping, reduce_lr]
        )

        scores = model.evaluate(ds_val_fold, verbose=0)
        fold_performance.append(scores)

    average_performance = np.mean(fold_performance, axis=0)
    print("Average performance across all folds:", average_performance)

    return fold_performance


In [14]:
# Function to evaluate model on the test dataset
def evaluate_model(model, ds_test, df_test):
    score = model.evaluate(ds_test, steps=len(df_test), verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

    predictions = model.predict(ds_test, steps=len(df_test), verbose=1)
    predicted_classes = np.where(predictions > 0.5, 1, 0)
    true_classes = ds_test.classes

    report = classification_report(true_classes, predicted_classes, target_names=['Normal', 'Pneumonia'])
    print(report)

    cm = confusion_matrix(true_classes, predicted_classes)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Normal', 'Pneumonia'])
    disp.plot(cmap=plt.cm.Blues)
    plt.show()

    fpr, tpr, _ = roc_curve(true_classes, predictions)
    roc_auc = auc(fpr, tpr)
    plt.figure()
    plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='grey', linestyle='--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc='lower right')
    plt.show()

    precision, recall, _ = precision_recall_curve(true_classes, predictions)
    plt.figure()
    plt.plot(recall, precision, color='green', lw=2)
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.show()


In [None]:
# Function to implement soft voting
def soft_voting(models, ds_test, df_test):
    # Get predictions from each model
    predictions = [model.predict(ds_test, steps=len(df_test), verbose=1) for model in models]
    
    # Average predictions
    avg_predictions = np.mean(predictions, axis=0)
    predicted_classes = np.where(avg_predictions > 0.5, 1, 0)
    true_classes = ds_test.classes

    # Classification report
    report = classification_report(true_classes, predicted_classes, target_names=['Normal', 'Pneumonia'])
    print(report)

    # Confusion matrix
    cm = confusion_matrix(true_classes, predicted_classes)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Normal', 'Pneumonia'], yticklabels=['Normal', 'Pneumonia'])
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    plt.show()

    # ROC Curve
    fpr, tpr, _ = roc_curve(true_classes, avg_predictions)
    roc_auc = auc(fpr, tpr)
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='gray', lw=2, linestyle='--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc='lower right')
    plt.show()

    # Precision-Recall Curve
    precision, recall, _ = precision_recall_curve(true_classes, avg_predictions)
    plt.figure(figsize=(8, 6))
    plt.plot(recall, precision, color='green', lw=2)
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.show()

In [None]:
# Main script
if __name__ == "__main__":
    df_train, df_test = create_dataframes()
    plot_data_distribution(df_train, "Training Data Distribution")
    plot_data_distribution(df_test, "Test Data Distribution")

    train_datagen, val_datagen, ds_test = create_data_generators(df_train, df_test)

    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=2, verbose=1, min_delta=1e-4, cooldown=0, min_lr=1e-7)

    # Train and evaluate MobileNet model
    mobilenet_model = create_mobilenet_model()
    train_model_kfold(mobilenet_model, df_train, train_datagen, val_datagen)
    evaluate_model(mobilenet_model, ds_test, df_test)

    # Train and evaluate DenseNet model
    densenet_model = create_densenet_model()
    train_model_kfold(densenet_model, df_train, train_datagen, val_datagen)
    evaluate_model(densenet_model, ds_test, df_test)

    # Train and evaluate ResNet model
    resnet_model = create_resnet_model()
    train_model_kfold(resnet_model, df_train, train_datagen, val_datagen)
    evaluate_model(resnet_model, ds_test, df_test)

    # Train and evaluate EfficientNet model
    efficientnet_model = create_efficientnet_model()
    train_model_kfold(efficientnet_model, df_train, train_datagen, val_datagen)
    evaluate_model(efficientnet_model, ds_test, df_test)

    # Train and evaluate GhostNet model
    ghostnet_model = create_ghostnet_model()
    train_model_kfold(ghostnet_model, df_train, train_datagen, val_datagen)
    evaluate_model(ghostnet_model, ds_test, df_test)

     # Implement soft voting
    models = [mobilenet_model, densenet_model, resnet_model, efficientnet_model, ghostnet_model]
    soft_voting(models, ds_test, df_test)