<a href="https://colab.research.google.com/github/10710arnav/Noesis/blob/main/Aryan%20Basnet%2C%20Arnav%20Maharjan%20and%20Ashila%20A%20M%20Ardiyansyah/01_dataset1_HIC_pneumonia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **NOTE ON RUNTIME AND OUTPUTS**

# Google Colab may have disconnected or reset the runtime during long training sessions, crashes, or memory interruptions. When this occurred, some previously displayed outputs in the notebook were no longer visible. However, all results remained saved and logged correctly. Each model’s complete metrics and metadata were stored as JSON files in my Google Drive folder:

# [https://drive.google.com/drive/folders/1ejlJaZhHEBm-1khLBJ--mbG2pg5TZHoJ?usp=sharing](https://drive.google.com/drive/folders/1ejlJaZhHEBm-1khLBJ--mbG2pg5TZHoJ?usp=sharing)

# These JSON files contain the full and reliable outputs for all models across all datasets, even if certain notebook outputs were lost due to runtime resets.

# ==============================
# SETUP: Freeze all package versions
# ==============================
Ensure reproducibility by installing the exact versions of packages used in these notebooks. This includes pre-installed packages in Colab.

The packages and versions used are:

- numpy==1.25.2
- pandas==2.1.1
- matplotlib==3.8.0
- seaborn==0.12.2
- scikit-learn==1.3.2
- tensorflow==2.15.0
- keras==2.15.0
- scipy==1.11.2
- opencv-python==4.9.0.73
- Pillow==10.0.1
- h5py==3.9.0
- google-colab==2.0.0

In [None]:
# ==========================================
# CHEST X-RAY CLASSIFICATION - DATASET 1
# ==========================================
# stating metadata for tracking dataset identity

# STEP 1: METADATA
# defining dataset properties and labels
DATASET_NAME = "dataset1_chest_xray_pneumonia"
COUNTRY_INCOME_LEVEL = "HIC"  # High Income Country
DATASET_SOURCE = "https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia"
NUM_CLASSES = 2  # using two-class setup: Healthy vs Pneumonia

# printing dataset info for traceability
print(f"Dataset: {DATASET_NAME}")
print(f"Income Level: {COUNTRY_INCOME_LEVEL}")
print(f"Classes: Healthy, Pneumonia (No TB in this dataset)")

# STEP 2: MOUNT DRIVE
# mounting Google Drive
from google.colab import drive
drive.mount('/content/drive')

# creating directory for storing results
!mkdir -p /content/drive/MyDrive/xray_research_results

# STEP 3: IMPORT DATASET
# importing required packages
import kagglehub
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import f1_score, confusion_matrix
import json
import time
import gc
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

# downloading dataset from Kaggle
print("Downloading dataset...")
path = kagglehub.dataset_download("paultimothymooney/chest-xray-pneumonia")
print("Path to dataset files:", path)

# printing dataset structure for inspection
print("\nDataset structure:")
for root, dirs, files in os.walk(path):
    level = root.replace(path, '').count(os.sep)
    indent = ' ' * 2 * level
    print(f"{indent}{os.path.basename(root)}/")
    subindent = ' ' * 2 * (level + 1)
    for file in files[:5]:  # showing first few files only
        print(f"{subindent}{file}")
    if len(files) > 5:
        print(f"{subindent}... and {len(files)-5} more files")

# STEP 4: SETUP DATA PREPROCESSING
# defining preprocessing hyperparameters
IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS = 20

# defining class names and mapping
CLASS_NAMES = ['NORMAL', 'PNEUMONIA']
CLASS_MAPPING = {
    'NORMAL': 0,
    'PNEUMONIA': 1
}

# creating training augmentation generator
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=15,
    zoom_range=0.1,
    brightness_range=[0.9, 1.1],
    horizontal_flip=True,
    validation_split=0.15
)

# creating test generator without augmentation
test_datagen = ImageDataGenerator(
    rescale=1./255
)

# defining dataset directory paths
train_dir = os.path.join(path, 'chest_xray', 'train')
test_dir = os.path.join(path, 'chest_xray', 'test')
val_dir = os.path.join(path, 'chest_xray', 'val')

# checking dataset folder availability
print("\nChecking directories:")
print(f"Train dir exists: {os.path.exists(train_dir)}")
print(f"Test dir exists: {os.path.exists(test_dir)}")
print(f"Val dir exists: {os.path.exists(val_dir)}")

