<a href="https://colab.research.google.com/github/ayyucedemirbas/SSD_multimodal_sensor_fusion/blob/main/multimodal_sensor_fusion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!git clone https://github.com/aimotive/aimotive-dataset-loader.git

In [None]:
!pip install -r aimotive-dataset-loader/requirements.txt

In [None]:
!pip install -q transformers datasets accelerate

In [None]:
import sys
sys.path.append('aimotive-dataset-loader')
from typing import List, Dict, Tuple

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from torch.utils.data import Dataset, SequentialSampler, DataLoader

# Import dataset loader components
from typing import List, Dict, Tuple
from src.aimotive_dataset import AiMotiveDataset
from src.data_loader import DataItem
from src.loaders.camera_loader import CameraData
from src.loaders.lidar_loader import LidarData
from src.loaders.radar_loader import RadarData

In [None]:
CATEGORY_MAPPING = {'CAR': 0, 'Size_vehicle_m': 0,
                    'TRUCK': 1, 'BUS': 1, 'TRUCK/BUS': 1, 'TRAIN': 1, 'Size_vehicle_xl': 1, 'VAN': 1,
                    'PICKUP': 1,
                    'MOTORCYCLE': 2, 'RIDER': 2, 'BICYCLE': 2, 'BIKE': 2, 'Two_wheel_without_rider': 2,
                    'Rider': 2,
                    'OTHER_RIDEABLE': 2, 'OTHER-RIDEABLE': 2,
                    'PEDESTRIAN': 3, 'BABY_CARRIAGE': 3
                    }

In [None]:
def lidar_to_bev(point_cloud, x_range=(0, 70), y_range=(-40, 40), grid_size=(512, 512)):
    """
    Projects a LiDAR point cloud onto a BEV grid.
    Returns a torch.Tensor of shape (3, H, W) encoding density, height and intensity.
    """
    H, W = grid_size
    bev = np.zeros((3, H, W), dtype=np.float32)
    x_min, x_max = x_range
    y_min, y_max = y_range
    if point_cloud.shape[0] == 0:
        return torch.from_numpy(bev)
    x_bins = np.linspace(x_min, x_max, W+1)
    y_bins = np.linspace(y_min, y_max, H+1)
    xs = point_cloud[:, 0]
    ys = point_cloud[:, 1]
    zs = point_cloud[:, 2]
    intensities = point_cloud[:, 3]
    ix = np.clip(np.digitize(xs, bins=x_bins) - 1, 0, W-1)
    iy = np.clip(np.digitize(ys, bins=y_bins) - 1, 0, H-1)
    for i in range(point_cloud.shape[0]):
        bev[0, iy[i], ix[i]] += 1
        bev[1, iy[i], ix[i]] = max(bev[1, iy[i], ix[i]], zs[i])
        bev[2, iy[i], ix[i]] += intensities[i]
    if bev[0].max() > 0:
        bev[0] = bev[0] / bev[0].max()
    bev[1] = np.clip(bev[1] / 3.0, 0, 1)
    mask = bev[0] > 0
    bev[2, mask] = bev[2, mask] / (bev[0, mask] * bev[0].max())
    bev[2] = np.clip(bev[2], 0, 1)
    return torch.from_numpy(bev)

def radar_to_map(point_cloud, x_range=(0, 70), y_range=(-40, 40), grid_size=(512, 512)):
    """
    Converts Radar point cloud into a 2D map.
    Returns a torch.Tensor of shape (2, H, W) encoding range and velocity.
    """
    H, W = grid_size
    radar_map = np.zeros((2, H, W), dtype=np.float32)
    if point_cloud.shape[0] == 0:
        return torch.from_numpy(radar_map)
    x_min, x_max = x_range
    y_min, y_max = y_range
    x_bins = np.linspace(x_min, x_max, W+1)
    y_bins = np.linspace(y_min, y_max, H+1)
    xs = point_cloud[:, 0]
    ys = point_cloud[:, 1]
    ranges = np.sqrt(xs**2 + ys**2)
    velocities = point_cloud[:, 3]
    ix = np.clip(np.digitize(xs, bins=x_bins) - 1, 0, W-1)
    iy = np.clip(np.digitize(ys, bins=y_bins) - 1, 0, H-1)
    for i in range(point_cloud.shape[0]):
        radar_map[0, iy[i], ix[i]] = max(radar_map[0, iy[i], ix[i]], ranges[i])
        radar_map[1, iy[i], ix[i]] += velocities[i]
    radar_map[0] = np.clip(radar_map[0] / 100.0, 0, 1)
    count = np.zeros((H, W), dtype=np.float32)
    for i in range(point_cloud.shape[0]):
        count[iy[i], ix[i]] += 1
    mask = count > 0
    radar_map[1, mask] = radar_map[1, mask] / count[mask]
    radar_map[1] = np.clip(radar_map[1], -1, 1)
    radar_map[1] = (radar_map[1] + 1) / 2.0
    return torch.from_numpy(radar_map)

