import os
import gc
import cv2
import numpy as np
import pandas as pd
from glob import glob
import tensorflow as tf
from tensorflow import keras
from sklearn.utils import shuffle
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, models, regularizers, backend as K
from tensorflow.keras.preprocessing import image_dataset_from_directory
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score
K.clear_session()
gc.collect()

In [3]:
import zipfile

zip_path = "covid19-radiography-database.zip"
if os.path.exists(zip_path):
    with zipfile.ZipFile(zip_path, 'r') as z:
        z.extractall()

In [None]:
# Reproducibility
SEED = 42
os.environ["PYTHONHASHSEED"] = str(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# GPU memory safety (Jupyter)
gpus = tf.config.list_physical_devices("GPU")
if gpus:
    tf.config.experimental.set_memory_growth(gpus[0], True)

K.clear_session()
gc.collect()

In [5]:
data_dir = "COVID-19_Radiography_Dataset"
MODEL_DIR = "./saved_models"
os.makedirs(MODEL_DIR, exist_ok=True)
os.listdir(data_dir)
cat = ['COVID', 'Lung_Opacity', 'Normal', 'Viral Pneumonia']

In [None]:
os.listdir(os.path.join(data_dir))

In [None]:
def is_image(filename):
    image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif']
    return any(filename.lower().endswith(ext) for ext in image_extensions)

for category in cat:
    path = os.path.join(data_dir, category, 'images')
    images = [f for f in os.listdir(path) if is_image(f)]

    if not images:
        print(f"No images found in {path}")
        continue

    fig, ax = plt.subplots(1, 5, figsize=(10, 4))
    fig.suptitle(f'{category}', fontsize=18)
    for i in range(5):
        img_name = images[np.random.randint(0, len(images))]
        img_path = os.path.join(path, img_name)
        img_array = cv2.imread(img_path)
        if img_array is None:
            print(f"Error: Could not load image {img_path}")
            continue

        ax[i].imshow(cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB))
        ax[i].axis('off')
    plt.show()

In [9]:
IMG_SIZE = 128
new_array = cv2.resize(img_array, (IMG_SIZE, IMG_SIZE))

In [10]:
training_data = []
def create_train_data():
    for category in cat:
        path = os.path.join(data_dir, category, 'images')
        labels = cat.index(category)
        for img in os.listdir(path):
            try:
                img_array = cv2.imread(os.path.join(path, img))
                new_array = cv2.resize(img_array, (IMG_SIZE, IMG_SIZE))
                training_data.append([new_array, labels])

            except Exception as e:
                pass

create_train_data()

In [None]:
file_counts = {}
for category in cat:
    path = os.path.join(data_dir, category, 'images')
    files = os.listdir(path)
    file_counts[category] = len(files)

for category, count in file_counts.items():
    print(f"{category}: {count} files")

In [None]:
X = []
y = []

for features, labels in training_data:
    X.append(features)
    y.append(labels)

X = np.array(X).reshape(-1, IMG_SIZE, IMG_SIZE, 3)
y = np.array(y)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state = 42)

print(f'X_train Length : {X_train.shape[0]}, X_train Image size : {X_train.shape[1:3]}, Channel Dimension : {X_train.shape[3]}')
print(f'X_test Length : {X_test.shape[0]}, X_test Image size : {X_test.shape[1:3]}, Channel Dimension : {X_test.shape[3]}')

In [13]:
def create_patches(images, patch_size):
    patches = tf.image.extract_patches(
        images=images,
        sizes=[1, patch_size, patch_size, 1],
        strides=[1, patch_size, patch_size, 1],
        rates=[1, 1, 1, 1],
        padding='VALID'
    )

    patches = tf.reshape(patches, (patches.shape[0], -1, patch_size * patch_size * 3))
    return patches

patch_size = 8
X_train_patches = create_patches(X_train, patch_size)
X_test_patches = create_patches(X_test, patch_size)


In [None]:
cnn_input = layers.Input(shape=(128, 128, 3))

x = layers.Rescaling(1.0 / 255)(cnn_input)

x = layers.Conv2D(filters=32, kernel_size=3, padding="same", kernel_regularizer=regularizers.l2(0.01))(x)
x = layers.BatchNormalization()(x)
x = layers.Activation("relu")(x)
x = layers.MaxPooling2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Conv2D(filters=64, kernel_size=3, padding="same", kernel_regularizer=regularizers.l2(0.01))(x)
x = layers.BatchNormalization()(x)
x = layers.Activation("relu")(x)
x = layers.MaxPooling2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Conv2D(filters=128, kernel_size=3, padding="same", kernel_regularizer=regularizers.l2(0.01))(x)
x = layers.BatchNormalization()(x)
x = layers.Activation("relu")(x)
#x = se_block(x)
x = layers.MaxPooling2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Conv2D(filters=256, kernel_size=3, padding="same", kernel_regularizer=regularizers.l2(0.01))(x)
x = layers.BatchNormalization()(x)
x = layers.Activation("relu")(x)
#x = se_block(x)
x = layers.MaxPooling2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Conv2D(filters=512, kernel_size=3, padding="same", kernel_regularizer=regularizers.l2(0.01))(x)
x = layers.BatchNormalization()(x)
x = layers.Activation("relu")(x)
#x = se_block(x)
x = layers.MaxPooling2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation="relu", kernel_regularizer=regularizers.l2(0.02))(x)
cnn_branch = layers.Dropout(0.5)(x)

