# Pneumonia Detection on Chest X-Rays with CNN (PyTorch)

_Complete PyTorch pipeline with training loop, accuracy/loss curves, confusion matrix, ROC, misclassified cases, and best model (.pth)._

**Task:** Binary classification (Normal vs Pneumonia)  
**Framework:** PyTorch  
**Dataset:** Chest X-Ray Images (Pneumonia) (Kermany et al., via Kaggle)


In [None]:
# Imports & Reproducibility

# General-purpose
import os
import random
import numpy as np
import pandas as pd
from tqdm import tqdm

# Visualization
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns

# Deep learning (PyTorch)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Evaluation (scikit-learn)
from sklearn.metrics import (
    classification_report, confusion_matrix, ConfusionMatrixDisplay,
    roc_curve, roc_auc_score
)

# Reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

# deterministic cuDNN for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Device:", device)

In [None]:
# Download and unzip Chest X-Ray Pneumonia dataset

# Install Kaggle API
# !pip install -q kaggle

# If running locally or in Colab, upload your kaggle.json here:
# from google.colab import files
# files.upload()  # Upload kaggle.json manually

# Or manually place kaggle.json in ~/.kaggle/
os.makedirs('/root/.kaggle', exist_ok=True)
# os.rename('kaggle.json', '/root/.kaggle/kaggle.json')  # Uncomment if using upload method
os.chmod('/root/.kaggle/kaggle.json', 600)

# Download and unzip the dataset
# !kaggle datasets download -d paultimothymooney/chest-xray-pneumonia
# !unzip -q chest-xray-pneumonia.zip -d ./pneumonia_data

# NOTE: For manual setup (e.g., GitHub), place the dataset manually in:
# ./pneumonia_data/chest_xray
# Dataset link: https://www.kaggle.com/paultimothymooney/chest-xray-pneumonia

In [None]:
# Visualize a sample image from each class

train_dir = '/content/pneumonia_data/chest_xray/train'
classes = ['NORMAL', 'PNEUMONIA']

plt.figure(figsize=(10, 5))
for i, label in enumerate(classes):
    class_dir = os.path.join(train_dir, label)
    image_file = random.choice(os.listdir(class_dir))
    image_path = os.path.join(class_dir, image_file)

    img = mpimg.imread(image_path)
    plt.subplot(1, 2, i+1)
    plt.imshow(img, cmap='gray')
    plt.title(label)
    plt.axis('off')

plt.tight_layout()
plt.show()

In [None]:
# Data Preprocessing with augmentation + internal validation split

# Image parameters
img_size = (150, 150)
batch_size = 16

# Data augmentation transforms
train_transforms = transforms.Compose([
    transforms.Resize(img_size),
    transforms.Grayscale(num_output_channels=1),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=1, fill=0),
    transforms.RandomAffine(
        degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1), shear=10, fill=0
    ),
    transforms.ColorJitter(brightness=(0.99, 1.01)),
    transforms.ToTensor(),  # scales to [0,1]
])

val_test_transforms = transforms.Compose([
    transforms.Resize(img_size),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor()
])

# Load full training folder with augmentations
full_train = datasets.ImageFolder(root=os.path.join('/content/pneumonia_data', 'chest_xray', 'train'),
                                  transform=train_transforms)

# Split into train/val (80/20) with fixed seed
dataset_size = len(full_train)
train_size = int(0.8 * dataset_size)
val_size = dataset_size - train_size
generator = torch.Generator().manual_seed(42)
train_dataset, val_dataset = torch.utils.data.random_split(
    full_train, [train_size, val_size], generator=generator
)

# Apply validation transforms to the underlying dataset for val split
val_dataset.dataset.transform = val_test_transforms  # ensure no augmentation in val

# Test dataset
test_dataset = datasets.ImageFolder(root=os.path.join('/content/pneumonia_data', 'chest_xray', 'test'),
                                    transform=val_test_transforms)

# DataLoaders
num_workers = 2
pin_memory = torch.cuda.is_available()

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,
                          drop_last=False, num_workers=num_workers, pin_memory=pin_memory)
val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False,
                          drop_last=False, num_workers=num_workers, pin_memory=pin_memory)
test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False,
                          drop_last=False, num_workers=num_workers, pin_memory=pin_memory)

print("Class to idx mapping:", full_train.class_to_idx)  # expect {'NORMAL':0, 'PNEUMONIA':1}
print("Train samples:", len(train_dataset))
print("Validation samples:", len(val_dataset))
print("Test samples:", len(test_dataset))

In [None]:
# Define CNN model (BCEWithLogitsLoss-ready)
class PneumoniaCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(128, 256, 3, padding=1), nn.BatchNorm2d(256), nn.ReLU(), nn.MaxPool2d(2),
        )
        self.classifier = nn.Sequential(
            nn.Dropout(0.8),
            nn.Flatten(),
            nn.Linear(256*9*9, 128), nn.ReLU(),
            nn.Dropout(0.8),
            nn.Linear(128, 1) # logits
        )
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