class AiMotiveSSD_Dataset(Dataset):
    def __init__(self, root_dir: str, train: bool = True, grid_size: Tuple[int,int]=(512,512)):
        data_split = 'train' if train else 'val'
        self.dataset = AiMotiveDataset(root_dir, data_split)
        self.grid_size = grid_size
        self.camera_transform = T.Compose([
            T.ToTensor(),
            T.Resize((224, 224))
        ])

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        data_item = self.dataset.data_loader[self.dataset.dataset_index[index]]
        bev = self.prepare_lidar_data(data_item.lidar_data)
        front_radar = self.prepare_radar_data(data_item.radar_data.front_radar)
        back_radar = self.prepare_radar_data(data_item.radar_data.back_radar)
        # Fuse LiDAR and radar data (7 channels: 3 from LiDAR, 2 each from front and back radar)
        fused_sensor = torch.cat([bev, front_radar, back_radar], dim=0)
        # Prepare camera images from the four cameras.
        camera_images = self.prepare_camera_data(data_item.camera_data)
        annotations = self.get_targets(data_item.annotations.objects, CATEGORY_MAPPING)
        # Return all sensor modalities: fused BEV/radar, camera images, and annotations.
        return fused_sensor, camera_images, annotations

    def prepare_lidar_data(self, lidar_data: LidarData) -> torch.Tensor:
        bev = lidar_to_bev(lidar_data.top_lidar.point_cloud, grid_size=self.grid_size)
        return bev

    def prepare_radar_data(self, radar_sensor) -> torch.Tensor:
        r_map = radar_to_map(radar_sensor.point_cloud, grid_size=self.grid_size)
        return r_map

    def prepare_camera_data(self, camera_data: CameraData) -> torch.Tensor:
        """
        Processes camera images from front, back, left, and right cameras.
        Returns a tensor of shape (4, C, H, W), where C is the number of channels.
        """
        front_cam = self.camera_transform(camera_data.front_camera.image)
        back_cam = self.camera_transform(camera_data.back_camera.image)
        left_cam = self.camera_transform(camera_data.left_camera.image)
        right_cam = self.camera_transform(camera_data.right_camera.image)
        # Stack images along a new dimension so you have all four views.
        return torch.stack([front_cam, back_cam, left_cam, right_cam], dim=0)

    def get_targets(self, annotations: List[Dict], category_mapping: Dict[str, int]):
        targets = []
        for obj in annotations:
            # Each target: [class, x, y, l, w, q_z, vel_x, vel_y]
            x, y, _ = [obj[f'BoundingBox3D Origin {ax}'] for ax in ['X', 'Y', 'Z']]
            l, w, _ = [obj[f'BoundingBox3D Extent {ax}'] for ax in ['X', 'Y', 'Z']]
            vel_x, vel_y, _ = [obj[f'Relative Velocity {ax}'] for ax in ['X', 'Y', 'Z']]
            q_z = obj[f'BoundingBox3D Orientation Quat Z']  # Using only one orientation component
            cat = category_mapping[obj['ObjectType']]
            targets.append(torch.tensor([cat, x, y, l, w, q_z, vel_x, vel_y], dtype=torch.float))
        return torch.vstack(targets) if targets else torch.zeros((1,8))

