# License Plate Detector using MobilenetV3Small SSD

### Load Dataset

In [1]:
# !pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124

In [2]:
import os
import torch
import torch.utils.data
import torchvision
from PIL import Image
from pycocotools.coco import COCO

class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation, transforms=None):
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.ids = list(sorted(self.coco.imgs.keys()))

    def __getitem__(self, index):
        # Own coco file
        coco = self.coco
        # Image ID
        img_id = self.ids[index]
        # List: get annotation id from coco
        ann_ids = coco.getAnnIds(imgIds=img_id)
        # Dictionary: target coco_annotation file for an image
        coco_annotation = coco.loadAnns(ann_ids)
        # path for input image
        path = coco.loadImgs(img_id)[0]['file_name']
        # open the input image
        img = Image.open(os.path.join(self.root, path))
        img = img.convert("RGB")

        # number of objects in the image
        num_objs = len(coco_annotation)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        for i in range(num_objs):
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            xmax = xmin + coco_annotation[i]['bbox'][2]
            ymax = ymin + coco_annotation[i]['bbox'][3]
            boxes.append([xmin, ymin, xmax, ymax])
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # Labels (In my case, I only one class: target class or background)
        labels = torch.ones(num_objs, dtype=torch.int64)
        # Tensorise img_id
        img_id = torch.tensor(img_id)
        # Size of bbox (Rectangular)
        areas = []
        for i in range(num_objs):
            areas.append(coco_annotation[i]['area'])
        areas = torch.as_tensor(areas, dtype=torch.float32)
        # Iscrowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        if num_objs == 0:
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)
            areas = torch.zeros((0,), dtype=torch.float32)
            iscrowd = torch.zeros((0,), dtype=torch.int64)

        # Annotation is in dictionary format
        my_annotation = {}
        my_annotation["boxes"] = boxes
        my_annotation["labels"] = labels
        my_annotation["image_id"] = img_id
        my_annotation["area"] = areas
        my_annotation["iscrowd"] = iscrowd

        if self.transforms is not None:
            if not isinstance(img, Image.Image):
                img = Image.fromarray(img)
            img = self.transforms(img)

        return img, my_annotation

    def __len__(self):
        return len(self.ids)

In [3]:
from torchvision.transforms import v2 as T


def get_transform(train):
    transforms = []
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    transforms.append(T.ToDtype(torch.float, scale=True))
    transforms.append(torchvision.transforms.ToTensor())
    return T.Compose(transforms)

In [4]:
# path to your own data and coco file
import utils
train_data_dir = 'data/train'
train_coco = 'data/train/_annotations.coco.json'

test_data_dir = 'data/test'
test_coco = 'data/test/_annotations.coco.json'

valid_data_dir = 'data/valid'
valid_coco = 'data/valid/_annotations.coco.json'

# create own Dataset
train_ds = CustomDataset(root=train_data_dir,
                          annotation=train_coco,
                          transforms=get_transform(train=True)
                          )

test_ds = CustomDataset(root=test_data_dir,
                          annotation=test_coco,
                          transforms=get_transform(train=False)
                          )

valid_ds = CustomDataset(root=valid_data_dir,
                          annotation=valid_coco,
                          transforms=get_transform(train=False)
                          )

# collate_fn needs for batch
def collate_fn(batch):
    return tuple(zip(*batch))

# Batch size
train_batch_size = 8
test_batch_size = 8
valid_batch_size = 8

# own DataLoader
train_loader = torch.utils.data.DataLoader(train_ds,
                                          batch_size=train_batch_size,
                                          shuffle=True,
                                          collate_fn=utils.collate_fn)

test_loader = torch.utils.data.DataLoader(test_ds,
                                            batch_size=test_batch_size,
                                            shuffle=False,
                                            collate_fn=utils.collate_fn)

val_loader = torch.utils.data.DataLoader(valid_ds,
                                            batch_size=valid_batch_size,
                                            shuffle=False,
                                            collate_fn=utils.collate_fn)

loading annotations into memory...
Done (t=0.01s)
creating index...
index created!
loading annotations into memory...
Done (t=0.00s)
creating index...
index created!
loading annotations into memory...
Done (t=0.01s)
creating index...
index created!


### Baseline model

In [None]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# load a model pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

# replace the classifier with a new one, that has
# num_classes which is user-defined
num_classes = 2  # 1 class (person) + background
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [6]:
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

# load a pre-trained model for classification and return
# only the features
backbone = torchvision.models.mobilenet_v3_small(weights="DEFAULT").features
# ``FasterRCNN`` needs to know the number of
# output channels in a backbone. For mobilenet_v2, it's 1280
# so we need to add it here
backbone.out_channels = 576