model = PneumoniaCNN()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
# print(model)

In [None]:
# Loss, Optimizer, Scheduler setup
learning_rate = 0.00025
# Use BCEWithLogitsLoss (more stable than BCE + Sigmoid inside model)
pos_weight = torch.tensor([0.77], device=device) # Adjusted to 0.77 to fine-tune balance
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.005)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.4, patience=2, min_lr=1e-6)
print('Optimizer/Loss ready.')

In [None]:
# Training loop with early stopping & checkpoint

checkpoint_path = 'best_model_pytorch.pth'
patience = 4
best_val_acc = 0.0
epochs_no_improve = 0
num_epochs = 20
history = {'accuracy': [], 'val_accuracy': [], 'loss': [], 'val_loss': []}

for epoch in range(1, num_epochs+1):
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)
        optimizer.zero_grad()
        logits = model(inputs)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        preds = (torch.sigmoid(logits) >= 0.5).float()
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    train_loss = running_loss / len(train_loader)
    train_acc = 100.0 * correct / total

    model.eval()
    vloss, vcorrect, vtotal = 0.0, 0, 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)
            logits = model(inputs)
            loss = criterion(logits, labels)
            vloss += loss.item()
            preds = (torch.sigmoid(logits) >= 0.5).float()
            vcorrect += (preds == labels).sum().item()
            vtotal += labels.size(0)

    val_loss = vloss / len(val_loader)
    val_acc = 100.0 * vcorrect / vtotal

    history['accuracy'].append(train_acc)
    history['val_accuracy'].append(val_acc)
    history['loss'].append(train_loss)
    history['val_loss'].append(val_loss)

    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']
    print(f"Learning rate now: {current_lr:.6f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        epochs_no_improve = 0
        torch.save({'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'val_acc': val_acc}, checkpoint_path)
        print(f"Epoch {epoch}: val_acc improved to {val_acc:.2f}%. Saved {checkpoint_path}.")
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print(f"Early stopping at epoch {epoch} (best val_acc={best_val_acc:.2f}%).")
            break

    print(f"Epoch [{epoch}/{num_epochs}] "
          f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% "
          f"| Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%")

print(f"Training finished. Best Val Acc: {best_val_acc:.2f}%")

# files.download(checkpoint_path)


In [None]:
# Plot training and validation accuracy and loss

acc = history['accuracy']
val_acc = history['val_accuracy']
loss = history['loss']
val_loss = history['val_loss']

epochs_range = range(len(acc))

plt.figure(figsize=(14, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Train Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.title('Training vs Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Train Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.title('Training vs Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(loc='upper right')

plt.tight_layout()
plt.show()

In [None]:
# Test evaluation

model.eval()

y_true, y_pred, y_prob = [], [], []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        logits = model(inputs)
        probs = torch.sigmoid(logits)
        preds = (probs >= 0.5).float()
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy().reshape(-1))
        y_prob.extend(probs.cpu().numpy().reshape(-1))

y_true = np.array(y_true)
y_pred = np.array(y_pred)
y_prob = np.array(y_prob)

print('Classification Report:')
print(classification_report(y_true, y_pred, target_names=['NORMAL', 'PNEUMONIA']))

cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['NORMAL', 'PNEUMONIA'])
disp.plot(cmap='Blues')
plt.title('Confusion Matrix'); plt.show()

In [None]:
# ROC & AUC

fpr, tpr, thresholds = roc_curve(y_true, y_prob)
roc_auc = roc_auc_score(y_true, y_prob)

plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0,1],[0,1], lw=2, linestyle='--')
plt.xlim([0.0, 1.0]); plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate'); plt.ylabel('True Positive Rate')
plt.title('ROC Curve'); plt.legend(loc='lower right'); plt.grid(True)
plt.show()
print('AUC =', roc_auc)

In [None]:
# Visualize misclassified test images

mis_idx = np.where(y_true != y_pred)[0]

plt.figure(figsize=(12, 6))
max_show = min(6, len(mis_idx))
for i, idx in enumerate(mis_idx[:max_show]):
    img_path = test_dataset.samples[idx][0]
    img = mpimg.imread(img_path)

    plt.subplot(2, 3, i+1)
    plt.imshow(img, cmap='gray')
    true_label = 'NORMAL' if y_true[idx] == 0 else 'PNEUMONIA'
    pred_label = 'NORMAL' if y_pred[idx] == 0 else 'PNEUMONIA'
    prob = y_prob[idx]
    plt.title(f'True: {true_label}\nPred: {pred_label} (p={prob:.2f})', color='red')
    plt.axis('off')

plt.tight_layout()
plt.show()