In [None]:
class SSDDetector(nn.Module):
    def __init__(self, in_channels=7, num_classes=4, num_default=4):
        """
        in_channels: Number of input channels (7 from fused BEV+Radar maps)
        num_classes: Number of object classes
        num_default: Number of default boxes per feature map cell.
        """
        super(SSDDetector, self).__init__()
        self.num_default = num_default
        self.base = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        # Detection heads on the base feature map (assumed size: 128x128 for 512x512 input)
        self.loc_head = nn.Conv2d(64, self.num_default * 4, kernel_size=3, padding=1)
        self.cls_head = nn.Conv2d(64, self.num_default * num_classes, kernel_size=3, padding=1)

    def forward(self, x):
        features = self.base(x)  # (B, 64, 128, 128)
        loc = self.loc_head(features)  # (B, num_default*4, 128, 128)
        conf = self.cls_head(features)  # (B, num_default*num_classes, 128, 128)
        B = x.size(0)
        loc = loc.permute(0, 2, 3, 1).contiguous().view(B, -1, 4)
        conf = conf.permute(0, 2, 3, 1).contiguous().view(B, -1, self.cls_head.out_channels // self.num_default)
        return loc, conf

def generate_default_boxes(feature_map_size=(128,128), stride=4):
    """
    Generate default boxes for the feature map.
    Each cell produces 4 default boxes with predefined scales and aspect ratios.
    Boxes are in pixel coordinates (cx, cy, w, h) for an image of size 512x512.
    """
    fm_h, fm_w = feature_map_size
    default_boxes = []
    scales = [0.1, 0.2, 0.2, 0.3]  # relative scales
    aspect_ratios = [1.0, 2.0, 0.5, 1.0]
    for i in range(fm_h):
        for j in range(fm_w):
            cx = (j + 0.5) * stride
            cy = (i + 0.5) * stride
            for s, ar in zip(scales, aspect_ratios):
                w = s * 512 * np.sqrt(ar)
                h = s * 512 / np.sqrt(ar)
                default_boxes.append([cx, cy, w, h])
    return torch.tensor(default_boxes)  # (num_default, 4)

def convert_gt_boxes(gt, image_size=(512,512), x_range=(0,70), y_range=(-40,40)):
    """
    Convert ground truth boxes from physical coordinates to BEV pixel coordinates.
    gt: Tensor of shape (num_objects, 8) with fields [cat, x, y, l, w, q_z, vel_x, vel_y]
    Returns:
      - boxes: Tensor of shape (num_objects, 4) in (cx, cy, w, h) pixel coordinates.
      - labels: Tensor of shape (num_objects,)
    """
    pixel_x = (gt[:,1] - 0) / (70 - 0) * image_size[1]
    pixel_y = (gt[:,2] - (-40)) / (80) * image_size[0]
    pixel_w = gt[:,3] / (70) * image_size[1]
    pixel_h = gt[:,4] / (80) * image_size[0]
    boxes = torch.stack([pixel_x, pixel_y, pixel_w, pixel_h], dim=1)
    labels = gt[:,0].long()
    return boxes, labels

def compute_iou(boxes1, boxes2):
    """
    Compute IoU between two sets of boxes.
    boxes: Tensor of shape (N,4) in (cx, cy, w, h) format.
    """
    boxes1_x1 = boxes1[:,0] - boxes1[:,2] / 2
    boxes1_y1 = boxes1[:,1] - boxes1[:,3] / 2
    boxes1_x2 = boxes1[:,0] + boxes1[:,2] / 2
    boxes1_y2 = boxes1[:,1] + boxes1[:,3] / 2

    boxes2_x1 = boxes2[:,0] - boxes2[:,2] / 2
    boxes2_y1 = boxes2[:,1] - boxes2[:,3] / 2
    boxes2_x2 = boxes2[:,0] + boxes2[:,2] / 2
    boxes2_y2 = boxes2[:,1] + boxes2[:,3] / 2

    inter_x1 = torch.max(boxes1_x1.unsqueeze(1), boxes2_x1.unsqueeze(0))
    inter_y1 = torch.max(boxes1_y1.unsqueeze(1), boxes2_y1.unsqueeze(0))
    inter_x2 = torch.min(boxes1_x2.unsqueeze(1), boxes2_x2.unsqueeze(0))
    inter_y2 = torch.min(boxes1_y2.unsqueeze(1), boxes2_y2.unsqueeze(0))
    inter_area = torch.clamp(inter_x2 - inter_x1, min=0) * torch.clamp(inter_y2 - inter_y1, min=0)
    area1 = (boxes1_x2 - boxes1_x1) * (boxes1_y2 - boxes1_y1)
    area2 = (boxes2_x2 - boxes2_x1) * (boxes2_y2 - boxes2_y1)
    union_area = area1.unsqueeze(1) + area2.unsqueeze(0) - inter_area
    return inter_area / union_area

def match_anchors(default_boxes, gt_boxes, gt_labels, iou_threshold=0.5):
    ious = compute_iou(default_boxes, gt_boxes)  # (N_default, M)
    best_gt_iou, best_gt_idx = ious.max(dim=1)    # For each default, best IoU and corresponding gt index.
    cls_targets = torch.zeros(default_boxes.size(0), dtype=torch.long, device=default_boxes.device)
    loc_targets = torch.zeros(default_boxes.size(0), 4, device=default_boxes.device)
    pos_idx = best_gt_iou >= iou_threshold
    if pos_idx.sum() > 0:
        assigned_gt_boxes = gt_boxes[best_gt_idx[pos_idx]]
        assigned_defaults = default_boxes[pos_idx]
        offsets = torch.zeros_like(assigned_defaults)
        offsets[:, 0] = (assigned_gt_boxes[:, 0] - assigned_defaults[:, 0]) / assigned_defaults[:, 2]
        offsets[:, 1] = (assigned_gt_boxes[:, 1] - assigned_defaults[:, 1]) / assigned_defaults[:, 3]
        offsets[:, 2] = torch.log(assigned_gt_boxes[:, 2] / assigned_defaults[:, 2])
        offsets[:, 3] = torch.log(assigned_gt_boxes[:, 3] / assigned_defaults[:, 3])
        loc_targets[pos_idx] = offsets
        cls_targets[pos_idx] = gt_labels[best_gt_idx[pos_idx]]
    return loc_targets, cls_targets

use_cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if use_cuda else "cpu")
print("Using device:", device)

