In [1]:
import os
import cv2
import torch
import torchvision
import numpy as np
from torch.utils.data import Dataset, DataLoader
import xml.etree.ElementTree as ET
from collections import defaultdict

# Convert a video into a tensor
def video_to_tensor(video_path, resize=None, frame_skip=1, return_orig_size = False):
    print(f"Loading video: {os.path.basename(video_path)}")
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = 0
    orig_w = None
    Orig_h = None
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if orig_w is None:
            orig_h, orig_w = frame.shape[:2]
        if resize:
            frame = cv2.resize(frame, resize)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(frame)
        frame_count += 1
    cap.release()

    if not frames:
        raise ValueError(f"No frames read from {video_path}")

    frames = np.stack(frames)[::frame_skip]
    frames = torch.from_numpy(frames).float().permute(0, 3, 1, 2) / 255.0
    print(f"Loaded {frame_count} frames -> kept {frames.shape[0]} after skipping\n")
    
    if return_orig_size:
        return frames, (orig_w, orig_h)
    else:
        return frames  # shape: (T, 3, H, W)

# Parse CVAT XML annotation file
def parse_cvat_xml(xml_path, frame_skip=1, scale=None):
    tree = ET.parse(xml_path)
    root = tree.getroot()
    annotations = defaultdict(list)

    if scale is None:
        sx = sy = 1.0
    else:
        sx, sy = scale

    for track in root.findall("track"):
        label = track.attrib["label"]
        for box in track.findall("box"):
            frame = int(box.attrib["frame"])
            outside = int(box.attrib["outside"])
            if outside != 0:
                continue

            xtl = float(box.attrib["xtl"])
            ytl = float(box.attrib["ytl"])
            xbr = float(box.attrib["xbr"])
            ybr = float(box.attrib["ybr"])

            #scale bbox into resized frame coordinates if scale != 1
            xtl *= sx
            xbr *= sx
            ytl *= sy
            ybr *= sy

            moving_attr = None
            for attr in box.findall("attribute"):
                if attr.get("name", "").lower() == "moving":
                    moving_attr = attr
                    break


            moving_flag = 1 if moving_attr.text.lower() == "true" else 0
            
            #frame skip alignment
            if frame % frame_skip == 0:
                adjusted_frame = frame // frame_skip
                annotations[adjusted_frame].append({
                    "label": label,
                    "bbox": [xtl, ytl, xbr, ybr],
                    "moving": moving_flag})

    return annotations

# Dataset class that loads videos + XML annotations together
class BaseballVideoDataset(Dataset):
    def __init__(self, video_dir, xml_dir, resize=(1280, 720), frame_skip=1, scale_boxes=True):
        self.video_dir = video_dir
        self.xml_dir = xml_dir
        self.resize = resize
        self.frame_skip = frame_skip
        self.scale_boxes = scale_boxes
        self.video_tensors = {}
        self.skipped_videos = []
        self.index_map = []

        # Match videos with their annotation XMLs by filename stem
        self.samples = []
        for vid_name in os.listdir(video_dir):
            if vid_name.lower().endswith((".mp4", ".mov", ".avi")):
                stem = os.path.splitext(vid_name)[0]
                xml_path = os.path.join(xml_dir, f"{stem}.xml")
                if os.path.exists(xml_path):
                    self.samples.append((os.path.join(video_dir, vid_name), xml_path))
                else:
                    print(f"No XML found for {vid_name}")
        print(f"\n Found {len(self.samples)} videos with matching XMLs in {video_dir}\n")

        print("Preloading videos and indexing frames...\n")

        # Loop through all matched video/XML pairs
        for vid_idx, (video_path, xml_path) in enumerate(self.samples, start=1):
            try:
                video_tensor, (orig_w, orig_h) = video_to_tensor(video_path, resize=self.resize, frame_skip=self.frame_skip, return_orig_size = True)
                annotations = parse_cvat_xml(xml_path, frame_skip=self.frame_skip)
                self.video_tensors[video_path] = video_tensor
            
            #compute scale factors for bboxes
                if self.resize is not None and self.scale_boxes:
                    new_w, new_h = self.resize
                    sx = new_w/float(orig_w)
                    sy = new_h/float(orig_h)
                    scale = (sx, sy)
                else:
                    scale = None

                #parse annotations in scaled coordinates
                annotations = parse_cvat_xml(xml_path, frame_skip = self.frame_skip, scale = scale)
                self.video_tensors[video_path] = video_tensor

            except Exception as e:
                print(f"Skipping {os.path.basename(video_path)}: {e}")
                self.skipped_videos.append((video_path, str(e)))
                continue

            # Build frame-by-frame index map
            for frame_idx, ann_list in annotations.items():
                if len(ann_list) == 0:
                    continue
                if frame_idx >= len(video_tensor):
                    print(f"Frame {frame_idx} out of range for {os.path.basename(video_path)} "
                          f"(video has {len(video_tensor)} frames) — skipping")
                    continue

                boxes = torch.tensor([a["bbox"] for a in ann_list], dtype=torch.float32)
                moving = torch.tensor([a["moving"] for a in ann_list], dtype=torch.int64)

                self.index_map.append((video_path, frame_idx, {"boxes": boxes,"moving": moving}))

            print(f"   [{vid_idx}/{len(self.samples)}] Loaded {os.path.basename(video_path)} "
                  f"({len(video_tensor)} frames, {len(annotations)} annotated)\n")

        print(f"Finished indexing {len(self.index_map)} annotated frames "
              f"from {len(self.video_tensors)} videos.\n")

    def __len__(self):
        return len(self.index_map)

    def __getitem__(self, idx):
        video_path, frame_idx, target = self.index_map[idx]

        video_tensor = self.video_tensors[video_path]
        frame_tensor = video_tensor[frame_idx]  # (3, H, W)
        
        #convert flags to int, and configure labels for model interpretation
        moving_flags = target["moving"].to(torch.int64)
        labels = moving_flags + 1

        target_out = {
            "boxes": target["boxes"],
            "moving": moving_flags,
            "labels": labels,
            "video": os.path.basename(video_path)}

        return frame_tensor, target_out


