In [50]:
!pip install motmetrics
!pip install deep-sort-realtime
!git clone https://github.com/abewley/sort.git
!pip install filterpy


fatal: destination path 'sort' already exists and is not an empty directory.


<h3>Imports</h3>

In [51]:
import matplotlib
matplotlib.use('Agg')  # non-interactive backend suitable for headless environments

import os
import zipfile
import gdown
import torch
import torchvision
import numpy as np
import pandas as pd
import motmetrics as mm
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from deep_sort_realtime.deepsort_tracker import DeepSort
from tqdm import tqdm
from PIL import Image
import sys

# Fix sort.py backend issue
with open('/content/sort/sort.py', 'r') as file:
    code = file.read()

# Replace TkAgg with Agg
fixed_code = code.replace('TkAgg', 'Agg')

with open('/content/sort/sort.py', 'w') as file:
    file.write(fixed_code)

print("Backend issue fixed!")

sys.path.append('/content/sort')


Backend issue fixed!


<h3>Configuration & Device Setup</h3>

In [52]:
# CONFIGURATION
CONFIG = {
    "dataset_url": "https://drive.google.com/uc?id=1yvOwbPks7dFzMX2z4JoUQlwdEfNYQd7-",
    "dataset_zip": "/content/MOT15.zip",
    "dataset_path": "/content/MOT15",
    "tracking": {"iou_threshold": 0.3, "max_age": 30},
    "training": {"epochs": 1, "batch_size": 8, "learning_rate": 0.0001},
}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cuda


<h3>Dataset Download & Extraction Functions</h3>

In [53]:
# DOWNLOAD & EXTRACT DATASET
def download_dataset():
    if not os.path.exists(CONFIG["dataset_zip"]):
        print("Downloading MOT15 dataset from Google Drive...")
        gdown.download(CONFIG["dataset_url"], CONFIG["dataset_zip"], quiet=False)
    else:
        print("Dataset already downloaded.")

def extract_dataset():
    if not os.path.exists(CONFIG["dataset_path"]):
        print("Extracting dataset...")
        with zipfile.ZipFile(CONFIG["dataset_zip"], 'r') as zip_ref:
            zip_ref.extractall("/content/")
        print(f"Dataset extracted to {CONFIG['dataset_path']}")
    else:
        print("Dataset already extracted.")


<h3>Data Augmentation Function</h3>