rnn_input = layers.Input(shape=(256, 192))
y = layers.Bidirectional(layers.LSTM(64, activation='tanh'))(rnn_input)
rnn_branch = layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.01))(y)
rnn_branch = layers.Dropout(0.3)(rnn_branch)


merged = layers.Concatenate()([cnn_branch, rnn_branch])
merged = layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.01))(merged)
output = layers.Dense(4, activation='softmax')(merged)

combined_model = models.Model(inputs=[cnn_input, rnn_input], outputs=output)
combined_model.summary()

In [15]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
combined_model.compile(loss='sparse_categorical_crossentropy', optimizer = optimizer, metrics=['accuracy'])

In [16]:
EarlyStopping = tf.keras.callbacks.EarlyStopping
ReduceLROnPlateau = tf.keras.callbacks.ReduceLROnPlateau

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=5,
    min_lr=1e-5
)

early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_checkpoint = tf.keras.callbacks.ModelCheckpoint('best_model.keras', monitor='val_accuracy', save_best_only=True)

In [17]:
import time
Callback = tf.keras.callbacks.Callback

class TimeHistory(Callback):
    def on_train_begin(self, logs=None):
        self.epoch_times = []

    def on_epoch_begin(self, epoch, logs=None):
        self.start_time = time.time()

    def on_epoch_end(self, epoch, logs=None):
        self.epoch_times.append(time.time() - self.start_time)


In [None]:
time_callback = TimeHistory()

history = combined_model.fit(
    x=[X_train, X_train_patches],
    y=y_train,
    batch_size=64,
    epochs=100,
    validation_data=([X_test, X_test_patches], y_test),
    callbacks=[ model_checkpoint, reduce_lr, time_callback]
)

In [None]:
combined_model = models.Model(inputs=[cnn_input, rnn_input], outputs=output)
combined_model.summary()

In [None]:
combined_model.load_weights("best_model.keras")
combined_model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

In [None]:
train_loss, train_acc = combined_model.evaluate(
    [X_train, X_train_patches],
    y_train,
    verbose=1
)
print("Train Accuracy:", train_acc)

test_loss, test_acc = combined_model.evaluate(
    [X_test, X_test_patches],
    y_test,
    verbose=1
)
print("Test Accuracy:", test_acc)


In [None]:
from sklearn.metrics import classification_report
import numpy as np

y_pred = combined_model.predict([X_test, X_test_patches])
y_pred_classes = np.argmax(y_pred, axis=1)

print(classification_report(
    y_test,
    y_pred_classes,
    digits=3
))

In [None]:
# Accuracy plot
y_test[:5]
pred = np.argmax(combined_model.predict([X_test, X_test_patches]), axis = -1)

pred[:5]

print(classification_report(y_test, pred, digits = 3))

plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy', color='blue')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy', color='green')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()


plt.tight_layout()
plt.show()

In [None]:
# Average training time per epoch
avg_time_per_epoch = np.mean(time_callback.epoch_times)
print(f"Average training time per epoch: {avg_time_per_epoch:.2f} s/epoch")

total_time = np.sum(time_callback.epoch_times)
print(f"Total training time for 100 epochs: {total_time:.2f} seconds")

In [None]:
# Multiclass AUC
from sklearn.metrics import roc_auc_score
from tensorflow.keras.utils import to_categorical

y_test_onehot = to_categorical(y_test, num_classes=4)

y_pred_probs = combined_model.predict([X_test, X_test_patches])

auc_macro = roc_auc_score(
    y_test_onehot,
    y_pred_probs,
    multi_class="ovr",
    average="macro"
)

auc_weighted = roc_auc_score(
    y_test_onehot,
    y_pred_probs,
    multi_class="ovr",
    average="weighted"
)

print("Macro AUC:", auc_macro)
print("Weighted AUC:", auc_weighted)

In [None]:
# Compute confusion matrix
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

y_pred = np.argmax(combined_model.predict([X_test, X_test_patches]), axis=1)

cm = confusion_matrix(y_test, y_pred)
print(cm)

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=cat)
disp.plot(cmap='Blues', xticks_rotation=45)
plt.title("Confusion Matrix")
plt.show()