# PyFUNMAP Experiments
This notebook experiments with running FUNMAP outside of the ROS ecosystem.

**How to run:**

 1. Power on and connect to Stretch (keyboard/mouse/HDMI or via SSH)
 2. Navigate to the directory this notebook lives in and run `jupyter notebook --no-browser`
 3. If SSH-ed, open a tunnel for Jupyter using `ssh -L 8888:localhost:8888 <ip addr>`
 4. Click on Jupyter's link (e.g. looks like `http://localhost:8888/?token=<some token>`)
 5. Shift + Enter to execute the cells below

In [1]:
!pip install git+https://github.com/safijari/tiny_tf.git

Defaulting to user installation because normal site-packages is not writeable
Collecting git+https://github.com/safijari/tiny_tf.git
  Cloning https://github.com/safijari/tiny_tf.git to /tmp/pip-req-build-yq8q8atk
  Running command git clone --filter=blob:none --quiet https://github.com/safijari/tiny_tf.git /tmp/pip-req-build-yq8q8atk
  Resolved https://github.com/safijari/tiny_tf.git to commit 6d28b69dfde971f5ed63ff24de5c84b950690997
  Preparing metadata (setup.py) ... [?25ldone


In [2]:
# FUNMAP Imports
import cv2
import time
import math
import gzip
import yaml
import pathlib
import ros_numpy
import numpy as np
from copy import deepcopy
from numba import jit, njit

# Robot imports
import threading
from urdfpy import URDF
import pyrealsense2 as rs
import stretch_body.hello_utils as hu
from tiny_tf.tf import TFTree, Transform
from stretch_body.robot import Robot

# ROS Message imports
from sensor_msgs.msg import PointCloud2

## Emulating ROS

We'll set up some infrastructure to emulate some of the ROS components that FUNMAP relies on. Namely, we emulate the TF2 library's `lookup_transform()` method and reading point clouds from the Realsense depth camera.

In [3]:
# Setup the Python API to Stretch
robot = Robot()
if not robot.startup():
    print("Failed to open connection to the robot")

# Ensure robot is homed
if not robot.is_calibrated():
    robot.home()

robot.stow()

--------- Homing Head ----
--------- Homing Lift ----
Homing Lift...


[INFO] [robot_monitor]: Guarded contact lift


Hardstop detected at motor position (rad) 98.10670471191406
Marking Lift position to 1.097231 (m)
Marking Lift position to 0.000000 (m)
Lift homing successful
--------- Homing Arm ----
Homing Arm...
Hardstop detected at motor position (rad) 1.6194909811019897
Marking Arm position to 0.000000 (m)


[INFO] [robot_monitor]: Guarded contact arm
[INFO] [robot_monitor]: Wrist single tap: 15
[INFO] [robot_monitor]: Wrist single tap: 16


Arm homing successful
Moving to first hardstop...
First hardstop contact at position (ticks): -25
-----
Homing offset was 1488
Marking current position to zero ticks
Homing offset is now  1510 (ticks)
-----
Current position (ticks): 40
Moving to calibrated zero: (rad)
Moving to first hardstop...
First hardstop contact at position (ticks): -26
-----
Homing offset was 2249
Marking current position to zero ticks
Homing offset is now  2269 (ticks)
-----
Current position (ticks): 62
Moving to calibrated zero: (rad)


[INFO] [robot_monitor]: Base bump event


--------- Stowing Arm ----


[INFO] [robot_monitor]: Base bump event
[INFO] [robot_monitor]: Wrist single tap: 17


--------- Stowing ToolStretchDexWrist ----
--------- Stowing Lift ----


In [4]:
# Load Stretch's URDF
urdf_path = str((pathlib.Path(hu.get_fleet_directory()) / 'exported_urdf' / 'stretch.urdf').absolute())
urdf = URDF.load(urdf_path)

  value = value / np.linalg.norm(value)


We'll write a method `get_current_configuration()` that returns a dictionary of the robot's current configuration. We will be able to pass this configuration into Stretch's URDF to get transforms from every link to "base_link".

In [5]:
def get_current_configuration(tool):
    def bound_range(name, value):
        return min(max(value, urdf.joint_map[name].limit.lower), urdf.joint_map[name].limit.upper)

    if tool == 'tool_stretch_gripper':
        q_lift = bound_range('joint_lift', robot.lift.status['pos'])
        q_arml = bound_range('joint_arm_l0', robot.arm.status['pos'] / 4.0)
        q_yaw = bound_range('joint_wrist_yaw', robot.end_of_arm.status['wrist_yaw']['pos'])
        q_pan = bound_range('joint_head_pan', robot.head.status['head_pan']['pos'])
        q_tilt = bound_range('joint_head_tilt', robot.head.status['head_tilt']['pos'])
        return {
            'joint_left_wheel': 0.0,
            'joint_right_wheel': 0.0,
            'joint_lift': q_lift,
            'joint_arm_l0': q_arml,
            'joint_arm_l1': q_arml,
            'joint_arm_l2': q_arml,
            'joint_arm_l3': q_arml,
            'joint_wrist_yaw': q_yaw,
            'joint_gripper_finger_left': 0.0,
            'joint_gripper_finger_right': 0.0,
            'joint_head_pan': q_pan,
            'joint_head_tilt': q_tilt
        }
    elif tool == 'tool_stretch_dex_wrist':
        q_lift = bound_range('joint_lift', robot.lift.status['pos'])
        q_arml = bound_range('joint_arm_l0', robot.arm.status['pos'] / 4.0)
        q_yaw = bound_range('joint_wrist_yaw', robot.end_of_arm.status['wrist_yaw']['pos'])
        q_pitch = bound_range('joint_wrist_pitch', robot.end_of_arm.status['wrist_pitch']['pos'])
        q_roll = bound_range('joint_wrist_roll', robot.end_of_arm.status['wrist_roll']['pos'])
        q_pan = bound_range('joint_head_pan', robot.head.status['head_pan']['pos'])
        q_tilt = bound_range('joint_head_tilt', robot.head.status['head_tilt']['pos'])
        return {
            'joint_left_wheel': 0.0,
            'joint_right_wheel': 0.0,
            'joint_lift': q_lift,
            'joint_arm_l0': q_arml,
            'joint_arm_l1': q_arml,
            'joint_arm_l2': q_arml,
            'joint_arm_l3': q_arml,
            'joint_wrist_yaw': q_yaw,
            'joint_wrist_pitch': q_pitch,
            'joint_wrist_roll': q_roll,
            'joint_gripper_finger_left': 0.0,
            'joint_gripper_finger_right': 0.0,
            'joint_head_pan': q_pan,
            'joint_head_tilt': q_tilt
        }

Next, we'll start the Realsense pipeline.

In [6]:
fps_color = 30 # FPS for color-only videos and for color+depth videos
resolution_color = [1280, 720]   # [1920, 1080], [1280, 720], [640, 480]
resolution_depth = [1280, 720]   # [1280, 720], [640, 480]

pc = rs.pointcloud()
align = rs.align(rs.stream.depth)
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, resolution_depth[0], resolution_depth[1], rs.format.z16, fps_color)
config.enable_stream(rs.stream.color, resolution_color[0], resolution_color[1], rs.format.bgr8, fps_color)
pipeline.start(config)

<pyrealsense2.pyrealsense2.pipeline_profile at 0x7ff540ff5630>

FUNMAP relies on a ROS node that can execute position commands for the robot. We'll create a fake ROS node for it now.

In [7]:
class FakeNode:
    
    def __init__(self):
#         self.tf2_buffer = None
        self.joint_state = None
#         self.point_cloud = None
        self.thread = threading.Thread(target=self._spin)
        self.thread.setDaemon(True)
        self.thread.start()

    def _spin(self):
        while True:
            # update self.tf2_buffer
            tree = TFTree()
            q_curr = get_current_configuration(tool=robot.end_of_arm.name)
            fk_curr = urdf.link_fk(cfg=q_curr)
            for l in urdf.links:
                if l.name == "base_link":
                    continue
                tree.add_transform('base_link', l.name, Transform.from_matrix(fk_curr[l]))
            tree.add_transform('map', 'base_link', Transform())
            self.tf2_buffer = tree

            # update self.point_cloud
            # 1. Get the latest frames from the camera
            frames = pipeline.wait_for_frames()
            aligned = align.process(frames)
            depth_frame = frames.get_depth_frame()
            color_aligned_to_depth_frame = aligned.first(rs.stream.color)

            # 2. Create point cloud from depth image
            points = pc.calculate(depth_frame)
            pc.map_to(color_aligned_to_depth_frame)

            # 3. Create xyz matrix
            v = points.get_vertices()
            xyz = np.asanyarray(v).view(np.float32).reshape(-1, 3)  # xyz

            # 4. Create rgb matrix
            color_image = np.asanyarray(color_aligned_to_depth_frame.get_data())
            t = points.get_texture_coordinates()
            texcoords = np.asanyarray(t).view(np.float32).reshape(-1, 2)  # uv
            cw, ch = color_image.shape[:2][::-1]
            v, u = (texcoords * (cw, ch) + 0.5).astype(np.uint32).T
            np.clip(u, 0, ch-1, out=u)
            np.clip(v, 0, cw-1, out=v)
            rgb = color_image[u, v] # rgb

            # 5. Create point cloud matrix
            points_arr = np.zeros((points.size(),), dtype=[
                ('x', np.float32),
                ('y', np.float32),
                ('z', np.float32),
                ('r', np.uint8),
                ('g', np.uint8),
                ('b', np.uint8)])
            points_arr['x'] = xyz[:, 0]
            points_arr['y'] = xyz[:, 1]
            points_arr['z'] = xyz[:, 2]
            points_arr['r'] = rgb[:, 2]
            points_arr['g'] = rgb[:, 1]
            points_arr['b'] = rgb[:, 0]
            points_arr = ros_numpy.point_cloud2.merge_rgb_fields(points_arr)

            # 6. Create PointCloud2 message
            self.point_cloud = ros_numpy.msgify(PointCloud2, points_arr, stamp=time.time(), frame_id='camera_color_optical_frame')

    def move_to_pose(self, pose, return_before_done=False, custom_contact_thresholds=False):
        if custom_contact_thresholds:
            raise Exception('FakeRobot.move_to_pose: helpppp! dont support contact thresholds yet')
        print(f'Moving to {pose}')
        if 'joint_lift' in pose:
            robot.lift.move_to(pose['joint_lift'])
        if 'wrist_extension' in pose:
            robot.arm.move_to(pose['wrist_extension'])
        robot.push_command()
        if 'joint_wrist_yaw' in pose:
            robot.end_of_arm.move_to('wrist_yaw', pose['joint_wrist_yaw'])
        if 'joint_wrist_pitch' in pose:
            robot.end_of_arm.move_to('wrist_pitch', pose['joint_wrist_pitch'])
        if 'joint_wrist_roll' in pose:
            robot.end_of_arm.move_to('wrist_roll', pose['joint_wrist_roll'])
        if 'joint_head_pan' in pose:
            robot.head.move_to('head_pan', pose['joint_head_pan'])
        if 'joint_head_tilt' in pose:
            robot.head.move_to('head_tilt', pose['joint_head_tilt'])
        if 'joint_gripper_finger_left' in pose or 'joint_gripper_finger_right' in pose:
            print('FakeRobot.move_to_pose: gripper not converted')
        if not return_before_done:
            print('FakeRobot.move_to_pose: sleeping 1 second')
            time.sleep(1)

