<a href="https://colab.research.google.com/github/TWOCHE/Data_Analytics_Tools/blob/master/comparing_CNN_Models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install opencv-contrib-python-headless
!pip install seaborn
!pip3 install mlnotify

In [None]:
import os
import shutil
from shutil import copy
from shutil import copytree, rmtree
import mlnotify

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set()

import cv2

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.inception_v3 import preprocess_input
from tensorflow.keras.applications import InceptionV3, VGG16, ResNet50
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report



In [None]:
# download dataset
!mkdir ./data

!wget --no-check-certificate \
    https://data.mendeley.com/public-files/datasets/rscbjbr9sj/files/f12eaf6d-6023-432f-acc9-80c9d7393433/file_downloaded \
    -O ./data/x-ray.zip

In [None]:
# Extract from archive

!unzip -q ./data/x-ray.zip -d ./data/

In [None]:
# Check the extracted dataset folder
!ls ./data/chest_xray/

In [None]:
# Cleanup
print('Removing unneeded folders and files...')
! rm -rf ./data/chest_xray/.DS_Store
! rm -rf ./data/chest_xray/train/.DS_Store
! rm -rf ./data/chest_xray/test/.DS_Store
! rm ./data/x-ray.zip
shutil.rmtree('./data/__MACOSX/')

In [None]:
! ls -l ./data/chest_xray

! ls -l ./data/chest_xray/train

! ls -l ./data/chest_xray/test

In [None]:

dataset_dir = './data/chest_xray/'

train_dir = os.path.join(dataset_dir, 'train')
test_dir = os.path.join(dataset_dir, 'test')

# Get a list of class names
class_names = os.listdir(train_dir)

def print_class_info(directory, dataset_type):
    print(f'Number of x-ray image classes in {dataset_type}:', len(class_names))
    print(f'{dataset_type} class names:', class_names)
    for cls in class_names:
        class_path = os.path.join(directory, cls)
        num_images = len(os.listdir(class_path))
        print(f'{cls} : {num_images}')

print_class_info(train_dir, 'train')
print("\n")
print_class_info(test_dir, 'test')

In [None]:
# let's see what the filenames look like
print(os.listdir(train_dir + '/' + 'NORMAL')[:5])
print(os.listdir(train_dir + '/' + 'PNEUMONIA')[:5])

In [None]:
# Visualize one image from each class - ['NORMAL', 'PNEUMONIA']
npic = './data/chest_xray/train/NORMAL/NORMAL2-IM-0517-0001.jpeg'
ppic = './data/chest_xray/train/PNEUMONIA/person1265_virus_2156.jpeg'

plt.figure(1, figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(cv2.imread(npic))
plt.title("NORMAL x-ray")
plt.xticks([]) , plt.yticks([])

plt.subplot(1, 2, 2)
plt.imshow(cv2.imread(ppic))
plt.title("PNEUMONIA x-ray")
plt.xticks([]) , plt.yticks([])

plt.show()

In [None]:
COUNT_NORMAL = 1349
COUNT_PNEUMONIA = 3884

initial_bias = np.log([COUNT_PNEUMONIA / COUNT_NORMAL])
print("Initial bias: {:.5f}".format(initial_bias[0]))

TRAIN_IMG_COUNT = COUNT_NORMAL + COUNT_PNEUMONIA
weight_for_0 = (1 / COUNT_NORMAL) * (TRAIN_IMG_COUNT) / 2.0
weight_for_1 = (1 / COUNT_PNEUMONIA) * (TRAIN_IMG_COUNT) / 2.0

class_weight = {0: weight_for_0, 1: weight_for_1}

print("Weight for class 0: {:.2f}".format(weight_for_0))
print("Weight for class 1: {:.2f}".format(weight_for_1))

In [None]:
# Parameters

input_size = 299
batch_size = 32
learning_rate = 0.0005
size_inner = 128
droprate = 0.2
n_epochs = 20

In [None]:
# data generatores

train_datagen = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    shear_range=10,
    zoom_range=0.1,
    horizontal_flip=True,
    validation_split=0.2     # set validation split
)

test_datagen = ImageDataGenerator(
    preprocessing_function=preprocess_input
)

# Flow training images in batches using train_datagen generator
train_ds = train_datagen.flow_from_directory(
        train_dir,
        target_size=(input_size, input_size),
        batch_size=batch_size,
        subset='training'
)

# Flow val images in batches using val_datagen generator
val_ds = train_datagen.flow_from_directory(
        train_dir,
        target_size=(input_size, input_size),
        batch_size=batch_size,
        subset='validation'
)

# Flow test images in batches using test_datagen generator
test_ds = test_datagen.flow_from_directory(
        test_dir,
        target_size=(input_size, input_size),
        batch_size=batch_size,
        shuffle=False
)

In [None]:
# Let's look at 1 of the the batches
train_batch = train_ds[0]
images, labels = list(train_batch)
print(images.shape)
print(labels.shape)

