In [None]:
# import system libs
import os
import time
import shutil
import pathlib
import itertools

# import data handling tools
import cv2
import numpy as np
import pandas as pd
import seaborn as sns
sns.set_style('darkgrid')
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder

# import data for deep learning
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Dense, Flatten
from keras.callbacks import LearningRateScheduler, ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from keras.utils import load_img, array_to_img, img_to_array
from tensorflow.keras.optimizers import Adam
import random
from tqdm.notebook import tqdm
from tensorflow.keras.models import load_model
from tensorflow.keras.models import Model

# Ignore Warnings
import warnings
warnings.filterwarnings("ignore")

print ('modules loaded')

In [None]:
def load_data(data_dir):
    classes = os.listdir(data_dir)
    images = []
    masks = []
    labels = []

    for class_name in classes:
        class_dir = os.path.join(data_dir, class_name)
        if os.path.isdir(class_dir):
            image_subdir = os.path.join(class_dir, 'images')
            mask_subdir = os.path.join(class_dir, 'masks')

            for filename in os.listdir(image_subdir):
                if filename.endswith(".png"):
                    # Load image
                    image_path = os.path.join(image_subdir, filename)
                    if not os.path.exists(image_path):
                        print(f"Image not found: {image_path}")
                        continue

                    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
                    image = cv2.resize(image, (128,128))
                    images.append(image)

                    # Load mask
                    mask_filename = filename
                    mask_path = os.path.join(mask_subdir, mask_filename)
                    if not os.path.exists(mask_path):
                        print(f"Mask not found: {mask_path}")
                        continue

                    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
                    mask = cv2.resize(mask, (128,128))
                    masks.append(mask)

                    # Assign label based on class name
                    labels.append(class_name)

    return np.array(images), np.array(masks), np.array(labels)

data_dir = "COVID-19_Radiography_Dataset"
images, masks, labels = load_data(data_dir)

In [None]:
print(len(images))
print(len(masks))
print(len(labels))

In [None]:
def plot_images_with_masks(images, masks, labels, num_images=10):
    # Select a random subset of indices
    indices = np.random.choice(len(images), num_images, replace=False)

    plt.figure(figsize=(15, 6 * num_images // 5))

    for i, idx in enumerate(indices, start=1):
        #Images
        plt.subplot(num_images, 6, 6 * i - 2)
        plt.imshow(images[idx])
        plt.axis('off')
        plt.title(f"Image - Label: {labels[idx]}")

        #Masks
        plt.subplot(num_images, 6, 6 * i - 1)
        plt.imshow(masks[idx], cmap='gray')
        plt.axis('off')
        plt.title("Mask")

        # Overlay mask on the image
        plt.subplot(num_images, 6, 6 * i)
        plt.imshow(images[idx])
        plt.imshow(masks[idx], cmap='jet', alpha=0.5)
        plt.axis('off')
        plt.title("Image with Mask")

    plt.tight_layout()
    plt.show()

plot_images_with_masks(images, masks, labels, num_images=10)

In [None]:
train_images, test_images, train_masks, test_masks = train_test_split(images, masks, test_size=0.2, random_state=42)
train_images = np.array(train_images).reshape(len(train_images),128,128)
test_images = np.array(test_images).reshape(len(test_images),128,128)
print(train_images.shape)
print(test_images.shape)
print(train_masks.shape)
print(test_masks.shape)

In [None]:
def conv_block(shape, out_ch, rate=1):
    x = tf.keras.layers.Conv2D(out_ch, 3, padding="same", dilation_rate=1)(shape)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)
    return x

def RSU_L(shape, out_ch, M_ch, num_layers, rate=2):
    x = conv_block(shape, out_ch)
    inp_ch = x

    skip_features = []
    x = conv_block(x, M_ch)
    skip_features.append(x)

    for i in range(num_layers-2):
        x = tf.keras.layers.MaxPool2D((2, 2))(x)
        x = conv_block(x, M_ch)
        skip_features.append(x)

    x = conv_block(x, M_ch, rate=rate)
    skip_features.reverse()
    x = tf.keras.layers.Concatenate()([x, skip_features[0]])
    x = conv_block(x, M_ch)

    for i in range(num_layers-3):
        x = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation="bilinear")(x)
        x = tf.keras.layers.Concatenate()([x, skip_features[i+1]])
        x = conv_block(x, M_ch)

    x = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation="bilinear")(x)
    x = tf.keras.layers.Concatenate()([x, skip_features[-1]])
    x = conv_block(x, out_ch)

    x = tf.keras.layers.Add()([x, inp_ch])
    return x

