In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import shutil
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.applications import EfficientNetB3
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from google.colab import files

In [None]:
# DIP Preprocessing

def preprocess_image(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    enhanced = clahe.apply(gray)
    blur = cv2.GaussianBlur(enhanced, (5,5), 0)
    return blur

def segment_tb_regions(image):
    binary = cv2.adaptiveThreshold(image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                   cv2.THRESH_BINARY_INV, 11, 2)
    kernel = np.ones((5,5), np.uint8)
    closing = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel, iterations=2)
    opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel, iterations=1)
    return opening

def visualize_suspicious_regions(original, segmented):
    contours, _ = cv2.findContours(segmented, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    for contour in contours:
        area = cv2.contourArea(contour)
        if area > 100:
            x, y, w, h = cv2.boundingRect(contour)
            cv2.rectangle(original, (x, y), (x + w, y + h), (0, 255, 0), 2)
    plt.imshow(cv2.cvtColor(original, cv2.COLOR_BGR2RGB))
    plt.axis('off')
    plt.title("Suspicious Regions (DIP Visualization)")
    plt.show()

In [None]:
#dataset

def prepare_data(dataset_path, output_path="data"):
    # Define the actual class directory names in the dataset
    class_dirs = {'Normal': 'Normal Chest X-rays', 'Tuberculosis': 'TB Chest X-rays'}

    for split in ['train','val','test']:
        for class_name in ['Normal','Tuberculosis']:
            os.makedirs(f"{output_path}/{split}/{class_name}", exist_ok=True)

    for class_name, dir_name in class_dirs.items():
        target_dir = f"{dataset_path}/{dir_name}"
        print(f"Attempting to list directory: {target_dir}") # Added print statement
        files_list = os.listdir(target_dir)
        train, temp = train_test_split(files_list, test_size=0.3, random_state=42)
        val, test = train_test_split(temp, test_size=0.5, random_state=42)
        for file_list, split in [(train,'train'), (val,'val'), (test,'test')]:
            for file in file_list:
                shutil.copy2(f"{target_dir}/{file}",
                             f"{output_path}/{split}/{class_name}/{file}")

In [None]:
# model

def create_model():
    base = EfficientNetB3(weights='imagenet', include_top=False, input_shape=(224,224,3))
    base.trainable = False
    model = tf.keras.Sequential([
        base,
        layers.GlobalAveragePooling2D(),
        layers.Dropout(0.3),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model


In [None]:
#data generation

def setup_data():
    train_gen = ImageDataGenerator(rescale=1./255,
                                   rotation_range=15,
                                   zoom_range=0.1,
                                   horizontal_flip=True)
    val_gen = ImageDataGenerator(rescale=1./255)

    train_data = train_gen.flow_from_directory('data/train', target_size=(224,224),
                                               batch_size=16, class_mode='binary')
    val_data = val_gen.flow_from_directory('data/val', target_size=(224,224),
                                           batch_size=16, class_mode='binary')
    test_data = val_gen.flow_from_directory('data/test', target_size=(224,224),
                                           batch_size=16, class_mode='binary')
    return train_data, val_data, test_data

In [None]:
# training
def train_model(model, train_data, val_data):
    callbacks = [
        tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
        tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3)
    ]
    class_weight = {0: 4.85, 1: 1.0}  # handle imbalance
    history = model.fit(train_data, epochs=20, validation_data=val_data,
                        callbacks=callbacks, class_weight=class_weight)
    return history

In [None]:
# upload image for prediction

def predict_uploaded_image(model):
    uploaded = files.upload()
    for filename in uploaded.keys():
        img = cv2.imread(filename)
        preprocessed = preprocess_image(img)
        segmented = segment_tb_regions(preprocessed)
        visualize_suspicious_regions(img.copy(), segmented)

        img_array = cv2.resize(img, (224,224))
        img_array = img_array / 255.0
        img_array = np.expand_dims(img_array, axis=0)

        pred = model.predict(img_array)[0][0]
        result = "TB Positive" if pred>0.5 else "TB Negative"
        confidence = pred*100 if pred>0.5 else (1-pred)*100
        print(f"Model Prediction: {result} (Confidence: {confidence:.1f}%)")

In [None]:
# execution
dataset_path = "/content/TB_Chest_Radiography_Database/Dataset of Tuberculosis Chest X-rays Images" # Updated path to the nested directory

#prepare
prepare_data(dataset_path=dataset_path)

# setup
train_data, val_data, test_data = setup_data()

# create and train model
model = create_model()
history = train_model(model, train_data, val_data)

# upload and predict a single image
predict_uploaded_image(model)