# Pneumonia Detecion with ResNet50

## Dataset

In [10]:
import torch
# data handling
from torch.utils.data import DataLoader, Dataset
import pandas as pd
from PIL import Image

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Custom Dataset
class ChestXrayDataset(Dataset):
    def __init__(self, df:pd.DataFrame, transform=None):
        self.df = df
        self.transform = transform
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        img_path = self.df.iloc[idx]['path']
        label = self.df.iloc[idx]['label']
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, label
    


In [2]:
# Organize data paths and labels
import os
import pandas as pd
from sklearn.model_selection import train_test_split

def resolve_data_path(data_dir: str):
    normal_dir = os.path.join(data_dir, 'NORMAL')
    pneumonia_dir = os.path.join(data_dir, 'PNEUMONIA')

    normal_paths = [os.path.join(data_dir, 'NORMAL', f) for f in os.listdir(normal_dir) if f.endswith(('.jpeg', '.jpg', '.png'))]
    pneumonia_paths = [os.path.join(data_dir, 'PNEUMONIA', f) for f in os.listdir(pneumonia_dir) if f.endswith(('.jpeg', '.jpg', '.png'))]
    pathes = normal_paths + pneumonia_paths
    labels = [0]*len(normal_paths) + [1]*len(pneumonia_paths)
    return pd.DataFrame({'path': pathes, 'label': labels})

# Base dir
train_df = resolve_data_path("datasets/all_data")
test_df = resolve_data_path("datasets/test")

train_df, val_df = train_test_split(train_df, test_size=0.2, stratify=train_df['label'], random_state=42)

# compute class weights
class_counts = train_df['label'].value_counts().to_dict()
total_samples = len(train_df)
class_weights = {cls: total_samples/count for cls, count in class_counts.items()}
weights = [class_weights[label] for label in train_df['label']]
# ouptput class weights
print("Class Weights:", class_weights)

print(train_df.head(20))

Class Weights: {1: 1.3473921442369607, 0: 3.8785912882298423}
                                                   path  label
3566  datasets/all_data/PNEUMONIA/person1483_bacteri...      1
2866  datasets/all_data/PNEUMONIA/person591_bacteria...      1
2681  datasets/all_data/PNEUMONIA/person361_virus_74...      1
1199  datasets/all_data/NORMAL/NORMAL2-IM-1438-0001....      0
4619  datasets/all_data/PNEUMONIA/person450_bacteria...      1
1008         datasets/all_data/NORMAL/IM-0349-0001.jpeg      0
1295  datasets/all_data/NORMAL/NORMAL2-IM-1261-0001....      0
2688  datasets/all_data/PNEUMONIA/person994_virus_16...      1
3142  datasets/all_data/PNEUMONIA/person1098_bacteri...      1
82    datasets/all_data/NORMAL/NORMAL2-IM-0898-0001....      0
4042  datasets/all_data/PNEUMONIA/person67_bacteria_...      1
5037  datasets/all_data/PNEUMONIA/person886_virus_15...      1
1980  datasets/all_data/PNEUMONIA/person1617_bacteri...      1
4688  datasets/all_data/PNEUMONIA/person333_bacteria... 

In [3]:
# compute the mean and std of images in the training set
import cv2
import tqdm
import numpy as np

def compute_mean_std(df: pd.DataFrame):
    sum_ = 0.0
    sum_squared = 0.0
    num_pixels = 0

    for idx in tqdm.tqdm(range(len(df))[:10]):
        img_path = df.iloc[idx]['path']
        image = cv2.imread(img_path)
        image = image.astype('float32') / 255.0  # Normalize to [0, 1]
        # compute sum and sum of squares for each channel
        sum_ += image.sum(axis=(0, 1))
        sum_squared += (image ** 2).sum(axis=(0, 1))
        num_pixels += image.shape[0] * image.shape[1]

    mean = sum_ / num_pixels
    std = np.sqrt((sum_squared / num_pixels) - (mean ** 2))
    return mean, std
mean, std = compute_mean_std(train_df)
print(f"Mean: {mean}, Std: {std}")

100%|██████████| 10/10 [00:00<00:00, 29.60it/s]

Mean: [0.47240412 0.47240412 0.47240412], Std: [0.25500405 0.25500405 0.25500405]





In [13]:
# Define transforms, dataloaders
from torchvision import transforms

train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    # Normalize with ImageNet stats, beacause resnet is pretrained on ImageNet
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

## Model Training With Cross Validation