Using device: cuda:0


In [None]:
import numpy as np

In [None]:
root_directory = '/kaggle/input/aimotive-multimodal-dataset'
train_dataset = AiMotiveSSD_Dataset(root_directory, train=True, grid_size=(512,512))
train_sampler = SequentialSampler(train_dataset)

def custom_collate_fn(batch):
    # Each item in batch: (fused_sensor, camera_images, annotations)
    fused_sensors = torch.stack([item[0] for item in batch], dim=0)
    camera_images = torch.stack([item[1] for item in batch], dim=0)  # (B, 4, C, H, W)
    targets = [item[2] for item in batch]
    return fused_sensors, camera_images, targets

train_loader = DataLoader(train_dataset, batch_size=4, sampler=train_sampler,
                          pin_memory=False, drop_last=True, num_workers=4,
                          collate_fn=custom_collate_fn)

model = SSDDetector(in_channels=7, num_classes=4, num_default=4)
model = model.to(device)

criterion_loc = nn.SmoothL1Loss()
criterion_cls = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

default_boxes = generate_default_boxes(feature_map_size=(128,128), stride=4)
default_boxes = default_boxes.to(device)

In [None]:
num_epochs = 10
model.train()

for epoch in range(num_epochs):
    for step, (fused_sensor, camera_images, targets) in enumerate(train_loader):
        fused_sensor = fused_sensor.to(device)
        camera_images = camera_images.to(device)
        batch_loc_targets = []
        batch_cls_targets = []
        for gt in targets:
            gt = gt.float()  # shape: (num_objects, 8)
            gt_boxes, gt_labels = convert_gt_boxes(gt, image_size=(512,512), x_range=(0,70), y_range=(-40,40))
            gt_boxes = gt_boxes.to(device)
            gt_labels = gt_labels.to(device)
            loc_t, cls_t = match_anchors(default_boxes, gt_boxes, gt_labels, iou_threshold=0.5)
            batch_loc_targets.append(loc_t)
            batch_cls_targets.append(cls_t)
        batch_loc_targets = torch.stack(batch_loc_targets, dim=0)  # (B, num_default, 4)
        batch_cls_targets = torch.stack(batch_cls_targets, dim=0)  # (B, num_default)

        optimizer.zero_grad()
        loc_preds, conf_preds = model(fused_sensor)  # loc_preds: (B, num_default, 4), conf_preds: (B, num_default, num_classes)
        loss_loc = criterion_loc(loc_preds, batch_loc_targets)
        loss_cls = criterion_cls(conf_preds.view(-1, conf_preds.size(-1)), batch_cls_targets.view(-1))
        loss = loss_loc + loss_cls
        loss.backward()
        optimizer.step()

        if step % 5 == 0:
            print(f"Epoch [{epoch+1}], Step [{step}], Loss: {loss.item():.4f}")


Epoch [1], Step [0], Loss: 1.3765
Epoch [1], Step [5], Loss: 1.3409
Epoch [1], Step [10], Loss: 1.3111
Epoch [1], Step [15], Loss: 1.2693
Epoch [1], Step [20], Loss: 1.2154
Epoch [1], Step [25], Loss: 1.1543
Epoch [1], Step [30], Loss: 1.0768
Epoch [1], Step [35], Loss: 1.0365
Epoch [1], Step [40], Loss: 0.9640
Epoch [1], Step [45], Loss: 0.8412
Epoch [1], Step [50], Loss: 0.7319
Epoch [1], Step [55], Loss: 0.5929
Epoch [1], Step [60], Loss: 0.4531
Epoch [1], Step [65], Loss: 0.3334
Epoch [1], Step [70], Loss: 0.2392
Epoch [1], Step [75], Loss: 0.1335
Epoch [1], Step [80], Loss: 0.0942


In [None]:
!rm /kaggle/working/multimodal_sensor_fusion.pth

rm: cannot remove '/kaggle/working/multimodal_sensor_fusion.pth': No such file or directory


In [None]:
torch.save(model.state_dict(), 'multimodal_sensor_fusion.pth')