<a href="https://colab.research.google.com/github/christineseng/Pneumonia-Predictor/blob/main/PneumoniaProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

assuming dataset is downloaded in "pneumonia" folder, which contains three subfolders "test", "train", and "val" each containing "NORMAL" and "PNEUMONIA" subfolders with respective images

In [None]:
from google.colab import drive
drive.mount('/content/drive')
dataset_path = '/content/drive/MyDrive/pneumonia'

Mounted at /content/drive


In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torchvision import datasets, models
import timm
from collections import Counter
print("PyTorch version:", torch.__version__)
print("GPU available:", torch.cuda.is_available())

PyTorch version: 2.9.0+cu126
GPU available: True


In [None]:
# downloads images to local data directory, speeding up the training loop
import os
import shutil

# Define local destination
local_dest_root = '/content/data'

# List of folders to copy
folders_to_copy = ['test', 'train', 'val']

# Iterate and copy
for folder in folders_to_copy:
    # Construct full paths
    # dataset_path is already defined in the previous cells: '/content/drive/MyDrive/shared-cat-vs-rabbit'
    src_path = os.path.join(dataset_path, folder)
    dest_path = os.path.join(local_dest_root, folder)

    # Check if destination exists to avoid errors
    if os.path.exists(dest_path):
        print(f"Directory '{folder}' already exists at {dest_path}. Skipping copy.")
    else:
        print(f"Copying '{folder}' from Drive to local runtime...")
        shutil.copytree(src_path, dest_path)
        print(f"Successfully copied '{folder}'.")

# Verify contents
if os.path.exists(local_dest_root):
    print("\nContents of '/content/data':")
    print(os.listdir(local_dest_root))
else:
    print("\nLocal directory was not created.")

Copying 'test' from Drive to local runtime...
Successfully copied 'test'.
Copying 'train' from Drive to local runtime...
Successfully copied 'train'.
Copying 'val' from Drive to local runtime...
Successfully copied 'val'.

Contents of '/content/data':
['train', 'test', 'val']


In [None]:
class CustomDataset(Dataset):
  def __init__(self, data_dir, transform=None):
    self.data = ImageFolder(data_dir, transform=transform)

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    return self.data[idx]

  @property
  def classes(self):
    return self.data.classes

In [None]:
train_transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
test_transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


In [None]:
dataset_path = '/content/data/train/'
target_to_class = {v: k for k, v in ImageFolder(dataset_path).class_to_idx.items()}
print(target_to_class)

{0: 'NORMAL', 1: 'PNEUMONIA'}


