In [None]:
%reload_ext autoreload
%autoreload 2

from ros_camera import RosCamera
from camera_utils import stream
from yolov8 import YOLOv8
from byte_track import ByteTrack
from track_3d import Tracker3D
import cv2
import numpy as np
from time import time


def depth_scale(depth):
    if depth.dtype == np.uint16:
        return 0.001
    if depth.dtype == float:
        return 1
    raise RuntimeError


class TimeMeasurer:
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        self.tic = time()
        return self

    def __exit__(self, type, value, traceback):
        toc = time()
        print(f"{self.name}: {toc - self.tic:.03f}")

In [5]:
camera = RosCamera(
    "/realsense_gripper/color/camera_info",
    "/realsense_gripper/color/image_raw/compressed",
    "/realsense_gripper/aligned_depth_to_color/camera_info",
    "/realsense_gripper/aligned_depth_to_color/image_raw",
    "toy_box_3.bag", exact_sync=True)

In [None]:
model_file = "/home/cds-jetson-host/ultralytics/yolov8n-seg-1class.yaml"
weights_file = "/home/cds-jetson-host/ultralytics/runs/segment/train3/weights/last.pt"
yolov8 = YOLOv8(model_file, weights_file, min_score=0.7)
byte_track = ByteTrack(frame_rate=30, track_buffer=30)
tracker_3d = Tracker3D(15, camera.K, camera.D)
odom = cv2.rgbd.RgbdOdometry_create(camera.K, 0.1, 3.0)

In [None]:
stream(camera)

In [None]:
def detect_toy_box(key, image, depth):

    def prep_rgbd(image, depth):
        image_prep = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        depth_prep = (depth * depth_scale(depth)).astype(np.float32)
        return image_prep, depth_prep

    with TimeMeasurer("segmentation") as tm:
        scores, classes_ids, boxes, masks = yolov8.segment(image)

    segmentation_vis = image.copy()
    YOLOv8.draw_segmentation(segmentation_vis, scores, classes_ids, boxes, masks)
    segmentation_image = YOLOv8.to_segmentation_image(classes_ids, masks)

    boxes, scores, classes_ids, masks = ByteTrack.from_segmentation_image(segmentation_image)
    with TimeMeasurer("tracking") as tm:
        tracked_objects = byte_track.track(boxes, scores, classes_ids, masks=masks)

    tracking_vis = image.copy()
    ByteTrack.draw_tracked_objects(tracking_vis, tracked_objects)
    tracking_image = ByteTrack.to_tracking_image(tracked_objects, segmentation_image.shape)

    image_prep, depth_prep = prep_rgbd(image, depth)
    if detect_toy_box.prev_image_prep is None:
        detect_toy_box.pose = np.eye(4)
    else:
        with TimeMeasurer("odometry") as tm:
            ret, transform = odom.compute(
                image_prep, depth_prep, np.empty((0,)),
                detect_toy_box.prev_image_prep, detect_toy_box.prev_depth_prep, np.empty((0,)))
        assert ret
        if ret == False:
            detect_toy_box.pose = np.eye(4)
            print("Odomtery failed")
        else:
            detect_toy_box.pose = np.matmul(detect_toy_box.pose, transform)
    detect_toy_box.prev_image_prep = image_prep
    detect_toy_box.prev_depth_prep = depth_prep

    classes_ids, tracking_ids, masks = Tracker3D.from_tracking_image(tracking_image)
    with TimeMeasurer("tracking 3d") as tm:
        tracker_3d.update(detect_toy_box.pose, depth, classes_ids, tracking_ids, masks)

    image[...] = tracking_vis

    print()

detect_toy_box.prev_image_prep = None

In [None]:
detect_toy_box.prev_image_prep = None

stream(camera, detect_toy_box)

In [None]:
for tracked_object in tracker_3d.tracked_objects:
    print(tracked_object.pose)

In [None]:
print(tracker_3d.tracked_objects[0].pose - tracker_3d.tracked_objects[1].pose)