In [None]:
pip install pytorch_lightning

Collecting pytorch_lightning
  Downloading pytorch_lightning-2.5.1.post0-py3-none-any.whl.metadata (20 kB)
Collecting torchmetrics>=0.7.0 (from pytorch_lightning)
  Downloading torchmetrics-1.7.1-py3-none-any.whl.metadata (21 kB)
Collecting lightning-utilities>=0.10.0 (from pytorch_lightning)
  Downloading lightning_utilities-0.14.3-py3-none-any.whl.metadata (5.6 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=2.1.0->pytorch_lightning)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=2.1.0->pytorch_lightning)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=2.1.0->pytorch_lightning)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=2.1.0->pytorch_lightning)
  Dow

In [None]:
pip install torchmetrics



In [None]:
"""
Pneumonia Classification with ResNet18
=====================================
This script builds and trains a ResNet18-based model to classify chest X-rays for pneumonia detection.
"""

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
import torchmetrics
from PIL import Image
from tqdm.auto import tqdm
from torchvision.datasets import ImageFolder

print("PyTorch version:", torch.__version__)
print("Torchvision version:", torchvision.__version__)

# Set random seed for reproducibility
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

# Check for GPU availability
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Create directories for the dataset
data_dir = './data/chest_xray'
os.makedirs(data_dir, exist_ok=True)

# For this example, we'll use a synthetic dataset with random noise
# In a real scenario, you would use real X-ray images
def create_synthetic_dataset(data_dir, num_normal=100, num_pneumonia=100):
    """Create a synthetic dataset of 'chest X-rays' for demonstration"""
    # Create directories
    train_normal_dir = os.path.join(data_dir, 'train', 'NORMAL')
    train_pneumonia_dir = os.path.join(data_dir, 'train', 'PNEUMONIA')
    val_normal_dir = os.path.join(data_dir, 'val', 'NORMAL')
    val_pneumonia_dir = os.path.join(data_dir, 'val', 'PNEUMONIA')

    for directory in [train_normal_dir, train_pneumonia_dir, val_normal_dir, val_pneumonia_dir]:
        os.makedirs(directory, exist_ok=True)

    # Generate synthetic normal images (more uniform)
    for i in range(int(num_normal * 0.8)):  # 80% for training
        img = np.random.normal(0.5, 0.1, (224, 224))  # Gaussian noise with higher mean
        img = np.clip(img, 0, 1)  # Clip values to be between 0 and 1
        img = (img * 255).astype(np.uint8)
        Image.fromarray(img).save(os.path.join(train_normal_dir, f'normal_{i}.png'))

    for i in range(int(num_normal * 0.2)):  # 20% for validation
        img = np.random.normal(0.5, 0.1, (224, 224))
        img = np.clip(img, 0, 1)
        img = (img * 255).astype(np.uint8)
        Image.fromarray(img).save(os.path.join(val_normal_dir, f'normal_{i}.png'))

    # Generate synthetic pneumonia images (more textured with random patches)
    for i in range(int(num_pneumonia * 0.8)):  # 80% for training
        # Base image
        img = np.random.normal(0.4, 0.1, (224, 224))

        # Add random patches to simulate pneumonia opacity
        num_patches = np.random.randint(3, 8)
        for _ in range(num_patches):
            x = np.random.randint(20, 200)
            y = np.random.randint(20, 200)
            size = np.random.randint(20, 60)
            intensity = np.random.uniform(0.7, 0.9)  # Brighter patch

            # Create a circular patch
            for i_x in range(max(0, x-size), min(224, x+size)):
                for i_y in range(max(0, y-size), min(224, y+size)):
                    if (i_x - x)**2 + (i_y - y)**2 < size**2:
                        img[i_y, i_x] = intensity

        img = np.clip(img, 0, 1)
        img = (img * 255).astype(np.uint8)
        Image.fromarray(img).save(os.path.join(train_pneumonia_dir, f'pneumonia_{i}.png'))

    for i in range(int(num_pneumonia * 0.2)):  # 20% for validation
        # Base image
        img = np.random.normal(0.4, 0.1, (224, 224))

        # Add random patches
        num_patches = np.random.randint(3, 8)
        for _ in range(num_patches):
            x = np.random.randint(20, 200)
            y = np.random.randint(20, 200)
            size = np.random.randint(20, 60)
            intensity = np.random.uniform(0.7, 0.9)

            # Create a circular patch
            for i_x in range(max(0, x-size), min(224, x+size)):
                for i_y in range(max(0, y-size), min(224, y+size)):
                    if (i_x - x)**2 + (i_y - y)**2 < size**2:
                        img[i_y, i_x] = intensity

        img = np.clip(img, 0, 1)
        img = (img * 255).astype(np.uint8)
        Image.fromarray(img).save(os.path.join(val_pneumonia_dir, f'pneumonia_{i}.png'))

    print(f"Created synthetic dataset with {num_normal} normal and {num_pneumonia} pneumonia X-rays")

    # Calculate dataset statistics
    mean_val = 0.5  # Approximate mean for our synthetic data
    std_val = 0.2   # Approximate std for our synthetic data

    return mean_val, std_val

