In [None]:
# Dataset Abstraction|
import json
with open('/kaggle/input/speedspn/speed/train.json') as f:
    train_metadata = json.load(f)
    print(train_metadata)

In [None]:
import os
import json
from PIL import Image
import matplotlib.pyplot as plt
import torch
from torch import nn,optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms.functional as T
from torchinfo import summary
from scipy.io import loadmat
from scipy.spatial.transform import Rotation as R
from torch.amp import autocast
from torch.cuda.amp import GradScaler
import numpy as np
import tqdm
from torchvision import transforms, models, ops
from torchvision.models.detection.anchor_utils import AnchorGenerator
from torchvision.models.detection.rpn import RegionProposalNetwork, RPNHead
from torchvision.models.detection.roi_heads import RoIHeads
from torchvision.models.detection.faster_rcnn import TwoMLPHead, FastRCNNPredictor
from torchvision.models.detection.transform import GeneralizedRCNNTransform

In [None]:
base_image_path_train = '/kaggle/input/speedspn/speed/images/trainval'
base_image_path_val   = '/kaggle/input/speedspn/speed/images/trainval'

train_json_path = '/kaggle/input/speedspn/speed/train.json'
val_json_path   = '/kaggle/input/speedspn/speed/val.json'


In [None]:
class CustomTransform:
    def __init__(self):
        self.transform_preprocess = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
        ])
        self.normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        
    def __call__(self, image, bbox=None, keypts=None):
        orig_image = transforms.ToTensor()(image)
        preprocess_image = self.transform_preprocess(image)
        
        return preprocess_image, orig_image, keypts

transform = CustomTransform()


In [None]:
import torchvision.transforms as T

# === Define image transformations ===
transform = T.Compose([
    T.Resize((224, 224)),         # or another size depending on your model
    T.ToTensor(),
    T.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # update if your model expects different stats
])

In [None]:
import numpy as np
from scipy.io import loadmat
import json
from scipy.spatial.transform import Rotation as R
import torch.nn.functional as F
import torch

def get_discrete_classes(m):
    """
    Computes m uniformly distributed random rotations parametrized as unit quaternions.
    Args:
        m (int): Number of random rotations to generate
    Returns:
        numpy.ndarray: Matrix of shape (m, 4) containing unit quaternions
    """
    x0 = np.random.rand(m)
    x1 = np.random.rand(m)
    x2 = np.random.rand(m)

    theta1 = 2 * np.pi * x1
    theta2 = 2 * np.pi * x2

    s1 = np.sin(theta1)
    s2 = np.sin(theta2)
    c1 = np.cos(theta1)
    c2 = np.cos(theta2)

    r1 = np.sqrt(1 - x0)
    r2 = np.sqrt(x0)

    quats = np.column_stack([s1 * r1, c1 * r1, s2 * r2, c2 * r2])
    return quats


def _get_quat_bins(qPose, qClass, numNeighbors):
    q = R.from_quat(qPose[[1, 2, 3, 0]])
    qClass = R.from_quat(qClass[:, [1, 2, 3, 0]])

    qDiff = q.inv() * qClass
    qDiff = qDiff.as_quat()

    angleVec = 2 * np.arccos(np.abs(qDiff[:, -1]))

    sortIdx = np.argsort(angleVec)
    nClasses = sortIdx[:numNeighbors]
    nAngles = angleVec[nClasses]

    nWeights = 1.0 - nAngles / np.pi**2
    nWeights = nWeights / np.sum(nWeights)

    return nClasses, nWeights


def softmax_cross_entropy_with_logits(logits, target, reduction='mean'):
    loss = -torch.sum(target.detach() * F.log_softmax(logits, dim=1), dim=1)
    if reduction == 'mean':
        return loss.mean()
    elif reduction == 'sum':
        return loss.sum()
    else:
        return loss


def load_tango_3d_keypoints(mat_dir):
    vertices = loadmat(mat_dir)['tango3Dpoints']
    corners3D = np.transpose(np.array(vertices, dtype=np.float32))
    return corners3D


def load_camera_intrinsics(camera_json):
    with open(camera_json) as f:
        cam = json.load(f)
    cameraMatrix = np.array(cam['cameraMatrix'], dtype=np.float32)
    distCoeffs = np.array(cam['distCoeffs'], dtype=np.float32)
    return cameraMatrix, distCoeffs