In [None]:
class CustomCNN(nn.Module):
  def __init__(self, num_classes = 2):
    super(CustomCNN, self).__init__()
    # feature extraction: learns patterns (edges, textures, shapes)
    self.features = nn.Sequential(
        # input: 3 channels (RGB), output: 32 feature maps
        nn.Conv2d(3, 32, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.Conv2d(32, 32, kernel_size = 3, padding = 1),
        nn.ReLU(),
        # reduces image height and width by half (256 x 256)
        nn.MaxPool2d(2,2),
        # increase depth to 64 filters
        nn.Conv2d(32, 64, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.Conv2d(64, 64, kernel_size = 3, padding = 1),
        nn.ReLU(),
        # reduces image height and width by half (128 x 128)
        nn.MaxPool2d(2,2),
        # increase depth to 128 filters
        nn.Conv2d(64, 128, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.Conv2d(128, 128, kernel_size = 3, padding = 1),
        nn.ReLU(),
        # final reduction before flattening (64 x 64)
        nn.MaxPool2d(2,2),
    )

    # classification: takes learned features and maps them to class labels
    self.classifier = nn.Sequential(
        # converts 3D feature map into 1D vector
        nn.Flatten(),
        # input features = width * height * channels from last layer
        nn.Linear(64*64*128, 192),
        nn.ReLU(),
        # randomly disables 10% of neurons to prevent overfitting
        nn.Dropout(0.10),
        # final layer outputs scores for 2 classes
        nn.Linear(192, 2)
   )

  def forward(self, x):
    # extract features
    x = self.features(x)
    #categorize features
    x = self.classifier(x)
    return x

In [None]:
train_folder = '/content/data/train'
#kaggle dataset had these swapped
valid_folder = '/content/data/test'
test_folder = '/content/data/val'

train_dataset = CustomDataset(train_folder, transform= train_transform)
valid_dataset = CustomDataset(valid_folder, transform= test_transform)
#test_dataset = CustomDataset(test_folder, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)
#test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Count occurrences of each class in the training dataset
class_counts = Counter([label for _, label in train_dataset])
print("Class counts in training dataset:", class_counts)

# Calculate class weights
total_samples = sum(class_counts.values())
num_classes = len(class_counts)

# The weight for each class can be inversely proportional to its frequency.
# A common formula is: weight_i = total_samples / (num_classes * count_i)
class_weights = torch.tensor([total_samples / (num_classes * class_counts[i]) for i in range(num_classes)], dtype=torch.float32)
print("Calculated class weights:", class_weights)


Class counts in training dataset: Counter({1: 3875, 0: 1341})
Calculated class weights: tensor([1.9448, 0.6730])


In [None]:
# training loop
num_epochs = 50
best_accuracy = 0.0
best_val_loss = 10
save_path = 'pneumonia_model_weights.pth'

train_losses, valid_losses = [], []
val_accuracies = []

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CustomCNN(num_classes = 2)
model.to(device)

# Only optimize the parameters of the classifier head
optimizer = optim.Adam(model.parameters(), lr = 0.001)
criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))

for epoch in range(num_epochs):
  # training phase
  model.train()
  running_loss = 0.0
  count = 0
  for images, labels in train_loader:
    if count % 10 == 0:
       print(count)
    count += 1


    images, labels = images.to(device), labels.to(device)

    optimizer.zero_grad()
    # contains model outputs
    outputs = model(images)

    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()

    running_loss += loss.item() * labels.size(0)
  train_loss = running_loss / len(train_loader.dataset)
  train_losses.append(train_loss)

  # validation phase
  model.eval()
  running_loss = 0.0
  correct = 0
  total = 0

  with torch.no_grad():
    for images, labels in valid_loader:
      images, labels = images.to(device), labels.to(device)

      outputs = model(images)
      loss = criterion(outputs, labels)

      running_loss += loss.item() * labels.size(0)

      _, predicted = torch.max(outputs, 1)
      #print(predicted)
      correct += (predicted == labels).sum().item()
      total += labels.size(0)
  val_loss = running_loss / len(valid_loader.dataset)
  valid_losses.append(val_loss)

  val_accuracy = 100 * correct / total
  if (val_accuracy > best_accuracy) or (val_accuracy == best_accuracy and val_loss < best_val_loss):
    print(f"New best accuracy: {val_accuracy:.2f}%.")
    best_accuracy = val_accuracy
    best_val_loss = val_loss
    # save the best weights so far
    torch.save(model.state_dict(), save_path)

  val_accuracies.append(val_accuracy)

  print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

0
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
New best accuracy: 79.65%.
Epoch 1/50, Train Loss: 0.4907, Val Loss: 0.5519, Val Accuracy: 79.65%
0
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
New best accuracy: 79.81%.
Epoch 2/50, Train Loss: 0.2734, Val Loss: 0.7006, Val Accuracy: 79.81%
0
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
New best accuracy: 85.42%.
Epoch 3/50, Train Loss: 0.2349, Val Loss: 0.3508, Val Accuracy: 85.42%
0
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
New best accuracy: 86.06%.
Epoch 4/50, Train Loss: 0.2195, Val Loss: 0.3244, Val Accuracy: 86.06%
0
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
Epoch 5/50, Train Loss: 0.2036, Val Loss: 0.5621, Val Accuracy: 83.81%
0
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
Epoch 6/50, Train Loss: 0.1845, Val Loss: 0.4841, Val Accuracy: 84.94%
0
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
Epoch 7/50, Train Loss: 0.1860, Val Loss: 0.4752, Val Accuracy: 85.