In [6]:
from sklearn.metrics import recall_score, f1_score, roc_auc_score

def train_epoch(model, dataloader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / total
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

def eval_epoch(model, dataloader, criterion):
    model.eval()
    
    all_preds, all_trues, all_probs = [], [], []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            # q: why [:, 1]?
            # a: get the probabilities for the positive class
            probs = torch.softmax(outputs, dim=1)[:, 1]
            preds = (probs >= 0.5).float()

            all_probs.extend(probs.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
            all_trues.extend(labels.cpu().numpy())



    val_recall = recall_score(all_trues, all_preds, pos_label=1)
    val_specificity = recall_score(all_trues, all_preds, pos_label=0)
    val_f1 = f1_score(all_trues, all_preds, average='weighted')
    val_auc = roc_auc_score(all_trues, all_probs)
    return val_recall, val_specificity, val_f1, val_auc

In [None]:
from itertools import product
from torchvision import transforms, models
from torchvision.models import ResNet50_Weights, ResNet101_Weights, ResNet152_Weights

# hyperparameters
learning_rates = [1e-4, 3e-5]
batch_sizes = [32, 64]
num_epochs = 5

# Results dict, to check performance of different hyperparameter combinations
results = {}

for lr, bs in product(learning_rates, batch_sizes):
    print(f"Training with lr={lr}, batch_size={bs}")
    
    # Create dataloaders
    train_dataset = ChestXrayDataset(train_df, transform=train_transform)
    val_dataset = ChestXrayDataset(val_df, transform=val_test_transform)
    
    train_loader = DataLoader(train_dataset, batch_size=bs, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=bs, shuffle=False, num_workers=4)
    
    # Model, criterion, optimizer
    model = models.resnet50(weights=ResNet50_Weights.DEFAULT)
    model.fc = torch.nn.Linear(model.fc.in_features, 2)  # binary classification
    model = model.to(device)
    
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    best_val_acc = 0.0
    
    for epoch in range(num_epochs):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
        val_recall, val_specificity, val_f1, val_auc = eval_epoch(model, val_loader, criterion)
        
        val_acc = (val_recall + val_specificity) / 2
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, Val F1: {val_f1:.4f}, Val AUC: {val_auc:.4f}")
        
        results[(lr, bs)] = {'val_recall': val_recall, 'val_specificity': val_specificity, 'val_f1': val_f1, 'val_auc': val_auc}

# Best (by val weighted F1 for balance)
best_params = max(results, key=lambda x: results[x]['val_f1'])
print(f"Best: LR={best_params[0]}, BS={best_params[1]}, Val F1={results[best_params]['val_f1']:.4f}, Recall={results[best_params]['val_recall']:.4f}, Specificity={results[best_params]['val_specificity']:.4f}")
        


Training with lr=0.0001, batch_size=32
Epoch 1/5, Train Loss: 0.1810, Train Acc: 0.9317, Val Acc: 0.9681, Val F1: 0.9743, Val AUC: 0.9941
Epoch 2/5, Train Loss: 0.0809, Train Acc: 0.9737, Val Acc: 0.9629, Val F1: 0.9515, Val AUC: 0.9969
Epoch 3/5, Train Loss: 0.0597, Train Acc: 0.9780, Val Acc: 0.9812, Val F1: 0.9847, Val AUC: 0.9982
Epoch 4/5, Train Loss: 0.0544, Train Acc: 0.9816, Val Acc: 0.9750, Val F1: 0.9809, Val AUC: 0.9983
Epoch 5/5, Train Loss: 0.0381, Train Acc: 0.9857, Val Acc: 0.9684, Val F1: 0.9817, Val AUC: 0.9992
Training with lr=0.0001, batch_size=64
Epoch 1/5, Train Loss: 0.2056, Train Acc: 0.9290, Val Acc: 0.9424, Val F1: 0.9661, Val AUC: 0.9956
Epoch 2/5, Train Loss: 0.0739, Train Acc: 0.9742, Val Acc: 0.9499, Val F1: 0.9719, Val AUC: 0.9981
Epoch 3/5, Train Loss: 0.0564, Train Acc: 0.9795, Val Acc: 0.9407, Val F1: 0.9669, Val AUC: 0.9978
Epoch 4/5, Train Loss: 0.0468, Train Acc: 0.9830, Val Acc: 0.9610, Val F1: 0.9778, Val AUC: 0.9974
Epoch 5/5, Train Loss: 0.0358, 