In [None]:
#Lucian Irsigler 2621933, Prashan Rajaratnam 2436566, Banzile Nhlebela 2571291, Pramit Kanji 2551233
import os
import re
import random

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import (
    precision_score,
    recall_score,
    f1_score,
    classification_report,
    confusion_matrix,
    ConfusionMatrixDisplay
)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import (
    Dataset,
    DataLoader,
    random_split,
    Subset,
    WeightedRandomSampler
)
from torch.nn.utils import clip_grad_norm_

import torchvision.transforms as transforms
from torchvision import datasets


# Hyperparameters
We have the following hyperparameters:
- num_epochs : Number of epochs to train on
- batch_size : Number of images to be in each batch
- image_size : square image size to resize images to
- lr : learning rate
- size : represents the TRAIN-VALIDATION-TEST split
- balanced: Whether to use a WeightedRandomSampler or not for the train data

In [None]:
# Hyperparameters
h = {
    "num_epochs": 10,
    "batch_size": 64,
    "image_size": 56,
    "lr": 0.001,
    "size":(0.6,0.2,0.2),
    "balanced":True
}

classes = ("NORMAL","PNEUMONIA")

# Preventing Data leakage
In the PNEUMONIA data, there exists files such as "personx_bacteria0","personx_bacteria1"...
These are multiple scans for the same person.

If left as is, there is a potential for the same person's scans to be in multiple data sets, eg) 1 file in test, another in train.

This gives an inflated evaluation metric. Hence we ensure that the files for a person only appears in one data set.

We do this as follows:
- Extract all the unique person's IDs from the PNEUMONIA files into a list
- Shuffle the IDs
- Split the list into the data split specified by the "size" hyperparameter
- Find the relevent files with that id, and add to test,validation or train set for PNEUMONIA
- Return the lists

Since we have a weird way of loading data, we have a custom wrapper to feed to a Dataloader. This class **CustomImageDataset**, simply takes in a list of images and associates a class to them based on the path of that image ("NORMAL" in path is class 0 and "PNEUMONIA" in path is class 1).

This CustomImageDataset allows us to split all the PNEUMONIA files as specified above, and to split the NORMAL images as usual.

Lastly, there is a Data Leakage check, that just checks that all the PNEUMONIA files, if one of a person's files is in a dataset, that all of them are there.

eg) if id x is found in test, it should NOT be found in train/validation. If found, then a data leakage has occured

In [None]:
def extract_patient_ids(filename):
    patient_id = filename.split('_')[0].replace("person", "")
    return patient_id


def split_file_names(h,folder="chest", seed=42):
    random.seed(seed)  # For reproducibility
    location = os.path.join(folder, "PNEUMONIA")

    # Get unique patient IDs
    pneumonia_patient_ids = list(set([extract_patient_ids(fn) for fn in os.listdir(location)]))
    total_patients = len(pneumonia_patient_ids)

    # Shuffle and split
    random.shuffle(pneumonia_patient_ids)
    train_cutoff = int(h["size"][0] * total_patients)
    val_cutoff = int((1-h["size"][1]) * total_patients)

    train_ids = set(pneumonia_patient_ids[:train_cutoff])
    val_ids = set(pneumonia_patient_ids[train_cutoff:val_cutoff])
    test_ids = set(pneumonia_patient_ids[val_cutoff:])

    # Initialize file path lists
    train_files, val_files, test_files = [], [], []

    for fn in os.listdir(location):
        patient_id = extract_patient_ids(fn)
        full_path = os.path.join(location, fn)

        if patient_id in train_ids:
            train_files.append(full_path)
        elif patient_id in val_ids:
            val_files.append(full_path)
        elif patient_id in test_ids:
            test_files.append(full_path)

    # print(f"Train: {len(train_files)}\nValidation: {len(val_files)}\nTest: {len(test_files)}")
    return train_files, val_files, test_files

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform
        self.label_map = {"NORMAL": 0, "PNEUMONIA": 1}
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        path = self.image_paths[idx]
        
        # Infer label from file path
        if "NORMAL" in path:
            label = self.label_map["NORMAL"]
        elif "PNEUMONIA" in path:
            label = self.label_map["PNEUMONIA"]
        else:
            raise ValueError(f"Unknown class in file path: {path}")
        
        # Load image
        try:
            image = Image.open(path).convert("RGB")
        except Exception as e:
            print(f"Error loading image {path}: {e}")
            # Return a blank image as fallback
            image = Image.new('RGB', (224, 224), color='black')
        
        # Apply transforms
        if self.transform:
            image = self.transform(image)
        else:
            image = transforms.ToTensor()(image)
        
        return image, label
    
    def get_indices(self):
        """Return all valid indices for this dataset"""
        return list(range(len(self.image_paths)))
    
    def get_image_label_pairs(self):
        """Return list of tuples (img_path, class_label) for all images"""
        pairs = []
        for path in self.image_paths:
            if "NORMAL" in path:
                label = self.label_map["NORMAL"]
            elif "PNEUMONIA" in path:
                label = self.label_map["PNEUMONIA"]
            else:
                raise ValueError(f"Unknown class in file path: {path}")
            pairs.append((path, label))
        return pairs