def quat2dcm(q):
    q = q / np.linalg.norm(q)
    q0, q1, q2, q3 = q

    dcm = np.zeros((3, 3))
    dcm[0, 0] = 2 * q0 ** 2 - 1 + 2 * q1 ** 2
    dcm[1, 1] = 2 * q0 ** 2 - 1 + 2 * q2 ** 2
    dcm[2, 2] = 2 * q0 ** 2 - 1 + 2 * q3 ** 2

    dcm[0, 1] = 2 * q1 * q2 + 2 * q0 * q3
    dcm[0, 2] = 2 * q1 * q3 - 2 * q0 * q2
    dcm[1, 0] = 2 * q1 * q2 - 2 * q0 * q3
    dcm[1, 2] = 2 * q2 * q3 + 2 * q0 * q1
    dcm[2, 0] = 2 * q1 * q3 + 2 * q0 * q2
    dcm[2, 1] = 2 * q2 * q3 - 2 * q0 * q1

    return dcm


def project_keypoints(q_vbs2tango, r_Vo2To_vbs, cameraMatrix, distCoeffs, keypoints):
    if keypoints.shape[0] != 3:
        keypoints = np.transpose(keypoints)

    keypoints = np.vstack((keypoints, np.ones((1, keypoints.shape[1]))))
    pose_mat = np.hstack((np.transpose(quat2dcm(q_vbs2tango)), np.expand_dims(r_Vo2To_vbs, 1)))
    xyz = np.dot(pose_mat, keypoints)

    x0, y0 = xyz[0, :] / xyz[2, :], xyz[1, :] / xyz[2, :]

    r2 = x0 * x0 + y0 * y0
    cdist = 1 + distCoeffs[0] * r2 + distCoeffs[1] * r2 * r2 + distCoeffs[4] * r2 * r2 * r2
    x = x0 * cdist + distCoeffs[2] * 2 * x0 * y0 + distCoeffs[3] * (r2 + 2 * x0 * x0)
    y = y0 * cdist + distCoeffs[2] * (r2 + 2 * y0 * y0) + distCoeffs[3] * 2 * x0 * y0

    points2D = np.vstack((
        cameraMatrix[0, 0] * x + cameraMatrix[0, 2],
        cameraMatrix[1, 1] * y + cameraMatrix[1, 2]
    ))

    return points2D


In [None]:
import os
import json
import torch
import numpy as np
from torch.utils.data import Dataset
from PIL import Image
from scipy.io import loadmat
import tqdm

# === Assume these functions are defined ===
# _get_quat_bins, project_keypoints, load_tango_3d_keypoints, load_camera_intrinsics