# Create the synthetic dataset
print("Creating synthetic chest X-ray dataset...")
mean_val, std_val = create_synthetic_dataset(data_dir)

# Define transformations
train_transforms = transforms.Compose([
    transforms.Grayscale(),
    transforms.ToTensor(),
    transforms.Normalize(mean_val, std_val),
    transforms.RandomAffine(
        degrees=(-5, 5),
        translate=(0, 0.05),
        scale=(0.9, 1.1)
    ),
    transforms.RandomResizedCrop((224, 224), scale=(0.8, 1.0))
])

val_transforms = transforms.Compose([
    transforms.Grayscale(),
    transforms.ToTensor(),
    transforms.Normalize(mean_val, std_val)
])

# Create datasets
train_dataset = ImageFolder(
    os.path.join(data_dir, 'train'),
    transform=train_transforms
)

val_dataset = ImageFolder(
    os.path.join(data_dir, 'val'),
    transform=val_transforms
)

# Display dataset information
print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")

# Display class distribution
train_counts = np.bincount(train_dataset.targets)
val_counts = np.bincount(val_dataset.targets)
print(f"Training class distribution: {train_counts}")
print(f"Validation class distribution: {val_counts}")

# Create data loaders
batch_size = 32
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=2
)

val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=2
)

# Define the PyTorch Lightning model
class XrayClassifier(pl.LightningModule):
    def __init__(self, class_weight=1.0):
        super().__init__()

        # Load the pre-trained ResNet18
        self.model = torchvision.models.resnet18(weights='IMAGENET1K_V1')

        # Modify the first layer to accept grayscale images
        self.model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)

        # Modify the final layer for binary classification
        self.model.fc = nn.Linear(512, 1)

        # Set up loss function and metrics
        self.criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([class_weight]))
        self.train_accuracy = torchmetrics.Accuracy(task="binary")
        self.val_accuracy = torchmetrics.Accuracy(task="binary")

        # Add F1 Score and AUROC metrics
        self.val_f1 = torchmetrics.F1Score(task="binary")
        self.val_auroc = torchmetrics.AUROC(task="binary")

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y = y.float()
        logits = self(x).squeeze(1)
        loss = self.criterion(logits, y)

        # Log metrics
        self.log("train_loss", loss)
        probs = torch.sigmoid(logits)
        self.log("train_acc_step", self.train_accuracy(probs, y.int()))

        return loss

    def on_train_epoch_end(self):
        self.log("train_acc_epoch", self.train_accuracy.compute())

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y = y.float()
        logits = self(x).squeeze(1)
        loss = self.criterion(logits, y)

        # Log metrics
        self.log("val_loss", loss)
        probs = torch.sigmoid(logits)
        self.log("val_acc_step", self.val_accuracy(probs, y.int()))
        self.log("val_f1", self.val_f1(probs, y.int()))
        self.log("val_auroc", self.val_auroc(probs, y.int()))

        return loss

    def on_validation_epoch_end(self):
        self.log("val_acc_epoch", self.val_accuracy.compute())
        self.log("val_f1_epoch", self.val_f1.compute())
        self.log("val_auroc_epoch", self.val_auroc.compute())

    def configure_optimizers(self):
        # Use Adam optimizer with a learning rate scheduler
        optimizer = optim.Adam(self.parameters(), lr=1e-4)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer,
            mode='max',
            factor=0.5,
            patience=2,
            verbose=True
        )
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "monitor": "val_acc_epoch",
                "interval": "epoch"
            }
        }