In [2]:
#define the model, adjustable number of classes, not pretrained
def get_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained = True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)
    return model

def train_loop(model, dataloader, optimizer, device):
    #set model to training mode
    model.train()
    total_loss = 0.0
    for batch_idx, (images, targets) in enumerate(dataloader):
            images = [img.to(device) for img in images]
            for t in targets:
                t["boxes"] = t["boxes"].to(device)
                t["labels"] = t["labels"].to(device)

            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

            #clear previous gradients
            optimizer.zero_grad()
            #compute new gradients via backpropagation
            losses.backward()
            #update model weights
            optimizer.step()

            total_loss += losses.item()
            if batch_idx % 5 == 0:
                print(f"Batch {batch_idx}/{len(dataloader)} | Loss: {losses.item():.4f}")

    avg_loss = total_loss/len(dataloader)
    print(f"Average Training LossL: {avg_loss:.4f}")
    return avg_loss

#evaluate the model
@torch.no_grad() #prevent gradient updates
def test_loop(model, dataloader, device):
    model.train()
    total_loss = 0.0
    for images, targets in dataloader:
        images = [img.to(device) for img in images]
        for t in targets:
            t["boxes"] = t["boxes"].to(device)
            t["labels"] = t["labels"].to(device)

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        total_loss += losses.item()

    avg_loss = total_loss / len(dataloader)
    print(f"Validation Loss: {avg_loss:.4f}")
    return avg_loss

#Function to measure model accuracy
@torch.no_grad()
def accuracy_loop(model, dataloader, device, score_thresh=0.5):
    model.eval()
    num_frames = 0
    num_correct = 0

    for images, targets in dataloader:
        images = [img.to(device) for img in images]
        # move labels to device just in case
        for t in targets:
            t["labels"] = t["labels"].to(device)

        predictions = model(images)

        for pred, tgt in zip(predictions, targets):
            # Ground truth: does this frame contain any moving object?
            gt_any_moving = (tgt["labels"] == 2).any().item()

            # Predictions: keep only boxes above a confidence threshold
            scores = pred["scores"].to(device)
            labels = pred["labels"].to(device)
            keep = scores >= score_thresh
            pred_labels = labels[keep]

            pred_any_moving = (pred_labels == 2).any().item()

            num_frames += 1
            if bool(gt_any_moving) == bool(pred_any_moving):
                num_correct += 1

    acc = num_correct / num_frames if num_frames > 0 else 0.0
    print(f"Frame-level moving/not-moving accuracy: {acc*100:.2f}% "
          f"(threshold={score_thresh})")
    return acc

def train_detector(train_dataset, val_dataset, num_classes=2, epochs=5, lr=1e-4, batch_size=4):
    #use gpu if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

    model = get_model(num_classes).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}\n----------------------------")
        train_loss = train_loop(model, train_loader, optimizer, device)
        val_loss = test_loop(model, val_loader, device)
        val_acc  = accuracy_loop(model, val_loader, device, score_thresh=0.5)
        print(f"Summary: train_loss={train_loss:.4f}, " f"val_loss={val_loss:.4f}, val_acc={val_acc*100:.2f}%")

    return model

