In [None]:
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from math import ceil
from torchvision import transforms
from PIL import Image
import dill
from torch.utils.data import TensorDataset, DataLoader
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection

**"DL_info.csv" and "images/" (contains all the folders with the dataset's images) should be in the same directory as the script before you run this**

Load and preprocess dataset

In [None]:
CSV_DF_PATH = 'DL_info.csv'
BASE_IMG_DIR = 'images'
GRID_SIZE = 16  # 16x16 grid divides images into 32x32 pixel cells
BOXES_PER_CELL = 1  # Number of bounding boxes per cell
NUM_COORDS = 4  # Coordinates for each box: (x, y, width, height)
IMAGE_SIZE = 512
# Number of outputs per grid cell (e.g., 2 boxes * (4 coords + 1 confidence))
OUTPUT_CHANNELS = BOXES_PER_CELL * (NUM_COORDS + 1)  # Just confidence, no class predictions here
GRID_WIDTH = IMAGE_SIZE / GRID_SIZE

# Returns a list with preprocessed dataset images and another for bounding box grids in YOLO format
def generate_dataset():
    # Image transformation pipeline
    transform = transforms.Compose([
        transforms.ToTensor(),  # Convert to PyTorch tensor (scales to [0, 1])
        transforms.Lambda(lambda x: x - 32768),  # Subtract 32768
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE))  # Resize to desired dimensions
    ])
    csv_df = pd.read_csv(CSV_DF_PATH)
    prev_entry_img_name = None
    bounding_boxes = []
    images = []
    boxes = []
    iterator = iter(csv_df.values)
    img_names = []
    for entry in iterator:
        # debug_var = False
        # For images with different dimensions
        x_rescale_factor = 1
        y_rescale_factor = 1
        img_name = entry[0]

        # Some images have multiple bounding boxes. Their entries in the csv are
        # side-by-side so we can check if the previous entry had the same image
        # and group their bounding boxes together if so
        if img_name != prev_entry_img_name:
            # Last bounding box in the image found; convert and store them in the tf dataset
            if len(bounding_boxes) != 0: 
                # Initiallize empty YOLO grid
                y_true = torch.zeros((GRID_SIZE, GRID_SIZE, OUTPUT_CHANNELS))
                # Convert bounding boxes to YOLO grid format
                for box in bounding_boxes:
                    # Get index of grid where bounding box lies
                    grid_idx_x = min(max(ceil(box[0] / GRID_WIDTH) - 1, 0), GRID_SIZE - 1)
                    grid_idx_y = min(max(ceil(box[1] / GRID_WIDTH) - 1, 0), GRID_SIZE - 1)
                    # Get coordinate of bounding box within grid
                    box_x = box[0] / GRID_WIDTH - grid_idx_x
                    box_y = box[1] / GRID_WIDTH - grid_idx_y
                    # Get relative widths to the image size
                    rw = box[2] / IMAGE_SIZE
                    rh = box[3] / IMAGE_SIZE
                    y_true[grid_idx_x, grid_idx_y, :] = torch.tensor([box_x, box_y, rw, rh, 1]) 
                # Convert grid to tensor and store in dataset list
                boxes.append(y_true)
                images.append(image)
                img_names.append(prev_entry_img_name)

                bounding_boxes = []

            # Load and decode the image
            img_path = os.path.join(BASE_IMG_DIR, img_name[:12], img_name[13:])
            # Make sure chosen image exists before loading
            while not os.path.exists(img_path):
                entry = next(iterator, None)
                if entry is None:
                    return images, boxes, img_names
                img_name = entry[0]
                img_path = os.path.join(BASE_IMG_DIR, img_name[:12], img_name[13:])
            with Image.open(img_path).convert("F") as im:
                original_size = im.size
                image = transform(im)
                # print(image)
            if original_size != (IMAGE_SIZE, IMAGE_SIZE):
                x_rescale_factor = IMAGE_SIZE / original_size[0]
                y_rescale_factor = IMAGE_SIZE / original_size[1]
                # Debug to track wrong dimension images
                # plt.imsave(img_name, image[0], vmin = -1200, vmax = 600, cmap = 'gray')
                # print(img_name)
                # debug_var = True

        # Format bounding boxes as list of floats and append to list
        bbox = [float(x) for x in entry[6].split(", ")]
        # if debug_var:
        #     print(bbox)
        # Convert to [x_center, y_center, width, height] format for YOLO models
        # Scale bounding box coordinates for images with different dimensions
        width = (bbox[2] - bbox[0])
        height = (bbox[3] - bbox[1])
        x_center = (bbox[0] + width / 2) * x_rescale_factor
        y_center = (bbox[1] + height / 2) * y_rescale_factor
        width *= x_rescale_factor
        height *= y_rescale_factor
        bounding_boxes.append([x_center, y_center, width, height])

        prev_entry_img_name = img_name
    return images, boxes, img_names