# Create the model - compute class weight based on dataset imbalance
weight = train_counts[0] / train_counts[1] if len(train_counts) > 1 else 1.0
model = XrayClassifier(class_weight=weight)
print(f"Using class weight: {weight}")

# Create callbacks
checkpoint_callback = ModelCheckpoint(
    monitor='val_acc_epoch',
    save_top_k=3,
    mode='max',
    filename='pneumonia-{epoch:02d}-{val_acc_epoch:.3f}'
)

# Create trainer
trainer = pl.Trainer(
    max_epochs=10,
    callbacks=[checkpoint_callback],
    log_every_n_steps=10,
    accelerator='auto',  # Use GPU if available
)

# Train the model
print("Starting training...")
trainer.fit(model, train_loader, val_loader)

# Print best model path
print(f"Best model path: {checkpoint_callback.best_model_path}")
print(f"Best validation accuracy: {checkpoint_callback.best_model_score:.4f}")

# Load the best model for evaluation
best_model = XrayClassifier.load_from_checkpoint(checkpoint_callback.best_model_path)
best_model.eval()
best_model.to(device)

# Evaluate on validation set
val_preds = []
val_labels = []

with torch.no_grad():
    for x, y in tqdm(val_loader, desc="Evaluating"):
        x = x.to(device)
        logits = best_model(x).squeeze(1)
        preds = torch.sigmoid(logits).cpu().numpy()
        val_preds.extend(preds)
        val_labels.extend(y.numpy())

val_preds = np.array(val_preds)
val_labels = np.array(val_labels)

# Convert probabilities to binary predictions
val_pred_binary = (val_preds > 0.5).astype(int)

# Calculate metrics
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc

# Confusion matrix
cm = confusion_matrix(val_labels, val_pred_binary)
plt.figure(figsize=(8, 6))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
classes = ['Normal', 'Pneumonia']
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)

thresh = cm.max() / 2.
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, str(cm[i, j]),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.savefig('confusion_matrix.png')
plt.close()

# Classification report
print("\nClassification Report:")
print(classification_report(val_labels, val_pred_binary, target_names=classes))

# ROC curve and AUC
fpr, tpr, _ = roc_curve(val_labels, val_preds)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.grid(alpha=0.3)
plt.savefig('roc_curve.png')
plt.close()

# Visualize some predictions
plt.figure(figsize=(15, 10))
indices = np.random.choice(len(val_dataset), 8, replace=False)

for i, idx in enumerate(indices):
    img, label = val_dataset[idx]
    img_tensor = img.unsqueeze(0).to(device)

    with torch.no_grad():
        logit = best_model(img_tensor).item()
        prob = torch.sigmoid(torch.tensor(logit)).item()

    plt.subplot(2, 4, i+1)
    plt.imshow(img[0], cmap='bone')
    predicted = "Pneumonia" if prob > 0.5 else "Normal"
    true_label = "Pneumonia" if label == 1 else "Normal"
    color = "green" if predicted == true_label else "red"
    plt.title(f"Pred: {predicted} ({prob:.2f})\nTrue: {true_label}", color=color)
    plt.axis('off')

plt.tight_layout()
plt.savefig('predictions.png')
plt.close()

# Implement Grad-CAM for explainability
class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None

        # Register hooks
        target_layer.register_forward_hook(self.save_activation)
        target_layer.register_full_backward_hook(self.save_gradient)

    def save_activation(self, module, input, output):
        self.activations = output.detach()

    def save_gradient(self, module, grad_input, grad_output):
        self.gradients = grad_output[0].detach()

    def __call__(self, x, class_idx=None):
        # Forward pass
        x = x.to(device)
        self.model.zero_grad()
        output = self.model(x).squeeze()

        # If class_idx is None, use the predicted class
        if class_idx is None:
            class_idx = torch.sigmoid(output) > 0.5

        # Backward pass
        output.backward()

        # Get weights
        gradients = self.gradients
        activations = self.activations

        # Global average pooling
        weights = torch.mean(gradients, dim=(2, 3))

        # Create CAM
        batch_size, channels, height, width = activations.shape
        cam = torch.zeros((batch_size, height, width), dtype=torch.float32, device=device)

        for i in range(batch_size):
            # Weighted sum of activation maps
            for j in range(channels):
                cam[i] += weights[i, j] * activations[i, j]

            # ReLU
            cam[i] = torch.maximum(cam[i], torch.tensor(0.0, device=device))

            # Normalize
            if torch.max(cam[i]) > 0:
                cam[i] = cam[i] / torch.max(cam[i])

        return cam

