Preprocessing....
1. Reads images and labels from the Kaggle dataset.
2. Creates a stratified subset (10% of the full dataset).
3. Splits that subset into train and test sets (90% / 10%).
4. Saves them as CSV files in the project.

In [None]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split
dataset_dir1 = '../Dataset/TB_Chest_Radiography_Database'

image_paths = []
labels = []

#function to process the directory
def process_directory(directory, label_list, path_list):
    for label in os.listdir(directory):
        label_dir = os.path.join(directory, label)
        if os.path.isdir(label_dir):
            for image_name in os.listdir(label_dir):
                if image_name.endswith('.jpg') or image_name.endswith('.png'):
                    image_path = os.path.join(label_dir, image_name)
                    path_list.append(image_path)
                    label_list.append(label)

process_directory(dataset_dir1, labels, image_paths)
data = {'Image_Path': image_paths, 'Label': labels}
df = pd.DataFrame(data)

# Calculate the subset size which is 10.00% of the total data
subset_size = int(0.1000 * len(df))

# Calculate the subset size which is 10.00% of the total data
subset_size = int(0.1000 * len(df))

# Split the data to get 10.00% of the entire dataset
_, subset_df = train_test_split(df, train_size=subset_size, stratify=df['Label'], random_state=42)

# Further split the subset into train and validation sets
train_df, test_df = train_test_split(subset_df, test_size=0.1, stratify=subset_df['Label'], random_state=42)
test_df['Label'] = "Unknown"

#saving the csv file
train_csv_path = '../dataset/train_data.csv'
test_csv_path = '../dataset/test_data.csv'

train_df.to_csv(train_csv_path, index=False)
test_df.to_csv(test_csv_path, index=False)

print("CSV files for train and validation data saved successfully!")

In [None]:
import os
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# Image preprocessing
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Pretrained models
from tensorflow.keras.applications import (
    Xception,
    DenseNet121,
    MobileNetV2,
    ResNet50V2,
    InceptionV3
)

# Layers, models, optimizers
from tensorflow.keras import layers, models, optimizers

# Splitting strategy
from sklearn.model_selection import StratifiedShuffleSplit

# Callbacks
from tensorflow.keras.callbacks import (
    EarlyStopping,
    ReduceLROnPlateau,
    ModelCheckpoint
)

# Additional layers
from tensorflow.keras.layers import (
    Dropout,
    GlobalAveragePooling2D
)

# Sequential model
from tensorflow.keras.models import Sequential

This section loads the `train_data.csv` file containing the image paths and labels for the TB chest X-ray dataset.  
A **stratified train–validation split** is performed to maintain equal class distribution across both sets.

Next, data augmentation is applied to the training images to improve model robustness, while validation images are only rescaled.  
Finally, two data generators are created to read images from disk, preprocess them to **224×224**, and feed them into the model during training.

In [None]:
# Load the dataset

df = pd.read_csv('../dataset/train_data.csv')

# Ensure we have 17 unique classes
num_classes = len(df['Label'].unique())
print(num_classes)

# Define the stratified shuffle split
split = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=42)

# Perform the split
for train_index, val_index in split.split(df, df['Label']):
    train_df = df.iloc[train_index]
    val_df = df.iloc[val_index]

# Data Augmentation
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)
val_datagen = ImageDataGenerator(rescale=1./255)

batch_size = 32
target_size = (224, 224)

train_generator = train_datagen.flow_from_dataframe(dataframe=train_df,
                                                    x_col='Image_Path',
                                                    y_col='Label',
                                                    target_size=target_size,
                                                    batch_size=batch_size,
                                                    class_mode='categorical')

validation_generator = val_datagen.flow_from_dataframe(dataframe=val_df,
                                                        x_col='Image_Path',
                                                        y_col='Label',
                                                        target_size=target_size,
                                                        batch_size=batch_size,
                                                        class_mode='categorical')