def RSU_4F(shape, out_ch, M_ch):
    x0 = conv_block(shape, out_ch, rate=1)

    x1 = conv_block(x0, M_ch, rate=1)
    x2 = conv_block(x1, M_ch, rate=2)
    x3 = conv_block(x2, M_ch, rate=4)

    x4 = conv_block(x3, M_ch, rate=8)

    x = tf.keras.layers.Concatenate()([x4, x3])
    x = conv_block(x, M_ch, rate=4)

    x = tf.keras.layers.Concatenate()([x, x2])
    x = conv_block(x, M_ch, rate=2)

    x = tf.keras.layers.Concatenate()([x, x1])
    x = conv_block(x, out_ch, rate=1)

    x = tf.keras.layers.Add()([x, x0])
    return x

def u2net(shape, out_ch, M_ch, num_classes=1):
    inputs = tf.keras.layers.Input(shape)

    e1 = RSU_L(inputs, out_ch[0], M_ch[0], 7)
    p1 = tf.keras.layers.MaxPool2D((2, 2))(e1)

    e2 = RSU_L(p1, out_ch[1], M_ch[1], 6)
    p2 = tf.keras.layers.MaxPool2D((2, 2))(e2)

    e3 = RSU_L(p2, out_ch[2], M_ch[2], 5)
    p3 = tf.keras.layers.MaxPool2D((2, 2))(e3)

    e4 = RSU_L(p3, out_ch[3], M_ch[3], 4)
    p4 = tf.keras.layers.MaxPool2D((2, 2))(e4)

    e5 = RSU_4F(p4, out_ch[4], M_ch[4])
    p5 = tf.keras.layers.MaxPool2D((2, 2))(e5)

    b1 = RSU_4F(p5, out_ch[5], M_ch[5])
    b2 = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation="bilinear")(b1)

    d1 = tf.keras.layers.Concatenate()([b2, e5])
    d1 = RSU_4F(d1, out_ch[6], M_ch[6])
    u1 = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation="bilinear")(d1)

    d2 = tf.keras.layers.Concatenate()([u1, e4])
    d2 = RSU_L(d2, out_ch[7], M_ch[7], 4)
    u2 = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation="bilinear")(d2)

    d3 = tf.keras.layers.Concatenate()([u2, e3])
    d3 = RSU_L(d3, out_ch[8], M_ch[8], 5)
    u3 = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation="bilinear")(d3)

    d4 = tf.keras.layers.Concatenate()([u3, e2])
    d4 = RSU_L(d4, out_ch[9], M_ch[9], 6)
    u4 = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation="bilinear")(d4)

    d5 = tf.keras.layers.Concatenate()([u4, e1])
    d5 = RSU_L(d5, out_ch[10], M_ch[10], 7)


    y1 = tf.keras.layers.Conv2D(num_classes, 3, padding="same")(d5)

    y2 = tf.keras.layers.Conv2D(num_classes, 3, padding="same")(d4)
    y2 = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation="bilinear")(y2)

    y3 = tf.keras.layers.Conv2D(num_classes, 3, padding="same")(d3)
    y3 = tf.keras.layers.UpSampling2D(size=(4, 4), interpolation="bilinear")(y3)

    y4 = tf.keras.layers.Conv2D(num_classes, 3, padding="same")(d2)
    y4 = tf.keras.layers.UpSampling2D(size=(8, 8), interpolation="bilinear")(y4)

    y5 = tf.keras.layers.Conv2D(num_classes, 3, padding="same")(d1)
    y5 = tf.keras.layers.UpSampling2D(size=(16, 16), interpolation="bilinear")(y5)

    y6 = tf.keras.layers.Conv2D(num_classes, 3, padding="same")(b1)
    y6 = tf.keras.layers.UpSampling2D(size=(32, 32), interpolation="bilinear")(y6)

    y0 = tf.keras.layers.Concatenate()([y1, y2, y3, y4, y5, y6])
    y0 = tf.keras.layers.Conv2D(num_classes, 3, padding="same")(y0)

    y0 = tf.keras.layers.Activation("sigmoid")(y0)
    y1 = tf.keras.layers.Activation("sigmoid")(y1)
    y2 = tf.keras.layers.Activation("sigmoid")(y2)
    y3 = tf.keras.layers.Activation("sigmoid")(y3)
    y4 = tf.keras.layers.Activation("sigmoid")(y4)
    y5 = tf.keras.layers.Activation("sigmoid")(y5)
    y6 = tf.keras.layers.Activation("sigmoid")(y6)

    model = tf.keras.models.Model(inputs, outputs=y0, name="U2-Net")
    return model