images, boxes, img_names = generate_dataset()
images_tensor = torch.stack(images)
boxes_tensor = torch.stack(boxes)
print(images_tensor.shape)
print(boxes_tensor.shape)

Perform train-test-validation spllit and initialize dataloaders for training

In [None]:
# Shuffle indices
shuffle_indices = torch.randperm(images_tensor.size(0))

# Shuffle features and labels
shuffled_features = images_tensor[shuffle_indices]
shuffled_labels = boxes_tensor[shuffle_indices]

# Split ratio (75% test, 15% test, 10% validation)
train_split = 0.75
test_split = 0.90
split_idx_train = int(images_tensor.size(0) * train_split)
split_idx_test = int(images_tensor.size(0) * test_split)

# Split the data
train_features, test_features, val_features = shuffled_features[:split_idx_train], shuffled_features[split_idx_train:split_idx_test], shuffled_features[split_idx_test:]
train_labels, test_labels, val_labels = shuffled_labels[:split_idx_train], shuffled_labels[split_idx_train:split_idx_test], shuffled_labels[split_idx_test:]

# Create TensorDataset for training, test, and validation sets
train_dataset = TensorDataset(train_features, train_labels)
test_dataset = TensorDataset(test_features, test_labels)
val_dataset = TensorDataset(val_features, val_labels)

# Create DataLoaders with batching
batch_size = 2
train_loader = DataLoader(train_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [None]:
# Converts YOLO grid to bounding boxes given a confidence threshold for bounding boxes to be considered
def grid_to_bounding_boxes(grid, confidence_threshold=0.9, keep_confidence_score=False):
    # Extract indices of bounding boxes
    indices = torch.nonzero(grid[:, :, 4] > confidence_threshold)
    bboxes = []
    for idx in indices:
        grid_x = idx[0]
        grid_y = idx[1]
        bbox = grid[grid_x, grid_y, :]

        x_center = ((bbox[0] + grid_x) * GRID_WIDTH).item()
        y_center = ((bbox[1]+ grid_y) * GRID_WIDTH).item()
        x_width = (bbox[2] * IMAGE_SIZE).item()
        y_width = (bbox[3] * IMAGE_SIZE).item()

        # box = [x_center, y_center, x_width, y_width]
        if keep_confidence_score:
            box = [x_center - x_width / 2, y_center - y_width / 2, x_width, y_width, bbox[4].item()]
        else:
            box = [x_center - x_width / 2, y_center - y_width / 2, x_width, y_width]

        bboxes.append(box)
    return bboxes
idx = -1

Visualize images in the dataset (can be run repeatedly to see different examples)

In [None]:
idx += 1

i = shuffle_indices[idx].item()
# i = idx  # Enable to view images in-order rather than shuffled

image = images[i][0]
img_name = img_names[i]
bbox = grid_to_bounding_boxes(boxes[i])

# Formats bounding boxes for drawing onto image plots
def create_boxes(bounding_boxes):
    boxes = []
    for box in bounding_boxes:
        boxes.append(Rectangle(box[0:2], box[2], box[3]))
    return boxes

# Display the image
ax1 = plt.subplot()
ax1.imshow(image, vmin=-1200, vmax=600, cmap='gray')
print(img_name)
# Display bounding boxes
ax1.add_collection(PatchCollection(create_boxes(bbox), alpha = 0.25, facecolor = 'red'));
# plt.imsave(img_names[idx], image, vmin=-1200, vmax=600, cmap='gray')

In [None]:
# Example of iterating through the train dataset
print(len(train_loader))
for images, bboxes in train_loader:
    print("Train batch image shape:", images.shape)
    print("Train batch bounding boxes shape:", bboxes.shape)
    break

Build the model

In [None]:
# Implemented after the YOLOv3 architecture diagram in https://arxiv.org/pdf/1804.02767
class YoloModel(torch.nn.Module):
    def __init__(self, grid_size=GRID_SIZE):
        super(YoloModel, self).__init__()

        self.transform = transform = transforms.Compose([
            transforms.ToTensor(),  # Convert to PyTorch tensor (scales to [0, 1])
            transforms.Lambda(lambda x: x - 32768),  # Subtract 32768
            transforms.Resize((IMAGE_SIZE, IMAGE_SIZE))  # Resize to desired dimensions
        ])

        self.grid_size = grid_size
        
        # Feature extraction layers
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),  # (batch, 16, 512, 512)
            nn.ReLU(),  
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),  # (batch, 64, 256, 256)
            nn.ReLU(), 
            ResidualBlock(in_channels=32, repetitions=1),   
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),  # (batch, 64, 128, 128)
            nn.ReLU(), 
            ResidualBlock(in_channels=64, repetitions=2),                   
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),  # (batch, 128, 64, 64)
            nn.ReLU(), 
            ResidualBlock(in_channels=128, repetitions=4),                   
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),  # (batch, 256, 32, 32)
            nn.ReLU(),  
            ResidualBlock(in_channels=256, repetitions=8),                   
            nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1),  # (batch, 512, 16, 16)
            nn.ReLU(), 
            ResidualBlock(in_channels=512, repetitions=8),                   
            nn.Conv2d(512, 1024, kernel_size=3, stride=2, padding=1),  # (batch, 1024, 8, 8)
            nn.ReLU(), 
            ResidualBlock(in_channels=1024, repetitions=4)            
        )
        
        # Prediction head
        self.predictor = nn.Sequential(
            nn.AvgPool2d(8),
            nn.Flatten(start_dim=1),  # Resize to (batch, 1024) for linear layer
            nn.Linear(1024, grid_size * grid_size * 5),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.feature_extractor(x)  # Extract features
        x = self.predictor(x)          # Predict grid-based output
        x = x.view((-1, GRID_SIZE, GRID_SIZE, 5))  # Reshape to grid
        return x
    
    def predict(self, img_path):
        with Image.open(img_path).convert("F") as im:
            image = self.transform(im)
        with torch.no_grad():
            device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
            output = self(image.unsqueeze(0).to(device))
            return output.squeeze(0)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, repetitions):
        super(ResidualBlock, self).__init__()
        layers = []
        between_channels = in_channels // 2
        for i in range(repetitions):
            layers += [nn.Conv2d(in_channels, between_channels, kernel_size=1, stride=1, padding=0),
                       nn.BatchNorm2d(between_channels),
                       nn.ReLU(),
                       nn.Conv2d(between_channels, in_channels, kernel_size=3, stride=1, padding=1),
                       nn.BatchNorm2d(in_channels),
                       nn.ReLU()]
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        residual = x  # Save the input for the skip connection
        for layer in self.layers:
            x = layer(x)
        return F.relu(x + residual) # Add the input (skip connection)
        