Perfroming EDA

In [None]:
# Understand the dataset
print(train_df.head())
print(train_df.shape)
print(train_df.info())
print(train_df.describe())

In [None]:
# Bar chart for label distribution
plt.figure(figsize=(30, 10))
sns.countplot(data=train_df, x='Label', order=train_df['Label'].value_counts().index)
plt.title('Distribution of Labels')
plt.xlabel('Label')
plt.ylabel('Frequency')
plt.xticks(rotation=90)  # Rotate x labels for better readability
plt.show()

# Pie chart for label distribution
plt.figure(figsize=(15, 15))
train_df['Label'].value_counts().plot(kind='pie', autopct='%1.1f%%')
plt.title('Label Distribution')
plt.ylabel('')  # Hide the y-label
plt.show()

In [None]:
# Count unique image paths
unique_paths = train_df['Image_Path'].nunique()
print(f"Number of unique image paths: {unique_paths}")

# Image paths distribution by label (top 20 paths for readability)
plt.figure(figsize=(10, 6))
sns.countplot(data=train_df, y='Image_Path', hue='Label', order=train_df['Image_Path'].value_counts().index[:20])
plt.title('Top 20 Image Paths Distribution by Label')
plt.xlabel('Frequency')
plt.ylabel('Image Path')
plt.show()

# Unique values for each categorical column
for column in train_df.select_dtypes(include=['object']).columns:
    unique_values = train_df[column].nunique()
    print(f"Column {column} has {unique_values} unique values.")

# Missing values heatmap
plt.figure(figsize=(12, 8))
sns.heatmap(train_df.isnull(), cbar=False, cmap='viridis')
plt.title('Heatmap of Missing Values')
plt.show()

# Label counts summary
label_counts = train_df['Label'].value_counts()
print(label_counts)

In [None]:
# Data Cleaning
print(train_df.isnull().sum())
train_df.fillna(method='ffill', inplace=True)
print(train_df.duplicated().sum())
train_df.drop_duplicates(inplace=True)

This function builds a transfer learning–based classification model for chest X-ray images.  
A pretrained CNN (e.g., DenseNet121, MobileNetV2, ResNet50V2) is passed as `base_model` and **kept frozen** during initial training to preserve its learned features.

On top of the base model, a custom classification head is added:

- **GlobalAveragePooling2D** to convert feature maps into a single feature vector  
- **Dense(512)** with ReLU activation  
- **Dropout(0.5)** to reduce overfitting  
- **Dense(256)** with ReLU  
- **Dropout(0.5)**  
- **Dense(num_classes)** with softmax for final class probabilities  

This architecture helps the model learn high-level TB-related patterns while avoiding overfitting on a relatively small dataset.

In [None]:
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import GlobalAveragePooling2D, Dropout
from tensorflow.keras.models import Sequential