In [None]:
def checkDataLeakage(trainLoader, valLoader, testLoader) -> bool:
    location = os.path.join("chest", "PNEUMONIA")
    pneumonia_files = os.listdir(location)
    pneumonia_patient_ids = list(set([extract_patient_ids(fn) for fn in pneumonia_files]))
    total_patients = len(pneumonia_patient_ids)
    print(f"Total unique PNEUMONIA patients: {total_patients}")

    leaked = False

    for patient_id in pneumonia_patient_ids:
        pattern = re.compile(rf'person{patient_id}(?!\d)') 

        train_files = [path for path in trainLoader.dataset.image_paths if pattern.search(path)]
        val_files = [path for path in valLoader.dataset.image_paths if pattern.search(path)]
        test_files = [path for path in testLoader.dataset.image_paths if pattern.search(path)]

        if sum(map(bool, [train_files, val_files, test_files])) > 1:
            print(f"\nData leakage detected for patient ID: {patient_id}")
            print(f"  → In train: {len(train_files)} file(s)")
            for f in train_files:
                print(f"    - {f}")
            print(f"  → In val: {len(val_files)} file(s)")
            for f in val_files:
                print(f"    - {f}")
            print(f"  → In test: {len(test_files)} file(s)")
            for f in test_files:
                print(f"    - {f}")
            leaked = True

    if not leaked:
        print("No data leak present")
    return leaked

# Data loading and split
This section includes the files for:
- Creating a WeightedRandomSampler
- loading the data from file using pytorch's ```ImageFolder``` object
- Splitting the data into the train-validation-test data sets
- Creating the ```DataLoader``` for each data set

In [None]:
def create_weighted_sampler(dataset):
    targets = [label for _,label in dataset]
    class_counts = np.bincount(targets)
    class_weights = 1.0 / class_counts
    class_weights = class_weights / class_weights.sum()
    weights = [class_weights[label] for label in targets]
    sampler = WeightedRandomSampler(weights,len(weights))
    return sampler

In [None]:
def load_data(folder="chest"):
    transform = transforms.Compose([
        transforms.Resize((h["image_size"],h["image_size"])),
        transforms.ToTensor()
    ])

    full_dataset = datasets.ImageFolder(folder,transform=transform)

    return full_dataset


def splitData(dataset,h):
    if (sum(h["size"])!=1.0):
        raise ValueError("Need to equal 1")
    
    train_size = int(h["size"][0]*len(dataset))
    val_size = int(h["size"][1]*len(dataset))
    test_size = len(dataset)-val_size-train_size

    train_data,val_data,test_data = random_split(dataset,[train_size,val_size,test_size])

    return train_data,val_data,test_data


In [None]:
def getNormalIndices(dataset):
    normal_indices = [i for i, (path, label) in enumerate(dataset.imgs) if "NORMAL" in path]
    return normal_indices


def splitNormalData(dataset,indices):
    normal_dataset = Subset(dataset, indices)
    train_data, val_data, test_data = splitData(normal_dataset, h)
    return train_data,val_data,test_data


def getNormalFiles(dataset,trainData,valData,testData):
    train_files = [dataset.imgs[idx][0] for idx in trainData.indices]
    val_files = [dataset.imgs[idx][0] for idx in valData.indices]
    test_files = [dataset.imgs[idx][0] for idx in testData.indices]
    return train_files,val_files,test_files

