In [3]:
import os
import math
import pandas as pd
import torch
from torchvision.io import read_image
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import kagglehub
from pathlib import Path
from torchvision import tv_tensors
from torchvision.tv_tensors import BoundingBoxFormat




try:
    from lxml import etree
    print("running with lxml.etree")
except ImportError:
    import xml.etree.ElementTree as etree
    print("running with Python's xml.etree.ElementTree")

# Dataset
# https://www.kaggle.com/datasets/karthika95/pedestrian-detection

# Download latest version
#path = kagglehub.dataset_download("karthika95/pedestrian-detection")

#print("Path to dataset files:", path)



running with lxml.etree


In [4]:
# get all unique labels

def get_all_unique_labels(annotations_file):
    annotations_dir = Path(annotations_file)
    labels = dict()
    for item in annotations_dir.iterdir():
        file_path = f"{annotations_dir}/{item.name}"
        if Path(file_path).is_file():  
            try:
                # Parse the XML from the file
                tree = etree.parse(file_path)
                # Get the root element
                root = tree.getroot()
    
                objects = root.findall("object")
                for obj in objects:
                    lbl = obj.find("name").text
                    lbl_count = labels.get(lbl, 0)
                    labels[lbl] = lbl_count + 1
    
            except etree.XMLSyntaxError as e:
                print(f"XML parsing error: {e}")
            except IOError as e:
                print(f"File error: {e}")
    
    return labels


train_annotations_dir = "/Users/Dylan/Documents/ml/pedestrian_tracking/dataset/Train/Annotations"

get_all_unique_labels(train_annotations_dir)

{'person-like': 960, 'person': 1106}

In [14]:
## Dataset

'''
image: torchvision.tv_tensors.Image

target: a dict containing the following fields
    - boxes, torchvision.tv_tensors.BoundingBoxes of shape [N, 4]: the coordinates of the N bounding boxes in [x0, y0, x1, y1] format, ranging from 0 to W and 0 to H
    - labels, integer torch.Tensor of shape [N]: the label for each bounding box. 0 represents always the background class.
    - image_id, int: an image identifier. It should be unique between all the images in the dataset, and is used during evaluation
    - area, float torch.Tensor of shape [N]: the area of the bounding box. This is used during evaluation with the COCO metric, to separate the metric scores between small, medium and large boxes.
    - iscrowd, uint8 torch.Tensor of shape [N]: instances with iscrowd=True will be ignored during evaluation.

'''


class MyDataset(Dataset):
    # a dataset has to implement these 3 methods
    def __init__(self, annotations_file, img_dir, transforms=None):
        
        self.img_labels = self._get_labels(annotations_file)
        self.img_dir = img_dir
        self.transforms = transforms
    

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        
        n = self.img_labels[idx]["image_id"]
        num_bytes = math.ceil(n.bit_length() / 8)
        image_name = n.to_bytes(num_bytes, 'big').decode('utf-8').split("/")[1].split(".")[0]
        
        img_path = os.path.join(self.img_dir, image_name + ".jpg")
        image = read_image(img_path)
        label = self.img_labels[idx]
        image = tv_tensors.Image(image)

        if self.transforms is not None:
            image, label = self.transforms(image, label)
    
        return image, label

    def _get_labels(self, annotations_file):
        labels = []
        label_map = {"person": 1, "person-like": 2}
        # iterate over the directory
        annotations_dir = Path(annotations_file)
        
        # every image can have multiple bounding boxes
        for item in annotations_dir.iterdir():
            
            file_path = f"{annotations_dir}/{item.name}"
    
            if Path(file_path).is_file():  
                try:
                    # Parse the XML from the file
                    tree = etree.parse(file_path)
                    # Get the root element
                    root = tree.getroot()
        
                    # get width x height
                    size   = root.find("size")
                    width  = int(size.find("width").text)
                    height = int(size.find("height").text)
        
                    objects = root.findall("object")

                    bboxes = []
                    lbls   = []
                    areas  = []

                    for obj in objects:
                        # get the bounding box for each object
                        bnd_box_xml = obj.find("bndbox")
                        x1, y1, x2, y2 = float(bnd_box_xml.find("xmin").text), float(bnd_box_xml.find("ymin").text), float(bnd_box_xml.find("xmax").text), float(bnd_box_xml.find("ymax").text)
                        area = (x2 - x1) * (y2 - y1)
                        areas.append(area)
                        bboxes.append([x1, y1, x2, y2])
                        lbls.append(label_map[obj.find("name").text])
                
                    bboxes = tv_tensors.BoundingBoxes(
                        bboxes,
                        format=BoundingBoxFormat.XYXY,
                        canvas_size=(height, width)
                    )

                    image_label = {
                        "boxes": bboxes,
                        "labels": torch.tensor(lbls),
                        "image_id": int.from_bytes("/".join(file_path.split("/")[-2:]).encode('utf-8'), 'big'),
                        "area": torch.tensor(areas),
                        "iscrowd": torch.tensor([False for i in range(len(lbls))])
                    }

                    
                    labels.append(image_label)
            
                except etree.XMLSyntaxError as e:
                    print(f"XML parsing error: {e}")
                except IOError as e:
                    print(f"File error: {e}")
            
        return labels


In [15]:
# instantiate training dataset
train_annotations_dir = "/Users/Dylan/Documents/ml/pedestrian_tracking/dataset/Train/Annotations"
train_img_dir = "/Users/Dylan/Documents/ml/pedestrian_tracking/dataset/Train/JPEGImages"
training_dataset = MyDataset(train_annotations_dir, train_img_dir)