class Speed(Dataset):
    def __init__(self, images_dir, json_dir, transform=None):
        self.images_dir = images_dir
        self.json_dir = json_dir
        self.transform = transform

        self.num_classes = 5000
        self.num_neighbors = 25

        self.imagesList = []
        self.yClassesList = []
        self.yWeightsList = []
        self.bboxList = []
        self.keyptsList = []

        # === Load auxiliary data ===
        attClassesMAT = loadmat('/kaggle/input/attitue/attitudeClasses (1).mat')['qClass']
        keypts3d = load_tango_3d_keypoints('/kaggle/input/tangojson/tangoPoints.mat')
        cameraMatrix, distCoeffs = load_camera_intrinsics('/kaggle/input/camerajson/camera.json')

        # === Load JSON annotations ===
        with open(self.json_dir, 'r') as f:
            annotations = json.load(f)

        # Prepare lookup with corrected filenames
        lookup = {
            item['filename'].lower(): item  # No replace() needed now!
            for item in annotations
        }

        # === Process each image file ===
        for root, _, files in os.walk(self.images_dir):
            for filename in files:
                fname = filename.lower()
                ann = lookup.get(fname)
                if ann is None:
                    continue

                full_path = os.path.join(root, filename)
                self.imagesList.append(full_path)

                q_vbs2tango = np.array(ann["q_vbs2tango"], dtype=np.float32)
                r_Vo2To_vbs = np.array(ann["r_Vo2To_vbs_true"], dtype=np.float32)

                attClasses, attWeights = _get_quat_bins(q_vbs2tango, attClassesMAT, self.num_neighbors)

                yClasses = np.zeros(self.num_classes, dtype=np.float32)
                yClasses[attClasses] = 1. / self.num_neighbors

                yWeights = np.zeros(self.num_classes, dtype=np.float32)
                yWeights[attClasses] = attWeights

                self.yClassesList.append(torch.from_numpy(yClasses))
                self.yWeightsList.append(torch.from_numpy(yWeights))

                # Project keypoints and compute bounding box
                keypts2d = project_keypoints(q_vbs2tango, r_Vo2To_vbs, cameraMatrix, distCoeffs, keypts3d)

                xmin = np.min(keypts2d[0])
                xmax = np.max(keypts2d[0])
                ymin = np.min(keypts2d[1])
                ymax = np.max(keypts2d[1])

                # Add margin
                margin_x = 0.05 * (xmax - xmin)
                margin_y = 0.05 * (ymax - ymin)

                bbox = [
                    max(0, xmin - margin_x),
                    xmax + margin_x,
                    max(0, ymin - margin_y),
                    ymax + margin_y
                ]

                self.bboxList.append(torch.tensor(bbox, dtype=torch.float32))
                self.keyptsList.append(torch.tensor(keypts2d, dtype=torch.float32))

    def __getitem__(self, idx):
        image_path = self.imagesList[idx]
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return {
            'image': image,
            'filename': os.path.basename(image_path),
            'y_class': self.yClassesList[idx],
            'y_weight': self.yWeightsList[idx],
            'bbox': self.bboxList[idx],
            'keypoints': self.keyptsList[idx]
        }

    def __len__(self):
        return len(self.imagesList)


In [None]:
train_dataset = Speed(images_dir=base_image_path_train, json_dir=train_json_path, transform=transform)
print("✅ Length of train_dataset:", len(train_dataset))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2, pin_memory=True)



In [None]:
from torch.utils.data import DataLoader

# === Create dataset instances ===
train_dataset = Speed(images_dir=base_image_path_train, json_dir=train_json_path, transform=transform)
val_dataset   = Speed(images_dir=base_image_path_val,   json_dir=val_json_path,   transform=transform)

# === Create DataLoaders ===
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=32,         # change as needed
    shuffle=True,
    num_workers=2,         # set to 0 for Kaggle or increase for speed in local
    pin_memory=True
)