In [None]:
def combineFiles(normalFiles,pneumoniaFiles):
    if (len(normalFiles))!=len(pneumoniaFiles):
        raise ValueError("normalFiles and pneumoniaFiles must match in size")

    finalTestFiles = normalFiles[0]+pneumoniaFiles[0]
    finalValFiles = normalFiles[1]+pneumoniaFiles[1]
    finalTrainFiles = normalFiles[2]+pneumoniaFiles[2]

    return finalTestFiles,finalValFiles,finalTrainFiles


def createDataLoaders(train_data,val_data,test_data,batch_size,balanced=False):
    if balanced:
        sampler = create_weighted_sampler(train_data)
        train_loader = torch.utils.data.DataLoader(train_data, 
                    batch_size=h["batch_size"], 
                    sampler=sampler, 
                    num_workers=4)
    else: 
        train_loader = DataLoader(train_data,batch_size=batch_size,shuffle=True)

    val_loader = DataLoader(val_data,batch_size=batch_size,shuffle=True)
    test_loader = DataLoader(test_data,batch_size=batch_size,shuffle=True)

    return train_loader,val_loader,test_loader

In [None]:
# Creates the data loaders
def load_pipeline(h):
    dataset = load_data()

    #NORMAL data stuff
    normal_indices = getNormalIndices(dataset)
    train_data, val_data, test_data = splitNormalData(dataset,normal_indices)
    n_train_files,n_val_files,n_test_files = getNormalFiles(dataset,train_data,val_data,test_data)

    #PNEUMONIA data stuff
    p_train_files, p_val_files, p_test_files = split_file_names(h)

    train_files,val_files,test_files = combineFiles([n_train_files,n_val_files,n_test_files],
                                                    [p_train_files,p_val_files,p_test_files])
    
    # Create custom datasets
    transform = transforms.Compose([
            transforms.Resize((h["image_size"],h["image_size"])),
            transforms.ToTensor()
        ])

    train_dataset = CustomImageDataset(train_files, transform=transform)
    val_dataset = CustomImageDataset(val_files, transform=transform)
    test_dataset = CustomImageDataset(test_files, transform=transform)

    #Create data loaders
    train_loader,val_loader,test_loader=createDataLoaders(train_dataset,val_dataset,
                                                    test_dataset,h["batch_size"],h["balanced"])
    
    
    return train_loader,val_loader,test_loader

In [None]:
train_loader,val_loader,test_loader=load_pipeline(h)

In [None]:
output = checkDataLeakage(train_loader,val_loader,test_loader)

# Other
This section has an optional way to just show 4 images from the train_loader. This was just for visual purposes

In [None]:
def imshow(img, title=None):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    if title:
        plt.title(title)
    plt.axis('off')
    plt.show()


def showImages(train_loader):
    images, labels = next(iter(train_loader))
    for i in range(4):
        imshow(images[i], title=f"Label: {labels[i].item()}")

# Initial data analysis

Observations:
- The classes are biased. There are 4273 PNEUMONIA images and 1583 NORMAL images. The dataset is thus 73% PNEUMONIA and 27% NORMAL.

We visualize the difference in classes(per data set) using graphs below, and we also calculate their mean/std per data set

In [None]:
def vis_class_dist(loader,dataset_name):
    class_counts = np.zeros(2)
    for _,y in loader.dataset.get_image_label_pairs():
        class_counts[y]+=1
    
    print([(classes[i],class_counts[i]) for i in range(len(classes))])
    plt.bar(["Normal","Pneumonia"],class_counts)
    plt.xlabel("Classes")
    plt.ylabel("Count")
    plt.title(f"Class distribtion in {dataset_name} data")
    plt.show()


def calculate_stats(dataset):
    mean = 0.0
    std = 0.0

    for x,_ in dataset:
        mean+=x.mean()
        std+=x.std()

    mean /= len(dataset)
    std /= len(dataset)

    return mean.item(),std.item()

