In [None]:
import torch
import os
import xml.etree.ElementTree as ET
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torch.optim as optim
from torchvision.models.detection import ssdlite320_mobilenet_v3_large
from torch.utils.data import random_split
from torchvision.ops import box_iou
import matplotlib.pyplot as plt



class ObjectDetectionDataset(Dataset):
    def __init__(self, images_dir, annotations_dir, transform=None):
        """
        Custom dataset for object detection.
        Args:
        - images_dir (str): Path to the images directory.
        - annotations_dir (str): Path to the annotations directory.
        - transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.images_dir = images_dir
        self.annotations_dir = annotations_dir
        self.transform = transform

        # List of image files
        self.imgs = list(sorted(os.listdir(images_dir)))

        # Define class names (change this according to your dataset)
        self.class_names = ['crosswalk','speedlimit','stop','trafficlight']

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        # Load image
        img_path = os.path.join(self.images_dir, self.imgs[idx])
        img = Image.open(img_path).convert("RGB")

        # Load annotation
        annotation_file = os.path.splitext(self.imgs[idx])[0] + '.xml'
        annotation_path = os.path.join(self.annotations_dir, annotation_file)
        boxes, labels = self.parse_annotation(annotation_path)

        # Convert everything to a torch.Tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((len(boxes),), dtype=torch.int64)

        # Annotation dictionary
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd

        if self.transform:
            img = self.transform(img)

        return img, target

    def parse_annotation(self, annotation_path):
        """
        Parse the XML file and extract bounding boxes and labels.
        """
        tree = ET.parse(annotation_path)
        root = tree.getroot()

        boxes = []
        labels = []
        for obj in root.iter('object'):
            label = obj.find('name').text
            if label in self.class_names:
                label_idx = self.class_names.index(label)
            else:
                continue  # Skip unknown labels

            xmlbox = obj.find('bndbox')
            xmin = int(xmlbox.find('xmin').text)
            ymin = int(xmlbox.find('ymin').text)
            xmax = int(xmlbox.find('xmax').text)
            ymax = int(xmlbox.find('ymax').text)
            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(label_idx)

        return boxes, labels


In [None]:
def get_ssd_model(num_classes):
    # Load an SSD model pre-trained on COCO
    model = ssdlite320_mobilenet_v3_large(pretrained=True)

    # Replace the head with a new one (adjusting number of classes)
    model.head.classification_head.num_classes = num_classes

    return model


In [None]:
# Transformations for the images
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Creating the dataset instance
images_dir = 'images'
annotations_dir = 'annotations'

dataset = ObjectDetectionDataset(images_dir, annotations_dir, transform=transform)

In [None]:
# Splitting the dataset into train and validation sets
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Define training and validation data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

# Load the modified model
num_classes = len(dataset.class_names) + 1  # Number of classes + background
model = get_ssd_model(num_classes)

# Choose the appropriate device (GPU if available, else CPU)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Move our model to the chosen device
model.to(device)

In [None]:
# Adjust the learning rate
optimizer = optim.SGD(model.parameters(), lr=0.05, momentum=0.9, weight_decay=0.0005)

# Function to train the model for one epoch
def train_one_epoch(model, optimizer, data_loader, device, epoch):
    model.train()
    running_loss = 0.0

    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        optimizer.step()

        running_loss += losses.item()

    print(f"Epoch [{epoch+1}], Loss: {running_loss/len(data_loader)}")

# Function for evaluating the model
def evaluate(model, data_loader, device, iou_threshold=0.5):
    model.eval()
    TP, FP, FN = 0, 0, 0

    with torch.no_grad():
        for images, targets in data_loader:
            images = list(img.to(device) for img in images)
            outputs = model(images)

            for i, output in enumerate(outputs):
                # Predictions
                boxes = output['boxes'].to('cpu')
                labels = output['labels'].to('cpu')
                scores = output['scores'].to('cpu')

                # Ground truths
                gt_boxes = targets[i]['boxes']
                gt_labels = targets[i]['labels']

                if len(boxes) == 0:
                    FN += len(gt_boxes)
                    continue

                ious = box_iou(boxes, gt_boxes)
                max_ious, max_indices = ious.max(dim=1)

                for i in range(len(boxes)):
                    if max_ious[i] >= iou_threshold and labels[i] == gt_labels[max_indices[i]]:
                        TP += 1
                    else:
                        FP += 1

                FN += len(gt_boxes) - len(torch.unique(max_indices[max_ious >= iou_threshold]))

    precision = TP / (TP + FP) if TP + FP > 0 else 0
    recall = TP / (TP + FN) if TP + FN > 0 else 0

    print("Evaluation Results:")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 score: {2 * precision * recall / (precision + recall):.4f}")

In [None]:
num_epochs = 50
for epoch in range(num_epochs):
    train_one_epoch(model, optimizer, train_loader, device, epoch)
    evaluate(model, val_loader, device)

In [None]:
# Save the model
model_save_path = 'SSDlite_model_50epoch.pth'
torch.save(model.state_dict(), model_save_path)