In [54]:
# DATA AUGMENTATION
def apply_augmentations(image):
    transform = transforms.Compose([
        transforms.Resize((640, 640)),
        transforms.RandomCrop(600),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 2.0)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    return transform(image)


<h3>MOT15Dataset Class</h3>

In [55]:
# MOT15 DATASET CLASS
class MOT15Dataset(Dataset):
    def __init__(self, root_dir, mode="train", transform=None):
        self.root_dir = os.path.join(root_dir, mode)
        self.transform = transform
        self.data = []
        for seq in os.listdir(self.root_dir):
            img_dir = os.path.join(self.root_dir, seq, "img1")
            gt_path = os.path.join(self.root_dir, seq, "gt/gt.txt")
            if os.path.exists(gt_path):
                gt_df = pd.read_csv(gt_path, header=None)
                gt_df.columns = ["frame", "track_id", "x", "y", "w", "h", "conf", "class", "visibility", "unused"]
                for img_name in sorted(os.listdir(img_dir)):
                    frame_id = int(img_name.split('.')[0])
                    frame_gt = gt_df[gt_df["frame"] == frame_id]
                    boxes_df = frame_gt[["x", "y", "w", "h"]].copy()
                    boxes_df = pd.DataFrame({
                        'x1': boxes_df['x'],
                        'y1': boxes_df['y'],
                        'x2': boxes_df['x'] + boxes_df['w'],
                        'y2': boxes_df['y'] + boxes_df['h']
                    })
                    boxes = boxes_df[['x1', 'y1', 'x2', 'y2']].values
                    labels = np.ones(len(boxes))
                    self.data.append((os.path.join(img_dir, img_name), boxes, labels))
            else:
              # If no ground truth is available, load the image with empty boxes and labels.
              for img_name in sorted(os.listdir(img_dir)):
                  self.data.append((os.path.join(img_dir, img_name), np.empty((0, 4)), np.empty((0,))))


    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, boxes, labels = self.data[idx]
        image = Image.open(img_path).convert("RGB")
        target = {"boxes": torch.tensor(boxes, dtype=torch.float32), "labels": torch.tensor(labels, dtype=torch.int64)}
        if self.transform:
            image = self.transform(image)
        return image, target


<h3>Object Detector Class</h3>

In [56]:
# OBJECT DETECTOR CLASS
class ObjectDetector:
    def __init__(self, num_classes=2):
        self.model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
        in_features = self.model.roi_heads.box_predictor.cls_score.in_features
        self.model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
        self.model.to(device)
        self.model.train()

    def detect_objects(self, images):
        img_tensors = [
            img.to(device) if isinstance(img, torch.Tensor) else apply_augmentations(img).to(device)
            for img in images
        ]
        with torch.no_grad():
            predictions = self.model(img_tensors)
        return predictions



<h3>Adaptive Tracker Class</h3>

In [57]:
# ADAPTIVE TRACKER CLASS
# Make sure that Sort is imported from your fixed sort.py file.
from sort import Sort

class AdaptiveTracker:
    def __init__(self):
        self.sort_tracker = Sort()
        # self.deep_sort = DeepSort(max_age=30, n_init=3, max_cosine_distance=0.2)
        self.previous_tracks = {}

    def track_objects(self, raw_detections, frame):
        if len(raw_detections) > 0:
            sort_dets = np.array([d[0] + [d[1]] for d in raw_detections])
            sort_tracked = self.sort_tracker.update(sort_dets)
        else:
            sort_tracked = np.empty((0, 5))

        # For a simpler pipeline, use only SORT results:
        consistent_tracks = []
        for track in sort_tracked:
            track_id = int(track[4])
            bbox = track[:4].tolist()
            # Here you can add consistency checks if needed
            consistent_tracks.append({'track_id': track_id, 'bbox': bbox})

        return sort_tracked, consistent_tracks





<h3>Training & Evaluation Functions</h3>

In [58]:
# TRAINING FUNCTION
def train_faster_rcnn(model, train_loader, epochs=10, lr=0.0001):
    optimizer = torch.optim.Adam(model.model.parameters(), lr=lr)
    model.model.train()
    for epoch in range(epochs):
        epoch_loss = 0
        for images, targets in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            images = [img.to(device) for img in images]
            targets = [{"boxes": t["boxes"].to(device), "labels": t["labels"].to(device)} for t in targets]
            optimizer.zero_grad()
            loss_dict = model.model(images, targets)
            loss = sum(loss for loss in loss_dict.values())
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {epoch_loss:.4f}")

# PERFORMANCE EVALUATION FUNCTION
def evaluate_performance(detections, dataset):
    acc = mm.MOTAccumulator(auto_id=True)
    for idx, det in enumerate(detections):
        print(f"Frame {idx}: Detected track IDs: {det['track_id']}, Bounding boxes: {det['bboxes']}")
        print("Detected track IDs:", det["track_id"])
        print("Detected bboxes:", det["bboxes"])
        gt_boxes = dataset[idx][1]["boxes"].numpy()
        gt_ids = np.arange(len(gt_boxes))
        det_boxes = np.array(det["bboxes"])
        det_ids = det["track_id"]
        distances = mm.distances.iou_matrix(gt_boxes, det_boxes, max_iou=0.3)
        acc.update(gt_ids, det_ids, distances)

    mh = mm.metrics.create()
    summary = mh.compute(acc, metrics=['mota', 'motp', 'idf1', 'num_switches'], name='Overall')
    print(summary)

def collate_fn(batch):
    return tuple(zip(*batch))


<h3>Training Execution</h3>

In [59]:
# ------------------------------
# Training Cell
# ------------------------------

# Download and extract dataset
download_dataset()
extract_dataset()

# Create datasets and dataloaders (using training augmentation)
train_dataset = MOT15Dataset(CONFIG["dataset_path"], mode="train", transform=apply_augmentations)
test_dataset = MOT15Dataset(CONFIG["dataset_path"], mode="test", transform=apply_augmentations)

train_loader = DataLoader(train_dataset, batch_size=CONFIG["training"]["batch_size"],
                          shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=CONFIG["training"]["batch_size"],
                         shuffle=False, collate_fn=collate_fn)

# Initialize and train detector
detector = ObjectDetector(num_classes=2)
train_faster_rcnn(detector, train_loader, epochs=CONFIG["training"]["epochs"],
                  lr=CONFIG["training"]["learning_rate"])

# Save the trained model checkpoint
torch.save(detector.model.state_dict(), "/content/fasterrcnn_checkpoint.pth")
print("Model checkpoint saved!")



Dataset already downloaded.
Dataset already extracted.


Epoch 1/1: 100%|██████████| 688/688 [25:05<00:00,  2.19s/it]


Epoch 1, Loss: 4235.9048
Model checkpoint saved!


In [None]:
from torch.utils.data import random_split, DataLoader

# Split the training dataset (assuming 'train_dataset' is already defined)
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_subset, val_subset = random_split(train_dataset, [train_size, val_size])

print(f"Train subset size: {len(train_subset)}")
print(f"Validation subset size: {len(val_subset)}")

# Create a DataLoader for the validation subset
val_loader = DataLoader(val_subset, batch_size=CONFIG["training"]["batch_size"],
                        shuffle=False, collate_fn=collate_fn)

# Switch model to evaluation mode (if not already)
detector.model.eval()

all_val_detections = []
frame_counter = 0

for images, _ in tqdm(val_loader, desc="Running Validation"):
    # Get predictions for the current batch (list of dicts, one per image)
    predictions = detector.detect_objects(images)

    for pred in predictions:
        if len(pred["boxes"]) > 0:
            boxes = pred["boxes"].cpu().numpy()
            scores = pred["scores"].cpu().numpy().reshape(-1, 1)
            dets_array = np.hstack((boxes, scores))
            # Format each detection as: ([x1,y1,x2,y2], score, dummy_class)
            detections_list = [(d[:4].tolist(), float(d[4]), 0) for d in dets_array]
        else:
            detections_list = []

        # If you're using a tracker, pass the corresponding image frame if needed.
        # Here, we'll assume your tracker can work with the current image frame.
        # For simplicity, if you're not using the tracker, you can skip this.
        _, consistent_tracks = tracker.track_objects(detections_list, images[0])

        # Save the tracker output for this frame
        all_val_detections.append({
            "track_id": [t.track_id for t in consistent_tracks] if isinstance(consistent_tracks[0], object) else [t["track_id"] for t in consistent_tracks],
            "bboxes": [t.to_tlbr() for t in consistent_tracks] if hasattr(consistent_tracks[0], "to_tlbr") else [t["bbox"] for t in consistent_tracks]
        })
        frame_counter += 1


<h3>Evaluation Cell</h3>

In [61]:
# ------------------------------
# Evaluation Cell
# ------------------------------

import torch
import numpy as np
from torch.utils.data import DataLoader
from tqdm import tqdm

# Load the trained model checkpoint for evaluation
detector = ObjectDetector(num_classes=2)
detector.model.load_state_dict(torch.load("/content/fasterrcnn_checkpoint.pth"))
detector.model.eval()  # Set model to evaluation mode
print("Model loaded for evaluation!")

# Re-create the test dataset (using the same transform as training)
test_dataset = MOT15Dataset(CONFIG["dataset_path"], mode="test", transform=apply_augmentations)
print("Test dataset size:", len(test_dataset))

# Debug: print a few samples from the test dataset
for idx in range(5):
    image, target = test_dataset[idx]
    print(f"Frame {idx}:")

    # Check the image type and size
    if isinstance(image, torch.Tensor):
        # Convert tensor to numpy (assuming image is in CxHxW format)
        img_np = image.detach().cpu().permute(1, 2, 0).numpy()
        print(" - Image shape (HxWxC):", img_np.shape)
    else:
        print(" - Image size:", image.size)

    # Print ground truth boxes
    if "boxes" in target:
        print(" - Ground truth boxes:", target["boxes"])
    else:
        print(" - No ground truth boxes found.")

    print("-" * 40)


# # Run detection on the test dataset
# tracker = AdaptiveTracker()
# all_detections = []
# frame_counter = 0

# for images, _ in tqdm(test_loader, desc="Running Detection"):
#     # Get predictions for the current batch (list of dicts, one per image)
#     predictions = detector.detect_objects(images)

#     current_frame = np.array(images[0])

#     # Process each image individually
#     for pred in predictions:
#         if len(pred["boxes"]) > 0:
#             # Convert boxes and scores into a numpy array (N,5)
#             boxes = pred["boxes"].cpu().numpy()         # shape (N,4)
#             scores = pred["scores"].cpu().numpy().reshape(-1, 1)  # shape (N,1)
#             dets_array = np.hstack((boxes, scores))       # shape (N,5)
#             # Convert each detection to expected format: ([x1,y1,x2,y2], score, dummy_class)
#             detections_list = [(d[:4].tolist(), float(d[4]), 0) for d in dets_array]
#         else:
#             detections_list = []

#         # Pass detections for the current image to the tracker (using frame_counter as frame id)
#         _, consistent_tracks = tracker.track_objects(detections_list, current_frame)

#         # Save the tracker output for this frame
#         all_detections.append({
#             "track_id": [t["track_id"] for t in consistent_tracks],
#             "bboxes": [t["bbox"] for t in consistent_tracks]
#         })

#         frame_counter += 1

# # Evaluate performance using the detections from this run
# evaluate_performance(all_detections, test_dataset)
# print("Evaluation completed!")



  detector.model.load_state_dict(torch.load("/content/fasterrcnn_checkpoint.pth"))


Model loaded for evaluation!
Test dataset size: 5783
Frame 0:
 - Image shape (HxWxC): (600, 600, 3)
 - Ground truth boxes: tensor([], size=(0, 4))
----------------------------------------
Frame 1:
 - Image shape (HxWxC): (600, 600, 3)
 - Ground truth boxes: tensor([], size=(0, 4))
----------------------------------------
Frame 2:
 - Image shape (HxWxC): (600, 600, 3)
 - Ground truth boxes: tensor([], size=(0, 4))
----------------------------------------
Frame 3:
 - Image shape (HxWxC): (600, 600, 3)
 - Ground truth boxes: tensor([], size=(0, 4))
----------------------------------------
Frame 4:
 - Image shape (HxWxC): (600, 600, 3)
 - Ground truth boxes: tensor([], size=(0, 4))
----------------------------------------