# let's make the RPN generate 5 x 3 anchors per spatial
# location, with 5 different sizes and 3 different aspect
# ratios. We have a Tuple[Tuple[int]] because each feature
# map could potentially have different sizes and
# aspect ratios
anchor_generator = AnchorGenerator(
    sizes=((32, 64, 128, 256, 512),),
    aspect_ratios=((0.5, 1.0, 2.0),)
)

# let's define what are the feature maps that we will
# use to perform the region of interest cropping, as well as
# the size of the crop after rescaling.
# if your backbone returns a Tensor, featmap_names is expected to
# be [0]. More generally, the backbone should return an
# ``OrderedDict[Tensor]``, and in ``featmap_names`` you can choose which
# feature maps to use.
roi_pooler = torchvision.ops.MultiScaleRoIAlign(
    featmap_names=['0'],
    output_size=7,
    sampling_ratio=2
)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# put the pieces together inside a Faster-RCNN model
model = FasterRCNN(
    backbone,
    num_classes=2,
    rpn_anchor_generator=anchor_generator,
    box_roi_pool=roi_pooler
).to(device)

In [7]:
from torchmetrics.detection.mean_ap import MeanAveragePrecision

def getIoU(bbox, gt):
    x1, y1, w1, h1 = bbox
    x2, y2, w2, h2 = gt
    xA = max(x1, x2)
    yA = max(y1, y2)
    xB = min(x1+w1, x2+w2)
    yB = min(y1+h1, y2+h2)
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    boxAArea = w1 * h1
    boxBArea = w2 * h2
    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou

# mean average precision
def get_mAP(pred_boxes, pred_labels, pred_scores, gt_boxes, gt_labels):
    # get mAP
    pred = [{'boxes': pred_boxes, 'labels': pred_labels, 'scores': pred_scores}]
    gt = [{'boxes': gt_boxes, 'labels': gt_labels}]
    map_metric = MeanAveragePrecision(iou_thresholds=[0.5], class_metrics=True)
    map_metric.update(pred, gt)
    mAP = map_metric.compute()
    return mAP['map']


def validate(model, data_loader, device):
    model.eval()
    average_IoU = 0

    mAP = 0
    with torch.no_grad():
        for images, targets in data_loader:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            pred = model(images)
            pred_boxes = pred[0]['boxes'].cpu()
            pred_labels = pred[0]['labels'].cpu()
            pred_scores = pred[0]['scores'].cpu()
            
            gt_boxes = targets[0]['boxes'].cpu()
            gt_labels = targets[0]['labels'].cpu()
            # get mAP
            if(len(pred_boxes) == 0):
                continue
            mAP += get_mAP(pred_boxes, pred_labels, pred_scores, gt_boxes, gt_labels)
            pred_boxes = pred[0]['boxes'].cpu().numpy()
            pred_labels = pred[0]['labels'].cpu().numpy()
            pred_scores = pred[0]['scores'].cpu().numpy()

            gt_boxes = targets[0]['boxes'].cpu().numpy()
            gt_labels = targets[0]['labels'].cpu().numpy()

            # get IoU
            if(len(pred_boxes) == 0):
                continue
            iou = getIoU(pred_boxes[0], gt_boxes[0])
            average_IoU += iou

    average_IoU /= len(data_loader)
    mAP /= len(data_loader)

    return average_IoU, mAP

In [8]:
from engine import train_one_epoch, evaluate

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.005,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=3,
    gamma=0.1
)

# let's train it just for 2 epochs
num_epochs = 20
max_IoU = 0
for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, train_loader, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    IoU, mAP = validate(model, val_loader, device=device)
    print(f"IoU: {IoU}, mAP: {mAP}", )

    if IoU > max_IoU:
        max_IoU = IoU
        model_name = f"FasterRCNN_MobileNetV3_small_{max_IoU:.3f}.pth"
        torch.save(model.state_dict(), model_name)

    # evaluate(model, val_loader, device=device)

print("That's it!")

  with torch.cuda.amp.autocast(enabled=scaler is not None):