Set up the loss function

In [None]:
class SquareRootDistanceLoss(nn.Module):
    def __init__(self):
        super(SquareRootDistanceLoss, self).__init__()

    def forward(self, predictions, targets):
        """
        Args:
            predictions: Tensor of predicted values (e.g., probabilities or weights)
            targets: Tensor of ground truth values (same shape as predictions)
        Returns:
            loss: The calculated Square Root Distance Loss
        """
        # Ensure all values are non-negative (to prevent NaNs in sqrt)
        predictions = torch.clamp(predictions, min=0.0)
        targets = torch.clamp(targets, min=0.0)

        # Compute the square root of predictions and targets
        sqrt_preds = torch.sqrt(predictions)
        sqrt_targets = torch.sqrt(targets)

        # Compute the squared difference
        loss = ((sqrt_preds - sqrt_targets) ** 2).sum()
        return loss

class YOLOLoss(nn.Module):
    def __init__(self, lambda_coord=1, lambda_noobj=0.1, confidence_threshold=0.5):
        super(YOLOLoss, self).__init__()
        self.ct = confidence_threshold    # Confidence required to make a positive prediction
        self.lambda_coord = lambda_coord  # Weight for coordinate loss
        self.lambda_noobj = lambda_noobj  # Weight for no-object loss
        self.mse_loss = nn.MSELoss(reduction='sum')  # For coordinates and confidence
        self.bce_loss = nn.BCEWithLogitsLoss(reduction='sum')  # For confidence scores
        self.sqrd_loss = SquareRootDistanceLoss()

    def forward(self, predictions, targets):
        """
        Args:
            predictions: (batch_size, S, S, B*5 + C) - Predicted tensor
            targets: (batch_size, S, S, B*5 + C) - Ground truth tensor

        Components:
            S: Grid size
            B: Number of bounding boxes per grid cell
            C: Number of classes
        """
        obj_mask = targets[..., 4] >= self.ct  # Object mask (confidence >= ct)
        noobj_mask = targets[..., 4] < self.ct  # No-object mask (confidence < ct)

        # Coordinate Loss
        coord_loss = self.mse_loss(
            predictions[obj_mask][..., :2], targets[obj_mask][..., :2]
        )

        width_loss = self.sqrd_loss(
            predictions[obj_mask][..., 2], targets[obj_mask][..., 2]
        )

        height_loss = self.sqrd_loss(
            predictions[obj_mask][..., 3], targets[obj_mask][..., 3]
        )

        # Confidence Loss (Object)
        obj_conf_loss = self.mse_loss(
            predictions[obj_mask][..., 4], targets[obj_mask][..., 4]
        )

        # Confidence Loss (No Object)
        noobj_conf_loss = self.mse_loss(
            predictions[noobj_mask][..., 4], targets[noobj_mask][..., 4]
        )

        # Total Loss
        total_loss = (
            self.lambda_coord * (coord_loss + width_loss + height_loss)
            + obj_conf_loss
            + self.lambda_noobj * noobj_conf_loss
        )

        return total_loss

