In [1]:
import os
import sys
import glob
import math
import time
import torch
import numpy as np
import torchvision
import pandas as pd
from torch import utils
import matplotlib.pyplot as plt
from torchvision import tv_tensors
from torchvision.io import read_image
from torchvision.ops.boxes import box_iou
from torchvision.transforms import v2 as T
from torchvision.transforms.v2 import functional as F
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.utils import draw_bounding_boxes
from PIL import Image

In [2]:
# ✅ Convert (cx, cy, w, h) -> (xmin, ymin, xmax, ymax)
def convert2RCNNbox(bbox):
    """
    Convert bounding box from (center_x, center_y, width, height)
    to (xmin, ymin, xmax, ymax) format for Faster R-CNN.
    
    bbox: [cx, cy, w, h] in absolute pixel values.
    Returns: [xmin, ymin, xmax, ymax]
    """
    xmin = bbox[0] - bbox[2] / 2
    xmax = bbox[0] + bbox[2] / 2
    ymin = bbox[1] - bbox[3] / 2
    ymax = bbox[1] + bbox[3] / 2

    return [xmin, ymin, xmax, ymax]


# ✅ Convert (xmin, ymin, xmax, ymax) -> relative (cx, cy, w, h)
def convert2Outputbox(bbox, width, height):
    """
    Convert bounding box from (xmin, ymin, xmax, ymax) to relative (cx, cy, w, h).
    
    bbox: [xmin, ymin, xmax, ymax] in absolute pixel values.
    width, height: Dimensions of the image.
    Returns: (cx, cy, w, h) as relative values.
    """
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    cx = (bbox[0] + w / 2) / width
    cy = (bbox[1] + h / 2) / height
    w /= width
    h /= height

    return cx, cy, w, h

In [3]:
class VehicleDetection(torch.utils.data.Dataset):
  def __init__(self, root, transforms) :
    self.root = root
    self.transforms = transforms

    self.imgs = list(sorted(glob.glob(root+"images/**.jpeg")))

    with open(os.path.join(root,"labels.txt")) as f:
      lbs = [x.split() for x in f.read().strip().splitlines()]

    labels_map = {}
    for x in lbs:
      if labels_map.get(x[0]) != None:
        labels_map[x[0]] += "," + " ".join(x[1:])
      else:
        labels_map[x[0]] = " ".join(x[1:])
    self.labels = [x[1] for x in labels_map.items()]


  def __getitem__(self, idx):
    img_path = os.path.join(self.root, "images", self.imgs[idx])
    img = read_image(img_path)
    img = tv_tensors.Image(img)

    lb = [x.split() for x in self.labels[idx].split(",")]
    classes = np.array([x[0] for x in lb], dtype=np.int64)
    boxes = [convert2RCNNbox(np.array(x[1:], dtype=np.float32)) for x in lb]

    target = {}
    target["labels"] = torch.from_numpy(classes)
    target["boxes"] = tv_tensors.BoundingBoxes(boxes, format="XYXY", canvas_size=F.get_size(img))
    target["image_id"] = idx

    if self.transforms is not None:
        img, target = self.transforms(img, target)

    return img, target

  def __len__(self):
    return len(self.imgs)

  @staticmethod
  def collate_fn(batch):
      im, target = zip(*batch)
      return im, target

In [4]:
class VehicleDetectionTest(torch.utils.data.Dataset):
  def __init__(self, root, transforms) :
    self.root = root
    self.transforms = transforms
    self.imgs = list(sorted(glob.glob(root+"images/**.jpeg")))

  def __getitem__(self, idx):
    img_path = os.path.join(self.root, "images",self.imgs[idx])
    img = read_image(img_path)

    img = tv_tensors.Image(img)

    target = {}
    target["image_id"] = idx

    if self.transforms is not None:
        img, target = self.transforms(img, target)

    return img, target

  def __len__(self):
    return len(self.imgs)

  @staticmethod
  def collate_fn(batch):
      im, target = zip(*batch)
      return im, target