In [3]:
#train and save trained model
if __name__ == "__main__":
    video_folder = r"C:\Users\Glen\Documents\School\BusForecasting\Final Project Folder\Raw Videos"
    xml_folder   = r"C:\Users\Glen\Documents\School\BusForecasting\Final Project Folder\Annotations"

    #Collate function for object detection
    def collate_fn(batch):
        batch = [b for b in batch if b is not None]
        frames = [b[0] for b in batch]
        targets = [b[1] for b in batch]
        return frames, targets

    #Load full dataset (videos + XMLs)
    full_dataset = BaseballVideoDataset(video_folder, xml_folder, resize=(1280, 720), frame_skip=1, scale_boxes=True)

    #print a quick summary
    print(f"\nDataset contains {len(full_dataset)} annotated frames across videos.")

    #Split train/test
    n = len(full_dataset)
    split = int(0.8 * n)
    train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [split, n - split])

    #Train the model
    trained_model = train_detector(
        train_dataset,
        val_dataset,
        num_classes=3,
        epochs=5,
        lr=1e-4,
        batch_size=4)

    #Save trained model
    torch.save(trained_model.state_dict(), "fasterrcnn_moving_detector_2.0.pth")
    print("\nModel saved as 'fasterrcnn_moving_detector_2.pth'.")

No XML found for IMG_0078.mov
No XML found for IMG_0084.mov

 Found 76 videos with matching XMLs in C:\Users\Glen\Documents\School\BusForecasting\Final Project Folder\Raw Videos

Preloading videos and indexing frames...

Loading video: dusty_1.mov
Loaded 76 frames -> kept 76 after skipping

   [1/76] Loaded dusty_1.mov (76 frames, 76 annotated)

Loading video: IMG_0030.mov
Loaded 48 frames -> kept 48 after skipping

   [2/76] Loaded IMG_0030.mov (48 frames, 3 annotated)

Loading video: IMG_0031.mov
Loaded 50 frames -> kept 50 after skipping

   [3/76] Loaded IMG_0031.mov (50 frames, 2 annotated)

Loading video: IMG_0032.mov
Loaded 51 frames -> kept 51 after skipping

   [4/76] Loaded IMG_0032.mov (51 frames, 3 annotated)

Loading video: IMG_0033.mov
Loaded 50 frames -> kept 50 after skipping

   [5/76] Loaded IMG_0033.mov (50 frames, 3 annotated)

Loading video: IMG_0034.mov
Loaded 52 frames -> kept 52 after skipping

   [6/76] Loaded IMG_0034.mov (52 frames, 3 annotated)

Loading vide




Epoch 1/5
----------------------------
Batch 0/658 | Loss: 2.2397
Batch 5/658 | Loss: 0.3585
Batch 10/658 | Loss: 0.6314
Batch 15/658 | Loss: 0.7239
Batch 20/658 | Loss: 0.5621
Batch 25/658 | Loss: 0.5229
Batch 30/658 | Loss: 0.7185
Batch 35/658 | Loss: 0.5190
Batch 40/658 | Loss: 0.5764
Batch 45/658 | Loss: 0.4711
Batch 50/658 | Loss: 0.4112
Batch 55/658 | Loss: 0.3708
Batch 60/658 | Loss: 0.3296
Batch 65/658 | Loss: 0.5586
Batch 70/658 | Loss: 0.3927
Batch 75/658 | Loss: 0.4574
Batch 80/658 | Loss: 0.6720
Batch 85/658 | Loss: 0.5343
Batch 90/658 | Loss: 0.4572
Batch 95/658 | Loss: 0.6087
Batch 100/658 | Loss: 0.4775
Batch 105/658 | Loss: 0.1884
Batch 110/658 | Loss: 0.1264
Batch 115/658 | Loss: 0.4279
Batch 120/658 | Loss: 0.4838
Batch 125/658 | Loss: 0.3431
Batch 130/658 | Loss: 0.5232
Batch 135/658 | Loss: 0.3587
Batch 140/658 | Loss: 0.8137
Batch 145/658 | Loss: 0.3415
Batch 150/658 | Loss: 0.1669
Batch 155/658 | Loss: 0.2869
Batch 160/658 | Loss: 0.6010
Batch 165/658 | Loss: 0.2

In [None]:
#link to .pth file in one drive, was too large to upload to github
#https://uofnebraska-my.sharepoint.com/:u:/g/personal/51628718_nebraska_edu/IQCJ0x1cdpvdQJXmvP5vf1XLAcN6AUHbCMoGgel_l8_oEMM?e=qqaj9l

#import script for trained model
def load_trained_model(weights_path, num_classes=3, device=None):

    # Select device
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Loading model on device: {device}")

    # Recreate model architecture
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=False)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)

    # Load saved weights
    state_dict = torch.load(weights_path, map_location=device)
    model.load_state_dict(state_dict)

    # Move model to device and set eval mode
    model.to(device)
    model.eval()

    print(f"Model loaded successfully from '{weights_path}'")
    return model