In [None]:
def visualize_classes(trainLoader,valLoader,testLoader):
    vis_class_dist(trainLoader,"Train")
    mean,std = calculate_stats(trainLoader)
    print(f"Train dataset mean and std:{mean},{std}")

    vis_class_dist(valLoader,"Validation")
    mean1,std1 = calculate_stats(valLoader)
    print(f"Validation dataset mean and std:{mean1},{std1}")

    vis_class_dist(testLoader,"Test")
    mean2,std2 = calculate_stats(testLoader)
    print(f"Test dataset mean and std:{mean2},{std2}")

    return [mean,mean1,mean2],[std,std1,std2]

In [None]:
visualize_classes(train_loader,val_loader,test_loader)

# Data Augmentation

In [None]:
def augment_data(image_size,dataset:torch.utils.data.dataloader.DataLoader,means,std):
    data_tranforms = transforms.Compose([
        transforms.Resize([image_size,image_size]),
        transforms.ToTensor(),
        transforms.Normalize(mean=means,std=std)
    ])

    #TODO change some numbers here
    data_tranforms_train = transforms.Compose([
        transforms.RandomRotation(20),
        transforms.RandomHorizontalFlip(0.5),
        transforms.RandomResizedCrop(size=[image_size,image_size],scale=(0.8,1.0)),
        transforms.ColorJitter(brightness=0.1,contrast=0.1,saturation=0.1,hue=0.1),
        transforms.Resize(size=[image_size,image_size]),
        transforms.ToTensor(),
        transforms.Normalize(mean=means,std=std)
    ])

In [None]:
# means = [mean,mean1,mean2]
# stds = [std,std1,std2]

# CNN Architecture Overview

This Convolutional Neural Network (CNN) is designed for classifying NORMAL vs PNEUMONIA on RGB images of size 52×52

###  Architecture Details

- **Input:**  
  `3 × 52 × 52` RGB image

1. **Conv Layer 1**  
   - Channels: `3 → 16`  
   - Kernel Size: `3 × 3`, Padding: `1`  
   - Components:  
     - Batch Normalization  
     - ReLU Activation  
     - MaxPooling: `2 × 2`  
     - Dropout2D: `p = 0.5`

2. **Conv Layer 2**  
   - Channels: `16 → 32`  
   - Kernel Size: `3 × 3`, Padding: `1`  
   - Components:  
     - Batch Normalization  
     - ReLU Activation  
     - MaxPooling: `2 × 2`  
     - Dropout2D: `p = 0.5`

3. **Conv Layer 3**  
   - Channels: `32 → 64`  
   - Kernel Size: `3 × 3`, Padding: `1`  
   - Components:  
     - Batch Normalization  
     - ReLU Activation  
     - MaxPooling: `2 × 2`  
     - Dropout2D: `p = 0.5`

4. **AdaptiveAvgPool2d**  
   - Output Size: `7 × 7`  
   - Ensures a fixed-size output regardless of input image size variations.
 
5. **Fully Connected Layer 1**  
   - Input Features: `64 × 7 × 7 = 3136`  
   - Output Features: `512`  
   - Components:  
     - ReLU Activation  
     - Dropout: `p = 0.5`

6. **Fully Connected Layer 2**  
   - Input Features: `512`  
   - Output Features: `2` (one for each class)  

7. **Output layer**
   - Final output: `2` logits (e.g., for NORMAL and PNEUMONIA)

The criterion is ```nn.CrossEntropyLoss```. The criterion has different importances for each class(since the classes are unbalanced), which ensures that the NORMAL class can be fairly tested as well.