val_loader = DataLoader(
    dataset=val_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

In [None]:
class DetectionWrapperDataset(Dataset):
    def __init__(self, base_dataset):
        self.base_dataset = base_dataset

    def __len__(self):
        return len(self.base_dataset)

    def __getitem__(self, idx):
        sample = self.base_dataset[idx]
        image = sample['image']
        bbox = sample['bbox']

        box = torch.tensor([[bbox[0], bbox[2], bbox[1], bbox[3]]], dtype=torch.float32)  # [x_min, y_min, x_max, y_max]
        label = torch.tensor([1], dtype=torch.int64)  # class 1 = object present

        target = {'boxes': box, 'labels': label}

        return image, target

In [None]:
def detection_collate_fn(batch):
    images = [item[0] for item in batch]
    targets = [item[1] for item in batch]
    return images, targets


In [None]:
import torch.nn as nn
from torchvision import models, ops
from torchvision.models.detection.rpn import RegionProposalNetwork, RPNHead
from torchvision.models.detection.roi_heads import RoIHeads
from torchvision.models.detection.anchor_utils import AnchorGenerator
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, TwoMLPHead
from torchvision.models.detection.transform import GeneralizedRCNNTransform

class MobileNetRPNRoI(nn.Module):
    def __init__(self, num_classes=2, freeze_backbone=True):
        super().__init__()
        
        # === Load MobileNetV2 ===
        mobilenet = models.mobilenet_v2(weights=None)
        self.backbone = mobilenet.features
        self.out_channels = 1280
        self.backbone.out_channels = self.out_channels

        # === Freeze backbone if requested ===
        if freeze_backbone:
            for param in self.backbone.parameters():
                param.requires_grad = False

        # === Image transformer ===
        self.transform = GeneralizedRCNNTransform(
            min_size=224,
            max_size=224,
            image_mean=[0.485, 0.456, 0.406],
            image_std=[0.229, 0.224, 0.225]
        )

        # === RPN: Region Proposal Network ===
        anchor_generator = AnchorGenerator(
            sizes=((32, 64, 128),),
            aspect_ratios=((0.5, 1.0, 2.0),)
        )

        rpn_head = RPNHead(
            in_channels=self.out_channels,
            num_anchors=anchor_generator.num_anchors_per_location()[0]
        )

        self.rpn = RegionProposalNetwork(
            anchor_generator=anchor_generator,
            head=rpn_head,
            fg_iou_thresh=0.7,
            bg_iou_thresh=0.3,
            batch_size_per_image=256,
            positive_fraction=0.5,
            pre_nms_top_n={'training': 2000, 'testing': 1000},
            post_nms_top_n={'training': 2000, 'testing': 300},
            nms_thresh=0.7
        )

        # === RoI Heads ===
        roi_pool = ops.MultiScaleRoIAlign(
            featmap_names=['0'],
            output_size=7,
            sampling_ratio=2
        )

        box_head = TwoMLPHead(
            in_channels=self.out_channels * 7 * 7,
            representation_size=1024
        )

        box_predictor = FastRCNNPredictor(in_channels=1024, num_classes=num_classes)

        self.roi_heads = RoIHeads(
            box_roi_pool=roi_pool,
            box_head=box_head,
            box_predictor=box_predictor,
            fg_iou_thresh=0.5,
            bg_iou_thresh=0.5,
            batch_size_per_image=128,
            positive_fraction=0.25,
            bbox_reg_weights=None,
            score_thresh=0.05,
            nms_thresh=0.5,
            detections_per_img=100
        )

    def forward(self, images, targets=None):
        # Apply transform
        images, targets = self.transform(images, targets)

        # Extract features
        features = self.backbone(images.tensors)  # shape: [B, 1280, H, W]
        features = {"0": features}

        # RPN proposals
        proposals, rpn_losses = self.rpn(images, features, targets)

        # RoI heads (bbox refinement/classification)
        detections, roi_losses = self.roi_heads(features, proposals, images.image_sizes, targets)

        # Aggregate losses
        losses = {}
        losses.update(rpn_losses)
        losses.update(roi_losses)

        if self.training:
            return losses
        return detections


In [None]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Instantiate the model with frozen backbone
model = MobileNetRPNRoI(num_classes=2, freeze_backbone=True).to(device)

# Optimizer only updates parameters that require gradients
optimizer = torch.optim.Adam(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=1e-4
)

# Wrap datasets
train_wrapped = DetectionWrapperDataset(train_dataset)
val_wrapped = DetectionWrapperDataset(val_dataset)

# DataLoaders
train_loader = DataLoader(train_wrapped, batch_size=4, shuffle=True, collate_fn=detection_collate_fn)
val_loader = DataLoader(val_wrapped, batch_size=4, shuffle=False, collate_fn=detection_collate_fn)

# Training loop
for epoch in range(75):
    model.train()
    epoch_loss = 0.0

    for images, targets in tqdm.tqdm(train_loader):
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        epoch_loss += losses.item()

    print(f"[Epoch {epoch + 1}] Loss: {epoch_loss:.4f}")


In [None]:
torch.save(model.state_dict(), "mobilenet_rpn_roi.pth")

In [22]:
def compute_position_spn(q_vbs2tango, bbox, corners3D, cameraMatrix, distCoeffs=np.zeros((1,5))):
    ''' Compute position vector for SPN model
    Arguments:
        q_vbs2tango: (4,) numpy.ndarray - predicted unit quaternion (scalar-first)
        bbox:        (4,) numpy.ndarray - bounding box [xmin, xmax, ymin, ymax] (pix)
        ...
    Returns:
        r_Vo2To_vbs: (3,) numpy.ndarray - predicted position vector (m)
    '''
    maxModelLength = 1.246 # [m] for Tango

    # Bounding box decomposition
    xmin, ymin, width, height = bbox[0], bbox[2], bbox[1]-bbox[0], bbox[3]-bbox[2]

    # Initial position guess based on similar triangles
    boxSize   = np.sqrt(width**2 + height**2)
    boxCenter = np.array([xmin + width/2.0, ymin + height/2.0])
    offsetPx  = np.array([boxCenter[0] - cameraMatrix[0,2],
                          boxCenter[1] - cameraMatrix[1,2]])
    az = np.arctan(offsetPx[0]/cameraMatrix[0,0]) # [rad]
    el = np.arctan(offsetPx[1]/cameraMatrix[1,1])
    range_wge = cameraMatrix[0,0] * maxModelLength / boxSize # [m]
    Ry = R.from_euler('y', -az).as_matrix()
    Rx = R.from_euler('x', -el).as_matrix()
    r_Vo2To_vbs = Ry @ Rx @ np.reshape(np.array([0, 0, range_wge]), (3,1))

    # NEWTON's METHOD
    maxIter = 50
    tolerance = 5e-10
    iter = 0
    dx = 1 + 1e-15

    # Initialize betas
    beta_old = np.squeeze(r_Vo2To_vbs)

    while dx > tolerance and iter <= maxIter:
        # Compute extreme reprojected points in VBS frame
        r_Vo2X_vbs = _compute_extremal_points(q_vbs2tango, beta_old, corners3D, cameraMatrix) # [4 x 3]

        # Compute update to beta
        r = _calc_residuals(r_Vo2X_vbs, cameraMatrix, distCoeffs, beta_old, bbox)
        J = _calc_jacobian(r_Vo2X_vbs, cameraMatrix, distCoeffs, beta_old)
        beta_new = beta_old - np.squeeze(np.linalg.inv(np.transpose(J) @ J) @ np.transpose(J) @ np.reshape(r, (4,1)))

        # Compute change between new and oldbeta
        dx = np.linalg.norm(beta_new - beta_old)

        # Updates
        iter = iter + 1
        beta_old = beta_new

    r_Vo2To_vbs = beta_new

    return r_Vo2To_vbs

def _compute_extremal_points(q_vbs2tango, r_Vo2To_vbs, tangoPoints, cameraMatrix):
    ''' Compute the extremal points of the Tango model given orientation and position estimates '''
    reprImagePoints = project_keypoints(q_vbs2tango, r_Vo2To_vbs,
                                cameraMatrix, np.zeros((5,)), tangoPoints)
    idx1 = np.argmin(reprImagePoints[0]) # xmin
    idx2 = np.argmin(reprImagePoints[1]) # ymin
    idx3 = np.argmax(reprImagePoints[0]) # xmax
    idx4 = np.argmax(reprImagePoints[1]) # ymax

    if tangoPoints.shape[0] != 3:
        tangoPoints = np.transpose(tangoPoints)
    tangoPoints_vbs = np.transpose(quat2dcm(q_vbs2tango)) @ tangoPoints

    r_Vo2X_vbs = np.zeros((4, 3))
    r_Vo2X_vbs[0] = tangoPoints_vbs[:,idx1] # left-most point
    r_Vo2X_vbs[1] = tangoPoints_vbs[:,idx3] # right-most point
    r_Vo2X_vbs[2] = tangoPoints_vbs[:,idx2] # top-most point
    r_Vo2X_vbs[3] = tangoPoints_vbs[:,idx4] # bottom-most point

    return r_Vo2X_vbs

def _calc_residuals(r_Vo2X_vbs, cameraMatrix, distCoeffs, r_Vo2To_vbs, bbox):
    ''' Compute residuals of projected extremal points against the bounding box '''
    Tx, Ty, Tz = r_Vo2To_vbs
    Bx1, Bx2, By1, By2 = bbox

    xs, ys = [], []
    for ii in range(4):
        # Project
        Rx, Ry, Rz = r_Vo2X_vbs[ii]
        x0 = (Rx + Tx) / (Rz + Tz)
        y0 = (Ry + Ty) / (Rz + Tz)

        # Distortion
        r2 = x0*x0 + y0*y0
        cdist = 1 + distCoeffs[0]*r2 + distCoeffs[1]*r2*r2 + distCoeffs[4]*r2*r2*r2
        x  = x0*cdist + distCoeffs[2]*2*x0*y0 + distCoeffs[3]*(r2 + 2*x0*x0)
        y  = y0*cdist + distCoeffs[2]*(r2 + 2*y0*y0) + distCoeffs[3]*2*x0*y0

        # Apply camera
        xs.append(cameraMatrix[0,0]*x + cameraMatrix[0,2])
        ys.append(cameraMatrix[1,1]*y + cameraMatrix[1,2])

    # Residuals
    r1 = xs[0] - Bx1
    r2 = xs[1] - Bx2
    r3 = ys[2] - By1
    r4 = ys[3] - By2

    return np.array([r1, r2, r3, r4])

def _calc_jacobian(r_Vo2X_vbs, cameraMatrix, distCoeffs, r_Vo2To_vbs):
    ''' Compute jacobian of the residuals.
        Camera distortion coefficients are neglected at the moment.
    '''
    fx, fy = cameraMatrix[0,0], cameraMatrix[1,1]
    Tx, Ty, Tz = r_Vo2To_vbs
    Rx_left, Rz_left = r_Vo2X_vbs[0,0], r_Vo2X_vbs[0,2]
    Rx_right, Rz_right = r_Vo2X_vbs[1,0], r_Vo2X_vbs[1,2]
    Ry_top, Rz_top = r_Vo2X_vbs[2,1], r_Vo2X_vbs[2,2]
    Ry_bot, Rz_bot = r_Vo2X_vbs[3,1], r_Vo2X_vbs[3,2]

    # Left-most image feature
    dr1db1 = fx / (Rz_left + Tz)
    dr1db2 = 0
    dr1db3 = -fx * (Rx_left + Tx) / (Rz_left + Tz)**2

    # Right-most iamge feature
    dr2db1 = fx / (Rz_right + Tz)
    dr2db2 = 0
    dr2db3 = -fx * (Rx_right + Tx) / (Rz_right + Tz)**2

    # Top-most image feature
    dr3db1 = 0
    dr3db2 = fy / (Rz_top + Tz)
    dr3db3 = -fy * (Ry_top + Ty) / (Rz_top + Tz)**2

    # Bottom-most image feature
    dr4db1 = 0
    dr4db2 = fy / (Rz_bot + Tz)
    dr4db3 = -fy * (Ry_bot + Ty) / (Rz_bot + Tz)**2

    # Jacobian
    J = np.array([[dr1db1, dr1db2, dr1db3],
                  [dr2db1, dr2db2, dr2db3],
                  [dr3db1, dr3db2, dr3db3],
                  [dr4db1, dr4db2, dr4db3]], dtype=np.float32)

    return J

In [23]:
def valid_spn(epoch, model, data_loader, cameraMatrix, distCoeffs, corners3D, writer, device, qClass):
    ''' Minimal SPN Validation: only Rotation & Translation error '''

    model.eval()
    total_q_error = 0.0
    total_t_error = 0.0
    total_samples = 0

    for idx, (images, bbox, q_gt, t_gt) in enumerate(data_loader):
        B = images.shape[0]
        total_samples += B

        with torch.no_grad():
            _, weights = model(images.to(device))
            topWeights, topClasses = torch.topk(weights, 5, dim=1)  # hardcoded top-5
            topWeights = torch.softmax(topWeights, dim=1)

        for b in range(B):
            qs_pr = qClass[topClasses[b].cpu()]        # [5, 4]
            q_weights = topWeights[b].cpu()            # [5]
            q_pr = weighted_mean_quaternion(qs_pr, q_weights)
            q_pr /= np.linalg.norm(q_pr)

            t_pr = compute_position_spn(q_pr, bbox[b].numpy(), corners3D, cameraMatrix, distCoeffs)

            q_gt_i = q_gt[b].numpy()
            t_gt_i = t_gt[b].numpy()

            err_q = error_orientation(q_pr, q_gt_i)
            err_t = error_translation(t_pr, t_gt_i)

            total_q_error += err_q
            total_t_error += err_t

    # Final average
    avg_q_error = total_q_error / total_samples
    avg_t_error = total_t_error / total_samples

    print(f"\n Validation Epoch {epoch}:")
    print(f" Rotation Error (deg)   : {avg_q_error:.2f}")
    print(f" Translation Error (m)  : {avg_t_error:.4f}")

    return {
        'eR': avg_q_error,
        'eT': avg_t_error
    }

In [24]:
def weighted_mean_quaternion(qs, weights=None):
    ''' Compute weighted mean of N unit quaternions.
    Arguments:
        qs: (N, 4) or (4, N) numpy.ndarray - unit quaternions (scalar-first)
    Returns:
        q: (4,) numpy.ndarray - weighted mean unit quaternion (scalar-first)
    '''
    # Size check
    if qs.shape[1] != 4:
        qs = np.transpose(qs) # (N,4)

    # Scipy uses scalar-last convention
    qs = qs[:,[1,2,3,0]]

    # Weights?
    if weights is None:
        weights = np.ones((qs.shape[0],), dtype=np.float32)

    # Quaternions to rotation matrices
    Rs = R.from_quat(qs)

    # Weighted average
    q = Rs.mean(weights).as_quat() # (4,)

    # Back to scalar-first convention
    q = q[[3,0,1,2]]

    return q

In [25]:
def error_orientation(q_pr, q_gt):
    # q must be [qvec, qcos]
    q_pr = np.reshape(q_pr, (4,))
    q_gt = np.reshape(q_gt, (4,))

    qdot = np.abs(np.dot(q_pr, q_gt))
    qdot = np.minimum(qdot, 1.0)
    return np.rad2deg(2*np.arccos(qdot)) # [deg

In [26]:
def error_translation(t_pr, t_gt):
    t_pr = np.reshape(t_pr, (3,))
    t_gt = np.reshape(t_gt, (3,))

    return np.sqrt(np.sum(np.square(t_gt - t_pr)))

In [27]:
import os
import json
import torch
import tqdm
import numpy as np
from torch.utils.data import Dataset
from PIL import Image
from scipy.io import loadmat

def load_tango_3d_keypoints(mat_dir):
    vertices = loadmat(mat_dir)['tango3Dpoints']
    return np.transpose(vertices.astype(np.float32))

def load_camera_intrinsics(camera_json):
    with open(camera_json) as f:
        cam = json.load(f)
    cameraMatrix = np.array(cam['cameraMatrix'], dtype=np.float32)
    distCoeffs = np.array(cam['distCoeffs'], dtype=np.float32)
    return cameraMatrix, distCoeffs

class Speed(Dataset):
    def __init__(self, images_dir, json_dir, transform=None):
        self.images_dir = images_dir
        self.json_dir = json_dir
        self.transform = transform

        self.num_classes = 5000
        self.num_neighbors = 5

        self.imagesList = []
        self.yClassesList = []
        self.yWeightsList = []
        self.bboxList = []
        self.keyptsList = []
        self.qList = []
        self.tList = []

        attClassesMAT = loadmat('/kaggle/input/atlitude/attitudeClasses.mat')['qClass']
        keypts3d = load_tango_3d_keypoints('/kaggle/input/tangopoint/tangoPoints.mat')
        cameraMatrix, distCoeffs = load_camera_intrinsics('/kaggle/input/speed/speedplusv2/camera.json')

        with open(self.json_dir, 'r') as f:
            annotations = json.load(f)
        lookup = {item['filename']: item for item in annotations}

        for filename in tqdm.tqdm(os.listdir(self.images_dir)):
            if filename not in lookup:
                continue

            self.imagesList.append(os.path.join(self.images_dir, filename))
            ann = lookup[filename]

            q_vbs2tango = np.array(ann["q_vbs2tango"], dtype=np.float32)
            r_Vo2To_vbs = np.array(ann['r_Vo2To_vbs_true'], dtype=np.float32)

            attClasses, attWeights = _get_quat_bins(q_vbs2tango, attClassesMAT, self.num_neighbors)

            yClasses = np.zeros(self.num_classes, dtype=np.float32)
            yClasses[attClasses] = 1. / self.num_neighbors
            yWeights = np.zeros(self.num_classes, dtype=np.float32)
            yWeights[attClasses] = attWeights

            self.yClassesList.append(torch.from_numpy(yClasses))
            self.yWeightsList.append(torch.from_numpy(yWeights))

            keypts2d = np.random.rand(2, 11) * 100  # placeholder if project_keypoints not defined
            xmin, xmax = np.min(keypts2d[0]), np.max(keypts2d[0])
            ymin, ymax = np.min(keypts2d[1]), np.max(keypts2d[1])
            width, height = xmax - xmin, ymax - ymin
            bbox = [max(0, xmin - 0.05 * width), xmax + 0.05 * width,
                    max(0, ymin - 0.05 * height), ymax + 0.05 * height]

            self.bboxList.append(bbox)
            self.keyptsList.append(torch.tensor(keypts2d, dtype=torch.float32))
            self.qList.append(q_vbs2tango)
            self.tList.append(r_Vo2To_vbs)

    def __getitem__(self, idx):
        image_path = self.imagesList[idx]
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        return {
            'image': image,
            'filename': os.path.basename(image_path),
            'y_class': self.yClassesList[idx],
            'y_weight': self.yWeightsList[idx],
            'bbox': torch.tensor(self.bboxList[idx], dtype=torch.float32),
            'keypoints': self.keyptsList[idx],
            'quat_gt': torch.tensor(self.qList[idx], dtype=torch.float32),
            'trans_gt': torch.tensor(self.tList[idx], dtype=torch.float32)
        }

    def __len__(self):
        return len(self.imagesList)

In [28]:
from torchvision import transforms
from torch.utils.data import DataLoader
from scipy.io import loadmat

# === Load Data
qClass = loadmat('/kaggle/input/atlitude/attitudeClasses.mat')['qClass']
corners3D = load_tango_3d_keypoints('/kaggle/input/tangopoint/tangoPoints.mat')
cameraMatrix, distCoeffs = load_camera_intrinsics('/kaggle/input/speed/speedplusv2/camera.json')

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

val_dataset = Speed(
    images_dir='/kaggle/input/speedsplit/speed/images/trainval',
    json_dir='/kaggle/input/speedsplit/speed/val.json',
    transform=val_transform
)

val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=2)

