In [33]:
import os
import glob
import torch
import numpy
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import torchvision.transforms as T
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.utils import draw_bounding_boxes
import random
from scipy.optimize import linear_sum_assignment
from torchvision import ops

In [14]:
def load_original_images(pair_dir):
    """
    Load the original frame1 and frame2 images from pair_dir.
    The dataset code:
    - Finds all images
    - If there are 4 images (2 annotated + 2 original), removes annotated ones at indices 0 and 2.
    """
    images = sorted([img for img in glob.glob(os.path.join(pair_dir, "*.png"))])
    if len(images) == 4: # remove annotated images at index 0 and 2
        images = sorted([img for img in images if "annotation" not in os.path.basename(img)]) # filter out annotated images
        images = sorted([img for img in images if "BoundingBox" not in os.path.basename(img)]) # filter out images containing box in it's naming
    # After this, images[0] -> frame1, images[1] -> frame2
    img1_path = images[0]
    img2_path = images[1]
    img1 = Image.open(img1_path).convert("RGB")
    img2 = Image.open(img2_path).convert("RGB")
    return img1, img2

In [None]:
class MovingObjectDataset(Dataset):
    def __init__(self, root_dir, transforms=None):
        """
        Args:
            root_dir: directory containing all the Pair_* subdirectories.
            transforms: optional transformations to be applied on images.
        """
        self.root_dir = root_dir
        self.transforms = transforms
        # Find all pair directories
        self.pairs = [d for d in glob.glob(os.path.join(root_dir, "Pair_*")) if os.path.isdir(d)]
        self.pairs.sort()
    
    def __len__(self):
        return len(self.pairs)
    
    def __getitem__(self, idx):
        pair_dir = self.pairs[idx]
        
        ann_files = sorted(glob.glob(os.path.join(pair_dir, "*.txt")))
        
        # Load images
        img1, img2 = load_original_images(pair_dir)
        
        to_tensor = T.ToTensor()
        img1_tensor = to_tensor(img1)
        img2_tensor = to_tensor(img2)
        
        # Compute difference image
        # Consider absolute difference:
        diff_image = torch.abs(img2_tensor - img1_tensor)
        
        boxes, labels = self._load_annotations(ann_files)
        
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        
        if self.transforms:
            diff_image = self.transforms(diff_image)
        
        return diff_image, target
    
    def _load_annotations(self, ann_files):
        boxes_all = []
        labels_all = []
        
        for ann_file in ann_files:
            with open(ann_file, 'r') as f:
                for line in f:
                    parts = line.strip().split()
                    if len(parts) < 8:
                        continue
                    frame_num = int(parts[2])
                    x = float(parts[3])
                    y = float(parts[4])
                    w = float(parts[5])
                    h = float(parts[6])
                    category = int(parts[7])
                    
                    xmin = x
                    ymin = y
                    xmax = x + w
                    ymax = y + h
                    
                    boxes_all.append([xmin, ymin, xmax, ymax])
                    labels_all.append(category + 1) # 0 is reserved for background
                
        if len(boxes_all) == 0:
            boxes_all = torch.zeros((0,4), dtype=torch.float32)
            labels_all = torch.zeros((0,), dtype=torch.int64)
        else:
            boxes_all = torch.tensor(boxes_all, dtype=torch.float32)
            labels_all = torch.tensor(labels_all, dtype=torch.int64)
        
        return boxes_all, labels_all

In [3]:
def get_model(num_classes, device):
    # load a model pre-trained on COCO
    model = fasterrcnn_resnet50_fpn(pretrained=True)
    
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # new head
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

def train_model(model, train_loader, device, num_epochs=10, lr=0.005):
    model.to(device)
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=lr, momentum=0.9, weight_decay=0.0005)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
    
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for images, targets in train_loader:
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            total_loss += losses.item()
            
            optimizer.zero_grad()
            losses.backward()
            optimizer.step()
        
        lr_scheduler.step()
        print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {total_loss/len(train_loader):.4f}")

    return model

In [4]:
# some preprocessing
root_dir = "data"  # path to data directory
dataset = MovingObjectDataset(root_dir=root_dir, transforms=None)

# Split dataset into train and test: 70% train, 30% test
dataset_size = len(dataset)
train_size = int(0.7 * dataset_size)
test_size = dataset_size - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

def collate_fn(batch):
    return list(zip(*batch))

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)

# number of classes: Background (0) + Unknown/Generic(1) + Person(2) + Car(3) 
# + Other Vehicle(4) + Other Object(5) + Bike(6)
num_classes = 7

device = torch.device("cuda")

In [None]:
# training the model
model = get_model(num_classes,device)
model = train_model(model, train_loader, device, num_epochs=10, lr=0.005)

# Save the trained model parameters
torch.save(model.state_dict(), "/results/model/model_weights.pth")
print("Model weights saved to model_weights.pth")