def build_u2net(input_shape, num_classes=1):
    out_ch = [64, 128, 256, 512, 512, 512, 512, 256, 128, 64, 64]
    M_ch = [32, 32, 64, 128, 256, 256, 256, 128, 64, 32, 16]
    model = u2net(input_shape, out_ch, M_ch, num_classes=num_classes)
    return model

def build_u2net_lite(input_shape, num_classes=1):
    out_ch = [64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64]
    M_ch = [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16]
    input_shape_with_channels = input_shape + (1,)
    model = u2net(input_shape_with_channels, out_ch, M_ch, num_classes=num_classes)
    return model

u2net_model = build_u2net_lite((128,128))
#u2net_model.summary()

In [None]:
opt = Adam(learning_rate=1e-3)
u2net_model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
def get_callbacks():
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, verbose=1)
    checkpoint = ModelCheckpoint('ChestSegmentor.hdf5',verbose=1, save_best_only= True)
    early_stop = EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True)
    return [reduce_lr, checkpoint, early_stop]

In [None]:
hsitory = u2net_model.fit(train_images, train_masks, validation_split = 0.2, batch_size = 16, epochs = 100, callbacks=get_callbacks())

In [None]:
u2net_model.load_weights("ChestSegmentor.hdf5")
def generate_masks(model, test_images):
    masks = model.predict(test_images)
    return masks

def plot_testimages_with_masks(images, masks, num_images=10):
    # Select a random subset of indices
    indices = np.random.choice(len(images), num_images, replace=False)

    plt.figure(figsize = (15, 6 * num_images // 5))

    for i, idx in enumerate(indices, start=1):
        #Images
        plt.subplot(num_images, 6, 6 * i - 2)
        plt.imshow(images[idx])
        plt.axis('off')
        plt.title("Image")

        #Masks
        plt.subplot(num_images, 6, 6 * i - 1)
        plt.imshow(masks[idx], cmap='gray')
        plt.axis('off')
        plt.title("Mask")

        # Overlay mask on the image
        plt.subplot(num_images, 6, 6 * i)
        plt.imshow(images[idx])
        plt.imshow(masks[idx], cmap='jet', alpha=0.5)
        plt.axis('off')
        plt.title("Image with Mask")

    plt.tight_layout()
    plt.show()

generated_masks = generate_masks(u2net_model, test_images)
plot_testimages_with_masks(test_images, generated_masks, num_images=10)

In [None]:
le = LabelEncoder()
encoded_labels = le.fit_transform(labels)
encoded_labels = to_categorical(encoded_labels, num_classes=4)
train_images, test_images, train_labels, test_labels = train_test_split(images, encoded_labels, test_size=0.2, random_state=42)
train_images = np.array(train_images).reshape(len(train_images),128,128)
test_images = np.array(test_images).reshape(len(test_images),128,128)
print(train_images.shape)
print(test_images.shape)
print(train_labels.shape)
print(test_labels.shape)

Regularization and Hyperparameter Tuning

In [None]:
# ... (existing code)

def build_classification_model_with_regularization(base_model, dropout_rate=0.5):
    u2net_base = Model(inputs=base_model.input, outputs=base_model.output)

    for layer in u2net_base.layers:
        layer.trainable = False

    flat_layer = Flatten()(u2net_base.output)
    dropout_layer = Dropout(dropout_rate)(flat_layer)  # Add dropout for regularization
    dense_layer = Dense(256, activation='relu')(dropout_layer)
    output_layer = Dense(4, activation='softmax')(dense_layer)

    classification_model = Model(inputs=base_model.input, outputs=output_layer)
    return classification_model

# Adjust hyperparameters
learning_rate = 1e-4
batch_size = 32
epochs = 50

# Build and compile the classification model with regularization
classification_model = build_classification_model_with_regularization(u2net_model, dropout_rate=0.5)
opt = Adam(learning_rate=learning_rate)
classification_model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

# Implement learning rate scheduling
def lr_schedule(epoch):
    if epoch < 20:
        return learning_rate
    else:
        return learning_rate * tf.math.exp(0.1 * (20 - epoch))

lr_scheduler = LearningRateScheduler(lr_schedule)

# Save the best model weights during training
checkpoint_filepath = 'classification_model.h5'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True
)

# Train the model with regularization and learning rate scheduling
history = classification_model.fit(
    train_images,
    train_labels,
    validation_split=0.2,
    batch_size=batch_size,
    epochs=epochs,
    callbacks=[model_checkpoint_callback, lr_scheduler]
)

# Evaluate the model
evaluation_result = classification_model.evaluate(test_images, test_labels)

accuracy = evaluation_result[1]
print(f'Model Accuracy on Test Data: {accuracy}')