Last up, we'll emulate some ROS messages and the rospy library.

In [8]:
class HeaderMsg:
    def __init__(self):
        self.seq = 0
        self.stamp = time.time()
        self.frame_id = ''

class PointCloud2Msg:
    def __init__(self):
        self.header = HeaderMsg()
        self.height = 1080
        self.width = 1280
        self.fields = None
        self.is_bigendian = None
        self.point_step = None
        self.row_step = None
        self.data = [0]
        self.is_dense = None

class Vector3Msg:
    def __init__(self):
        self.x = None
        self.y = None
        self.z = None

class QuaternionMsg:
    def __init__(self):
        self.x = None
        self.y = None
        self.z = None
        self.w = None

class TransformMsg:
    def __init__(self):
        self.translation = Vector3Msg()
        self.rotation = QuaternionMsg()

class TransformStampedMsg:
    def __init__(self):
        self.header = HeaderMsg()
        self.child_frame_id = 'fake_link'
        self.transform = TransformMsg()

class TF2Manager:
    def __init__(self):
        pass
    def lookup_transform(self, from_frame_id, to_frame_id, lookup_time, timeout):
        return TransformStampedMsg()

class Time:
    def __init__(self):
        pass
    def now(self):
        return time.time()

class RospyManager:
    def __init__(self):
        self.Time = Time()

    def sleep(self, x):
        #print(f'via rospy.sleep: sleeping {x} seconds')
        time.sleep(x)

    def loginfo(self, msg):
        print('via rospy.loginfo: ' + msg)

rospy = RospyManager()

## FUNMAP Code

Lots of code.

In [9]:
def draw_robot_mast_blind_spot_wedge(x_pix, y_pix, ang_rad, m_per_pix, image, verbose=False, value=255):
    # The x axis points to the front of the robot.
    # The y axis points to the left of the robot.
    
    # 0.18 m measured diagonal from origin (center of laser range
    # finder) to the edge of the robot that avoids the carriage corner
    #
    # 0.165 m measured across from origin (center of laser range finder) to robot side
    #
    # In [6]: math.acos(0.165/0.18)
    # Out[6]: 0.4111378623223475
    # ~23.5 degrees
    # 1.980795 = 0.41 + 1.570795
    #start_angle = 1.98 + ang_rad

    # larger angular range to account for camera looking down and
    # being closer to the center of the robot
    # 0.54 rad is about 30 deg
    # 1.98 - 0.54 = 1.44
    start_angle = 1.44 + ang_rad
    

    # 0.23 m measured diagonal from origin (center of laser range
    # finder) to edge of the robot.
    #
    # In [5]: math.acos(0.165/0.23)
    # Out[5]: 0.7707457827651633
    # ~44.1 degrees
    # 2.341795 = 0.771 + 1.570795
    end_angle = 2.35 + ang_rad

    # 2.35 - 1.98 = 0.37 rad wedge
    # ~21.2 degree wedge
    # ~6% of a circle

    # Find the maximum possible distance represented in the image to
    # ensure that the blind spot goes to the edge of the image.
    h, w = image.shape[:2]
    max_dist = np.sqrt(np.square(h) + np.square(w))

    x_1 = x_pix
    y_1 = y_pix

    x_2 = (max_dist * np.cos(start_angle)) + x_pix
    y_2 = (-max_dist * np.sin(start_angle)) + y_pix

    x_3 = (max_dist * np.cos(end_angle)) + x_pix
    y_3 = (-max_dist * np.sin(end_angle)) + y_pix
    
    corners = np.array([[x_1, y_1], [x_2, y_2], [x_3, y_3]])
    if verbose: 
        print('corners =', corners)

    poly_points = np.array(corners)
    poly_points = np.round(poly_points).astype(np.int32)
    cv2.fillConvexPoly(image, poly_points, value)

def draw_robot_footprint_rectangle(x_pix, y_pix, ang_rad, m_per_pix, image, verbose=False, value=255):
    # One issue to consider is what to do when the mobile base is
    # underneath a surface, such as a table. In these situations,
    # calling this function will erase part of the surface and replace
    # it with floor. This can be problematic, since some robot actions
    # such as stowing or lifting the arm can collide with the surface
    # when mistaking it for floor.
    
    # Robot measurements for rectangular approximation. These should
    # be updated if the robot's footprint changes.
    safety_border_m = 0.02

    # stretch's arm can extend from the side, which adds width to footprint.
    # 0.335 m for the mobile base
    # 0.04 m for the wrist extending over the edge
    # total = 0.335 + (2 * 0.04) due to enforcing symmetry around point of rotation
    robot_width_m = 0.415

    # larger, more conservative model when tethered
    # 0.32 m for the length of the mobile base
    # 0.01 m for the arm E-chain cartridge extending off the back
    # 0.2 m to ignore cables when tethered
    # 0.52 = 0.32 + 0.2
    #robot_length_m = 0.52
    
    # smaller, more optimistic model when untethered
    # 0.32 m for the length of the mobile base
    # 0.01 m for the arm E-chain cartridge extending off the back
    # 0.2 m to ignore cables when tethered
    # 0.33 = 0.32 + 0.01
    robot_length_m = 0.33
    
    origin_distance_from_front_pix_m = 0.05
    
    robot_width_pix = (robot_width_m + (2.0 * safety_border_m))/m_per_pix
    robot_length_pix = (robot_length_m + (2.0 * safety_border_m))/m_per_pix
    origin_distance_from_front_pix = (origin_distance_from_front_pix_m + safety_border_m)/m_per_pix

    # vector to left side of robot
    ls_ang_rad = ang_rad + (np.pi/2.0)
    ls_x = (robot_width_pix/2.0) * np.cos(ls_ang_rad)
    ls_y = -(robot_width_pix/2.0) * np.sin(ls_ang_rad)

    # vector to front of robot
    f_x = origin_distance_from_front_pix * np.cos(ang_rad)
    f_y = -origin_distance_from_front_pix * np.sin(ang_rad)

    # vector to back of robot
    dist_to_back = robot_length_pix - origin_distance_from_front_pix
    b_x = -dist_to_back * np.cos(ang_rad)
    b_y = dist_to_back * np.sin(ang_rad)

    # front left corner of the robot
    fl_x = x_pix + f_x + ls_x
    fl_y = y_pix + f_y + ls_y

    # front right corner of the robot
    fr_x = (x_pix + f_x) - ls_x
    fr_y = (y_pix + f_y) - ls_y
    
    # back left corner of the robot
    bl_x = x_pix + b_x + ls_x
    bl_y = y_pix + b_y + ls_y

    # back right corner of the robot
    br_x = (x_pix + b_x) - ls_x
    br_y = (y_pix + b_y) - ls_y

    corners = np.array([[fl_x, fl_y], [fr_x, fr_y], [br_x, br_y], [bl_x, bl_y]])
    if verbose: 
        print('corners =', corners)
        
    poly_points = np.array(corners)
    poly_points = np.round(poly_points).astype(np.int32)
    
    if image is not None:
        cv2.fillConvexPoly(image, poly_points, value)

In [10]:
def get_p1_to_p2_matrix(p1_frame_id, p2_frame_id, tf2_buffer):
    # If the necessary TF2 transform is successfully looked up, this
    # returns a 4x4 affine transformation matrix that transforms
    # points in the p1_frame_id frame to points in the p2_frame_id.
    try:
        stamped_transform =  tf2_buffer.lookup_transform(p2_frame_id, p1_frame_id)
        p1_to_p2_mat = stamped_transform.matrix
        return p1_to_p2_mat, time.time()
    except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException) as e:
        print('WARNING: get_p1_to_p2_matrix failed to lookup transform from p1_frame_id =', p1_frame_id, ' to p2_frame_id =', p2_frame_id)
        print('         exception =', e)
        return None, None

In [11]:
@njit(fastmath=True)
def numba_max_height_and_rgb_and_camera_depth_images_int(points_to_image_mat, rgb_points,
                                                         height_image, rgb_image, camera_depth_image,
                                                         m_per_pix, m_per_height_unit,
                                                         voi_x_m, voi_y_m, voi_z_m, bounds):
    # Update the max height image to represent the provided 3D
    # points. This function is for images with integer pixels.
    im_height, im_width = height_image.shape

    # Unpack the affine transformation matrix that transforms points
    # into the image's coordinate system.
    r00, r01, r02, t0 = points_to_image_mat[0]
    r10, r11, r12, t1 = points_to_image_mat[1]
    r20, r21, r22, t2 = points_to_image_mat[2]

    # Assume that the camera is located at 0.0, 0.0, 0.0 in the point
    # cloud coordinate system. Consequently, the center of the camera
    # in the height image can be found by transforming it using
    # points_to_image_mat, which results in the following x and y
    # coordinates.
    camera_x = t0
    camera_y = t1
    
    min_x, max_x, min_y, max_y, min_z, max_z = bounds 
    
    n_points = rgb_points.shape[0]    
    for i in range(n_points):
        p = rgb_points[i]
        x_p = p['x']
        y_p = p['y']
        z_p = p['z']
        
        x = (r00 * x_p) + (r01 * y_p) + (r02 * z_p) + t0
        y = (r10 * x_p) + (r11 * y_p) + (r12 * z_p) + t1
        z = (r20 * x_p) + (r21 * y_p) + (r22 * z_p) + t2

        # Ensures that points are strictly within the volume of
        # interest (VOI) by comparing float representations. Consider
        # removing these checks and replacing them with integer
        # comparisons to ensure valid image indices and values. This
        # would require changing the definition of a MaxHeightImage or
        # doing something clever to handle the exclusion of points on
        # the borders of the VOI.
        if (x > min_x) and (x < max_x) and (y > min_y) and (y < max_y) and (z > min_z) and (z < max_z): 
            x_index = int(round( x / m_per_pix ))
            y_index = - int(round( y / m_per_pix ))
            # A value of 0 represents no observations, so add 1.
            z_val = 1 + int(round( z / m_per_height_unit ))
            current_z_val = height_image[y_index, x_index]
            if z_val > current_z_val:
                # The height value is the maximum encountered at this
                # pixel, so update the pixel in all the images.
                height_image[y_index, x_index] = z_val
                
                # 4 cm per unit results in 10.16 meter max = 254*0.04.
                # The D435i is listed as having an approximately 10
                # meter maximum range.
                x_dist = x - camera_x
                y_dist = y - camera_y
                floor_distance = math.sqrt((x_dist*x_dist) + (y_dist*y_dist))
                camera_depth = 1 + int(round(floor_distance/0.04))
                if camera_depth > 255:
                    camera_depth = 255
                if camera_depth < 0:
                    camera_depth = 0
                camera_depth_image[y_index, x_index] = camera_depth

                r = p['r']
                g = p['g']
                b = p['b']
                rgb_sum = r + g + b
                # If [r, g, b] = [0, 0, 0], color information for the
                # point is assumed not to exist and the color of the
                # point is ignored by not updating the corresponding
                # pixel of the RGB image.

                # Example of use: For the Intel ROS Wrapper for the
                # D435i camera, point clouds produced with full depth
                # image mode (full sized frustum) include points
                # without color information (i.e., texture) due to the
                # narrower field of view of the D435i's RGB camera. In
                # these cases the points have [r,g,b] = [0,0,0].
                if rgb_sum != 0:
                    rgb_image[y_index, x_index] = [p['b'], p['g'], p['r']]
                
             
