In [None]:
!pip install timm

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torchvision import models
from torch.utils.data import DataLoader
from sklearn.metrics import confusion_matrix, classification_report, roc_auc_score, f1_score,accuracy_score, roc_curve
import matplotlib.pyplot as plt
import numpy as np
import os
import time
import copy
from tqdm import tqdm
import timm
import seaborn as sns

In [None]:
train_transformer = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomRotation(degrees=10),
    transforms.RandomApply([transforms.ColorJitter(brightness=0.1, contrast=0.1)], p=0.3),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

val_transformer = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])


In [None]:
import os
from PIL import Image

dataset_root = "/content/Final_dataset" 

valid_exts = ['.jpg', '.jpeg', '.png', '.bmp']

bad_images = []

def is_valid_image(path):
    try:
        with Image.open(path) as img:
            if img.size != (224, 224):
                return False, "Wrong size: " + str(img.size)
            if img.mode not in ['RGB', 'L']:
                return False, "Unsupported mode: " + img.mode
            return True, None
    except Exception as e:
        return False, f"Unreadable: {str(e)}"

for root, dirs, files in os.walk(dataset_root):
    for file in files:
        if os.path.splitext(file)[1].lower() in valid_exts:
            filepath = os.path.join(root, file)
            valid, reason = is_valid_image(filepath)
            if not valid:
                bad_images.append((filepath, reason))

print(f"\nScan complete. Total bad images: {len(bad_images)}")
for path, reason in bad_images:
    print(f"{path} — {reason}")

In [None]:
train_data = datasets.ImageFolder('/content/Final_dataset/train', transform=train_transformer)
val_data   = datasets.ImageFolder('/content/Final_dataset/val', transform=val_transformer)
test_data  = datasets.ImageFolder('/content/Final_dataset/test', transform=val_transformer)

# Data Loader
batch_size = 16
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_data, batch_size=batch_size, shuffle=False)

In [None]:
# 1. CBAM Attention Module
class CBAMBlock(nn.Module):
    def __init__(self, channels, reduction=16, kernel_size=7):
        super().__init__()
        self.channel_attention = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels, channels // reduction, 1, bias=False),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels // reduction, channels, 1, bias=False),
            nn.Sigmoid()
        )

        self.spatial_attention = nn.Sequential(
            nn.Conv2d(2, 1, kernel_size, padding=kernel_size // 2, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        ca = self.channel_attention(x) * x
        avg_out = torch.mean(ca, dim=1, keepdim=True)
        max_out, _ = torch.max(ca, dim=1, keepdim=True)
        sa = self.spatial_attention(torch.cat([avg_out, max_out], dim=1)) * ca
        return sa


# 2. Replace SEBlock with CBAM in ResidualBlock
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, downsample=False,dropout=0.2):
        super().__init__()
        stride = 2 if downsample else 1

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.cbam = CBAMBlock(out_channels)  # ✅ CBAM
        self.silu = nn.SiLU()

        self.downsample = nn.Sequential()
        if downsample or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        identity = self.downsample(x)
        out = self.silu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = self.cbam(out)
        out = self.dropout(out)
        out += identity
        return self.silu(out)

# ImprovedPneumoniaCNN with SE, SiLU, and Dropout
class ImprovedPneumoniaCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.stem = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.SiLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.layer1 = self._make_layer(64, 64, num_blocks=3, downsample=False)
        self.layer2 = self._make_layer(64, 128, num_blocks=4, downsample=True)
        self.layer3 = self._make_layer(128, 256, num_blocks=6, downsample=True)
        self.layer4 = self._make_layer(256, 512, num_blocks=3, downsample=True)

        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(0.4)            
        self.fc = nn.Linear(512, 1)               

    def _make_layer(self, in_channels, out_channels, num_blocks, downsample):
        layers = [ResidualBlock(in_channels, out_channels, downsample=downsample)]
        for _ in range(1, num_blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.stem(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.global_avg_pool(x)
        x = torch.flatten(x, 1)
        x = self.dropout(x)       
        x = self.fc(x)
        return x

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, downsample=False):
        super(ResidualBlock, self).__init__()
        stride = 2 if downsample else 1

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.skip = nn.Sequential()
        if downsample or in_channels != out_channels:
            self.skip = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.skip(x)
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        return F.relu(out + identity)


class DeepResNet(nn.Module):
    def __init__(self, num_classes=2):
        super(DeepResNet, self).__init__()

        self.layer0 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.layer1 = self._make_layer(64, 64, num_blocks=3)
        self.layer2 = self._make_layer(64, 128, num_blocks=4, downsample=True)
        self.layer3 = self._make_layer(128, 256, num_blocks=6, downsample=True)
        self.layer4 = self._make_layer(256, 512, num_blocks=6, downsample=True)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, 1)

    def _make_layer(self, in_channels, out_channels, num_blocks, downsample=False):
        layers = [ResidualBlock(in_channels, out_channels, downsample)]
        for _ in range(1, num_blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

In [None]:
class EfficientNetB0(nn.Module):
    def __init__(self):
        super(EfficientNetB0, self).__init__()
        self.backbone = timm.create_model('efficientnet_b0', pretrained=True)

        # Adjust for grayscale input (1-channel)
        self.backbone.conv_stem = nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1, bias=False)

        # Adjust classifier for binary output
        in_features = self.backbone.classifier.in_features
        self.backbone.classifier = nn.Linear(in_features, 1)

    def forward(self, x):
        return self.backbone(x)

In [None]:
# Training Parameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImprovedPneumoniaCNN().to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
num_epochs = 30
patience = 5
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=3, verbose=True)