# Create Grad-CAM visualizations
# Get the last convolutional layer in ResNet18
target_layer = best_model.model.layer4[-1].conv2

# Initialize Grad-CAM
grad_cam = GradCAM(best_model, target_layer)

# Visualize some examples with Grad-CAM
plt.figure(figsize=(15, 10))
indices = np.random.choice(len(val_dataset), 4, replace=False)

for i, idx in enumerate(indices):
    img, label = val_dataset[idx]
    img_tensor = img.unsqueeze(0).to(device)

    # Get model prediction
    with torch.no_grad():
        logit = best_model(img_tensor).item()
        prob = torch.sigmoid(torch.tensor(logit)).item()

    # Get Grad-CAM
    # We need to run this with gradients enabled
    cam = grad_cam(img_tensor)
    cam = cam[0].cpu().numpy()

    # Original image
    plt.subplot(2, 4, i+1)
    plt.imshow(img[0].cpu(), cmap='bone')
    true_label = "Pneumonia" if label == 1 else "Normal"
    plt.title(f"Original: {true_label}\nPred: {prob:.2f}")
    plt.axis('off')

    # CAM overlay
    plt.subplot(2, 4, i+5)
    plt.imshow(img[0].cpu(), cmap='bone')
    plt.imshow(cam, cmap='jet', alpha=0.5)
    plt.title(f"Grad-CAM Visualization")
    plt.axis('off')

plt.tight_layout()
plt.savefig('gradcam.png')
plt.close()

print("Analysis complete!")


PyTorch version: 2.6.0+cu124
Torchvision version: 0.21.0+cu124
Using device: cpu
Creating synthetic chest X-ray dataset...
Created synthetic dataset with 100 normal and 100 pneumonia X-rays
Training samples: 160
Validation samples: 40
Training class distribution: [80 80]
Validation class distribution: [20 20]


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:01<00:00, 36.3MB/s]
INFO:pytorch_lightning.utilities.rank_zero:GPU available: False, used: False
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


Using class weight: 1.0
Starting training...


INFO:pytorch_lightning.callbacks.model_summary:
  | Name           | Type              | Params | Mode 
-------------------------------------------------------------
0 | model          | ResNet            | 11.2 M | train
1 | criterion      | BCEWithLogitsLoss | 0      | train
2 | train_accuracy | BinaryAccuracy    | 0      | train
3 | val_accuracy   | BinaryAccuracy    | 0      | train
4 | val_f1         | BinaryF1Score     | 0      | train
5 | val_auroc      | BinaryAUROC       | 0      | train
-------------------------------------------------------------
11.2 M    Trainable params
0         Non-trainable params
11.2 M    Total params
44.683    Total estimated model params size (MB)
73        Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

/usr/local/lib/python3.11/dist-packages/pytorch_lightning/loops/fit_loop.py:310: The number of training batches (5) is smaller than the logging interval Trainer(log_every_n_steps=10). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=10` reached.


Best model path: /content/lightning_logs/version_0/checkpoints/pneumonia-epoch=09-val_acc_epoch=0.859.ckpt
Best validation accuracy: 0.8591


Evaluating:   0%|          | 0/2 [00:00<?, ?it/s]


Classification Report:
              precision    recall  f1-score   support

      Normal       1.00      1.00      1.00        20
   Pneumonia       1.00      1.00      1.00        20

    accuracy                           1.00        40
   macro avg       1.00      1.00      1.00        40
weighted avg       1.00      1.00      1.00        40

Analysis complete!