@njit(fastmath=True)
def numba_max_height_image_float(points_to_image_mat, points,
                                 image, m_per_pix, m_per_height_unit,
                                 voi_x_m, voi_y_m, voi_z_m, bounds):
    # Update the max height image to represent the provided 3D
    # points. This function is for images with integer pixels.
    im_height, im_width = image.shape

    # Unpack the affine transformation matrix that transforms points
    # into the image's coordinate system.
    r00, r01, r02, t0 = points_to_image_mat[0]
    r10, r11, r12, t1 = points_to_image_mat[1]
    r20, r21, r22, t2 = points_to_image_mat[2]

    min_x, max_x, min_y, max_y, min_z, max_z = bounds 
    
    n_points = points.shape[0]    
    for p in range(n_points):
        x_p = points[p,0]
        y_p = points[p,1]
        z_p = points[p,2]

        x = (r00 * x_p) + (r01 * y_p) + (r02 * z_p) + t0
        y = (r10 * x_p) + (r11 * y_p) + (r12 * z_p) + t1
        z = (r20 * x_p) + (r21 * y_p) + (r22 * z_p) + t2

        # Ensures that points are strictly within the volume of
        # interest (VOI) by comparing float representations. Consider
        # removing these checks and replacing them with integer
        # comparisons to ensure valid image indices and values. This
        # would require changing the definition of a MaxHeightImage or
        # doing something clever to handle the exclusion of points on
        # the borders of the VOI.
        if (x > min_x) and (x < max_x) and (y > min_y) and (y < max_y) and (z > min_z) and (z < max_z): 
            x_index = int(round( x / m_per_pix ))
            y_index = - int(round( y / m_per_pix ))
            current_z = image[y_index, x_index]
            if z > current_z:
                image[y_index, x_index] = z

def numba_max_height_image_int_check(points_to_image_mat,
                                     image, m_per_pix, m_per_height_unit,
                                     voi_x_m, voi_y_m, voi_z_m, bounds, verbose=False):
    # Use this to check bounds and other properties prior to running
    # the real numba version, which does not perform safety checking.
    
    # Update the max height image to represent the provided 3D
    # points. This function is for images with integer pixels.
    im_height, im_width = image.shape
    dtype = image.dtype

    # This function is only for integer pixel types.
    assert(np.issubdtype(dtype, np.integer))

    max_z_val = np.iinfo(dtype).max

    min_x, max_x, min_y, max_y, min_z, max_z = bounds
    
    # Check that the image indices and image values will all be within bounds.
    min_x_index = int(round( min_x / m_per_pix ))
    max_x_index = int(round( max_x / m_per_pix ))
    min_y_index = - int(round( max_y / m_per_pix ))
    max_y_index = - int(round( min_y / m_per_pix ))
    min_z_index = 1 + int(round( min_z / m_per_height_unit ))
    max_z_index = 1 + int(round( max_z / m_per_height_unit ))
    
    if verbose: 
        print('image.shape =', image.shape)
        print('np.iinfo(image.dtype).max =', np.iinfo(image.dtype).max)
        print('min_x_index =', min_x_index)
        print('max_x_index =', max_x_index)
        print('min_y_index =', min_y_index)
        print('max_y_index =', max_y_index)
        print('min_z_index =', min_z_index)
        print('max_z_index =', max_z_index)
    
    assert(min_x_index == 0)
    assert(max_x_index == (im_width - 1))
    assert(min_y_index == 0)
    assert(max_y_index == (im_height - 1))
    assert(min_z_index == 1)
    assert(max_z_index <= max_z_val)


def numba_max_height_image_float_check(points_to_image_mat,
                                       image, m_per_pix, m_per_height_unit,
                                       voi_x_m, voi_y_m, voi_z_m, bounds, verbose=False):
    # Use this to check bounds and other properties prior to running
    # the real numba version, which does not perform safety checking.
    
    # Update the max height image to represent the provided 3D
    # points. This function is for images with integer pixels.
    im_height, im_width = image.shape
    dtype = image.dtype

    # This function is only for integer pixel types.
    assert(np.issubdtype(dtype, np.floating))

    min_x, max_x, min_y, max_y, min_z, max_z = bounds
    
    # Check that the image indices and image values will all be within bounds.
    min_x_index = int(round( min_x / m_per_pix ))
    max_x_index = int(round( max_x / m_per_pix ))
    min_y_index = - int(round( max_y / m_per_pix ))
    max_y_index = - int(round( min_y / m_per_pix ))
    
    if verbose: 
        print('image.shape =', image.shape)
        print('np.iinfo(image.dtype).max =', np.iinfo(image.dtype).max)
        print('min_x_index =', min_x_index)
        print('max_x_index =', max_x_index)
        print('min_y_index =', min_y_index)
        print('max_y_index =', max_y_index)
        print('min_z =', min_z)
    
    assert(min_x_index == 0)
    assert(max_x_index == (im_width - 1))
    assert(min_y_index == 0)
    assert(max_y_index == (im_height - 1))
    assert(min_z > 0.0)

             
@njit(fastmath=True)
def numba_max_height_image_int(points_to_image_mat, points,
                               image, m_per_pix, m_per_height_unit,
                               voi_x_m, voi_y_m, voi_z_m, bounds):
    # Update the max height image to represent the provided 3D
    # points. This function is for images with integer pixels.
    im_height, im_width = image.shape

    # Unpack the affine transformation matrix that transforms points
    # into the image's coordinate system.
    r00, r01, r02, t0 = points_to_image_mat[0]
    r10, r11, r12, t1 = points_to_image_mat[1]
    r20, r21, r22, t2 = points_to_image_mat[2]

    min_x, max_x, min_y, max_y, min_z, max_z = bounds 
    
    n_points = points.shape[0]    
    for p in range(n_points):
        x_p = points[p,0]
        y_p = points[p,1]
        z_p = points[p,2]

        x = (r00 * x_p) + (r01 * y_p) + (r02 * z_p) + t0
        y = (r10 * x_p) + (r11 * y_p) + (r12 * z_p) + t1
        z = (r20 * x_p) + (r21 * y_p) + (r22 * z_p) + t2

        # Ensures that points are strictly within the volume of
        # interest (VOI) by comparing float representations. Consider
        # removing these checks and replacing them with integer
        # comparisons to ensure valid image indices and values. This
        # would require changing the definition of a MaxHeightImage or
        # doing something clever to handle the exclusion of points on
        # the borders of the VOI.
        if (x > min_x) and (x < max_x) and (y > min_y) and (y < max_y) and (z > min_z) and (z < max_z): 
            x_index = int(round( x / m_per_pix ))
            y_index = - int(round( y / m_per_pix ))
            # A value of 0 represents no observations, so add 1.
            z_val = 1 + int(round( z / m_per_height_unit ))
            current_z_val = image[y_index, x_index]
            if z_val > current_z_val:
                image[y_index, x_index] = z_val

                
@njit(fastmath=True)
def numba_max_height_image_int_2(points_to_image_mat, points,
                                 image, m_per_pix, m_per_height_unit,
                                 voi_x_m, voi_y_m, voi_z_m, max_quantized_height):
    # This version expects the transformation matrix
    # (points_to_image_mat) to handle the scaling and depth offset
    # required for conversion to the max height image. In other words,
    # it handles all the linear operations, so only nonlinear
    # operations like rounding and casting remain.
    
    # Update the max height image to represent the provided 3D
    # points. This function is for images with integer pixels.
    im_height, im_width = image.shape

    # Unpack the affine transformation matrix that transforms points
    # into the image.
    r00, r01, r02, t0 = points_to_image_mat[0]
    r10, r11, r12, t1 = points_to_image_mat[1]
    r20, r21, r22, t2 = points_to_image_mat[2]
    
    n_points = points.shape[0]    
    for p in range(n_points):
        x_p = points[p,0]
        y_p = points[p,1]
        z_p = points[p,2]

        x = (r00 * x_p) + (r01 * y_p) + (r02 * z_p) + t0
        x_index = int(round(x))
        if (x_index >= 0) and (x_index < im_width):
            y = (r10 * x_p) + (r11 * y_p) + (r12 * z_p) + t1
            y_index = int(round(y))
            if (y_index >= 0) and (y_index < im_height):
                height = (r20 * x_p) + (r21 * y_p) + (r22 * z_p) + t2
                height_quantized = int(round(height))
                if (height_quantized > 0) and (height_quantized <= max_quantized_height):
                    current_height_quantized = image[y_index, x_index]
                    if height_quantized > current_height_quantized:
                        image[y_index, x_index] = height_quantized

def calculate_voi_bounds( m_per_pix, m_per_height_unit, voi_x_m, voi_y_m, voi_z_m):
    # Calculate the borders of the volume of interest (VOI) in order
    # to only consider 3D points that are strictly within the VOI,
    # excluding the VOI's borders. This uses a 1000th of a millimeter
    # safety margin to exclude points almost on the VOI's border. This
    # ensures that the bounds on the image indices and the pixel type
    # are not violated.
    half_pix = m_per_pix / 2.0
    mm = 0.001
    safety_margin = mm/1000.0
    min_x = (- half_pix) + safety_margin
    max_x = (voi_x_m - half_pix) - safety_margin
    min_y = (half_pix - voi_y_m) + safety_margin
    max_y = half_pix - safety_margin
    min_z = 0.0 + safety_margin
    max_z = voi_z_m - safety_margin
    
    return np.array([min_x, max_x, min_y, max_y, min_z, max_z])

