<h1>What This Notebook Does</h1>

<p>
This notebook performs <strong>end-to-end training and testing of a DenseNet-121 deep learning model for Chest X-Ray Pneumonia Classification</strong>.
</p>

<p>The workflow includes:</p>

<ul>
  <li>Custom train/validation split from the dataset</li>
  <li>Extensive data augmentation for better generalization</li>
  <li>Transfer learning with fine-tuning of the final DenseNet block</li>
  <li>MixUp regularization to reduce overfitting</li>
  <li>One-Cycle Learning Rate Policy for efficient convergence</li>
  <li>Early stopping based on validation AUC</li>
</ul>

<p>
During training, the <strong>best model is saved automatically based on highest validation AUC</strong>.
</p>

<p>
At the end of the notebook, a complete evaluation on the test set is performed, including:
</p>

<ul>
  <li>AUC</li>
  <li>F1-score</li>
  <li>Classification report</li>
  <li>Confusion matrix using the optimal threshold derived from validation performance</li>
</ul>


#Importing Necessary Libraries +Device Setup





In [None]:
import os, shutil, numpy as np, torch, torch.nn as nn, torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from sklearn.metrics import roc_auc_score, f1_score

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


#Google Drive + Unzip Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

!unzip "/content/drive/MyDrive/X_ray_images/trust.zip" -d "/content/"


#Directory Setup + Train/Validation Split

---
Since the original dataset had only about abt 48 images for testing and validation set folder,I added more images in the new_test and val section for better intrepretation


In [None]:
orig_train = "/content/chest_xray/train"
test_dir = "/content/chest_xray/test"

new_train = "/content/chest_xray/new_train"
new_val = "/content/chest_xray/new_val"

for d in [new_train, new_val]:
    os.makedirs(d, exist_ok=True)
    os.makedirs(os.path.join(d, "NORMAL"), exist_ok=True)
    os.makedirs(os.path.join(d, "PNEUMONIA"), exist_ok=True)

normal_imgs = [os.path.join(orig_train, "NORMAL", f) for f in os.listdir(os.path.join(orig_train, "NORMAL"))]
pneu_imgs = [os.path.join(orig_train, "PNEUMONIA", f) for f in os.listdir(os.path.join(orig_train, "PNEUMONIA"))]

train_norm, val_norm = train_test_split(normal_imgs, test_size=0.15, random_state=42)
train_pneu, val_pneu = train_test_split(pneu_imgs, test_size=0.15, random_state=42)

def copy_list(files, dest):
    for f in files:
        shutil.copy(f, dest)

copy_list(train_norm, os.path.join(new_train, "NORMAL"))
copy_list(val_norm, os.path.join(new_val, "NORMAL"))
copy_list(train_pneu, os.path.join(new_train, "PNEUMONIA"))
copy_list(val_pneu, os.path.join(new_val, "PNEUMONIA"))

print("New train/val split complete.")


#Image Transforms

In [None]:
IMG_SIZE = 224

train_tfms = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.RandomAffine(10, translate=(0.02, 0.02)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

test_tfms = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])


#Datasets + Dataloaders

In [None]:
train_ds = datasets.ImageFolder(new_train, transform=train_tfms)
val_ds   = datasets.ImageFolder(new_val, transform=test_tfms)
test_ds  = datasets.ImageFolder(test_dir, transform=test_tfms)

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=0)
val_loader   = DataLoader(val_ds, batch_size=32, shuffle=False, num_workers=0)
test_loader  = DataLoader(test_ds, batch_size=32, shuffle=False, num_workers=0)


#Model Setup-DENSENET121

In [None]:
model = models.densenet121(weights="IMAGENET1K_V1")
numf = model.classifier.in_features
model.classifier = nn.Linear(numf, 1)
model = model.to(device)

for name, param in model.named_parameters():
    if "denseblock4" in name or "norm5" in name or "classifier" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

criterion = nn.BCEWithLogitsLoss()

optimizer = optim.Adam(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=1e-4,
    weight_decay=1e-4
)

scheduler = None


#Mixup + Training Functions

In [None]:
def mixup_data(x, y, alpha=0.2):
    lam = np.random.beta(alpha, alpha)
    idx = torch.randperm(x.size(0)).to(device)
    mixed_x = lam * x + (1-lam) * x[idx]
    return mixed_x, y, y[idx], lam