def create_model(base_model, num_classes):
    # Freeze the pretrained backbone initially
    base_model.trainable = False  
    
    # Build the final classification model
    model = Sequential([
        base_model,
        GlobalAveragePooling2D(),
        Dense(512, activation='relu'),
        Dropout(0.5),
        Dense(256, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    return model


This function handles the complete pipeline for training and evaluating a transfer learning model

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras import optimizers
import matplotlib.pyplot as plt
import numpy as np

def train_and_evaluate(model, train_data, val_data, model_name, epochs=10):
    # Compile the model
    model.compile(
        optimizer=optimizers.Adam(learning_rate=0.0001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    # Callbacks
    early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)
    checkpoint = ModelCheckpoint(filepath=f'{model_name}_best_model.h5',
                                monitor='val_loss',
                                save_best_only=True)

    # Train the model
    history = model.fit(
        train_data,
        validation_data=val_data,
        steps_per_epoch=len(train_data),
        epochs=epochs,
        callbacks=[early_stopping, reduce_lr, checkpoint]
    )

    # Load the best model weights
    model.load_weights(f'{model_name}_best_model.h5')

    # Evaluate on validation data
    val_loss, val_accuracy = model.evaluate(val_data)
    print(f'{model_name} Validation Accuracy: {val_accuracy:.4f}')

    # Plot accuracy curves
    plt.plot(history.history['accuracy'], label='accuracy')
    plt.plot(history.history['val_accuracy'], label='val_accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.ylim([0, 1])
    plt.legend(loc='lower right')
    plt.title(f'{model_name} Accuracy')
    plt.show()

    # Confusion Matrix
    val_data.reset()
    Y_pred = model.predict(val_data)
    y_pred = np.argmax(Y_pred, axis=1)
    y_true = val_data.classes

    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                                display_labels=val_data.class_indices.keys())
    disp.plot(cmap=plt.cm.Blues)
    plt.title(f'{model_name} Confusion Matrix')
    plt.show()

    return history


This function trains multiple pretrained CNN architectures (e.g., Xception, DenseNet121, MobileNetV2) using the `create_model()` and `train_and_evaluate()` functions.

In [None]:
def model_training(base_models):
    # Directory where final (non–best) models are saved
    save_dir = 'saved_models'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    histories = {}

    # Train and evaluate each base model
    for base_model, input_shape, model_name in base_models:
        model = create_model(base_model, num_classes)

        print(f'Training {model_name}...')
        history = train_and_evaluate(model, train_generator, validation_generator, model_name, epochs=10)
        histories[model_name] = history

        # Save final model again (optional, but allowed)
        model.save(os.path.join(save_dir, f'{model_name}_saved.h5'))
        print(f'Saved {model_name} model to {save_dir}/{model_name}_saved.h5')

    return histories


Loading the base models 

In [None]:
# List of base models with their respective input shapes
base_models1 = [ #mobilenet base model
    (
        MobileNetV2(
            weights='imagenet',
            include_top=False,
            input_shape=(224, 224, 3)
        ),
        (224, 224, 3),
        'MobileNetV2'
    ),
]

# Train using the selected base model
model_training(base_models1)

# Train InceptionV3

In [None]:
base_models2 = [
    (
        InceptionV3(
            weights='imagenet',
            include_top=False,
            input_shape=(299, 299, 3)
        ),
        (299, 299, 3),
        'InceptionV3'
    )
]


model_training(base_models2)

# Train Xception model

In [None]:

base_models3 = [
    (
        Xception(
            weights='imagenet',
            include_top=False,
            input_shape=(299, 299, 3)
        ),
        (299, 299, 3),
        'Xception'
    )
]


model_training(base_models3)


# Train ResNet50V2 model

In [None]:
base_models4 = [
    (
        ResNet50V2(
            weights='imagenet',
            include_top=False,
            input_shape=(224, 224, 3)
        ),
        (224, 224, 3),
        'ResNet50V2'
    )
]


model_training(base_models4)

# Train DenseNet121

In [None]:
base_models5 = [
    (
        DenseNet121(
            weights='imagenet',
            include_top=False,
            input_shape=(224, 224, 3)
        ),
        (224, 224, 3),
        'DenseNet121'
    )
]

model_training(base_models5)

Testing unlabelled data 

In [None]:
import os
from tqdm import tqdm
import numpy as np
import pandas as pd

from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image


test_df = pd.read_csv('../dataset/test_data.csv')

# Load class indices (from train_generator used during training)
class_indices_inverse = {v: k for k, v in train_generator.class_indices.items()}

# Directory containing the saved models (relative to models/ folder)
model_dir = 'saved_models'

# Define batch size
batch_size = 32

# Function to process images in batches
def process_images_in_batches(image_paths, model, target_size, batch_size):
    num_images = len(image_paths)
    num_batches = (num_images + batch_size - 1) // batch_size  # number of batches

    all_predictions = []

    for batch_idx in range(num_batches):
        start_idx = batch_idx * batch_size
        end_idx = min((batch_idx + 1) * batch_size, num_images)
        batch_paths = image_paths[start_idx:end_idx]

        batch_images = []
        for img_path in batch_paths:
            img = image.load_img(img_path, target_size=target_size)
            img = image.img_to_array(img)
            img = img / 255.0
            batch_images.append(img)

        batch_images = np.array(batch_images)
        batch_predictions_probs = model.predict(batch_images)
        batch_predictions = np.argmax(batch_predictions_probs, axis=1)

        all_predictions.extend(batch_predictions)

    return all_predictions

# Iterate over each model file in the saved_models directory
for model_file in os.listdir(model_dir):
    if model_file.endswith('.h5'):
        # Load the model
        model_path = os.path.join(model_dir, model_file)
        model = load_model(model_path)

        # Determine target size based on model name
        if "Xception_saved" in model_file or "InceptionV3_saved" in model_file:
            target_size = (299, 299)
        else:
            target_size = (224, 224)

        # Process images in batches and make predictions
        image_paths = test_df['Image_Path'].tolist()
        prediction = process_images_in_batches(image_paths, model, target_size, batch_size)

        # Map predictions to class labels
        prediction_labels = [class_indices_inverse[label] for label in prediction]

        # Create a DataFrame with predictions
        predicted_df = pd.DataFrame({
            'Image_Path': test_df['Image_Path'],
            'Label': prediction_labels,
        })

        # Save predictions to CSV in saved_models/ folder
        csv_path = os.path.join(model_dir, f'predicted_{model_file.split(".")[0]}.csv')
        predicted_df.to_csv(csv_path, header=True, index=False)

        print(f"Predictions saved to {csv_path}")


Image label prediction and visualization (the image has been showcased)

In [None]:
import csv
import os
import matplotlib.pyplot as plt

from tensorflow.keras.preprocessing import image


filenames = [
    '../dataset/TB_Chest_Radiography_Database/Normal/Normal-1157.png',
    '../dataset/TB_Chest_Radiography_Database/Tuberculosis/Tuberculosis-499.png',
    '../dataset/TB_Chest_Radiography_Database/Normal/Normal-1922.png',
    '../dataset/TB_Chest_Radiography_Database/Tuberculosis/Tuberculosis-539.png'
]

def load_predicted_labels(csv_file_path):
    predicted_labels = {}
    with open(csv_file_path, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            predicted_labels[row['Image_Path']] = row['Label']
    return predicted_labels

def visualize_predictions(filenames, predicted_labels):
    for filename in filenames:
        img = image.load_img(filename, target_size=(224, 224))
        img_array = image.img_to_array(img)
        img_processed = img_array / 255.0  # Normalize the image

        # Check if the filename is in predicted_labels
        if filename in predicted_labels:
            predicted_class_name = predicted_labels[filename]
        else:
            print(f"Filename not found: {filename}")
            predicted_class_name = "Unknown"

        plt.figure(figsize=(2, 2))
        plt.imshow(img_processed.astype("float32"))  # Display the processed image
        plt.title(f"Prediction - {predicted_class_name}", size=12, color='red')
        plt.axis('off')
        plt.show()

# Directory containing the predicted CSV files (inside models/)
csv_dir = 'saved_models'

# Iterate over each CSV file in the directory
for csv_file in os.listdir(csv_dir):
    if csv_file.startswith('predicted_') and csv_file.endswith('.csv'):
        # Determine the model name
        model_name = csv_file.split('.')[0]

        # Path to the CSV file containing predictions for the current model
        predicted_csv_file = os.path.join(csv_dir, csv_file)

        # Load predicted labels from the CSV file
        predicted_labels = load_predicted_labels(predicted_csv_file)

        # Predict and plot images using predicted labels
        print(f"Predictions using: {model_name}")
        visualize_predictions(filenames, predicted_labels)