Initialize the model

In [None]:
# Check if gpu support is enabled. If not, see https://pytorch.org/get-started/locally/ to install pytorch with CUDA support
torch.cuda.is_available()

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = YoloModel(grid_size=GRID_SIZE).to(device)
criterion = YOLOLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

Train the model (Skip if you have a pre-trained model) (can be executed multiple times to train more)

In [None]:
# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    running_loss = 0.0

    model.train()  # Set model to training mode
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        # Move data to device
        inputs, targets = inputs.to(device), targets.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate loss
        running_loss += loss.item()

        # Print progress
        # if (batch_idx + 1) % 50 == 0:  # Print every 50 batches
        #     print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    # Get validation loss
    model.eval()
    running_val_loss = 0
    for batch_idx, (inputs, targets) in enumerate(val_loader):
        # Move data to device
        inputs, targets = inputs.to(device), targets.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # Accumulate loss
        running_val_loss += loss.item()

    # Print epoch summary
    print(f"Epoch [{epoch+1}/{num_epochs}] Average Loss: {running_loss / len(train_loader):.4f}, Validation Loss: {running_val_loss / len(val_loader):.4f}")
idx = -1

Save model to file

In [None]:
with open("model.pickle", 'wb') as file:
    dill.dump(model, file)

Load model from file

In [None]:
with open("model.pickle", 'rb') as file:
    model = dill.load(file)
idx = -1
model.to(device);

Testing the model

In [None]:
model.eval()
def get_acc(data_loader, min_conf=0.5):
    total_acc = 0
    for batch_idx, (inputs, targets) in enumerate(data_loader):
        # Move data to device
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = model(inputs)

        obj_mask_t = (targets[..., 4] >= min_conf).nonzero()
        obj_mask_y = (outputs[..., 4] >= min_conf).nonzero()

        # Measure accuracy as % of positive predictions which match between the labels and predictions
        total = torch.cat((obj_mask_t, obj_mask_y))
        union = total.unique(dim=0)
        len_intersect = len(total) - len(union)
        acc = len_intersect / len(total)

        total_acc += acc
    return total_acc / (batch_idx + 1)

print(f"Average accuracy on training set: {get_acc(train_loader):.4f}")
print(f"Average accuracy on testing set: {get_acc(test_loader):.4f}")

Visualize model outputs (can be run repeatedly to see different examples)

In [None]:
# idx = 3
idx += 1

image, label_bbox = train_dataset[idx]

# Plot two images for comparison
figs, axes = plt.subplots(1, 2)
ax1 = axes[0]
ax2 = axes[1]
ax1.imshow(image[0], vmin=-1200, vmax=600, cmap='gray')
ax2.imshow(image[0], vmin=-1200, vmax=600, cmap='gray')
grid = model(image[np.newaxis, :].to(device))[0, :]
# Print image name and maximum confidence score within the grids
print(f"{img_name} - Max confidence: {grid[:, :, 4].max().item():.3f}")

alpha_confidence = False  # Displays bounding box transparency by confidence score
# alpha_confidence = True
# 0.5 is the min confidence threshold for bounding boxes to be visualized
bbox = grid_to_bounding_boxes(grid, 0.5, alpha_confidence)
# Print bounding box dimensions (x_center, y_center, width, height)
print("Bounding boxes:", bbox)

# Plot bounding boxes
if alpha_confidence:
    print([x[4] for x in bbox])
    ax1.add_collection(PatchCollection(create_boxes(bbox), alpha = [x[4] for x in bbox], facecolor = 'red'))
else:
    ax1.add_collection(PatchCollection(create_boxes(bbox), alpha = 0.25, facecolor = 'red'))
ax2.add_collection(PatchCollection(create_boxes(grid_to_bounding_boxes(label_bbox)), alpha = 0.25, facecolor = 'red'));