first_image = images[0]
first_image

In [None]:
first_image[:3, :3, 0]

In [None]:
train_batch = train_ds
train_images, train_labels = list(train_batch)

In [None]:
print(train_images.shape)

In [None]:
test_batch = test_ds[0]
test_images, test_labels = list(test_batch)

In [None]:

# Function to create and compile a CNN model
def create_and_evaluate_cnn_model(model_name, input_size=299, learning_rate=0.001, size_inner=512, droprate=0.2, num_classes=2):
    # Define the available CNN models
    available_models = {
        "InceptionV3": InceptionV3,
        "VGG16": VGG16,
        "ResNet50": ResNet50
    }

    # Check if the specified model_name is valid
    if model_name not in available_models:
        raise ValueError(f"Invalid model name. Supported models: {', '.join(available_models.keys())}")

    # Load the selected pre-trained model
    base_model = available_models[model_name](
        weights='imagenet',
        include_top=False,
        input_shape=(input_size, input_size, 3)
    )

    base_model.trainable = False

    # Define the custom classification head
    inputs = keras.Input(shape=(input_size, input_size, 3))
    base = base_model(inputs, training=False)
    vectors = keras.layers.GlobalAveragePooling2D()(base)

    inner = keras.layers.Dense(size_inner, activation='relu')(vectors)
    drop = keras.layers.Dropout(droprate)(inner)

    outputs = keras.layers.Dense(num_classes)(drop)

    model = keras.Model(inputs, outputs)

    # Compile the model
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
    loss = keras.losses.CategoricalCrossentropy(from_logits=True)
    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=['accuracy']
    )

    return model

In [None]:
# Function to train and evaluate a model
def train_and_evaluate_model(model_name, train_data, test_data, epochs=10, batch_size=32):
    # Create and compile the model
    model = create_and_evaluate_cnn_model(model_name)

    # Train the model
    model.fit(train_data[0], epochs=epochs, batch_size=batch_size)

    # Evaluate the model on test data
    test_images, test_labels = test_data
    predictions = model.predict(test_images)
    predicted_labels = np.argmax(predictions, axis=1)

    true_labels = np.argmax(test_labels, axis=1)

    accuracy = accuracy_score(true_labels, predicted_labels)
    confusion = confusion_matrix(true_labels, predicted_labels)
    report = classification_report(true_labels, predicted_labels)

    return accuracy, confusion, report

# Specify your model names, input data, and hyperparameters
model_names = ["InceptionV3", "VGG16", "ResNet50"]
input_size = 299
lr_schedule = 0.001
size_inner = 512
droprate = 0.2

# Create an empty dictionary to store the models and their results
models = {}

# Iterate through each model
for model_name in model_names:
    # Train and evaluate the model, and store the results in a dictionary
    accuracy, confusion, report = train_and_evaluate_model(
        model_name,
        (train_images, train_labels),
        (test_images, test_labels),
        epochs=10,
        batch_size=32
    )

    models[model_name] = {
        "model": model_name,
        "accuracy": accuracy,
        "confusion_matrix": confusion.tolist(),  # Convert the NumPy array to a list
        "classification_report": report
    }

# Now 'models' contains the evaluated models and their results



In [None]:
models

In [None]:
# Checkpoint callback
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(
    "xray_model.h5",
    save_best_only=True,
    monitor='val_accuracy',
    mode='max'
)

# Defining early stopping to prevent overfitting
early_stopping_cb = tf.keras.callbacks.EarlyStopping(
    monitor = 'val_loss',
    mode = 'auto',
    min_delta = 0,
    patience = 2,
    verbose = 2,
    restore_best_weights = True
)

In [None]:
# Exponential learning rate decay
initial_learning_rate = 0.015
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate, decay_steps=100000, decay_rate=0.96, staircase=True
)

In [None]:
# predict on test data
predictions=[]
for model_name in model_names:
  test_ds.reset()
  preds = model_name.predict(test_ds, verbose=2)
  predicted_class_indices=np.argmax(preds,axis=1)
  labels = (train_ds.class_indices)
  labels = dict((v,k) for k,v in labels.items())
  predictions = [labels[k] for k in predicted_class_indices]

In [None]:
filenames=test_ds.filenames
results=pd.DataFrame({"Filename":filenames,
                      "Predictions":predictions})

results[:-3]

In [None]:
for model_name in model_names:
  path = './data/chest_xray/test/PNEUMONIA/person104_bacteria_492.jpeg'
  img = load_img(path, target_size=(299, 299))
  x = tf.keras.preprocessing.image.img_to_array(img)
  X = np.array([x])  # Convert a single image to a batch.
  X = preprocess_input(X)  # Use the preprocessing function for InceptionV3
  prediction = model_name.predict(X)[0].flatten()
  prediction = (prediction - np.min(prediction)) / np.ptp(prediction)
  print({class_names[i]: float(prediction[i]) for i in range(2)})
  print("Prediction completed for ",model_name)