In [5]:
def get_transform():
    transforms = []
    transforms.append(T.ToDtype(torch.float, scale=True))
    transforms.append(T.ToPureTensor())
    return T.Compose(transforms)

In [6]:
def train_one_epoch(model, dataloader, optimizer, device, epoch):
    model.train()
    total_loss = 0
    loss_classes = {}

    for iteration, batch in enumerate(dataloader):
        optimizer.zero_grad()
        images, targets = batch
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in t.items()} for t in targets]
        
        loss_dict = model(images, targets)  # Model returns a dictionary
        losses = sum(loss for loss in loss_dict.values())

        # ✅ Corrected loop
        for key, loss in loss_dict.items():
            loss_classes[key] = loss_classes.get(key, 0) + loss.item()  # Convert tensor to scalar

        total_loss += losses.item()
        losses.backward()
        optimizer.step()

        if iteration % 20 == 0:
            print(f"✅ Epoch {epoch} | Iteration {iteration} | Batch Loss: {losses.item()}")

    print(f"✅ Epoch {epoch} Completed | Total Loss: {total_loss}")
    return loss_classes, total_loss


In [7]:
def calculate_validation_loss(model, dataloader, device):
    total_loss = 0
    loss_classes = {}

    for iteration, batch in enumerate(dataloader):
        images, targets = batch
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in t.items()} for t in targets]

        with torch.no_grad():
            loss_dict = model(images, targets)

        # ✅ Corrected loop
        for key, loss in loss_dict.items():
            loss_classes[key] = loss_classes.get(key, 0) + loss.item()  # Convert tensor to scalar

        losses = sum(loss for loss in loss_dict.values())
        total_loss += losses.item()

    return loss_classes, total_loss


In [8]:
def evaluate(model, dataloader, device):
  model.eval()
  predictions = {}
  total_inference_time = 0
  for iteration, batch in enumerate(dataloader):
    images, targets = batch
    images = list(image.to(device) for image in images)

    model_time = time.time()

    with torch.no_grad():      #torch.no_grad() added to avoid OOM issue
      outputs = model(images)

    outputs = [{k: v.to("cpu") for k, v in t.items()} for t in outputs]
    res = {target["image_id"]: output for target, output in zip(targets, outputs)}

    model_time = time.time() - model_time
    total_inference_time += model_time

    predictions.update(res)

  print(f'Inference time: {total_inference_time}')

  return predictions

In [9]:
def nms(bboxes: torch.Tensor, scores: torch.Tensor, iou_threshold: float) -> torch.Tensor:
    order = torch.argsort(-scores)
    indices = torch.arange(bboxes.shape[0])
    keep = torch.ones_like(indices, dtype=torch.bool)
    for i in indices:
        if keep[i]:
            bbox = bboxes[order[i]]
            iou = box_iou(bbox[None,...],(bboxes[order[i + 1:]]) * keep[i + 1:][...,None])
            overlapped = torch.nonzero(iou > iou_threshold)
            keep[overlapped + i + 1] = 0
    return order[keep]

In [10]:
root = "/WAVE/projects/CSEN-342-Wi25/data/pr2/"
output_dir = "/WAVE/users2/unix/ssonpole/pr2/"  # Or any directory where you can save models

In [11]:
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from PIL import Image

# ✅ Device Configuration
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# ✅ Define Model
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(
    weights="DEFAULT", trainable_backbone_layers=3
)

num_classes = 4  # 3 vehicle classes (1-3) + 1 background (0)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# ✅ Move Model to Device
model.to(device)

# ✅ Load Datasets
root = "/WAVE/projects/CSEN-342-Wi25/data/pr2/"
output_dir = "/WAVE/users2/unix/ssonpole/pr2/"

dataset = VehicleDetection(root + "train/", get_transform())
dataset_val = VehicleDetection(root + "val/", get_transform())
dataset_test = VehicleDetectionTest(root + "test/", get_transform())

# ✅ Data Loaders
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=4, shuffle=True, num_workers=2, collate_fn=VehicleDetection.collate_fn
)

data_loader_val = torch.utils.data.DataLoader(
    dataset_val, batch_size=2, shuffle=False, num_workers=2, collate_fn=VehicleDetection.collate_fn
)