before: ['data\\Pair_S_050203_09_001960_002083_0530_0660\\S_050203_09_001960_002083_0530.BoundingBox.png', 'data\\Pair_S_050203_09_001960_002083_0530_0660\\S_050203_09_001960_002083_0530.png', 'data\\Pair_S_050203_09_001960_002083_0530_0660\\S_050203_09_001960_002083_0660.BoundingBox.png', 'data\\Pair_S_050203_09_001960_002083_0530_0660\\S_050203_09_001960_002083_0660.png']
after: ['data\\Pair_S_050203_09_001960_002083_0530_0660\\S_050203_09_001960_002083_0530.png', 'data\\Pair_S_050203_09_001960_002083_0530_0660\\S_050203_09_001960_002083_0660.png']
before: ['data\\Pair_S_050202_09_001642_001712_1600_1650\\S_050202_09_001642_001712_1600.BoundingBox.png', 'data\\Pair_S_050202_09_001642_001712_1600_1650\\S_050202_09_001642_001712_1600.png', 'data\\Pair_S_050202_09_001642_001712_1600_1650\\S_050202_09_001642_001712_1650.BoundingBox.png', 'data\\Pair_S_050202_09_001642_001712_1600_1650\\S_050202_09_001642_001712_1650.png']
after: ['data\\Pair_S_050202_09_001642_001712_1600_1650\\S_050202_

In [51]:
# Evaluate the model
def compute_iou(box1, box2):
    """
    Compute the Intersection over Union (IoU) of two bounding boxes.
    """
    # box: [xmin, ymin, xmax, ymax]
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    
    inter_area = max(0, x2 - x1) * max(0, y2 - y1)
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union_area = box1_area + box2_area - inter_area
    
    if union_area == 0:
        return 0.0
    return inter_area / union_area

def get_pair_info(dataset, subset, idx):
    """
    Given the test subset and an index in the DataLoader,
    find the original pair directory name.
    """
    original_idx = subset.indices[idx]  # the index in the original dataset
    pair_dir = dataset.pairs[original_idx]
    return pair_dir, original_idx


category_names = {
    1: "Unknown",
    2: "Person",
    3: "Car",
    4: "Other Vehicle",
    5: "Other Object",
    6: "Bike"
}

In [50]:
def evaluate_model(model, test_loader, device, iou_threshold=0.5, dataset=None, subset=None):
    model.eval()
    model.to(device)
    
    total_samples = 0     # total number of ground truth boxes (for computing accuracy)
    total_correct = 0     # total matched ground truth boxes

    total_tp = 0
    total_fp = 0
    total_fn = 0

    with torch.no_grad():
        for i1, (images, targets) in enumerate(test_loader):
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k,v in t.items()} for t in targets]
            preds = model(images)  # predictions from model
            
            for i2, (pred, target) in enumerate(zip(preds, targets)):
                # Get original images
                pair_dir, original_idx = get_pair_info(dataset, subset, i1*test_loader.batch_size + i2)
                img1, img2 = load_original_images(pair_dir)
                diff_img = images[i2].cpu().clone()

                gt_boxes = target["boxes"]
                pred_boxes = pred["boxes"]
                gt_labels = target["labels"]
                pred_labels = pred["labels"]
                pred_scores = pred["scores"]
                frame_number = target.get("frame_number", -1)

                # score_mask = pred_scores >= 0.7
                # pred_boxes = pred_boxes[score_mask]
                # pred_labels = pred_labels[score_mask]
                # red_scores = pred_scores[score_mask]

                keep = ops.nms(pred_boxes, pred_scores, 0.3)
                pred_boxes = pred_boxes[keep]
                pred_scores = pred_scores[keep]
                pred_labels = pred_labels[keep]

                # If both are empty, skip
                if len(gt_boxes) == 0 and len(pred_boxes) == 0:
                    continue

                # Match predicted boxes to ground truth boxes
                used_gt = set()
                correct = 0
                matched_count = 0 # matched count for bbox

                for pb, pl in zip(pred_boxes, pred_labels):
                    # Compute IoU with all GT boxes
                    ious = [compute_iou(pb, gb) for gb in gt_boxes]
                    if len(ious) > 0:
                        max_iou = max(ious)
                        max_iou_idx = ious.index(max_iou)
                        if max_iou > iou_threshold and max_iou_idx not in used_gt:
                            matched_count += 1
                            used_gt.add(max_iou_idx)
                            gt_label = gt_labels[max_iou_idx].item()
                            if pl.item() == gt_label:
                                correct += 1
                
                # Update totals
                tp = matched_count
                fp = len(pred_boxes) - matched_count
                fn = len(gt_boxes) - matched_count

                total_tp += tp
                total_fp += fp
                total_fn += fn

                total_correct += correct
                total_samples += len(gt_boxes)


                # Save the results
                # find the directory
                results_dir = f"results/{iou_threshold}"
                os.makedirs(results_dir, exist_ok=True)
                pair_name = os.path.basename(pair_dir)
                pair_out_dir = os.path.join(results_dir, pair_name)
                os.makedirs(pair_out_dir, exist_ok=True)

                # write bbox.txt
                bbox_file = os.path.join(pair_out_dir, "bbox.txt")
                with open(bbox_file, 'w') as f:
                    for pb, pl in zip(pred_boxes, pred_labels):
                        xmin, ymin, xmax, ymax = pb.tolist()
                        w = xmax - xmin
                        h = ymax - ymin
                        category = int(pl.item()) -1
                        f.write(f"0 0 {frame_number} {xmin} {ymin} {w} {h} {category}\n")

                # Visualize predictions                
                # frame1_img and frame2_img are PIL; convert to tensor for drawing
                frame1_tensor = T.ToTensor()(img1) * 255
                frame1_tensor = frame1_tensor.type(torch.uint8)
                frame2_tensor = T.ToTensor()(img2) * 255
                frame2_tensor = frame2_tensor.type(torch.uint8)
                diff_tensor = diff_img * 255
                diff_tensor = diff_tensor.type(torch.uint8)

                colors = ["red"] * len(pred_boxes)  # all boxes red

                frame1_drawn = draw_bounding_boxes(frame1_tensor, boxes=pred_boxes, colors=colors, width=2)
                frame2_drawn = draw_bounding_boxes(frame2_tensor, boxes=pred_boxes, colors=colors, width=2)
                diff_drawn = draw_bounding_boxes(diff_tensor, boxes=pred_boxes, colors=colors, width=2)

                # Convert back to PIL and save
                to_pil = T.ToPILImage()
                to_pil(frame1_drawn).save(os.path.join(pair_out_dir, "frame1.png"))
                to_pil(frame2_drawn).save(os.path.join(pair_out_dir, "frame2.png"))
                to_pil(diff_drawn).save(os.path.join(pair_out_dir, "diff.png"))
    
    if total_samples == 0:
        print("No ground truth boxes found in the test set.")
        return

    # Compute metrics
    accuracy = total_correct / total_samples
    precision = total_tp / (total_tp + total_fp + 1e-6)
    recall = total_tp / (total_tp + total_fn + 1e-6)

    print(f"Evaluation - IoU Threshold: {iou_threshold}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")