In [None]:
def evaluate_model(model, dataloader, criterion, device='cuda'):
    model.eval()
    y_true, y_pred_probs, y_pred_labels = [], [], []
    total_loss = 0.0

    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device).float().unsqueeze(1)

            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * images.size(0)

            probs = torch.sigmoid(outputs)
            preds = (probs > 0.5).float()

            y_true.extend(labels.cpu().numpy())
            y_pred_probs.extend(probs.cpu().numpy())
            y_pred_labels.extend(preds.cpu().numpy())

    # Flatten
    y_true = np.array(y_true).flatten()
    y_pred_probs = np.array(y_pred_probs).flatten()
    y_pred_labels = np.array(y_pred_labels).flatten()

    # Metrics
    avg_loss = total_loss / len(dataloader.dataset)
    auc = roc_auc_score(y_true, y_pred_probs)
    f1 = f1_score(y_true, y_pred_labels)
    acc = accuracy_score(y_true, y_pred_labels)
    cm = confusion_matrix(y_true, y_pred_labels)
    report = classification_report(y_true, y_pred_labels, target_names=["Normal", "Pneumonia"])

    # Print Metrics
    print("\nEvaluation Metrics:")
    print(f"Loss: {avg_loss:.4f}")
    print(f"AUC: {auc:.4f}")
    print(f"F1-score: {f1:.4f}")
    print(f"Accuracy: {acc:.4f}")
    print("\nConfusion Matrix:\n", cm)
    print("\nClassification Report:\n", report)

    # Visuals
    plt.figure(figsize=(14, 5))

    # Confusion Matrix
    plt.subplot(1, 2, 1)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=["Normal", "Pneumonia"], yticklabels=["Normal", "Pneumonia"])
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(f'Confusion Matrix\nAccuracy: {acc:.2f} | F1: {f1:.2f}')

    # ROC Curve
    fpr, tpr, _ = roc_curve(y_true, y_pred_probs)
    plt.subplot(1, 2, 2)
    plt.plot(fpr, tpr, label=f"AUC = {auc:.4f}", color='darkorange')
    plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend()

    plt.tight_layout()
    plt.show()

    return {
        "loss": avg_loss,
        "auc": auc,
        "f1": f1,
        "accuracy": acc,
        "confusion_matrix": cm,
        "report": report
    }


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize all models
all_models = {
    "ImprovedPneumoniaCNN": ImprovedPneumoniaCNN().to(device),
    "DeepResNet": DeepResNet().to(device),
    "EfficientNetB0": EfficientNetB0().to(device),
}

# Corresponding weight file paths 
model_weights = {
    "ImprovedPneumoniaCNN": "/content/ImprovedPneumoniaCNN.pth",
    "DeepResNet": "/content/DeepResNet.pth",
    "EfficientNetB0": "/content/EfficientNetB0.pth",
}

# Store results
model_results = {}

for name, model in all_models.items():
    print(f"\n🔍 Evaluating {name}...")
    
    # Load saved weights
    model.load_state_dict(torch.load(model_weights[name], map_location=device))
    model.eval()

    # Evaluate
    results = evaluate_model(model, test_loader, criterion, device=device)
    model_results[name] = results


In [None]:
def evaluate_ensemble(y_true, y_probs, y_preds):
    print("\nEnsemble Evaluation:")
    print(f"AUC: {roc_auc_score(y_true, y_probs):.4f}")
    print(f"F1 Score: {f1_score(y_true, y_preds):.4f}")
    print(f"Accuracy: {accuracy_score(y_true, y_preds):.4f}")
    
    cm = confusion_matrix(y_true, y_preds)
    print("\nConfusion Matrix:\n", cm)

    print("\nClassification Report:\n")
    print(classification_report(y_true, y_preds, target_names=["Normal", "Pneumonia"]))

    # Confusion Matrix
    plt.figure(figsize=(5,4))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Normal", "Pneumonia"], yticklabels=["Normal", "Pneumonia"])
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    plt.show()

    # ROC Curve
    fpr, tpr, _ = roc_curve(y_true, y_probs)
    plt.figure(figsize=(6,5))
    plt.plot(fpr, tpr, color='darkorange', label=f"AUC = {roc_auc_score(y_true, y_probs):.4f}")
    plt.plot([0, 1], [0, 1], color='navy', linestyle='--')
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("ROC Curve - Ensemble")
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.tight_layout()
    plt.show()


In [None]:
def ensemble_predict(models, dataloader, device='cuda'):
    for model in models:
        model.eval()
    
    y_true, y_pred_probs, y_pred_labels = [], [], []

    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device).float().unsqueeze(1)

            # Get predictions from all models
            probs = []
            for model in models:
                outputs = model(images)
                prob = torch.sigmoid(outputs)
                probs.append(prob)

            # Average predictions across models
            avg_prob = torch.stack(probs).mean(0)
            preds = (avg_prob > 0.5).float()

            y_true.extend(labels.cpu().numpy())
            y_pred_probs.extend(avg_prob.cpu().numpy())
            y_pred_labels.extend(preds.cpu().numpy())

    return np.array(y_true).flatten(), np.array(y_pred_probs).flatten(), np.array(y_pred_labels).flatten()


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate models
models = [
    DeepResNet().to(device),
    ImprovedPneumoniaCNN().to(device),
    EfficientNetB0().to(device),
]

# Load weights
weights = [
    "DeepResNet.pth",
    "ImprovedPneumoniaCNN.pth",
    "EfficientNetB0.pth",
]

for model, path in zip(models, weights):
    model.load_state_dict(torch.load(path, map_location=device),strict=False)
    model.eval()

# Predict using ensemble
y_true, y_probs, y_preds = ensemble_predict(models, test_loader, device=device)

# Evaluate and visualize
evaluate_ensemble(y_true, y_probs, y_preds)