def train_epoch():
    model.train()
    total_loss = 0
    for imgs, labels in tqdm(train_loader):
        imgs = imgs.to(device)
        labels = labels.float().unsqueeze(1).to(device)

        xmix, ya, yb, lam = mixup_data(imgs, labels)

        optimizer.zero_grad()
        out = model(xmix)

        loss = lam * criterion(out, ya) + (1-lam) * criterion(out, yb)
        loss.backward()

        optimizer.step()
        scheduler.step()

        total_loss += loss.item()

    return total_loss / len(train_loader)

def evaluate(loader):
    model.eval()
    probs, trues = [], []
    with torch.no_grad():
        for imgs, labels in loader:
            imgs = imgs.to(device)
            out = model(imgs)
            p = torch.sigmoid(out).cpu().numpy()
            probs.extend(p)
            trues.extend(labels.numpy())
    probs = np.array(probs).flatten()
    trues = np.array(trues)
    auc = roc_auc_score(trues, probs)
    preds = (probs > 0.5).astype(int)
    f1 = f1_score(trues, preds)
    return auc, f1


#Training Loop & Saving Best Model

In [None]:
save_dir = "/content/saved_models"
os.makedirs(save_dir, exist_ok=True)

best_auc = 0
patience, es = 5, 0
EPOCHS = 25

scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=1e-4,
    steps_per_epoch=len(train_loader),
    epochs=EPOCHS
)

best_path = f"{save_dir}/best_model.pth"

print("\nStarting training...\n")
for epoch in range(1, EPOCHS+1):
    print(f"\nEpoch {epoch}/{EPOCHS}")

    train_loss = train_epoch()
    val_auc, val_f1 = evaluate(val_loader)

    print(f"Train Loss: {train_loss:.4f}")
    print(f"Val AUC: {val_auc:.4f} | Val F1: {val_f1:.4f}")

    if val_auc > best_auc:
        best_auc = val_auc
        torch.save(model.state_dict(), best_path)
        print(f"Saved BEST model â†’ {best_path}")
        es = 0
    else:
        es += 1
        print(f"No improvement ({es}/{patience})")

    if es >= patience:
        print("Early stopping.")
        break


#Save Final Model

In [None]:
final_path = f"{save_dir}/final_model.pth"
torch.save(model.state_dict(), final_path)

print("\nTraining complete.")
print("Best model:", best_path)
print("Final model:", final_path)


#Test Set Evaluation

In [None]:

# BLOCK: TEST SET EVALUATION

import matplotlib.pyplot as plt
def get_preds(model, loader):
    model.eval()
    probs = []
    trues = []

    with torch.no_grad():
        for imgs, labels in loader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            p = torch.sigmoid(outputs).cpu().numpy()

            probs.extend(p)
            trues.extend(labels.numpy())

    return np.array(probs).flatten(), np.array(trues)


print("\n Running TEST evaluation...")

test_probs, test_labels = get_preds(model, test_loader)

test_auc = roc_auc_score(test_labels, test_probs)

bin_preds = (test_probs > 0.5).astype(int)
test_f1 = f1_score(test_labels, bin_preds)

print("TEST SET RESULTS")
print(f"Test ROC AUC: {test_auc:.4f}")
print(f"Test F1 Score (0.5 threshold): {test_f1:.4f}")


#Finding best threshold on validation set by running threshold sweep

In [None]:

print("\n Running threshold sweep on VALIDATION SET...")

val_probs, val_labels = get_preds(model, val_loader)

thresholds = np.linspace(0, 1, 400)
best_f1 = 0
best_thresh = 0

for th in thresholds:
    preds = (val_probs > th).astype(int)
    f1 = f1_score(val_labels, preds)

    if f1 > best_f1:
        best_f1 = f1
        best_thresh = th

print("\n Best Threshold =", round(best_thresh, 4))
print(" Best F1 on VAL =", round(best_f1, 4))


#Evaluation Metrics

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
test_preds_best = (test_probs > best_thresh).astype(int)

print("\nCONFUSION MATRIX (TEST SET)")
cm = confusion_matrix(test_labels, test_preds_best)
print(cm)

print("\n CLASSIFICATION REPORT")
print(classification_report(test_labels, test_preds_best, target_names=["NORMAL", "PNEUMONIA"]))

plt.figure(figsize=(6,5))
plt.imshow(cm, cmap="Blues")
plt.title("Confusion Matrix (Test Set)")
plt.colorbar()
plt.xticks([0,1], ["NORMAL","PNEUMONIA"])
plt.yticks([0,1], ["NORMAL","PNEUMONIA"])
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()
