In [30]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

import tensorflow as tf
import numpy as np
import os
import cv2
import concurrent.futures
from keras.models import Sequential
from keras.layers import Dense, Conv2D, MaxPooling2D, Flatten, Dropout, BatchNormalization
from keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import classification_report, confusion_matrix
from keras.callbacks import ReduceLROnPlateau, EarlyStopping
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.metrics import Precision, Recall


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
classes = [ 'NORMAL', 'PNEUMONIA']
img_size = 120

# Function to preprocess a single image
def process_image(image_path, class_num):
    try:
        img_arr = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        if img_arr is None:
            raise ValueError(f"Image not loaded properly: {image_path}")
        resized_arr = cv2.resize(img_arr, (img_size, img_size))
        return resized_arr, class_num
    except Exception as e:
        print(e)
        return None

In [3]:
# Function to load and preprocess the data using parallel processing
def get_data(data_dir):
    data = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        for c in classes:
            path = os.path.join(data_dir, c)
            class_num = classes.index(c)
            for img in os.listdir(path):
                futures.append(executor.submit(process_image, os.path.join(path, img), class_num))

        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            if result:
                data.append(result)
    return data

In [4]:
data_dir = '/content/drive/My Drive/chest_xray'
train_data = get_data(os.path.join(data_dir, 'train'))
test_data = get_data(os.path.join(data_dir, 'test'))
val_data = get_data(os.path.join(data_dir, 'val'))

In [5]:
# Separate features and labels
x_train, y_train = zip(*train_data)
x_test, y_test = zip(*test_data)
x_val, y_val = zip(*val_data)

# Convert data to numpy arrays and normalize
x_train = np.array(x_train) / 255.0
x_test = np.array(x_test) / 255.0
x_val = np.array(x_val) / 255.0

# Convert labels to numpy arrays
y_train = np.array(y_train)
y_test = np.array(y_test)
y_val = np.array(y_val)

# Reshape data for deep learning
x_train = x_train.reshape(-1, img_size, img_size, 1)
x_test = x_test.reshape(-1, img_size, img_size, 1)
x_val = x_val.reshape(-1, img_size, img_size, 1)

In [14]:
# Define data augmentation generator
train_datagen = ImageDataGenerator(
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)
train_datagen.fit(x_train)

In [25]:
# Compute class weights
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
class_weights = dict(enumerate(class_weights))

In [8]:
# Build the enhanced CNN model
model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(img_size, img_size, 1)))
model.add(BatchNormalization())
model.add(MaxPooling2D((2, 2)))

model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(1, activation='sigmoid'))

In [9]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [40]:
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=2, min_lr=0.0001)
early_stopping = EarlyStopping(monitor='val_loss', patience=1, restore_best_weights=True)

In [41]:
history = model.fit(
    train_datagen.flow(x_train, y_train, batch_size=64),
    steps_per_epoch=len(x_train) / 64,
    epochs=5,
    validation_data=(x_val, y_val),
    class_weight=class_weights,
    callbacks=[early_stopping, reduce_lr]
)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [42]:
print("Loss: " , model.evaluate(x_test,y_test)[0])
print("Accuracy:" , model.evaluate(x_test,y_test)[1]*100 , "%")

Loss:  0.6416131854057312
Accuracy: 84.29487347602844 %