# creating training generator
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical' if NUM_CLASSES > 2 else 'binary',
    subset='training',
    shuffle=True
)

# creating validation generator
validation_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical' if NUM_CLASSES > 2 else 'binary',
    subset='validation',
    shuffle=False
)

# creating test generator
test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical' if NUM_CLASSES > 2 else 'binary',
    shuffle=False
)

# printing sample counts and discovered classes
print(f"\nTraining samples: {train_generator.samples}")
print(f"Validation samples: {validation_generator.samples}")
print(f"Test samples: {test_generator.samples}")
print(f"Classes found: {train_generator.class_indices}")

# STEP 5: DEFINE MODEL ARCHITECTURES

# defining simple baseline CNN
def create_baseline_cnn(input_shape=(224, 224, 3), num_classes=2):
    """Simple CNN baseline"""
    model = keras.Sequential([
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(1 if num_classes == 2 else num_classes,
                    activation='sigmoid' if num_classes == 2 else 'softmax')
    ])
    return model

# defining transfer learning constructor
def create_transfer_model(base_model_name, input_shape=(224, 224, 3), num_classes=2):
    """Create transfer learning model"""
    # loading chosen pretrained backbone
    if base_model_name == 'MobileNetV2':
        base = tf.keras.applications.MobileNetV2(
            input_shape=input_shape, include_top=False, weights='imagenet')
    elif base_model_name == 'EfficientNetB0':
        base = tf.keras.applications.EfficientNetB0(
            input_shape=input_shape, include_top=False, weights='imagenet')
    elif base_model_name == 'ResNet50':
        base = tf.keras.applications.ResNet50(
            input_shape=input_shape, include_top=False, weights='imagenet')
    elif base_model_name == 'Xception':
        base = tf.keras.applications.Xception(
            input_shape=input_shape, include_top=False, weights='imagenet')
    elif base_model_name == 'InceptionV3':
        base = tf.keras.applications.InceptionV3(
            input_shape=input_shape, include_top=False, weights='imagenet')
    else:
        raise ValueError(f"Unknown model: {base_model_name}")

    # freezing pretrained layers
    base.trainable = False

    # adding classification head
    inputs = keras.Input(shape=input_shape)
    x = base(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(1 if num_classes == 2 else num_classes,
                          activation='sigmoid' if num_classes == 2 else 'softmax')(x)

    model = keras.Model(inputs, outputs)
    return model

# STEP 6: TRAINING AND EVALUATION FUNCTION

# defining routine for training and evaluating models
def train_and_evaluate(model, model_name, train_gen, val_gen, test_gen):
    """Train model and return results"""
    print(f"\n{'='*50}")
    print(f"Training {model_name}")
    print(f"{'='*50}")

    # compiling model
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy' if NUM_CLASSES == 2 else 'categorical_crossentropy',
        metrics=['accuracy']
    )

    # configuring early stopping
    early_stop = keras.callbacks.EarlyStopping(
        monitor='val_loss', patience=5, restore_best_weights=True
    )

    # training model and tracking time
    start_time = time.time()
    history = model.fit(
        train_gen,
        epochs=EPOCHS,
        validation_data=val_gen,
        callbacks=[early_stop],
        verbose=1
    )
    training_time = (time.time() - start_time) / 60

    # evaluating on test set
    print("\nEvaluating on test set...")
    test_loss, test_acc = model.evaluate(test_gen, verbose=0)

    # generating predictions
    predictions = model.predict(test_gen)
    if NUM_CLASSES == 2:
        y_pred = (predictions > 0.5).astype(int).flatten()
        y_true = test_gen.classes
    else:
        y_pred = np.argmax(predictions, axis=1)
        y_true = test_gen.classes

    # computing F1 metrics
    if NUM_CLASSES == 2:
        f1_per_class = [
            f1_score(y_true == 0, y_pred == 0),
            f1_score(y_true == 1, y_pred == 1)
        ]
        f1_weighted = f1_score(y_true, y_pred, average='weighted')
    else:
        f1_per_class = f1_score(y_true, y_pred, average=None).tolist()
        f1_weighted = f1_score(y_true, y_pred, average='weighted')

    # computing confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # preparing results for export
    results = {
        'dataset_name': DATASET_NAME,
        'country_income': COUNTRY_INCOME_LEVEL,
        'model_name': model_name,
        'num_classes': NUM_CLASSES,
        'class_names': CLASS_NAMES,
        'f1_per_class': f1_per_class,
        'f1_weighted': float(f1_weighted),
        'confusion_matrix': cm.tolist(),
        'training_time_minutes': float(training_time),
        'num_images_train': train_gen.samples,
        'num_images_val': val_gen.samples,
        'num_images_test': test_gen.samples,
        'num_parameters': int(model.count_params()),
        'test_accuracy': float(test_acc),
        'epochs_trained': len(history.history['loss'])
    }

    # printing results summary
    print(f"\n{model_name} Results:")
    print(f"F1 Scores per class: {f1_per_class}")
    print(f"Weighted F1 Score: {f1_weighted:.4f}")
    print(f"Training time: {training_time:.2f} minutes")
    print(f"Parameters: {model.count_params():,}")

    # saving results to JSON
    filename = f'/content/drive/MyDrive/xray_research_results/{DATASET_NAME}_{COUNTRY_INCOME_LEVEL}_{model_name}_results.json'
    with open(filename, 'w') as f:
        json.dump(results, f, indent=2)
    print(f"Results saved to: {filename}")

    # clearing model from memory
    del model
    tf.keras.backend.clear_session()
    gc.collect()

    return results