def numba_max_height_image_to_points(points_in_image_to_frame_mat, image, points, m_per_pix, m_per_height_unit):
    dtype = image.dtype
    
    if np.issubdtype(dtype, np.integer):
        return numba_max_height_image_to_points_int(points_in_image_to_frame_mat, image, points, m_per_pix, m_per_height_unit)
    elif np.issubdtype(dtype, np.floating):
        return numba_max_height_image_to_points_float(points_in_image_to_frame_mat, image, points, m_per_pix, m_per_height_unit)


@njit(fastmath=True)
def numba_max_height_image_to_points_int(points_in_image_to_frame_mat, image, points, m_per_pix, m_per_height_unit):

    # Update the max height image to represent the provided 3D
    # points. This function is for images with integer pixels.
    im_height, im_width = image.shape

    # Unpack the affine transformation matrix that transforms points
    # from the image's coordinate system to a target frame.
    r00, r01, r02, t0 = points_in_image_to_frame_mat[0]
    r10, r11, r12, t1 = points_in_image_to_frame_mat[1]
    r20, r21, r22, t2 = points_in_image_to_frame_mat[2]

    x_array = points['x']
    y_array = points['y']
    z_array = points['z']
    
    i = 0
    for y_i in range(im_height):
        for x_i in range(im_width):
            val = image[y_i, x_i]
            # check if observed
            if val != 0:
                # observed, so create a point
                z_i = val - 1

                x_m = x_i * m_per_pix
                y_m = y_i * -m_per_pix
                z_m = z_i * m_per_height_unit
                
                x_f = (r00 * x_m) + (r01 * y_m) + (r02 * z_m) + t0
                y_f = (r10 * x_m) + (r11 * y_m) + (r12 * z_m) + t1
                z_f = (r20 * x_m) + (r21 * y_m) + (r22 * z_m) + t2
                
                x_array[i] = x_f
                y_array[i] = y_f
                z_array[i] = z_f
                i += 1
    return i



@njit(fastmath=True)
def numba_max_height_image_to_points_float(points_in_image_to_frame_mat, image, points, m_per_pix, m_per_height_unit):

    # Update the max height image to represent the provided 3D
    # points. This function is for images with integer pixels.
    im_height, im_width = image.shape

    # Unpack the affine transformation matrix that transforms points
    # from the image's coordinate system to a target frame.
    r00, r01, r02, t0 = points_in_image_to_frame_mat[0]
    r10, r11, r12, t1 = points_in_image_to_frame_mat[1]
    r20, r21, r22, t2 = points_in_image_to_frame_mat[2]

    x_array = points['x']
    y_array = points['y']
    z_array = points['z']
    
    i = 0
    for y_i in range(im_height):
        for x_i in range(im_width):
            z = image[y_i, x_i]
            # check if observed
            if z > 0.0:
                x_m = x_i * m_per_pix
                y_m = y_i * -m_per_pix
                
                x_f = (r00 * x_m) + (r01 * y_m) + (r02 * z) + t0
                y_f = (r10 * x_m) + (r11 * y_m) + (r12 * z) + t1
                z_f = (r20 * x_m) + (r21 * y_m) + (r22 * z) + t2
                
                x_array[i] = x_f
                y_array[i] = y_f
                z_array[i] = z_f
                i += 1
    return i

def numba_max_height_image(points_to_image_mat, points,
                           image, m_per_pix, m_per_height_unit,
                           voi_x_m, voi_y_m, voi_z_m, verbose=False):

    dtype = image.dtype
    
    if np.issubdtype(dtype, np.integer):
        bounds = calculate_voi_bounds( m_per_pix, m_per_height_unit, voi_x_m, voi_y_m, voi_z_m )
        numba_max_height_image_int_check(points_to_image_mat, 
                                         image, m_per_pix, m_per_height_unit,
                                         voi_x_m, voi_y_m, voi_z_m, bounds, verbose)
        numba_max_height_image_int(points_to_image_mat, points,
                               image, m_per_pix, m_per_height_unit,
                                   voi_x_m, voi_y_m, voi_z_m, bounds)
    elif np.issubdtype(dtype, np.floating):
        bounds = calculate_voi_bounds( m_per_pix, m_per_height_unit, voi_x_m, voi_y_m, voi_z_m )
        numba_max_height_image_float_check(points_to_image_mat,
                                           image, m_per_pix, m_per_height_unit,
                                           voi_x_m, voi_y_m, voi_z_m, bounds, verbose)
        numba_max_height_image_float(points_to_image_mat, points,
                                   image, m_per_pix, m_per_height_unit,
                                   voi_x_m, voi_y_m, voi_z_m, bounds)

def numba_max_height_and_rgb_images(points_to_image_mat, rgb_points,
                                    height_image, rgb_image,
                                    m_per_pix, m_per_height_unit,
                                    voi_x_m, voi_y_m, voi_z_m,
                                    verbose=False):

    dtype = height_image.dtype
    
    if np.issubdtype(dtype, np.integer):
        bounds = calculate_voi_bounds( m_per_pix, m_per_height_unit, voi_x_m, voi_y_m, voi_z_m )
        numba_max_height_image_int_check(points_to_image_mat,
                                         height_image, m_per_pix,
                                         m_per_height_unit, voi_x_m,
                                         voi_y_m, voi_z_m, bounds,
                                         verbose)
        assert(height_image.shape[:2] == rgb_image.shape[:2])
        numba_max_height_and_rgb_images_int(points_to_image_mat, rgb_points,
                                            height_image, rgb_image, m_per_pix,
                                            m_per_height_unit, voi_x_m,
                                            voi_y_m, voi_z_m, bounds)
    elif np.issubdtype(dtype, np.floating):
        print('ERROR: numba_max_height_and_rgb_images currently does not support float max images.')
        assert(False)

        
def numba_max_height_and_rgb_and_camera_depth_images(points_to_image_mat, rgb_points,
                                                     height_image, rgb_image, camera_depth_image,
                                                     m_per_pix, m_per_height_unit,
                                                     voi_x_m, voi_y_m, voi_z_m,
                                                     verbose=False):

    dtype = height_image.dtype
    
    if np.issubdtype(dtype, np.integer):
        bounds = calculate_voi_bounds( m_per_pix, m_per_height_unit, voi_x_m, voi_y_m, voi_z_m )
        numba_max_height_image_int_check(points_to_image_mat,
                                         height_image, m_per_pix,
                                         m_per_height_unit, voi_x_m,
                                         voi_y_m, voi_z_m, bounds,
                                         verbose)
        assert(height_image.shape[:2] == rgb_image.shape[:2])
        numba_max_height_and_rgb_and_camera_depth_images_int(points_to_image_mat, rgb_points,
                                                             height_image, rgb_image, camera_depth_image,
                                                             m_per_pix, m_per_height_unit,
                                                             voi_x_m, voi_y_m, voi_z_m, bounds)
    elif np.issubdtype(dtype, np.floating):
        print('ERROR: numba_max_height_and_rgb_images currently does not support float max images.')
        assert(False)