Epoch: [0]  [  0/151]  eta: 0:01:55  lr: 0.000038  loss: 1.6154 (1.6154)  loss_classifier: 0.7832 (0.7832)  loss_box_reg: 0.0230 (0.0230)  loss_objectness: 0.6975 (0.6975)  loss_rpn_box_reg: 0.1117 (0.1117)  time: 0.7617  data: 0.0334  max mem: 3897
Epoch: [0]  [ 10/151]  eta: 0:01:05  lr: 0.000371  loss: 1.5194 (1.4516)  loss_classifier: 0.6613 (0.6135)  loss_box_reg: 0.0248 (0.0290)  loss_objectness: 0.6957 (0.6950)  loss_rpn_box_reg: 0.1128 (0.1141)  time: 0.4624  data: 0.0499  max mem: 5944
Epoch: [0]  [ 20/151]  eta: 0:00:56  lr: 0.000704  loss: 1.1354 (1.2266)  loss_classifier: 0.2777 (0.3986)  loss_box_reg: 0.0319 (0.0399)  loss_objectness: 0.6857 (0.6840)  loss_rpn_box_reg: 0.0854 (0.1041)  time: 0.4170  data: 0.0472  max mem: 5944
Epoch: [0]  [ 30/151]  eta: 0:00:50  lr: 0.001037  loss: 0.9410 (1.1362)  loss_classifier: 0.1536 (0.3312)  loss_box_reg: 0.0600 (0.0491)  loss_objectness: 0.6506 (0.6638)  loss_rpn_box_reg: 0.0688 (0.0921)  time: 0.3962  data: 0.0475  max mem: 5944


KeyboardInterrupt: 

In [24]:
from PIL import Image, ImageDraw 

# predict one image
image, target = test_loader.dataset[0]
test_model = FasterRCNN(
    backbone,
    num_classes=2,
    rpn_anchor_generator=anchor_generator,
    box_roi_pool=roi_pooler
).to(device)
test_model.load_state_dict(torch.load('FasterRCNN_MobileNetV3_small_0.86.pth'))
test_model.eval()
with torch.no_grad():
    prediction = test_model([image.to(device)])[0]

    mAP = get_mAP(
        prediction["boxes"].cpu(), 
        prediction["labels"].cpu(), 
        prediction["scores"].cpu(),
          target["boxes"], 
          target["labels"]
          )
    print(mAP)
    # get the predicted bounding boxes
    pred_boxes = prediction["boxes"].cpu().numpy()
    # get the predicted labels
    pred_labels = prediction["labels"].cpu().numpy()
    # get the predicted scores
    pred_scores = prediction["scores"].cpu().numpy()
    # get the ground truth bounding boxes
    gt_boxes = target["boxes"].numpy()
    # get the ground truth labels
    gt_labels = target["labels"].numpy()
    # get the ground truth areas
    gt_areas = target["area"].numpy()
    # get the ground truth iscrowd
    gt_iscrowd = target["iscrowd"].numpy()
    # get the ground truth image_id
    gt_image_id = target["image_id"].numpy()
    # get the IoU
    iou = getIoU(pred_boxes[0], gt_boxes[0])
    img = Image.fromarray(image.mul(255).permute(1, 2, 0).byte().numpy())
    #show bbox
    draw = ImageDraw.Draw(img)
    draw.rectangle([(pred_boxes[0][0], pred_boxes[0][1]), (pred_boxes[0][2], pred_boxes[0][3])], outline ="red", width=3)
    draw.rectangle([(gt_boxes[0][0], gt_boxes[0][1]), (gt_boxes[0][2], gt_boxes[0][3])], outline ="green", width=3)
    img.show()

    print(iou)
    print(pred_scores)

  test_model.load_state_dict(torch.load('FasterRCNN_MobileNetV3_small_0.86.pth'))


tensor(1.)
0.7093819
[0.29928216 0.15594748 0.13304232 0.12764111 0.07659749]


In [10]:
# evaluate model
IoU, mAP = validate(test_model, test_loader, device=device)

print(IoU, mAP)

0.8106864 tensor(0.8118)


In [None]:
#save model
torch.save(model.state_dict(), 'model.pth')

In [None]:
from engine import evaluate
# evaluate(model, val_loader, device=device)

creating index...
index created!


AssertionError: Results do not correspond to current coco set

In [None]:
# from torchvision.transforms import v2 as T

# def get_transform(train):
#     transforms = []
#     if train:
#         transforms.append(T.RandomHorizontalFlip(0.5))
#     transforms.append(T.ToDtype(torch.float, scale=True))
#     transforms.append(T.ToPureTensor())
#     return T.Compose(transforms)

### Training

In [None]:
# optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=5e-4)
# scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
# criterion = torch.nn.CrossEntropyLoss

In [None]:
# import math
# import sys
# import time

# import torch
# import torchvision.models.detection.mask_rcnn
# import utils
# from coco_eval import CocoEvaluator
# from coco_utils import get_coco_api_from_dataset