# STEP 7: TRAIN ALL MODELS

# storing results for all runs
all_results = []

# training baseline CNN
model = create_baseline_cnn(num_classes=NUM_CLASSES)
results = train_and_evaluate(model, 'BaselineCNN', train_generator, validation_generator, test_generator)
all_results.append(results)

# training MobileNetV2
model = create_transfer_model('MobileNetV2', num_classes=NUM_CLASSES)
results = train_and_evaluate(model, 'MobileNetV2', train_generator, validation_generator, test_generator)
all_results.append(results)

# training EfficientNetB0
model = create_transfer_model('EfficientNetB0', num_classes=NUM_CLASSES)
results = train_and_evaluate(model, 'EfficientNetB0', train_generator, validation_generator, test_generator)
all_results.append(results)

# training ResNet50
model = create_transfer_model('ResNet50', num_classes=NUM_CLASSES)
results = train_and_evaluate(model, 'ResNet50', train_generator, validation_generator, test_generator)
all_results.append(results)

# Optional models (decided not to do these models since the 4 models were more than sufficient for a project of this scale)
# model = create_transfer_model('Xception', num_classes=NUM_CLASSES)
# results = train_and_evaluate(model, 'Xception', train_generator, validation_generator, test_generator)
# all_results.append(results)

# model = create_transfer_model('InceptionV3', num_classes=NUM_CLASSES)
# results = train_and_evaluate(model, 'InceptionV3', train_generator, validation_generator, test_generator)
# all_results.append(results)

# STEP 8: SUMMARY
# printing final summary of all models
print("\n" + "="*60)
print("TRAINING COMPLETE - SUMMARY")
print("="*60)

for result in all_results:
    print(f"\n{result['model_name']}:")
    print(f"  Weighted F1: {result['f1_weighted']:.4f}")
    print(f"  Training Time: {result['training_time_minutes']:.2f} min")
    print(f"  Parameters: {result['num_parameters']:,}")

print(f"\nAll results saved to: /content/drive/MyDrive/xray_research_results/")

Dataset: dataset1_chest_xray_pneumonia
Income Level: HIC
Classes: Healthy, Pneumonia (No TB in this dataset)
Mounted at /content/drive
Downloading dataset...
Using Colab cache for faster access to the 'chest-xray-pneumonia' dataset.
Path to dataset files: /kaggle/input/chest-xray-pneumonia

Dataset structure:
chest-xray-pneumonia/
  chest_xray/
    chest_xray/
      .DS_Store
      val/
        .DS_Store
        PNEUMONIA/
          person1947_bacteria_4876.jpeg
          person1946_bacteria_4875.jpeg
          person1952_bacteria_4883.jpeg
          person1954_bacteria_4886.jpeg
          person1951_bacteria_4882.jpeg
          ... and 4 more files
        NORMAL/
          NORMAL2-IM-1431-0001.jpeg
          NORMAL2-IM-1440-0001.jpeg
          NORMAL2-IM-1442-0001.jpeg
          NORMAL2-IM-1427-0001.jpeg
          NORMAL2-IM-1430-0001.jpeg
          ... and 4 more files
      test/
        .DS_Store
        PNEUMONIA/
          person1676_virus_2892.jpeg
          person1650_virus_28