In [None]:
def evaluate_model_bi(model, test_loader, device, iou_threshold=0.5, dataset=None, subset=None):
    model.eval()
    model.to(device)
    
    total_samples = 0     # total number of ground truth boxes
    total_correct = 0     # total matched ground truth boxes with correct category
    
    total_tp = 0
    total_fp = 0
    total_fn = 0

    with torch.no_grad():
        for i1, (images, targets) in enumerate(test_loader):
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k,v in t.items()} for t in targets]
            preds = model(images)  # predictions from model
            
            for i2, (pred, target) in enumerate(zip(preds, targets)):
                # Retrieve pair directory info
                pair_dir, original_idx = get_pair_info(dataset, subset, i1*test_loader.batch_size + i2)
                img1, img2 = load_original_images(pair_dir)
                diff_img = images[i2].cpu().clone()

                gt_boxes = target["boxes"].cpu()
                pred_boxes = pred["boxes"].cpu()
                gt_labels = target["labels"]
                pred_labels = pred["labels"]
                pred_scores = pred["scores"]
                frame_number = target.get("frame_number", -1)

                score_mask = pred_scores >= 0.65
                pred_boxes = pred_boxes[score_mask.to(pred_boxes.device)].cpu()
                pred_labels = pred_labels[score_mask]
                pred_scores = pred_scores[score_mask]
                
                # If no GT and no predictions, continue
                if len(gt_boxes) == 0 and len(pred_boxes) == 0:
                    continue
                
                # Compute IoU matrix
                iou_matrix = []
                for gb in gt_boxes:
                    ious = [compute_iou(gb, pb) for pb in pred_boxes]
                    iou_matrix.append(ious)
                
                iou_matrix = numpy.array(iou_matrix)
                
                # If there are no GT boxes or no predictions, handle edge cases
                if len(gt_boxes) == 0:
                    # all predictions are false positives
                    fp = len(pred_boxes)
                    total_fp += fp
                    continue
                if len(pred_boxes) == 0:
                    # all ground truths are missed
                    fn = len(gt_boxes)
                    total_fn += fn
                    total_samples += len(gt_boxes)
                    continue
                
                # Hungarian algorithm works on cost, not IoU, so use cost = 1 - IoU
                cost_matrix = 1 - iou_matrix
                
                # Solve the assignment problem
                gt_indices, pred_indices = linear_sum_assignment(cost_matrix)
                
                # Count matches
                matched_count = 0
                correct = 0
                assigned_gt = set(gt_indices)  # which GT boxes got matched
                assigned_pred = set(pred_indices)  # which pred boxes got matched

                for g_idx, p_idx in zip(gt_indices, pred_indices):
                    iou_val = iou_matrix[g_idx, p_idx]
                    if iou_val >= iou_threshold:
                        matched_count += 1
                        # Check label correctness
                        if gt_labels[g_idx].item() == pred_labels[p_idx].item():
                            correct += 1
                
                tp = matched_count
                fp = len(pred_boxes) - matched_count
                fn = len(gt_boxes) - matched_count

                total_tp += tp
                total_fp += fp
                total_fn += fn

                total_correct += correct
                total_samples += len(gt_boxes)

                # Save results
                results_dir = f"results/{iou_threshold}"
                os.makedirs(results_dir, exist_ok=True)
                pair_name = os.path.basename(pair_dir)
                pair_out_dir = os.path.join(results_dir, pair_name)
                os.makedirs(pair_out_dir, exist_ok=True)

                bbox_file = os.path.join(pair_out_dir, "bbox.txt")
                with open(bbox_file, 'w') as f:
                    for pb, pl in zip(pred_boxes, pred_labels):
                        xmin, ymin, xmax, ymax = pb.tolist()
                        w = xmax - xmin
                        h = ymax - ymin
                        category = int(pl.item()) - 1
                        f.write(f"0 0 {frame_number} {xmin} {ymin} {w} {h} {category}\n")

                # Visualize predictions
                frame1_tensor = T.ToTensor()(img1) * 255
                frame1_tensor = frame1_tensor.type(torch.uint8)
                frame2_tensor = T.ToTensor()(img2) * 255
                frame2_tensor = frame2_tensor.type(torch.uint8)
                diff_tensor = diff_img * 255
                diff_tensor = diff_tensor.type(torch.uint8)

                colors = ["red"] * len(pred_boxes)

                labels_for_display = [category_names.get(lbl.item(), "N/A") for lbl in pred_labels]

                frame1_drawn = draw_bounding_boxes(frame1_tensor, boxes=pred_boxes, colors=colors, width=2, labels = labels_for_display)
                frame2_drawn = draw_bounding_boxes(frame2_tensor, boxes=pred_boxes, colors=colors, width=2, labels = labels_for_display)
                diff_drawn = draw_bounding_boxes(diff_tensor, boxes=pred_boxes, colors=colors, width=2, labels = labels_for_display)

                to_pil = T.ToPILImage()
                to_pil(frame1_drawn).save(os.path.join(pair_out_dir, "frame1.png"))
                to_pil(frame2_drawn).save(os.path.join(pair_out_dir, "frame2.png"))
                to_pil(diff_drawn).save(os.path.join(pair_out_dir, "diff.png"))

    if total_samples == 0:
        print("No ground truth boxes found in the test set.")
        return

    accuracy = total_correct / total_samples
    precision = total_tp / (total_tp + total_fp + 1e-6)
    recall = total_tp / (total_tp + total_fn + 1e-6)

    print(f"Evaluation - IoU Threshold: {iou_threshold}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")