In [16]:
# instantiate val dataset
val_annotations_dir = "/Users/Dylan/Documents/ml/pedestrian_tracking/dataset/Val/Annotations"
val_img_dir = "/Users/Dylan/Documents/ml/pedestrian_tracking/dataset/Val/JPEGImages"
val_dataset = MyDataset(val_annotations_dir, val_img_dir)

In [17]:
print(training_dataset[0])

(Image([[[118, 118, 118,  ..., 164, 164, 164],
        [118, 118, 119,  ..., 165, 165, 164],
        [119, 119, 119,  ..., 166, 166, 166],
        ...,
        [ 82,  75,  90,  ...,  21,  22,  25],
        [ 75,  60,  93,  ...,  22,  24,  28],
        [ 71,  56,  99,  ...,  24,  26,  28]],

       [[146, 146, 146,  ..., 179, 179, 179],
        [146, 146, 147,  ..., 180, 180, 179],
        [147, 147, 147,  ..., 181, 181, 181],
        ...,
        [ 76,  62,  67,  ...,  23,  24,  26],
        [ 69,  47,  70,  ...,  24,  26,  29],
        [ 65,  43,  76,  ...,  26,  28,  29]],

       [[194, 194, 194,  ..., 210, 210, 210],
        [194, 194, 195,  ..., 211, 211, 210],
        [195, 195, 195,  ..., 212, 212, 212],
        ...,
        [ 44,  30,  33,  ...,   9,  10,  12],
        [ 37,  15,  36,  ...,  10,  12,  15],
        [ 33,  11,  42,  ...,  12,  14,  15]]], dtype=torch.uint8, ), {'boxes': BoundingBoxes([[ 35.,  21., 598., 435.]], format=BoundingBoxFormat.XYXY, canvas_size=(436, 653

In [113]:
len(training_dataset)

944

In [114]:
len(val_dataset)

160

In [250]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# load a model pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

# replace the classifier with a new one, that has
# num_classes which is user-defined
num_classes = 3  # 1 class (person) + background
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /Users/Dylan/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth


100%|████████████████████████████████████████| 160M/160M [00:05<00:00, 30.0MB/s]


In [None]:
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/engine.py")
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/utils.py")
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_utils.py")
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_eval.py")
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/transforms.py")

In [6]:
from torchvision.transforms import v2 as T


def get_transform(train):
    transforms = []
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    transforms.append(T.ToDtype(torch.float, scale=True))
    transforms.append(T.ToPureTensor())
    return T.Compose(transforms)

In [261]:
import utils



model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
dataset = MyDataset(train_annotations_dir, train_img_dir, get_transform(train=True))
data_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=2,
    shuffle=True,
    collate_fn=utils.collate_fn
)

# For Training
images, targets = next(iter(data_loader))
images = list(image for image in images)
targets = [{k: v for k, v in t.items()} for t in targets]
output = model(images, targets)  # Returns losses and detections
print(output)

# For inference
model.eval()
x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
predictions = model(x)  # Returns predictions

print(predictions[0])

{'loss_classifier': tensor(0.2559, grad_fn=<NllLossBackward0>), 'loss_box_reg': tensor(0.0960, grad_fn=<DivBackward0>), 'loss_objectness': tensor(0.0026, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>), 'loss_rpn_box_reg': tensor(0.0040, grad_fn=<DivBackward0>)}
{'boxes': tensor([], size=(0, 4), grad_fn=<StackBackward0>), 'labels': tensor([], dtype=torch.int64), 'scores': tensor([], grad_fn=<IndexBackward0>)}


In [None]:
import utils
import torchvision
from engine import train_one_epoch, evaluate

# train on the accelerator or on the CPU, if an accelerator is not available
device = torch.accelerator.current_accelerator() if torch.accelerator.is_available() else torch.device('cpu')

train_annotations_dir = "/Users/Dylan/Documents/ml/pedestrian_tracking/dataset/Train/Annotations"
train_img_dir = "/Users/Dylan/Documents/ml/pedestrian_tracking/dataset/Train/JPEGImages"

val_annotations_dir = "/Users/Dylan/Documents/ml/pedestrian_tracking/dataset/Val/Annotations"
val_img_dir = "/Users/Dylan/Documents/ml/pedestrian_tracking/dataset/Val/JPEGImages"

# our dataset has two classes only - background and person
num_classes = 3
# use our dataset and defined transformations
dataset = MyDataset(train_annotations_dir, train_img_dir, get_transform(train=True))
dataset_test = MyDataset(val_annotations_dir, val_img_dir, get_transform(train=False))

# split the dataset in train and test set
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-50])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])

# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=2,
    shuffle=True,
    collate_fn=utils.collate_fn
)

data_loader_test = torch.utils.data.DataLoader(
    dataset_test,
    batch_size=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)


model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.005,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=3,
    gamma=0.1
)

# let's train it just for 2 epochs
num_epochs = 2

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    evaluate(model, data_loader_test, device=device)

print("That's it!")

Epoch: [0]  [  0/447]  eta: 1 day, 8:11:12  lr: 0.000016  loss: 0.5127 (0.5127)  loss_classifier: 0.2816 (0.2816)  loss_box_reg: 0.1736 (0.1736)  loss_objectness: 0.0071 (0.0071)  loss_rpn_box_reg: 0.0504 (0.0504)  time: 259.2225  data: 0.0058