The optimizer is ```Adam```, due to its stability in convergence, and adaptive learning rates. SGD was another contender, but Adam was choosen in the end

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN,self).__init__()
        self.conv1 = nn.Conv2d(3,16,3,padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        
        self.conv2 = nn.Conv2d(16,32,3,padding=1)
        self.bn2 = nn.BatchNorm2d(32)

        self.conv3 = nn.Conv2d(32,64,3,padding=1)
        self.bn3 = nn.BatchNorm2d(64)

        self.pool =  nn.MaxPool2d(2,2)
        self.dropout = nn.Dropout(0.5)
        self.dropout2d = nn.Dropout2d(0.3)

        self.adaptive_pool = nn.AdaptiveAvgPool2d((7, 7))

        self.fc1 = nn.Linear(64*7*7,512)
        self.fc2 = nn.Linear(512,2)

    def forward(self,x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.dropout2d(x)
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.dropout2d(x)
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.dropout2d(x)

        x = self.adaptive_pool(x)

        x = x.view(x.size(0), -1)
        x = self.dropout(x)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)

        
        x = self.fc2(x)
        # x = self.dropout(x)

        # x = self.fc3(x)
        return x
        # return torch.argmax(x, dim=1)

In [None]:
train_loss_history = []
val_loss_history = []

In [None]:
model = CNN()

#do weight importance stuff here
targets = [label for _, label in train_loader.dataset.get_image_label_pairs()]
class_counts = np.bincount(targets)
class_weights = 1.0 / class_counts
class_weights = class_weights / class_weights.sum()  # Normalize (optional but cleaner)
class_weights_tensor = torch.FloatTensor(class_weights)


# optimizer = torch.optim.SGD(
#     model.parameters(), 
#     lr=h["lr"],
#     momentum=0.9,
#     weight_decay=1e-4
# )

criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
optimizer = torch.optim.Adam(model.parameters(), lr=h["lr"], weight_decay=1e-4)

# Training the model

The model is trained on a number of epochs(specified by the hyperparameter).
Additionally, a validation loss is calculated after every epoch.

After training, the train and validation loss per epoch is plotted against the number of epochs.

In [None]:
def validation_loss(model,val_loader):
    loss_function  = torch.nn.functional.cross_entropy
    total_loss = 0.0
    model.eval()

    with torch.no_grad():
        for inputs,labels in val_loader:
            outputs = model(inputs)
            loss = loss_function(outputs,labels)
            total_loss+=loss
    
    return total_loss/len(val_loader)

def plot_val_test_loss(trainLossHistory,valLossHistory):
    temp_train = [i.item() for i in trainLossHistory]
    temp_val = [i.item() for i in valLossHistory]
    plt.plot(temp_train,label="Train loss")
    plt.plot(temp_val,label="Validation loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, h):
    n_total_steps = len(train_loader)
    train_loss_history = []
    val_loss_history = []

    for epoch in range(h["num_epochs"]):
        model.train()
        running_loss = 0.0

        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{h['num_epochs']}", 
                            leave=False, unit="batch")

        for i, (images, labels) in enumerate(progress_bar):
            outputs = model(images)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()

            # Gradient clipping to avoid exploding gradients
            clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            running_loss += loss.item()

            if (i + 1) % 10 == 0:
                print(f"Epoch [{epoch+1}/{h['num_epochs']}], Step [{i+1}/{n_total_steps}], Loss: {loss.item():.4f}")

        avg_train_loss = running_loss / len(train_loader)
        train_loss_history.append(avg_train_loss)

        val_loss = validation_loss(model, val_loader)
        val_loss_history.append(val_loss)

        print(f"Epoch [{epoch+1}/{h['num_epochs']}], Validation Loss: {val_loss:.4f}")

    print("Finished Training")
    return train_loss_history, val_loss_history

In [None]:
train_loss_history, val_loss_history = train_model(model,train_loader,val_loader,criterion,optimizer,h)

# Evaluating the model

The model is evaluated on the data set aside for testing.

The follow metrics are captured/outputed:
- Accuracy
- F1 score
- Precision
- Recall

These metrics are also done for each class.

Lastly, a confusion matrix is outputted to get a visual of how well the model performed

In [None]:
def evaluate_model(model, test_loader, classes):
    model.eval()
    true_labels = []
    predicted_labels = []
    correct = 0
    total = 0

    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(test_loader):
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)

            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())

            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    accuracy = correct / total
    precision = precision_score(true_labels, predicted_labels)
    recall = recall_score(true_labels, predicted_labels)
    f1 = f1_score(true_labels, predicted_labels)
    class_report = classification_report(true_labels, predicted_labels, target_names=classes)

    print(f"Accuracy on the test set: {accuracy:.2%}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print(f"F1 Score:  {f1:.4f}")
    print("\nClassification Report:")
    print(class_report)

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "report": class_report,
        "true_labels": true_labels,
        "predicted_labels": predicted_labels
    }


In [None]:
metrics = evaluate_model(model,test_loader,classes)

In [None]:
cm = confusion_matrix(metrics["true_labels"],metrics["predicted_labels"])

disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=["NORMAL","PNEUMONIA"])

disp.plot()