In [49]:
evaluate_model_bi(model, test_loader, device, iou_threshold=0.5, dataset=dataset, subset=test_dataset)

before: ['data\\Pair_S_050203_07_001288_001531_2910_2940\\S_050203_07_001288_001531_2910.BoundingBox.png', 'data\\Pair_S_050203_07_001288_001531_2910_2940\\S_050203_07_001288_001531_2910.png', 'data\\Pair_S_050203_07_001288_001531_2910_2940\\S_050203_07_001288_001531_2940.BoundingBox.png', 'data\\Pair_S_050203_07_001288_001531_2910_2940\\S_050203_07_001288_001531_2940.png']
after: ['data\\Pair_S_050203_07_001288_001531_2910_2940\\S_050203_07_001288_001531_2910.png', 'data\\Pair_S_050203_07_001288_001531_2910_2940\\S_050203_07_001288_001531_2940.png']
before: ['data\\Pair_S_050203_07_001288_001531_3030_3120\\S_050203_07_001288_001531_3030.BoundingBox.png', 'data\\Pair_S_050203_07_001288_001531_3030_3120\\S_050203_07_001288_001531_3030.png', 'data\\Pair_S_050203_07_001288_001531_3030_3120\\S_050203_07_001288_001531_3120.BoundingBox.png', 'data\\Pair_S_050203_07_001288_001531_3030_3120\\S_050203_07_001288_001531_3120.png']
after: ['data\\Pair_S_050203_07_001288_001531_3030_3120\\S_050203_