In [12]:
class MaxHeightImage:

    def __init__(self, volume_of_interest, m_per_pix, pixel_dtype, m_per_height_unit=None, use_camera_depth_image=False, image=None, rgb_image=None, camera_depth_image=None):
        # MaxHeightImage creates a 2D image that represents 3D points
        # strictly within a volume of interest (VOI), so excluding
        # points on the border of the VOI. The image can be thought of
        # as sitting at the bottom of the VOI at z=0. Each of the
        # image's pixels represents the point that is highest above
        # the pixel (maximum z value). In other words, the image
        # represents the top surface of the VOI. It is a discretized
        # approximation to a surface function f(x,y) -> z where z =
        # Max(Points(x,y).z). If the pixel has a value of 0, then no
        # points are within the volume above the pixel.
        #
        # volume_of_interest : volume of interest (VOI) that will be
        # represented by the max height image (MHI). The MHI will
        # strictly represent the interior of the VOI and ignore a
        # point sitting exactly on the border of the VOI. The MHI can
        # be thought of as sitting at the bottom of the VOI at z=0.
        #
        # m_per_pix : meters per pixel that defines the spatial
        # resolution of the max height image. Each pixel represents a
        # column with a square cross section within the volume of
        # interest (VOI). m_per_pix is the length of the edges of this
        # square cross section.
        #
        # m_per_height_unit : meters per unit of height is an optional
        # parameter that sets the discretization of the 3D point
        # height. The pixel_type results in a lower bound for
        # m_per_height_unit that depends on the height of the volume
        # of interest. This is due to the pixel type limiting the
        # number of distinct height values a pixel can represent
        # (e.g., 254 with np.uint8). If m_per_height_unit is not
        # specified, the max height image attempts to use square
        # voxels for the discretization by setting m_per_height_unit =
        # m_per_pix. If this is unachievable, it will use the highest
        # resolution it can given the pixel_type and produce a warning
        # message.
        #
        # pixel_dtype : Numpy dtype for the max height image (MHI)
        # pixels, which also defines the maximum resolution in height.
        #
        self.supported_dtypes = [np.uint8, np.uint16, np.uint32, np.uint64, np.float32, np.float64]

        if pixel_dtype not in self.supported_dtypes:
            print('ERROR: Attempt to initialize MaxHeightImage with a pixel_dtype that is not currently supported')
            print('       pixel_dtype =', pixel_dtype)
            print('       self.supported_dtypes =', self.supported_dtypes)
            assert(pixel_dtype in self.supported_dtypes)

        self.m_per_pix = m_per_pix
        
        # Copy the volume of interest to avoid issues of it being
        # mutated after creation of the max height image.
        self.voi = deepcopy(volume_of_interest)

        # Available to hold a correction transform applied to the
        # height image. This transformation will not be performed on
        # the VOI.
        self.transform_original_to_corrected = None
        self.transform_corrected_to_original = None

        # Use ceil so that pixels on the far edges of the image can
        # represent column volumes truncated by the volume of interest
        # borders.
        num_x_bins = int(np.ceil(self.voi.x_in_m / self.m_per_pix))
        num_y_bins = int(np.ceil(self.voi.y_in_m / self.m_per_pix))

        def find_minimum_m_per_height_unit(z_in_m, max_z_bins):
            m_per_height_unit = z_in_m / (max_z_bins - 1)
            # Check for off by one error and correct it if
            # necessary. This type of error is likely due to float
            # precision, int casting, and ceil issues.
            num_z_bins = 1 + int(np.ceil(z_in_m / m_per_height_unit))
            if num_z_bins > max_z_bins:
                m_per_height_unit = z_in_m / (max_z_bins - 2)
            return m_per_height_unit    
        
        if np.issubdtype(pixel_dtype, np.integer):
            max_z_bins = np.iinfo(pixel_dtype).max
            
            if m_per_height_unit is None:
                # Two plausible default behaviors: 1) set to be the
                # same as the x and y resolution 2) set to the maximum
                # resolution achievable given the pixel type

                # For now, I'm going to make the default be the
                # maximum resolution achievable given the pixel type.
                self.m_per_height_unit = find_minimum_m_per_height_unit(self.voi.z_in_m, max_z_bins)
            else:
                self.m_per_height_unit = m_per_height_unit

            # image pixels are integers with limited range

            # Use ceil so that the maximum height voxels can be
            # truncated by the volume of interest borders. Add one to
            # account for 0 representing that no 3D points are in the
            # pixel column volume.
            num_z_bins = 1 + int(np.ceil(self.voi.z_in_m / self.m_per_height_unit))
            
            if num_z_bins > max_z_bins:
                print('WARNING: Unable to initialize MaxHeightImage with requested or default height resolution. Instead, using the highest resolution available for the given pixel_dtype. Consider changing pixel_dtype.')
                print('         attempted self.m_per_height_unit =', self.m_per_height_unit)
                print('         attempted num_z_bins =', num_z_bins)
                print('         max_z_bins =', max_z_bins)
                print('         pixel_dtype =', pixel_dtype)
                print('         self.supported_dtypes =', self.supported_dtypes)

                num_z_bins = max_z_bins
                self.m_per_height_unit = find_minimum_m_per_height_unit(self.voi.z_in_m, max_z_bins)
                print('         actual num_z_bins =', num_z_bins)
                print('         actual self.m_per_height_unit =', self.m_per_height_unit)
                
        elif np.issubdtype(pixel_dtype, np.floating):
            if m_per_height_unit is not None:
                print('WARNING: Ignoring provided m_per_height_unit, since pixel_dtype is a float. Float pixels are represented in meters and use no scaling, binning, or discretization.')
                print('         provided m_per_height_unit =', m_per_height_unit)
            self.m_per_height_unit = None

        if image is None: 
            self.image = np.zeros((num_y_bins, num_x_bins), pixel_dtype)
        else:
            # Check that the provided image is consistent with the other parameters
            s = image.shape
            assert(len(s) == 2)
            assert(s[0] == num_y_bins)
            assert(s[1] == num_x_bins)
            assert(image.dtype == pixel_dtype)
            assert(m_per_height_unit == self.m_per_height_unit)
            self.image = image


        if rgb_image is None:
            # The default behavior is not to have an associated RGB image.
            self.rgb_image = None
        else:
            # Check that the provided image is consistent with the other parameters
            s = rgb_image.shape
            assert(len(s) == 3)
            assert(s[0] == num_y_bins)
            assert(s[1] == num_x_bins)
            assert(image.dtype == np.uint8)
            self.rgb_image = rgb_image


        # This conversion is hardcoded in numba code.  4 cm per unit
        # results in 10.16 meter max = 254*0.04.  The D435i is listed
        # as having an approximately 10 meter maximum range.
        # camera_depth = 1 + int(round(z_p/0.04))
        self.camera_depth_m_per_unit = 0.04
        self.camera_depth_min = 1
        
        if use_camera_depth_image:
            if camera_depth_image is None: 
                self.camera_depth_image = np.zeros((num_y_bins, num_x_bins), np.uint8)
            else:
                # Check that the provided camera depth image is consistent with the other parameters
                s = camera_depth_image.shape
                assert(len(s) == 2)
                assert(s[0] == num_y_bins)
                assert(s[1] == num_x_bins)
                assert(image.dtype == np.uint8)
                self.camera_depth_image = camera_depth_image
        else:
            self.camera_depth_image = None
            
        # image_origin : The image origin in 3D space with respect to
        # the volume of interest (VOI) coordinate system. The pixel at
        # image[0,0] is the location of the image origin and is
        # located at the corner of the VOI such that decreasing y,
        # increasing x, and increasing z moves to the interior of the
        # VOI. The pixel at image[0,0] should only represent 3D points
        # that are strictly inside the VOI, not on the border.

        x_v = self.m_per_pix / 0.5
        y_v = self.voi.y_in_m - (self.m_per_pix / 0.5)
        z_v = 0.0
        self.image_origin = np.array([x_v, y_v, z_v])

    def m_to_camera_depth_pix(self, camera_depth_m):
        # This conversion is hardcoded in numba code.
        # 4 cm per unit results in 10.16 meter max = 254*0.04.
        # The D435i is listed as having an approximately 10
        # meter maximum range.
        depth_pix = self.camera_depth_min + int(round(camera_depth_m/self.camera_depth_m_per_unit))
        return depth_pix
        
    def print_info(self):
        print('MaxHeightImage information:')
        print('     image.shape =', self.image.shape)
        print('     image.dtype =', self.image.dtype)
        print('     m_per_pix =', self.m_per_pix)
        print('     m_per_height_unit =', self.m_per_height_unit)
        print('     voi.x_in_m =', self.voi.x_in_m)
        print('     voi.y_in_m =', self.voi.y_in_m)
        print('     voi.z_in_m =', self.voi.z_in_m)

    def clear(self):
        self.image.fill(0)
        if self.rgb_image is not None:
            self.image.fill(0)
        if self.camera_depth_image is not None:
            self.camera_depth_image.fill(0)

    def create_blank_rgb_image(self):
        h, w = self.image.shape
        self.rgb_image = np.zeros((h, w, 3), np.uint8)

    def apply_planar_correction(self, plane_parameters, plane_height_pix):
        # plane_parameters: [alpha, beta, gamma] such that alpha*x + beta*y + gamma = z
        # plane_height_m: The new height for points on the plane in meters
        plane_height_pix = plane_height_pix
        
        self.image, transform_to_corrected = numba_correct_height_image(plane_parameters, self.image, plane_height_pix)
        self.transform_original_to_corrected = transform_to_corrected
        self.transform_corrected_to_original = np.linalg.inv(transform_to_corrected)
            
    def save( self, base_filename, save_visualization=True ):
        print('MaxHeightImage saving to base_filename =', base_filename)

        max_pix = None
        if save_visualization: 
            # Save uint8 png image for visualization purposes. This would
            # be sufficient for uint8 pixel_dtypes, but does not work for
            # uint16.
            if self.image.dtype != np.uint8:
                # Scale the image for better visualization. For example, a
                # float image may not be interpretable in the uint8
                # version without scaling.
                max_pix = np.max(self.image)
                save_image = np.uint8(255.0 * (self.image / max_pix ))
            else:
                save_image = self.image
            visualization_filename = base_filename + '_visualization.png'
            cv2.imwrite(visualization_filename, save_image)
        else:
            visualization_filename = None

        rgb_image_filename = None
        if self.rgb_image is not None:
            rgb_image_filename = base_filename + '_rgb.png'
            cv2.imwrite(rgb_image_filename, self.rgb_image)

        camera_depth_image_filename = None
        if self.camera_depth_image is not None:
            camera_depth_image_filename = base_filename + '_camera_depth.png'
            cv2.imwrite(camera_depth_image_filename, self.camera_depth_image)
            
        image_filename = base_filename + '_image.npy.gz'
        fid = gzip.GzipFile(image_filename, 'w')
        np.save(fid, self.image, allow_pickle=False, fix_imports=True)
        fid.close
        
        voi_data = self.voi.serialize()
        voi_data['origin'] = voi_data['origin'].tolist()
        voi_data['axes'] = voi_data['axes'].tolist()

        if self.transform_original_to_corrected is not None:
            transform_original_to_corrected = self.transform_original_to_corrected.tolist()
        else:
            transform_original_to_corrected = None

        if self.transform_corrected_to_original is not None:
            transform_corrected_to_original = self.transform_corrected_to_original.tolist()
        else:
            transform_corrected_to_original = None
        
        data = {'visualization_filename': visualization_filename,
                'rgb_image_filename': rgb_image_filename,
                'camera_depth_image_filename': camera_depth_image_filename,
                'image_filename': image_filename,
                'image.dtype': str(self.image.dtype),
                'image.shape': list(self.image.shape),
                'np.max(image)': max_pix, 
                'm_per_pix': self.m_per_pix,
                'm_per_height_unit': self.m_per_height_unit,
                'voi_data': voi_data,
                'image_origin': self.image_origin.tolist(),
                'transform_original_to_corrected': transform_original_to_corrected, 
                'transform_corrected_to_original': transform_corrected_to_original
        }

        with open(base_filename + '.yaml', 'w') as fid:
            yaml.dump(data, fid)

        print('Finished saving.')


    @classmethod
    def load_serialization( self, base_filename ):
        print('MaxHeightImage: Loading serialization data from base_filename =', base_filename)
        with open(base_filename + '.yaml', 'r') as fid:
            data = yaml.load(fid, Loader=yaml.FullLoader)
        
        image_filename = data['image_filename']
        fid = gzip.GzipFile(image_filename, 'r')
        image = np.load(fid)
        fid.close()

        print('MaxHeightImage: Finished loading serialization data.')

        rgb_image = None
        rgb_image_filename = data.get('rgb_image_filename')
        if rgb_image_filename is not None:
            rgb_image = cv2.imread(rgb_image_filename)
            print('MaxHeightImage: Loading RGB image.')

        camera_depth_image = None
        camera_depth_image_filename = data.get('camera_depth_image_filename')
        if camera_depth_image_filename is not None:
            camera_depth_image = cv2.imread(camera_depth_image_filename)[:,:,0]
            print('MaxHeightImage: Loading camera depth image.')

        return data, image, rgb_image, camera_depth_image


    @classmethod
    def from_file( self, base_filename ):
        data, image, rgb_image, camera_depth_image = MaxHeightImage.load_serialization(base_filename)
        
        m_per_pix = data['m_per_pix']
        m_per_height_unit = data['m_per_height_unit']
        image_origin = np.array(data['image_origin'])

        voi = VolumeOfInterest.from_serialization(data['voi_data'])

        if camera_depth_image is not None:
            use_camera_depth_image = True
        else:
            use_camera_depth_image = False
        max_height_image = MaxHeightImage(voi, m_per_pix, image.dtype, m_per_height_unit, use_camera_depth_image=use_camera_depth_image, image=image, rgb_image=rgb_image, camera_depth_image=camera_depth_image)

        transform_original_to_corrected = data.get('transform_original_to_corrected', None)
        transform_corrected_to_original = data.get('transform_corrected_to_original', None)

        if transform_original_to_corrected is not None:
            transform_original_to_corrected = np.array(transform_original_to_corrected)

        if transform_corrected_to_original is not None:
            transform_corrected_to_original = np.array(transform_corrected_to_original)
        
        max_height_image.transform_original_to_corrected = transform_original_to_corrected
        max_height_image.transform_corrected_to_original = transform_corrected_to_original

        return max_height_image


    def to_points(self, colormap=None):
        
        h, w = self.image.shape
        max_num_points = w * h
        points = np.zeros((max_num_points,),
                          dtype=[
                              ('x', np.float32),
                              ('y', np.float32),
                              ('z', np.float32)])

        points_in_image_to_voi = np.identity(4)
        points_in_image_to_voi[:3, 3] = self.image_origin
        points_in_image_to_frame_id_mat = np.matmul(self.voi.points_in_voi_to_frame_id_mat, points_in_image_to_voi)
        
        if self.transform_corrected_to_original is not None: 
            points_in_image_to_frame_id_mat = np.matmul(points_in_image_to_frame_id_mat, self.transform_corrected_to_original)
        
        num_points = numba_max_height_image_to_points(points_in_image_to_frame_id_mat, self.image, points, self.m_per_pix, self.m_per_height_unit)

        points = points[:num_points]

        return points


    def from_points(self, points_to_voi_mat, points):
        
        points_to_image_mat = points_to_voi_mat
        points_to_image_mat[:3,3] = points_to_image_mat[:3,3] - self.image_origin

        if self.transform_original_to_corrected is not None: 
            points_to_image_mat = np.matmul(self.transform_original_to_corrected, points_to_image_mat)
        
        if self.camera_depth_image is None: 
            numba_max_height_image(points_to_image_mat, points, self.image, self.m_per_pix, self.m_per_height_unit, self.voi.x_in_m, self.voi.y_in_m, self.voi.z_in_m, verbose=False)
        else:
            print('Camera depth image with from_points command is not currently supported.')
            assert(False)

        
    def from_rgb_points(self, points_to_voi_mat, rgb_points):
        
        points_to_image_mat = points_to_voi_mat
        points_to_image_mat[:3,3] = points_to_image_mat[:3,3] - self.image_origin
        
        if self.transform_original_to_corrected is not None: 
            points_to_image_mat = np.matmul(self.transform_original_to_corrected, points_to_image_mat)

        s0, s1 = self.image.shape
        if ((self.rgb_image is None) or
            (self.rgb_image.shape != (s0, s1, 3)) or
            (self.rgb_image.dtype != np.uint8)):
            s = self.image.shape
            self.rgb_image = np.zeros(s[:2] + (3,), np.uint8)

        if self.camera_depth_image is None: 
            numba_max_height_and_rgb_images(points_to_image_mat, rgb_points, self.image, self.rgb_image, self.m_per_pix, self.m_per_height_unit, self.voi.x_in_m, self.voi.y_in_m, self.voi.z_in_m, verbose=False)
        else:
            numba_max_height_and_rgb_and_camera_depth_images(points_to_image_mat, rgb_points, self.image, self.rgb_image, self.camera_depth_image, self.m_per_pix, self.m_per_height_unit, self.voi.x_in_m, self.voi.y_in_m, self.voi.z_in_m, verbose=False)

