In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import glob
from cv2 import aruco
import math
import shutil
from scipy.spatial.transform import Rotation as R
from pyquaternion import Quaternion
import math
import json
from tqdm import tqdm
import os.path as osp

In [2]:
class Transporter:
    def __init__(self):
        self.aruco_dict = cv2.aruco.getPredefinedDictionary(aruco.DICT_6X6_250)
        self.parameters = cv2.aruco.DetectorParameters()
        self.ar_detector = cv2.aruco.ArucoDetector(self.aruco_dict, self.parameters)

    def estimate_arucos_poses(
        self, 
        image,
        intrinsic,
        distortion_coef,
        marker_sizes,
        aruco_corners,
        correct_rot_mat=np.eye(3)):
        """
        Estimate the pose of ArUco markers in the image.
        Parameters:
            image: Input image containing ArUco markers.
            intrinsic: Camera intrinsic matrix.
            distortion_coef: Distortion coefficients of the camera.
            marker_sizes: Size of the ArUco marker in meters.
            aruco_corners: Detected corners of the ArUco markers.
            correct_rot_mat: Rotation matrix to correct the orientation of the markers.
        Returns:
            rvecs: List of rotation vectors for each detected marker.
            tvecs: List of translation vectors for each detected marker.
        """

        rvecs, tvecs = [], [] 
    
        for i in range(len(aruco_corners)):
            marker_size = marker_sizes[i]
            rvec, tvec, markerPoints = cv2.aruco.estimatePoseSingleMarkers(
                aruco_corners[i], 
                marker_size, 
                intrinsic, 
                distortion_coef)
            r_mat, _ = cv2.Rodrigues(rvec)
    
            # The aruco is rotated about z-axis, we need to adjust the rotation matrix. If 
            # it is pasted correctly, just multiply with identity matrix.
            r_mat_correct = r_mat @ correct_rot_mat
            r_vec_correct, _ = cv2.Rodrigues(r_mat_correct)
    
            rvecs.append(r_vec_correct)
            tvecs.append(tvec)
                 
        return rvecs, tvecs

    def detect_aruco_markers_by_id(
        self, 
        image,
        selected_aruco_ids,
        BGR_format=True):
        """
        Detect ArUco markers in the image by their IDs.
        Parameters:
            image: Input image containing ArUco markers.
            selected_aruco_ids: List of ArUco marker IDs to detect.
            BGR_format: If True, the input image is in BGR format, otherwise RGB.
        Returns:
            boxes: List of bounding boxes for detected markers.
            centers: List of centers of the detected markers.
            detected_corners: List of corners for the detected markers.
            detected_aruco_ids: List of IDs of the detected markers.
        """

        boxes = []
        centers = []
        detected_corners = []
        detected_aruco_ids = []
        gray_img = cv2.cvtColor(image.copy(), cv2.COLOR_BGR2GRAY)

        corners, ids, _ = self.ar_detector.detectMarkers(gray_img)

        if ids is None:
            return boxes, centers, detected_corners, detected_aruco_ids
            
        for i, marker_id in enumerate(ids.flatten()):
            if marker_id in selected_aruco_ids:
                x_coords = corners[i][0][:, 0]
                y_coords = corners[i][0][:, 1]
                x1, y1 = int(np.min(x_coords)), int(np.min(y_coords))
                x2, y2 = int(np.max(x_coords)), int(np.max(y_coords))
                center = ((x1+x2)//2, (y1+y2)//2)
                boxes.append(((x1, y1, x2, y2), f'ArUco_{marker_id}', 1.0))
                centers.append(center)
                detected_corners.append(corners[i])
                detected_aruco_ids.append(marker_id)
                
        return boxes, centers, detected_corners, detected_aruco_ids

    def calculate_add_on_length(self, carried_object_length):
        """
        Suppose that transporter is able to align in the middle of the carried object
        """
        assert carried_object_length > self.transporter_length
        return (carried_object_length - self.transporter_length) / 2

    def draw_aruco_poses(
        self,
        image, 
        markers_corners, 
        matrix_coefficients,
        distortion_coefficients,
        rvecs,
        tvecs):
    
        annotated_image = image.copy()
    
        for rvec, tvec in zip(rvecs, tvecs):
            annotated_image = cv2.drawFrameAxes(
                annotated_image, 
                matrix_coefficients, 
                distortion_coefficients, 
                rvec, 
                tvec, 
                length=0.06) 
    
        return annotated_image

In [3]:
def resizeWithCropFactor(img, size, intrinsic=None):
    h1, w1 = img.shape[:2]
    w2, h2 = size
    if h1 == h2 and w1 == w2:
        return img, intrinsic 
    r1 = w1 / h1
    r2 = w2 / h2
    if r1 >= r2:
        changed_width = int(r2 * h1)
        offset_pxl_pos = int((w1 - changed_width) / 2)
        img = img[:, offset_pxl_pos : offset_pxl_pos + changed_width]
        ratio = h2 / h1
    elif r1 < r2:
        changed_height = int(w1 / r2)
        offset_pxl_pos = int((h1 - changed_height) / 2)
        img = img[offset_pxl_pos : offset_pxl_pos + changed_height, :]
        ratio = w2 / w1

    if intrinsic is not None:
        cx = w2 / 2
        cy = h2 / 2

        new_intrinsic = intrinsic.copy()
        new_intrinsic[0, -1] = cx
        new_intrinsic[1, -1] = cy
        new_intrinsic[0, 0] *= ratio
        new_intrinsic[1, 1] *= ratio
        return cv2.resize(img, (w2, h2)), new_intrinsic

    return cv2.resize(img, (w2, h2)), None

def rotationMatrixToEulerAngles(R):
    """
    Convert a rotation matrix to Euler angles (roll, pitch, yaw).
    """
    assert (R.shape == (3, 3))
    
    sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
    
    singular = sy < 1e-6
    
    if not singular:
        x = math.atan2(R[2, 1], R[2, 2])
        y = math.atan2(-R[2, 0], sy)
        z = math.atan2(R[1, 0], R[0, 0])
    else:
        x = math.atan2(-R[1, 2], R[1, 1])
        y = math.atan2(-R[2, 0], sy)
        z = 0
    
    return np.array([x, y, z])

def compute_angle(rmat):
    """Computes the angle from the rotation vector."""
    #euler_angles = rotationMatrixToEulerAngles(rmat)
    #angle = np.rad2deg(euler_angles)[1]
    #angle = -angle
    #return angle

    degrees = R.from_matrix(rmat).as_euler('yxz', degrees=True)
    pitch_angle = degrees[0]
    sign = int(np.sign(pitch_angle))
    angle = abs(pitch_angle)
    pitch_angle = (180. - angle) * -sign
    return pitch_angle

def put_text(
    image, 
    device_depth=None, 
    device_angle=None, 
    device_shift=None):

    annotated_image = image.copy()
    midpoint = (annotated_image.shape[1] // 2, annotated_image.shape[0] // 2)

    text_lines = [
        f"Depth: {device_depth:.2f} m",
        f"Angle: {device_angle:.2f} deg",
        f"Shift: ({device_shift[0]:.2f} m, {device_shift[1]:.2f}) m"]

    y_offset = 40
    for line in text_lines:
        text_size = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)[0]
        cv2.putText(annotated_image, line, 
                (midpoint[0], midpoint[1]-y_offset),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
        y_offset += text_size[1] + 10

    return annotated_image


In [4]:
src_dirs = []

root_dir = '/home/alan_khang/Desktop/adam_dataset'
transporter_name = ['t1', 't2']
#date_dir = ['2025_07_25', '2025_07_29', '2025_07_30', '2025_07_31', '2025_08_01']
date_dir = ['2025_08_03']

for transporter in transporter_name:
    for date in date_dir:
        dataset_root = osp.join(root_dir, f'{transporter}_{date}')
        if not osp.exists(dataset_root):
            print(f"Dataset root {dataset_root} does not exist.")
            continue
        scenes_by_date = sorted(os.listdir(dataset_root))
        scenes_by_date = list(filter(lambda x: date in x, scenes_by_date))
        scenes_by_date = [osp.join(dataset_root, s) for s in scenes_by_date]
        scenes_by_date = [s for s in scenes_by_date if osp.isdir(s)]
        src_dirs.extend(scenes_by_date)

Dataset root /home/alan_khang/Desktop/adam_dataset/t2_2025_08_03 does not exist.


In [5]:
aruco_id_by_side = {
    'bottom': [2],
    'right': [6, 9],
    'left': [5, 4],
    'front': [7, 8]
}

aruco_size_by_id = {
    2: 0.049,  # Bottom marker
    6: 0.046,  # Right markers
    9: 0.046,  # Right markers 
    7: 0.046,  # Front markers
    8: 0.046,  # Front markers
    5: 0.046,  # Left markers
    4: 0.042   # Left markers
}

guilder_aruco_2_center_mat_filepath = '/home/alan_khang/Desktop/adam_dataset/guilder_aruco_2_center_mat.json'

with open(guilder_aruco_2_center_mat_filepath, 'r') as f:
    guilder_aruco_2_center_mat_dict = json.load(f)

aruco_2_center_mat = {
    2: np.array(guilder_aruco_2_center_mat_dict['bottom_aruco_pose_2_global_pose'], dtype=np.float64),
    6: np.array(guilder_aruco_2_center_mat_dict['aruco_id_6_2_global_pose'], dtype=np.float64),
    9: np.array(guilder_aruco_2_center_mat_dict['aruco_id_9_2_global_pose'], dtype=np.float64),
    5: np.array(guilder_aruco_2_center_mat_dict['aruco_id_5_2_global_pose'], dtype=np.float64),
    4: np.array(guilder_aruco_2_center_mat_dict['aruco_id_4_2_global_pose'], dtype=np.float64),
    7: np.array(guilder_aruco_2_center_mat_dict['aruco_id_7_2_global_pose'], dtype=np.float64),
    8: np.array(guilder_aruco_2_center_mat_dict['aruco_id_8_2_global_pose'], dtype=np.float64) 
}

aruco_ids_to_detect = list(aruco_2_center_mat.keys())

In [6]:
offset_px = 5

correct_rot_mat = np.eye(3)

overlay_color = [0, 0, 0]

target_marker_ids = [2]

guilder_width = 0.58
guilder_height = 0.304
guilder_length = 0.488

distance_between_ground_and_bottom_aruco = 0.094
distance_between_guilder_rear_and_bottom_aruco = 0.04

guilder_whl = np.array([guilder_width, guilder_height, guilder_length])

guilder_center_coord_in_aruco_coord = np.array([
    [1., 0., 0., 0.],
    [0., 1., 0., (guilder_height / 2) - distance_between_ground_and_bottom_aruco],  
    [0., 0., 1., -((guilder_length / 2) - distance_between_guilder_rear_and_bottom_aruco)],   
    [0., 0., 0., 1.]], dtype=np.float32)

annotated_img_dir_name = 'processed/annotated_images'
annotated_dir_name = 'processed/annotations'
img_dir_name = 'processed/images'

transporter = Transporter()

for src_dir in src_dirs:
    raw_color_dir = os.path.join(src_dir, 'color')
    annotated_img_dir = os.path.join(src_dir, annotated_img_dir_name)
    annotated_dir = os.path.join(src_dir, annotated_dir_name)
    img_dir = os.path.join(src_dir, img_dir_name) 

    if os.path.exists(annotated_img_dir):
        shutil.rmtree(annotated_img_dir)

    if os.path.exists(annotated_dir):
        shutil.rmtree(annotated_dir)

    if os.path.exists(img_dir):
        shutil.rmtree(img_dir)

    os.makedirs(annotated_img_dir, exist_ok=True)
    os.makedirs(annotated_dir, exist_ok=True)
    os.makedirs(img_dir, exist_ok=True)

    img_paths = sorted(glob.glob(os.path.join(raw_color_dir, '*.jpg')))
    for img_path in tqdm(img_paths):
        try: 
            json_path = img_path.replace('color', 'info').replace('.jpg', '.json')
            if not os.path.exists(json_path):
                print(f"JSON file {json_path} does not exist. Skipping.")
                continue
            with open(json_path, 'r') as f:
                img_info = json.load(f)
            img_filename = os.path.basename(img_path)
            intr = np.array(img_info['camera_intrinsic'], dtype=np.float32)
            color_wh = (img_info['image_size']['width'], img_info['image_size']['height'])
            expected_size = (img_info['depth_size']['width'], img_info['depth_size']['height'])
            img = cv2.imread(img_path)

            if img.shape[:2][::-1] != color_wh:
                raise ValueError(
                    f"Image {img_filename} has unexpected size {img.shape[:2][::-1]}, expected {color_wh}.")

            img, intr = resizeWithCropFactor(img, expected_size, intrinsic=np.array(intr, dtype=np.float32))
            resized_img = img.copy()

            (aruco_boxes, aruco_centers, 
            aruco_detected_corners, aruco_detected_ids) = transporter.detect_aruco_markers_by_id(
                resized_img, 
                aruco_ids_to_detect,
                BGR_format=True)
            num_markers = len(aruco_centers)

            if num_markers == 0:
                #print(f"No ArUco markers detected in {img_filename}. Skipping.")
                continue

            markers_sizes = [aruco_size_by_id[id] for id in aruco_detected_ids]
            processed_img = np.ascontiguousarray(img.copy())
            rvecs, tvecs = transporter.estimate_arucos_poses(
                processed_img, 
                intr, 
                np.zeros(8, dtype=np.float64),
                markers_sizes, 
                aruco_detected_corners)

            assert len(rvecs) == len(aruco_detected_ids)

            cam_2_global_poses_in_frame = []
            aruco_detected = dict()

            for rvec, tvec, aruco_id, aruco_corners in zip(rvecs, tvecs, aruco_detected_ids, aruco_detected_corners):
                if rvec is None or tvec is None:
                    print(f"Skipping marker {aruco_id} due to None rvec or tvec.")
                    continue
                rmat, _ = cv2.Rodrigues(rvec)
                cam_2_aruco_quat = R.from_matrix(rmat).as_quat(scalar_first=True)
                cam_2_aruco_pose = np.eye(4, dtype=np.float32)
                cam_2_aruco_pose[:3, :3] = rmat
                cam_2_aruco_pose[:3, 3] = tvec.flatten()

                aruco_pose_2_global_pose = aruco_2_center_mat[aruco_id]
                cam_2_global_pose = cam_2_aruco_pose @ aruco_pose_2_global_pose

                cam_2_global_poses_in_frame.append(cam_2_global_pose)

                aruco_box = aruco_corners[0].astype(np.int16).squeeze()
                box_xyxy = [int(np.min(aruco_box[:, 0])), int(np.min(aruco_box[:, 1])),
                            int(np.max(aruco_box[:, 0])), int(np.max(aruco_box[:, 1]))]
                aruco_detected[str(aruco_id)] = {
                    'rot': cam_2_aruco_quat.tolist(),
                    'trans': tvec.flatten().tolist(),
                    'box_2d_xyxy': box_xyxy,
                    'aruco_2_global_rot': aruco_pose_2_global_pose.tolist()}

            assert len(cam_2_global_poses_in_frame) > 0
            translations = np.array([T[:3, 3] for T in cam_2_global_poses_in_frame])
            avg_translation = np.mean(translations, axis=0)

            # Average rotation using quaternions
            rotations = R.from_matrix([T[:3, :3] for T in cam_2_global_poses_in_frame])
            avg_rotation = R.mean(rotations).as_matrix()
        
            cam_2_global_pose_avg = np.eye(4)
            cam_2_global_pose_avg[:3, :3] = avg_rotation
            cam_2_global_pose_avg[:3, 3] = avg_translation

            guilder_angle = compute_angle(avg_rotation)

            annotated_rgb = transporter.draw_aruco_poses(
               processed_img, 
               aruco_detected_corners, 
               intr, 
               np.zeros(8, dtype=np.float64),
               rvecs, 
               tvecs)

            annotated_rgb = put_text(
               annotated_rgb,
               device_depth=round(avg_translation[2], 4),
               device_angle=guilder_angle,
               device_shift=(round(avg_translation[0], 4), 
                             round(avg_translation[1], 4)))

            for aruco_corners in aruco_detected_corners:
               aruco_box = aruco_corners[0].astype(np.int16).squeeze()
               box_xyxy = [int(np.min(aruco_box[:, 0])), int(np.min(aruco_box[:, 1])),
                           int(np.max(aruco_box[:, 0])), int(np.max(aruco_box[:, 1]))]

               resized_img = cv2.rectangle(
                   resized_img, 
                   (box_xyxy[0] - offset_px, box_xyxy[1] - offset_px), 
                   (box_xyxy[2] + offset_px, box_xyxy[3] + offset_px), 
                   overlay_color[::-1],
                   thickness=-1)

            cv2.imwrite(
               os.path.join(annotated_img_dir, img_filename), 
               annotated_rgb)

            cv2.imwrite(
               os.path.join(img_dir, img_filename), 
               resized_img)

            guilder_center = cam_2_global_pose_avg[:3, 3]
            guilder_rot_mat = cam_2_global_pose_avg[:3, :3]
            guilder_rot_quat = R.from_matrix(guilder_rot_mat).as_quat(scalar_first=True)

            annot = {
               'camera_intrinsic': intr.tolist(),
               'aruco_detected': aruco_detected,
               #'aruco_center': aruco_center.tolist(),
               #'aruco_orientation': aruco_orientation.tolist(),
               #'aruco_pitch_deg': aruco_pitch,
               #'aruco_pitch_rad': math.radians(aruco_pitch),
               'guilder_whl': guilder_whl.tolist(),
               'guilder_center': guilder_center.tolist(),
               'guilder_pitch_deg': guilder_angle,
               'guilder_pitch_rad': math.radians(guilder_angle),
               'guilder_rot': guilder_rot_quat.tolist(),
               'timestamp': img_info['timestamp'],
               'img_width': expected_size[0],
               'img_height': expected_size[1]
            }

            assert img_filename.endswith('.jpg')
            json_path = os.path.join(annotated_dir, img_filename.replace('.jpg', '.json'))

            with open(json_path, 'w') as f:
               json.dump(annot, f, indent=4)

            #aruco_boxes, aruco_centers, aruco_selected_corners = transporter.detect_aruco_markers_by_id(
                #img, 
                #target_marker_ids, 
                #BGR_format=True)
            #num_markers = len(aruco_centers)

            #if num_markers >= 1:
                #processed_img = np.ascontiguousarray(img.copy())
                #marker_size = 0.049
                #rvecs, tvecs = transporter.estimate_arucos_poses(
                    #processed_img, 
                    #intr, 
                    #np.zeros(8, dtype=np.float64),
                    #marker_size, 
                    #aruco_selected_corners, 
                    #correct_rot_mat=correct_rot_mat)

                #if num_markers == 1:
                    #tvec, rvec = tvecs[0], rvecs[0]
                    #t_mid = tvec
                    #R_mid, _ = cv2.Rodrigues(rvec)

                #if R_mid is not None and t_mid is not None:
                    #device_angle = compute_angle(R_mid)

                    #object_center = intr @ t_mid.squeeze().reshape(3, 1)
                    #object_center_2d = object_center / object_center[2]  # Normalize by the third coordinate
                    #object_center_2d = object_center_2d[:2].flatten() 
                    #device_midpoint = (int(object_center_2d[0]), int(object_center_2d[1]))

                    #aruco_coord_in_camera = np.eye(4)
                    #aruco_coord_in_camera[:3, :3] = R_mid
                    #aruco_coord_in_camera[:3, 3] = t_mid.squeeze() 

                    #guilder_center_coord_in_cam = aruco_coord_in_camera @ guilder_center_coord_in_aruco_coord

                    #aruco_orientation = R.from_matrix(R_mid).as_quat(scalar_first=True)
                    #aruco_center = np.array([
                        #round(aruco_coord_in_camera[0, 3], 8),
                        #round(aruco_coord_in_camera[1, 3], 8),
                        #round(aruco_coord_in_camera[2, 3], 8)
                    #])
                    #aruco_pitch = device_angle

                    #guilder_center = guilder_center_coord_in_cam[:-1, -1]

                    #annotated_rgb = _draw_boxes_and_labels(
                        #processed_img, 
                        #aruco_selected_corners, 
                        #intr, 
                        #np.zeros(8, dtype=np.float64),
                        #rvecs, 
                        #tvecs, 
                        #device_depth=round(aruco_coord_in_camera[2, 3], 4), 
                        #device_angle=device_angle, 
                        #device_shift=(round(aruco_coord_in_camera[0, 3], 4), round(aruco_coord_in_camera[1, 3], 4)), 
                        #midpoint=device_midpoint)

                    #for aruco_corners in aruco_selected_corners:
                        #aruco_box = aruco_corners[0].astype(np.int16).squeeze()
                        #box_xyxy = [int(np.min(aruco_box[:, 0])), int(np.min(aruco_box[:, 1])),
                                    #int(np.max(aruco_box[:, 0])), int(np.max(aruco_box[:, 1]))]

                        #resized_img = cv2.rectangle(
                            #resized_img, 
                            #(box_xyxy[0] - offset_px, box_xyxy[1] - offset_px), 
                            #(box_xyxy[2] + offset_px, box_xyxy[3] + offset_px), 
                            #overlay_color[::-1],
                            #thickness=-1)

                    #cv2.imwrite(
                        #os.path.join(annotated_img_dir, img_filename), 
                        #annotated_rgb)

                    #cv2.imwrite(
                        #os.path.join(img_dir, img_filename), 
                        #resized_img)

                    #annot = {
                        #'camera_intrinsic': intr.tolist(),
                        #'aruco_center': aruco_center.tolist(),
                        #'aruco_orientation': aruco_orientation.tolist(),
                        #'aruco_pitch_deg': aruco_pitch,
                        #'aruco_pitch_rad': math.radians(aruco_pitch),
                        #'guilder_whl': guilder_whl.tolist(),
                        #'guilder_center': guilder_center.tolist(),
                        #'timestamp': img_info['timestamp'],
                        #'img_width': expected_size[0],
                        #'img_height': expected_size[1]
                    #}

                    #assert img_filename.endswith('.jpg')
                    #json_path = os.path.join(annotated_dir, img_filename.replace('.jpg', '.json'))

                    #with open(json_path, 'w') as f:
                        #json.dump(annot, f, indent=4)
        except Exception as e:
            print(f"Error processing {img_filename}: {e}")
            shutil.rmtree(img_dir)
            shutil.rmtree(annotated_img_dir)
            shutil.rmtree(annotated_dir)

100%|██████████| 212/212 [00:06<00:00, 30.72it/s]
100%|██████████| 491/491 [00:15<00:00, 32.22it/s]