# def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq, scaler=None):
#     model.train()
#     metric_logger = utils.MetricLogger(delimiter="  ")
#     metric_logger.add_meter("lr", utils.SmoothedValue(window_size=1, fmt="{value:.6f}"))
#     header = f"Epoch: [{epoch}]"

#     lr_scheduler = None
#     if epoch == 0:
#         warmup_factor = 1.0 / 1000
#         warmup_iters = min(1000, len(data_loader) - 1)

#         lr_scheduler = torch.optim.lr_scheduler.LinearLR(
#             optimizer, start_factor=warmup_factor, total_iters=warmup_iters
#         )

#     for images, targets in metric_logger.log_every(data_loader, print_freq, header):
#         images = list(image.to(device) for image in images)
#         targets = [{k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in t.items()} for t in targets]
#         with torch.cuda.amp.autocast(enabled=scaler is not None):
#             loss_dict = model(images, targets)
#             losses = sum(loss for loss in loss_dict.values())

#         # reduce losses over all GPUs for logging purposes
#         loss_dict_reduced = utils.reduce_dict(loss_dict)
#         losses_reduced = sum(loss for loss in loss_dict_reduced.values())

#         loss_value = losses_reduced.item()

#         if not math.isfinite(loss_value):
#             print(f"Loss is {loss_value}, stopping training")
#             print(loss_dict_reduced)
#             sys.exit(1)

#         optimizer.zero_grad()
#         if scaler is not None:
#             scaler.scale(losses).backward()
#             scaler.step(optimizer)
#             scaler.update()
#         else:
#             losses.backward()
#             optimizer.step()

#         if lr_scheduler is not None:
#             lr_scheduler.step()

#         metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
#         metric_logger.update(lr=optimizer.param_groups[0]["lr"])

#     return metric_logger


# def _get_iou_types(model):
#     model_without_ddp = model
#     if isinstance(model, torch.nn.parallel.DistributedDataParallel):
#         model_without_ddp = model.module
#     iou_types = ["bbox"]
#     if isinstance(model_without_ddp, torchvision.models.detection.MaskRCNN):
#         iou_types.append("segm")
#     if isinstance(model_without_ddp, torchvision.models.detection.KeypointRCNN):
#         iou_types.append("keypoints")
#     return iou_types


# @torch.inference_mode()
# def evaluate(model, data_loader, device):
#     n_threads = torch.get_num_threads()
#     # FIXME remove this and make paste_masks_in_image run on the GPU
#     torch.set_num_threads(1)
#     cpu_device = torch.device("cpu")
#     model.eval()
#     metric_logger = utils.MetricLogger(delimiter="  ")
#     header = "Test:"

#     coco = get_coco_api_from_dataset(data_loader.dataset)
#     iou_types = _get_iou_types(model)
#     coco_evaluator = CocoEvaluator(coco, iou_types)

#     for images, targets in metric_logger.log_every(data_loader, 100, header):
#         images = list(img.to(device) for img in images)

#         if torch.cuda.is_available():
#             torch.cuda.synchronize()
#         model_time = time.time()
#         outputs = model(images)

#         outputs = [{k: v.to(cpu_device) for k, v in t.items()} for t in outputs]
#         model_time = time.time() - model_time

#         res = {target["image_id"]: output for target, output in zip(targets, outputs)}
#         evaluator_time = time.time()
#         coco_evaluator.update(res)
#         evaluator_time = time.time() - evaluator_time
#         metric_logger.update(model_time=model_time, evaluator_time=evaluator_time)

#     # gather the stats from all processes
#     metric_logger.synchronize_between_processes()
#     print("Averaged stats:", metric_logger)
#     coco_evaluator.synchronize_between_processes()

#     # accumulate predictions from all images
#     coco_evaluator.accumulate()
#     coco_evaluator.summarize()
#     torch.set_num_threads(n_threads)
#     return coco_evaluator

In [None]:
# from datetime import datetime
# num_epochs = 10
# max_mAP = 0
# model_name = ''
# date = datetime.now().strftime("%Y%m%d")

# for epoch in range(num_epochs):
#     train_loss = train_one_epoch(model, criterion, optimizer, train_loader, epoch)
#     mAP = evaluate(model, val_loader)
#     scheduler.step()  # Update learning rate

#     if mAP > max_mAP:
#         max_mAP = mAP
#         model_name = f"last_MobileNetSSD{date}_{mAP:.3f}.pt"
#         torch.save(model.state_dict(), model_name)
    