In [13]:
class ROSMaxHeightImage(MaxHeightImage):
    
    @classmethod
    def from_file( self, base_filename ):
        data, image, rgb_image, camera_depth_image = MaxHeightImage.load_serialization(base_filename)
                        
        m_per_pix = data['m_per_pix']
        m_per_height_unit = data['m_per_height_unit']
        image_origin = np.array(data['image_origin'])

        voi = ROSVolumeOfInterest.from_serialization(data['voi_data'])

        if camera_depth_image is not None:
            use_camera_depth_image = True
        else:
            use_camera_depth_image = False
        max_height_image = ROSMaxHeightImage(voi, m_per_pix, image.dtype, m_per_height_unit, use_camera_depth_image=use_camera_depth_image, image=image, rgb_image=rgb_image, camera_depth_image=camera_depth_image)
        max_height_image.rgb_image = rgb_image
        self.last_update_time = None

        return max_height_image


    def get_points_to_image_mat(self, ros_frame_id, tf2_buffer):
        # This returns a matrix that transforms a point in the
        # provided ROS frame to a point in the image. However, it does
        # not quantize the components of the image point, which is a
        # nonlinear operation that must be performed as a separate
        # step.
        points_to_voi_mat, timestamp = self.voi.get_points_to_voi_matrix_with_tf2(ros_frame_id, tf2_buffer)
        if points_to_voi_mat is not None:
            points_to_voi_mat[:3,3] = points_to_voi_mat[:3,3] - self.image_origin
            voi_to_image_mat = np.identity(4)
            voi_to_image_mat[0, 0] = 1.0/self.m_per_pix
            voi_to_image_mat[1, 1] = - 1.0/self.m_per_pix
            dtype = self.image.dtype
            if np.issubdtype(dtype, np.integer):
                voi_to_image_mat[2, 3] = 1.0
                voi_to_image_mat[2, 2] = 1.0 / self.m_per_height_unit
            else:
                rospy.logerr('ROSMaxHeightImage.get_points_to_image_mat: unsupported image type used for max_height_image, dtype = {0}'.format(dtype))
                assert(False)

            points_to_image_mat = np.matmul(voi_to_image_mat, points_to_voi_mat)
                
            if self.transform_original_to_corrected is not None: 
                points_to_image_mat = np.matmul(self.transform_original_to_corrected, points_to_image_mat)
                
            return points_to_image_mat, timestamp
        else:
            return None, None

        
    def get_image_to_points_mat(self, ros_frame_id, tf2_buffer):
        # This returns a matrix that transforms a point in the image
        # to a point in the provided ROS frame. However, it does not
        # quantize the components of the image point, which is a
        # nonlinear operation that would need to be performed as a
        # separate step.
        voi_to_points_mat, timestamp = self.voi.get_voi_to_points_matrix_with_tf2(ros_frame_id, tf2_buffer)
        if voi_to_points_mat is not None:
            image_to_voi_mat = np.identity(4)
            image_to_voi_mat[:3, 3] = self.image_origin
            image_to_voi_mat[0, 0] = self.m_per_pix
            image_to_voi_mat[1, 1] = -self.m_per_pix
            dtype = self.image.dtype
            if np.issubdtype(dtype, np.integer):
                image_to_voi_mat[2, 3] = image_to_voi_mat[2,3] - self.m_per_height_unit
                image_to_voi_mat[2, 2] = self.m_per_height_unit
            else:
                rospy.logerr('ROSMaxHeightImage.get_image_to_points_mat: unsupported image type used for max_height_image, dtype = {0}'.format(dtype))
                assert(False)

            points_in_image_to_frame_id_mat = np.matmul(voi_to_points_mat, image_to_voi_mat)
            
            if self.transform_corrected_to_original is not None: 
                points_in_image_to_frame_id_mat = np.matmul(points_in_image_to_frame_id_mat, self.transform_corrected_to_original)

            return points_in_image_to_frame_id_mat, timestamp
        else:
            return None, None

        
    def get_robot_pose_in_image(self, tf2_buffer):
        robot_to_image_mat, timestamp = self.get_points_to_image_mat('base_link', tf2_buffer)
        r0 = np.array([0.0, 0.0, 0.0, 1.0])
        r0 = np.matmul(robot_to_image_mat, r0)[:2]
        r1 = np.array([1.0, 0.0, 0.0, 1.0])
        r1 = np.matmul(robot_to_image_mat, r1)[:2]
        robot_forward = r1 - r0
        robot_ang_rad = np.arctan2(-robot_forward[1], robot_forward[0])
        robot_xy_pix = r0
        return robot_xy_pix, robot_ang_rad, timestamp

    def get_point_in_image(self, xyz, xyz_frame_id, tf2_buffer):
        point_to_image_mat, timestamp = self.get_points_to_image_mat(xyz_frame_id, tf2_buffer)
        p = np.matmul(point_to_image_mat, np.array([xyz[0], xyz[1], xyz[2], 1.0]))[:3]
        return p

    def get_pix_in_frame(self, xyz_pix, xyz_frame_id, tf2_buffer):
        image_to_point_mat, timestamp = self.get_image_to_points_mat(xyz_frame_id, tf2_buffer)
        p = np.matmul(image_to_point_mat, np.array([xyz_pix[0], xyz_pix[1], xyz_pix[2], 1.0]))[:3]
        return p
    
    def make_robot_footprint_unobserved(self, robot_x_pix, robot_y_pix, robot_ang_rad):
        # replace robot points with unobserved points
        draw_robot_footprint_rectangle(robot_x_pix, robot_y_pix, robot_ang_rad, self.m_per_pix, self.image, value=0)
        if self.camera_depth_image is not None: 
            draw_robot_footprint_rectangle(robot_x_pix, robot_y_pix, robot_ang_rad, self.m_per_pix, self.camera_depth_image, value=0)
        if self.rgb_image is not None: 
            draw_robot_footprint_rectangle(robot_x_pix, robot_y_pix, robot_ang_rad, self.m_per_pix, self.rgb_image, value=0)


    def make_robot_mast_blind_spot_unobserved(self, robot_x_pix, robot_y_pix, robot_ang_rad):
        # replace mast blind spot wedge points with unobserved points
        draw_robot_mast_blind_spot_wedge(robot_x_pix, robot_y_pix, robot_ang_rad, self.m_per_pix, self.image, value=0)
        if self.camera_depth_image is not None: 
            draw_robot_mast_blind_spot_wedge(robot_x_pix, robot_y_pix, robot_ang_rad, self.m_per_pix, self.camera_depth_image, value=0)
        if self.rgb_image is not None: 
            draw_robot_mast_blind_spot_wedge(robot_x_pix, robot_y_pix, robot_ang_rad, self.m_per_pix, self.rgb_image, value=0)
    
    def from_points_with_tf2(self, points, points_frame_id, tf2_buffer):
        # points should be a numpy array with shape = (N, 3) where N
        # is the number of points. So it has the following structure:
        # points = np.array([[x1,y1,z1], [x2,y2,z2]...]). The points
        # should be specified with respect to the coordinate system
        # defined by points_frame_id.

        points_to_voi_mat, timestamp = self.voi.get_points_to_voi_matrix_with_tf2(points_frame_id, tf2_buffer)

        if points_to_voi_mat is not None: 
            self.from_points(points_to_voi_mat, points)

            if points_timestamp is None:
                if timestamp is None: 
                    self.last_update_time = rospy.Time.now()
                else:
                    self.last_update_time = timestamp
            else:
                self.last_update_time = points_timestamp
        else:
            rospy.logwarn('ROSMaxHeightImage.from_points_with_tf2: failed to update the image likely due to a failure to lookup the transform using TF2. points_frame_id = {0}, points_timestamp = {1}, timeout_s = {2}'.format(points_frame_id, points_timestamp, timeout_s))

    def from_rgb_points_with_tf2(self, rgb_points, points_frame_id, tf2_buffer):
        # points should be a numpy array with shape = (N, 3) where N
        # is the number of points. So it has the following structure:
        # points = np.array([[x1,y1,z1], [x2,y2,z2]...]). The points
        # should be specified with respect to the coordinate system
        # defined by points_frame_id.
        
        points_to_voi_mat, timestamp = self.voi.get_points_to_voi_matrix_with_tf2(points_frame_id, tf2_buffer)

        if points_to_voi_mat is not None: 
            self.from_rgb_points(points_to_voi_mat, rgb_points)
        else:
            rospy.logwarn('ROSMaxHeightImage.from_rgb_points_with_tf2: failed to update the image likely due to a failure to lookup the transform using TF2. points_frame_id = {0}, points_timestamp = {1}, timeout_s = {2}'.format(points_frame_id, points_timestamp, timeout_s))

            
    def to_point_cloud(self, color_map=None):
        points = self.to_points(color_map)
        point_cloud = ros_numpy.msgify(PointCloud2, points, frame_id=self.voi.frame_id)
        return point_cloud