# === Validation Function
def valid_spn(epoch, model, data_loader, cameraMatrix, distCoeffs, corners3D, writer, device, qClass):
    model.eval()
    total_q_error, total_t_error, total_samples = 0, 0, 0

    for batch in data_loader:
        images = batch['image'].to(device)
        bbox = batch['bbox']
        q_gt = batch['quat_gt']
        t_gt = batch['trans_gt']
        B = images.shape[0]
        total_samples += B

        with torch.no_grad():
            _, weights = model(images)
            topWeights, topClasses = torch.topk(weights, 5, dim=1)
            topWeights = torch.softmax(topWeights, dim=1)

        for b in range(B):
            qs_pr = qClass[topClasses[b].cpu()]      # [5, 4]
            q_weights = topWeights[b].cpu()          # [5]
            q_pr = weighted_mean_quaternion(qs_pr, q_weights)
            q_pr = q_pr / np.linalg.norm(q_pr)
            t_pr = compute_position_spn(q_pr, bbox[b].numpy(), corners3D, cameraMatrix, distCoeffs)
            err_q = error_orientation(q_pr, q_gt[b].numpy())
            err_t = error_translation(t_pr, t_gt[b].numpy())
            total_q_error += err_q
            total_t_error += err_t

    avg_q = total_q_error / total_samples
    avg_t = total_t_error / total_samples
    print(f"\nEpoch {epoch} - Rotation Error: {avg_q:.2f} deg | Translation Error: {avg_t:.4f} m")
    return {'eR': avg_q, 'eT': avg_t}

# === Model and Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

# === Run
epoch = 0
performances = valid_spn(
    epoch=epoch,
    model=model,
    data_loader=val_loader,
    cameraMatrix=cameraMatrix,
    distCoeffs=distCoeffs,
    corners3D=corners3D,
    writer=None,
    device=device,
    qClass=qClass
)

print("\nFinal Validation Results:")
print(f"Rotation Error (deg):   {performances['eR']:.2f}")
print(f"Translation Error (m):  {performances['eT']:.4f}")

100%|██████████| 12000/12000 [00:18<00:00, 643.53it/s]



Epoch 0 - Rotation Error: 39.74 deg | Translation Error: 26.2184 m

Final Validation Results:
Rotation Error (deg):   39.74
Translation Error (m):  26.2184