data_loader_test = torch.utils.data.DataLoader(
    dataset_test, batch_size=1, shuffle=False, num_workers=2, collate_fn=VehicleDetection.collate_fn
)

# ✅ Optimizer & Scheduler
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

In [None]:
# ✅ Training Loop
num_epochs = 3
best_validation_loss = float("inf")
validation_loss = []
training_loss = []
training_loss_dict = []
validation_loss_dict = []

for epoch in range(num_epochs):
    # ✅ Train
    loss_dict, loss = train_one_epoch(model, data_loader, optimizer, device, epoch)
    training_loss.append(loss)
    training_loss_dict.append(loss_dict)

    # ✅ Step Scheduler
    lr_scheduler.step()

    # ✅ Validation
    loss_dict, loss = calculate_validation_loss(model, data_loader_val, device)
    validation_loss.append(loss)
    validation_loss_dict.append(loss_dict)

    # ✅ Save Best Model
    if loss < best_validation_loss:
        best_validation_loss = loss
        print("✅ Saving the best model")
        torch.save(model.state_dict(), output_dir + f"best_{epoch}.pt")

    print(f"✅ Epoch {epoch} | Validation Loss: {loss}")

# ✅ Save Loss Logs
with open(output_dir + 'loss_info.txt', 'w') as f:
    for i, (tloss, vloss) in enumerate(zip(training_loss, validation_loss)):
        f.write(f"Epoch {i}: Training Loss: {tloss} | Validation Loss: {vloss}\n")

# ✅ Test Predictions
predictions = evaluate(model, data_loader_test, device)
print(f"✅ Predictions made on {len(predictions)} images")

# ✅ Save Test Predictions
with open(output_dir + "test.txt", 'w') as f:
    for img_id, values in predictions.items():
        img_name = str(img_id + 1).zfill(5) + ".jpeg"

        # ✅ Use PIL instead of cv2
        with Image.open(root + "test/images/" + img_name) as img:
            w, h = img.size  # PIL gives (width, height)

        # ✅ Apply NMS
        '''labels = values['labels']
        boxes = values['boxes']
        scores = values['scores']
        indices = nms(boxes, scores, 0.8)
        for i in indices:'''

        for label, bbox, score in zip(values['labels'], values['boxes'], values['scores']):
            box = convert2Outputbox(bbox, w, h)
            f.write(f"{img_id+1} {label} {box[0]} {box[1]} {box[2]} {box[3]} {score}\n")

print("✅ Test predictions saved successfully!")

✅ Epoch 0 | Iteration 0 | Batch Loss: 2.1353402137756348
✅ Epoch 0 | Iteration 20 | Batch Loss: 0.30748146772384644
✅ Epoch 0 | Iteration 40 | Batch Loss: 0.15795676410198212
✅ Epoch 0 | Iteration 60 | Batch Loss: 0.5511859059333801
✅ Epoch 0 | Iteration 80 | Batch Loss: 0.18908871710300446
✅ Epoch 0 | Iteration 100 | Batch Loss: 0.5248900055885315
✅ Epoch 0 | Iteration 120 | Batch Loss: 0.2920415997505188
✅ Epoch 0 | Iteration 140 | Batch Loss: 0.5767536163330078
✅ Epoch 0 | Iteration 160 | Batch Loss: 0.3664097487926483
✅ Epoch 0 | Iteration 180 | Batch Loss: 0.4992969036102295
✅ Epoch 0 | Iteration 200 | Batch Loss: 0.1560649424791336
✅ Epoch 0 | Iteration 220 | Batch Loss: 0.4665796458721161
✅ Epoch 0 | Iteration 240 | Batch Loss: 0.3982652425765991
✅ Epoch 0 | Iteration 260 | Batch Loss: 0.5172609090805054
✅ Epoch 0 | Iteration 280 | Batch Loss: 0.37000057101249695
✅ Epoch 0 | Iteration 300 | Batch Loss: 0.6724282503128052
✅ Epoch 0 | Iteration 320 | Batch Loss: 0.4599401950836181