In [14]:
class VolumeOfInterest:
    def __init__(self, frame_id, origin, axes, x_in_m, y_in_m, z_in_m):
        ########################################################
        # frame_id : frame with respect to which the volume of
        # interest (VOI) is defined
        #
        # origin : x_f, y_f, z_f with respect to frame_id coordinate
        # system, f, that defines the origin of the volume of interest
        # (VOI). The origin is at a corner of the VOI.
        #
        # axes : A 3x3 rotation matrix with columns that define the
        # axes of the volume of interest (VOI) with respect to the
        # frame_id coordinate system. Together with the origin, the
        # axes form a right-handed coordinate system. The VOI occupies
        # the positive octant of the coordinate system. The column
        # vectors of the matrix are unit vectors orthogonal to one
        # another. The x axis of the VOI is column 0, the y axis is
        # column 1, and the z axis is column 2.
        #
        # x_in_m : length in meters of the edges of the volume
        # parallel to the x axis defined by axes.
        #
        # y_in_m : length in meters of the edges of the volume
        # parallel to the y axis defined by axes.
        #
        # z_in_m : length in meters of the edges of the volume
        # parallel to the z axis defined by axes.
        #
        ########################################################
        
        self.frame_id = frame_id
        # origin with respect to frame_id coordinate system
        self.origin = origin
        # axes with respect to frame_id coordinate system
        self.axes = axes
        self.points_in_voi_to_frame_id_mat = np.identity(4)
        self.points_in_voi_to_frame_id_mat[:3,:3] = self.axes
        self.points_in_voi_to_frame_id_mat[:3,3] = self.origin
        self.points_in_frame_id_to_voi_mat = np.linalg.inv(self.points_in_voi_to_frame_id_mat)
        self.x_in_m = x_in_m
        self.y_in_m = y_in_m
        self.z_in_m = z_in_m

    def get_points_to_voi_matrix(self, points_to_frame_id_mat):
        points_to_voi_mat = np.matmul(self.points_in_frame_id_to_voi_mat, points_to_frame_id_mat)
        return points_to_voi_mat

    def change_frame(self, points_in_old_frame_to_new_frame_mat, new_frame_id):
        # Assumes the input matrix defines a rigid body transformation.
        
        # translate the origin
        origin = list(self.origin)
        origin.append(1.0)
        origin = np.array(origin)
        new_origin = np.matmul(points_in_old_frame_to_new_frame_mat, origin)
        # rotate the axes
        new_axes = np.matmul(points_in_old_frame_to_new_frame_mat[:3,:3], self.axes)
        # transform transforms
        new_points_in_voi_to_frame_id_mat = np.matmul(points_in_old_frame_to_new_frame_mat, self.points_in_voi_to_frame_id_mat)
        new_points_in_frame_id_to_voi_mat = np.linalg.inv(new_points_in_voi_to_frame_id_mat)
        # set
        self.frame_id = new_frame_id
        self.origin = new_origin
        self.axes = new_axes
        self.points_in_voi_to_frame_id_mat = new_points_in_voi_to_frame_id_mat
        self.points_in_frame_id_to_voi_mat = new_points_in_frame_id_to_voi_mat
        
        
    def serialize(self):
        # return dictionary with the parameters needed to recreate it
        data = {'frame_id': self.frame_id, 'origin': self.origin, 'axes': self.axes, 'x_in_m': self.x_in_m, 'y_in_m': self.y_in_m, 'z_in_m': self.z_in_m}
        return data

    @classmethod
    def from_serialization(self, data):
        d = data
        voi = VolumeOfInterest(d['frame_id'], np.array(d['origin']), np.array(d['axes']), d['x_in_m'], d['y_in_m'], d['z_in_m'])
        return voi

In [15]:
class ROSVolumeOfInterest(VolumeOfInterest):

    @classmethod
    def from_serialization(self, data):
        d = data
        voi = ROSVolumeOfInterest(d['frame_id'], np.array(d['origin']), np.array(d['axes']), d['x_in_m'], d['y_in_m'], d['z_in_m'])
        return voi

    def get_voi_to_points_matrix_with_tf2(self, points_frame_id, tf2_buffer):
        points_to_voi_mat, timestamp = self.get_points_to_voi_matrix_with_tf2(points_frame_id, tf2_buffer)
        voi_to_points_mat = None
        if points_to_voi_mat is not None: 
            voi_to_points_mat = np.linalg.inv(points_to_voi_mat)
        return voi_to_points_mat, timestamp
    
    def get_points_to_voi_matrix_with_tf2(self, points_frame_id, tf2_buffer):
        # If the necessary TF2 transform is successfully looked up,
        # this returns a 4x4 affine transformation matrix that
        # transforms points in the points_frame_id frame to the voi
        # frame.
        try:
#             print(f'ROSVolumeOfInterest.get_points_to_voi_matrix_with_tf2 self.frame_id={self.frame_id} points_frame_id={points_frame_id}')
            stamped_transform =  tf2_buffer.lookup_transform(self.frame_id, points_frame_id)
            points_to_frame_id_mat = stamped_transform.matrix
            points_to_voi_mat = self.get_points_to_voi_matrix(points_to_frame_id_mat)
            return points_to_voi_mat, time.time()
        except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException):
            rospy.logwarn('ROSVolumeOfInterest.get_points_to_voi_matrix_with_tf2: failed to lookup transform. self.frame_id = {0}, points_frame_id = {1}, lookup_time = {2}, timeout_s = {3}'.format(self.frame_id, points_frame_id, lookup_time, timeout_s))
            return None, None
        
    def get_ros_marker(self, duration=0.2):
        marker = Marker()
        marker.type = marker.CUBE
        marker.action = marker.ADD
        marker.lifetime = rospy.Duration(duration)
        marker.text = 'volume of interest'

        marker.header.frame_id = self.frame_id
        marker.header.stamp = rospy.Time.now()
        marker.id = 0

        # scale of 1,1,1 would result in a 1m x 1m x 1m cube
        marker.scale.x = self.x_in_m
        marker.scale.y = self.y_in_m
        marker.scale.z = self.z_in_m

        # make as bright as possible
        r, g, b = [0, 0, 255]
        marker.color.r = r
        marker.color.g = g
        marker.color.b = b
        marker.color.a = 0.25 

        # find the middle of the volume of interest
        center_vec = np.array([self.x_in_m/2.0, self.y_in_m/2.0, self.z_in_m/2.0])
        center = self.origin + np.dot(self.axes, center_vec)
        marker.pose.position.x = center[0]
        marker.pose.position.y = center[1]
        marker.pose.position.z = center[2]

        q = Rotation.from_matrix(self.axes).as_quat()
        marker.pose.orientation.x = q[0]
        marker.pose.orientation.y = q[1]
        marker.pose.orientation.z = q[2]
        marker.pose.orientation.w = q[3]        

        return marker

