In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, Subset
from PIL import Image
import numpy as np
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from transformers import SwinModel, SwinConfig

# Custom Dataset Class
class CovidPneumoniaDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['covid', 'pneumonia']
        self.images = []
        self.labels = []
        
        for label, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            if os.path.exists(class_dir):
                for img_name in os.listdir(class_dir):
                    img_path = os.path.join(class_dir, img_name)
                    self.images.append(img_path)
                    self.labels.append(label)
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]
        
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
            
        return image, label

# Custom Model with Swin Tiny Transformer
class SwinBinaryClassifier(nn.Module):
    def __init__(self):
        super(SwinBinaryClassifier, self).__init__()
        self.swin = SwinModel.from_pretrained('microsoft/swin-tiny-patch4-window7-224')
        self.head = nn.Sequential(
            nn.Linear(768, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 2)
        )
    
    def forward_features(self, x):
        return self.swin(x).last_hidden_state[:, 0, :]
    
    def forward(self, x):
        features = self.forward_features(x)
        return self.head(features)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Data transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                        std=[0.229, 0.224, 0.225])
])

2025-04-19 12:12:44.481926: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1745064764.660214      31 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1745064764.710553      31 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
# Load full dataset
data_dir = "/kaggle/input/covid-pneumonia-lus-images/covid_pneumonia"  # Replace with your dataset path containing covid/ and pneumonia/ folders
full_dataset = CovidPneumoniaDataset(root_dir=data_dir, transform=transform)

In [3]:
import random
# Step 1: Count samples per class
labels = np.array(full_dataset.labels)  # Assuming labels are 0 (COVID) and 1 (pneumonia)
covid_indices = np.where(labels == 0)[0]  # Indices of COVID samples
pneumonia_indices = np.where(labels == 1)[0]  # Indices of pneumonia samples
n_covid = len(covid_indices)
n_pneumonia = len(pneumonia_indices)

print(f"Original: COVID samples = {n_covid}, Pneumonia samples = {n_pneumonia}")

# Step 2: Calculate target number of pneumonia samples for 10:1 imbalance
target_pneumonia = n_covid // 10  # Integer division to get ~1/10th of COVID samples

if target_pneumonia == 0:
    raise ValueError("Too few COVID samples to achieve 10:1 imbalance. Increase dataset size or adjust ratio.")

# Step 3: Undersample pneumonia to achieve 10:1 ratio
if n_pneumonia > target_pneumonia:
    # Randomly select target_pneumonia samples from pneumonia
    pneumonia_indices = random.sample(list(pneumonia_indices), target_pneumonia)
else:
    # If there are fewer pneumonia samples, use all of them and warn about imbalance
    print(f"Warning: Only {n_pneumonia} pneumonia samples available, less than {target_pneumonia} needed for 10:1 ratio.")
    target_pneumonia = n_pneumonia

# Step 4: Combine indices for the imbalanced dataset
selected_indices = list(covid_indices) + list(pneumonia_indices)
random.shuffle(selected_indices)  # Shuffle to avoid ordering bias
imbalanced_dataset = Subset(full_dataset, selected_indices)

# Update labels for the imbalanced dataset
imbalanced_labels = labels[selected_indices]
print(f"Imbalanced: COVID samples = {len(covid_indices)}, Pneumonia samples = {len(pneumonia_indices)} (Ratio ~ {len(covid_indices)/len(pneumonia_indices):.2f}:1)")

Original: COVID samples = 524, Pneumonia samples = 463
Imbalanced: COVID samples = 524, Pneumonia samples = 52 (Ratio ~ 10.08:1)


In [6]:
# Perform train-test split
train_idx, test_idx = train_test_split(
    range(len(imbalanced_dataset)),
    test_size=0.2,  # 20% for test set
    stratify=imbalanced_labels,  # Maintain 10:1 class distribution
    random_state=42
)

train_dataset = Subset(imbalanced_dataset, train_idx)
test_dataset = Subset(imbalanced_dataset, test_idx)

# Cross-validation parameters
n_splits = 5
num_epochs = 5
final_num_epochs = 10
batch_size = 16
kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)

# Training and validation loop for hyperparameter tuning
fold_val_accs = []
fold_val_f1s = []