In [16]:
class HeadScan:
    def __init__(self, max_height_im=None, voi_side_m=8.0, voi_origin_m=None):
        if max_height_im is not None:
            self.max_height_im = max_height_im
        else:
            # How to best set this volume of interest (VOI) merits further
            # consideration. Note that representing a smaller range of heights
            # results in higher height resolution when using np.uint8
            # pixels. For this VOI, 0.0 should be the nominal ground height
            # achieved via calibration.

            # Set to approximately the height of the D435i. This should result
            # in the volume of interest (VOI) containing the highest
            # manipulable surfaces. Also, when the top of the viewing frustum
            # is parallel to the ground it will be at or close to the top of
            # the volume of interest.
            
            robot_head_above_ground = 1.13
            
            # How far below the expected floor height the volume of interest
            # should extend is less clear. Sunken living rooms and staircases
            # can go well below the floor and a standard stair step should be
            # less than 20cm tall (0.2 m below the floor). However, the robot
            # should not go into these areas. For now, the volume of interest
            # (VOI) will contain points that the robot can potentially reach
            # its arm over or drive over (traverse). This implies that
            # unobserved points on the floor should be treated with great
            # caution, since they might be points significantly below the
            # floor that should not be traversed. For now, the robot will not
            # represent ramps that it might safely descend. It should be able
            # to represent floor points that look slightly lower due to noise
            # that can vary with floor type and small calibration errors. It
            # should be able to represent small traversable depressions in the
            # floor. However, there is a risk that points that are too low
            # will be classified as traversable floor. This risk is mitigated
            # by separate point cloud based obstacle detection while moving
            # and cliff sensors.
            
            lowest_distance_below_ground = 0.05 #5cm
            
            total_height = robot_head_above_ground + lowest_distance_below_ground
            # 8m x 8m region 
            voi_side_m = voi_side_m
            voi_axes = np.identity(3)
            if voi_origin_m is None: 
                voi_origin = np.array([-voi_side_m/2.0, -voi_side_m/2.0, -lowest_distance_below_ground])
            voi = ROSVolumeOfInterest('map', voi_origin, voi_axes, voi_side_m, voi_side_m, total_height)

            m_per_pix = 0.006
            pixel_dtype = np.uint8
            
            self.max_height_im = ROSMaxHeightImage(voi, m_per_pix, pixel_dtype, use_camera_depth_image=True)
            self.max_height_im.create_blank_rgb_image()
            
        self.max_height_im.print_info()

        
    def make_robot_footprint_unobserved(self):
        # replace robot points with unobserved points
        self.max_height_im.make_robot_footprint_unobserved(self.robot_xy_pix[0], self.robot_xy_pix[1], self.robot_ang_rad)

    def make_robot_mast_blind_spot_unobserved(self):
        # replace robot points with unobserved points
        self.max_height_im.make_robot_mast_blind_spot_unobserved(self.robot_xy_pix[0], self.robot_xy_pix[1], self.robot_ang_rad)
        
    def capture_point_clouds(self, node, pose, capture_params):
        head_settle_time = capture_params['head_settle_time']
        num_point_clouds_per_pan_ang = capture_params['num_point_clouds_per_pan_ang']
        time_between_point_clouds = capture_params['time_between_point_clouds']
        fast_scan = capture_params.get('fast_scan', False)

        if fast_scan:
            head_settle_time = head_settle_time
            num_point_clouds_per_pan_ang = 1
            time_between_point_clouds = time_between_point_clouds
        
        node.move_to_pose(pose)
        rospy.sleep(head_settle_time)
        settle_time = rospy.Time.now()
        prev_cloud_time = None
        num_point_clouds = 0
        # Consider using time stamps to make decisions, instead of
        # hard coded sleep times, as found in the head calibration
        # data collection code. The main issue is that the robot
        # needs time to mechanically settle in addition to sensor
        # timing considerations.
        not_finished = num_point_clouds < num_point_clouds_per_pan_ang
        while not_finished:
            cloud_time = node.point_cloud.header.stamp
            cloud_frame = node.point_cloud.header.frame_id
            point_cloud = ros_numpy.numpify(node.point_cloud)
            if (cloud_time is not None) and (cloud_time != prev_cloud_time) and (cloud_time >= settle_time): 
                only_xyz = False
                if only_xyz:
                    xyz = ros_numpy.point_cloud2.get_xyz_points(point_cloud)
                    self.max_height_im.from_points_with_tf2(xyz, cloud_frame, node.tf2_buffer)
                else: 
                    rgb_points = ros_numpy.point_cloud2.split_rgb_field(point_cloud)
                    self.max_height_im.from_rgb_points_with_tf2(rgb_points, cloud_frame, node.tf2_buffer)
                num_point_clouds += 1
                prev_cloud_time = cloud_time
            not_finished = num_point_clouds < num_point_clouds_per_pan_ang
            if not_finished: 
                rospy.sleep(time_between_point_clouds)


    def execute(self, head_tilt, far_left_pan, far_right_pan, num_pan_steps, capture_params, node, look_at_self=True):
        scan_start_time = time.time()

        pose = {'joint_head_pan': far_right_pan, 'joint_head_tilt': head_tilt}
        node.move_to_pose(pose)
        
        pan_left = np.linspace(far_right_pan, far_left_pan, num_pan_steps)

        for pan_ang in pan_left:
            pose = {'joint_head_pan': pan_ang}
            self.capture_point_clouds(node, pose, capture_params)
            
        # look at the ground right around the robot to detect any
        # nearby obstacles

        if look_at_self:
            # Attempt to pick a head pose that sees around the robot,
            # but doesn't see the mast, which can introduce noise.
            head_tilt = -1.2
            head_pan = 0.1
            pose = {'joint_head_pan': head_pan, 'joint_head_tilt': head_tilt}
            self.capture_point_clouds(node, pose, capture_params)

        scan_end_time = time.time()
        scan_duration = scan_end_time - scan_start_time
        rospy.loginfo('The head scan took {0} seconds.'.format(scan_duration))
            
        #####################################
        # record robot pose information and potentially useful transformations
        self.robot_xy_pix, self.robot_ang_rad, self.timestamp = self.max_height_im.get_robot_pose_in_image(node.tf2_buffer)

        # Should only need three of these transforms, since the other
        # three should be obtainable via matrix inversion. Variation
        # in time could result in small differences due to encoder
        # noise.
        self.base_link_to_image_mat, timestamp = self.max_height_im.get_points_to_image_mat('base_link', node.tf2_buffer)
        self.base_link_to_map_mat, timestamp = get_p1_to_p2_matrix('base_link', 'map', node.tf2_buffer)
        self.image_to_map_mat, timestamp = self.max_height_im.get_image_to_points_mat('map', node.tf2_buffer)
        self.image_to_base_link_mat, timestamp = self.max_height_im.get_image_to_points_mat('base_link', node.tf2_buffer)
        self.map_to_image_mat, timestamp = self.max_height_im.get_points_to_image_mat('map', node.tf2_buffer)
        self.map_to_base_mat, timestamp = get_p1_to_p2_matrix('map', 'base_link', node.tf2_buffer)

        self.make_robot_mast_blind_spot_unobserved()
        self.make_robot_footprint_unobserved()
                
        
    def execute_full(self, node, fast_scan=False):
        far_right_pan = -3.6 
        far_left_pan = 1.45 
        head_tilt = -0.8 
        num_pan_steps = 7 

        if fast_scan:
            num_pan_steps = 5 

        capture_params = {
            'fast_scan': fast_scan,
            'head_settle_time': 0.5,
            'num_point_clouds_per_pan_ang': 10, # low numbers may not be effective for some surfaces and environments
            'time_between_point_clouds': 1.0/15.0 # point clouds at 15 Hz, so this should help obtain distinct clouds
        }

        self.execute(head_tilt, far_left_pan, far_right_pan, num_pan_steps, capture_params, node)


    def execute_front(self, node, fast_scan=False):
        far_right_pan = -1.2 
        far_left_pan = 1.2 
        head_tilt = -0.8
        num_pan_steps = 3
        
        capture_params = {
            'fast_scan': fast_scan,
            'head_settle_time': 0.5,
            'num_point_clouds_per_pan_ang': 10, # low numbers may not be effective for some surfaces and environments
            'time_between_point_clouds': 1.0/15.0 # point clouds at 15 Hz, so this should help obtain distinct clouds
        }

        self.execute(head_tilt, far_left_pan, far_right_pan, num_pan_steps, capture_params, node)

    
    def execute_minimal(self, node, fast_scan=False):
        far_right_pan = 0.1
        far_left_pan = 0.1
        head_tilt = -0.8
        num_pan_steps = 1

        look_at_self = True
        
        capture_params = {
            'fast_scan': fast_scan,
            'head_settle_time': 0.5,
            'num_point_clouds_per_pan_ang': 10, # low numbers may not be effective for some surfaces and environments
            'time_between_point_clouds': 1.0/15.0 # point clouds at 15 Hz, so this should help obtain distinct clouds
        }

        self.execute(head_tilt, far_left_pan, far_right_pan, num_pan_steps, capture_params, node, look_at_self)
        
        
    def save( self, base_filename, save_visualization=True ):
        print('HeadScan: Saving to base_filename =', base_filename)
        # save scan to disk
        max_height_image_base_filename = base_filename + '_mhi'
        self.max_height_im.save(max_height_image_base_filename)

        if "tolist" in dir(self.robot_ang_rad):
            robot_ang_rad = self.robot_ang_rad.tolist()
        else:
            robot_ang_rad = self.robot_ang_rad
        data = {'max_height_image_base_filename' : max_height_image_base_filename,
                'robot_xy_pix' : self.robot_xy_pix.tolist(),
                'robot_ang_rad' : robot_ang_rad,
                'timestamp' : self.timestamp,
                'base_link_to_image_mat' : self.base_link_to_image_mat.tolist(), 
                'base_link_to_map_mat' : self.base_link_to_map_mat.tolist(), 
                'image_to_map_mat' : self.image_to_map_mat.tolist(), 
                'image_to_base_link_mat' : self.image_to_base_link_mat.tolist(), 
                'map_to_image_mat' : self.map_to_image_mat.tolist(), 
                'map_to_base_mat' : self.map_to_base_mat.tolist()}
        
        with open(base_filename + '.yaml', 'w') as fid:
            yaml.dump(data, fid)
        print('Finished saving.')

        
    @classmethod
    def from_file(self, base_filename):
        print('HeadScan.from_file: base_filename =', base_filename)
        with open(base_filename + '.yaml', 'r') as fid:
            data = yaml.load(fid, Loader=yaml.FullLoader)

        print('data =', data)
        max_height_image_base_filename = data['max_height_image_base_filename']
        max_height_image = rm.ROSMaxHeightImage.from_file(max_height_image_base_filename)
        head_scan = HeadScan(max_height_image)

        head_scan.robot_xy_pix = np.array(data['robot_xy_pix'])
        head_scan.robot_ang_rad = data['robot_ang_rad']
        head_scan.timestamp = data['timestamp']
        head_scan.base_link_to_image_mat = np.array(data['base_link_to_image_mat'])
        head_scan.base_link_to_map_mat = np.array(data['base_link_to_map_mat']) 
        head_scan.image_to_map_mat = np.array(data['image_to_map_mat'])
        head_scan.image_to_base_link_mat = np.array(data['image_to_base_link_mat'])
        head_scan.map_to_image_mat = np.array(data['map_to_image_mat'])
        head_scan.map_to_base_mat = np.array(data['map_to_base_mat'])

        return head_scan

## Head Scanning Experiment

In [17]:
node = FakeNode()
head_scan = HeadScan(voi_side_m=16.0)
head_scan.execute_full(node, fast_scan=False)

MaxHeightImage information:
     image.shape = (2667, 2667)
     image.dtype = uint8
     m_per_pix = 0.006
     m_per_height_unit = 0.0046456692913385824
     voi.x_in_m = 16.0
     voi.y_in_m = 16.0
     voi.z_in_m = 1.18
Moving to {'joint_head_pan': -3.6, 'joint_head_tilt': -0.8}
FakeRobot.move_to_pose: sleeping 1 second
Moving to {'joint_head_pan': -3.6}
FakeRobot.move_to_pose: sleeping 1 second
Moving to {'joint_head_pan': -2.7583333333333333}
FakeRobot.move_to_pose: sleeping 1 second
Moving to {'joint_head_pan': -1.9166666666666667}
FakeRobot.move_to_pose: sleeping 1 second
Moving to {'joint_head_pan': -1.0750000000000002}
FakeRobot.move_to_pose: sleeping 1 second
Moving to {'joint_head_pan': -0.2333333333333334}
FakeRobot.move_to_pose: sleeping 1 second
Moving to {'joint_head_pan': 0.608333333333333}
FakeRobot.move_to_pose: sleeping 1 second
Moving to {'joint_head_pan': 1.45}
FakeRobot.move_to_pose: sleeping 1 second
Moving to {'joint_head_pan': 0.1, 'joint_head_tilt': -1.2}
Fak

In [18]:
head_scan.save("/home/hello-robot/repos/stretch_tutorials/stretch_body/jupyter/example_head_scan")

HeadScan: Saving to base_filename = /home/hello-robot/repos/stretch_tutorials/stretch_body/jupyter/example_head_scan
MaxHeightImage saving to base_filename = /home/hello-robot/repos/stretch_tutorials/stretch_body/jupyter/example_head_scan_mhi
Finished saving.
Finished saving.


In [20]:
robot.stop()
pipeline.stop()

Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_16300/3221415467.py", line 26, in _spin
RuntimeError: wait_for_frames cannot be called before start()