for fold, (train_idx_cv, val_idx_cv) in enumerate(kfold.split(range(len(train_dataset)))):
    print(f'\nFold {fold + 1}/{n_splits}')
    
    # Create train and validation subsets from training data
    train_subset = Subset(train_dataset, train_idx_cv)
    val_subset = Subset(train_dataset, val_idx_cv)
    
    train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)
    
    model = SwinBinaryClassifier().to(device)
    optimizer = optim.Adam(model.parameters(), lr=2e-5)
    # criterion = nn.CrossEntropyLoss()
    # Calculate class weights (inverse frequency or custom)
    class_counts = np.bincount(imbalanced_labels)
    class_weights = 1.0 / class_counts
    class_weights = class_weights / class_weights.sum()  # Normalize
    class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
    
    # Update criterion in both CV and final training
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)
    
    # Training loop
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        train_preds, train_labels = [], []
        
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            train_preds.extend(predicted.cpu().numpy())
            train_labels.extend(labels.cpu().numpy())
        
        train_acc = accuracy_score(train_labels, train_preds) * 100
        avg_loss = running_loss / len(train_loader)
        
        # Validation
        model.eval()
        val_preds, val_labels = [], []
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                val_preds.extend(predicted.cpu().numpy())
                val_labels.extend(labels.cpu().numpy())
        
        val_acc = accuracy_score(val_labels, val_preds) * 100
        val_f1 = f1_score(val_labels, val_preds, average='binary')
        
        print(f'Epoch {epoch+1}/{num_epochs}:')
        print(f'Train Loss: {avg_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Val Acc: {val_acc:.2f}%, Val F1: {val_f1:.4f}')
        
        scheduler.step()
    
    fold_val_accs.append(val_acc)
    fold_val_f1s.append(val_f1)

# Cross-validation results
print('\nCross-validation Results:')
print(f'Average Val Accuracy: {np.mean(fold_val_accs):.2f}% (±{np.std(fold_val_accs):.2f})')
print(f'Average Val F1 Score: {np.mean(fold_val_f1s):.4f} (±{np.std(fold_val_f1s):.4f})')

# Train final model on full training set
print('\nTraining final model on full training set...')
final_train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
final_model = SwinBinaryClassifier().to(device)
optimizer = optim.Adam(final_model.parameters(), lr=2e-5)
criterion = nn.CrossEntropyLoss()
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)

for epoch in range(final_num_epochs):
    final_model.train()
    running_loss = 0.0
    train_preds, train_labels = [], []
    
    for images, labels in final_train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = final_model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        train_preds.extend(predicted.cpu().numpy())
        train_labels.extend(labels.cpu().numpy())
    
    train_acc = accuracy_score(train_labels, train_preds) * 100
    avg_loss = running_loss / len(final_train_loader)
    print(f'Epoch {epoch+1}/{num_epochs}: Train Loss: {avg_loss:.4f}, Train Acc: {train_acc:.2f}%')
    scheduler.step()

# Evaluate on test set
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
final_model.eval()
test_preds, test_labels = [], []

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = final_model(images)
        _, predicted = torch.max(outputs, 1)
        test_preds.extend(predicted.cpu().numpy())
        test_labels.extend(labels.cpu().numpy())

test_acc = accuracy_score(test_labels, test_preds) * 100
test_f1 = f1_score(test_labels, test_preds, average='binary')
cm = confusion_matrix(test_labels, test_preds)

print('\nFinal Test Set Results:')
print(f'Test Accuracy: {test_acc:.2f}%')
print(f'Test F1 Score: {test_f1:.4f}')
print('Confusion Matrix:')
print(cm)
print("Training and evaluation complete!")


Fold 1/5
Epoch 1/5:
Train Loss: 0.5753, Train Acc: 69.29%
Val Acc: 91.30%, Val F1: 0.5556
Epoch 2/5:
Train Loss: 0.2888, Train Acc: 91.03%
Val Acc: 94.57%, Val F1: 0.6667
Epoch 3/5:
Train Loss: 0.2194, Train Acc: 92.12%
Val Acc: 94.57%, Val F1: 0.6667
Epoch 4/5:
Train Loss: 0.1819, Train Acc: 91.85%
Val Acc: 94.57%, Val F1: 0.6667
Epoch 5/5:
Train Loss: 0.1750, Train Acc: 91.03%
Val Acc: 94.57%, Val F1: 0.6667

Fold 2/5
Epoch 1/5:
Train Loss: 0.5393, Train Acc: 83.42%
Val Acc: 94.57%, Val F1: 0.7368
Epoch 2/5:
Train Loss: 0.3152, Train Acc: 92.12%
Val Acc: 88.04%, Val F1: 0.5926
Epoch 3/5:
Train Loss: 0.2752, Train Acc: 89.40%
Val Acc: 95.65%, Val F1: 0.8000
Epoch 4/5:
Train Loss: 0.2070, Train Acc: 94.29%
Val Acc: 94.57%, Val F1: 0.7619
Epoch 5/5:
Train Loss: 0.2003, Train Acc: 93.48%
Val Acc: 94.57%, Val F1: 0.7619

Fold 3/5
Epoch 1/5:
Train Loss: 0.5788, Train Acc: 89.13%
Val Acc: 93.48%, Val F1: 0.7500
Epoch 2/5:
Train Loss: 0.3290, Train Acc: 90.49%
Val Acc: 89.13%, Val F1: 0.666

In [7]:
from sklearn.metrics import confusion_matrix, classification_report
labels = ['covid', 'pneumonia']
class_report = classification_report(test_labels, test_preds, target_names=labels)
print(class_report)

              precision    recall  f1-score   support

       covid       0.92      1.00      0.96       106
   pneumonia       1.00      0.10      0.18        10

    accuracy                           0.92       116
   macro avg       0.96      0.55      0.57       116
weighted avg       0.93      0.92      0.89       116

