In [1]:
import sys
import cv2
import math
import glob
import json
import open3d as o3d
import magnum as mn
import numpy as np
from PIL import Image
import habitat_sim
from habitat_sim.utils import common as utils
from habitat_sim.utils.common import d3_40_colors_rgb
import quaternion
from scipy.spatial import distance

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
# sys.path.append('/mnt/sdb/hank/habitat-lab')
# from habitat.core.simulator import Simulator

dataset_name = "demo"

In [3]:
def transform_rgb_bgr(image):
    return image[:, :, [2, 1, 0]]

def transform_depth(image):
    depth_img = (image / 10 * 255).astype(np.uint8)
    return depth_img

def transform_semantic(semantic_obs):
    semantic_img = Image.new("P", (semantic_obs.shape[1], semantic_obs.shape[0]))
    semantic_img.putpalette(d3_40_colors_rgb.flatten())
    semantic_img.putdata((semantic_obs.flatten() % 40).astype(np.uint8))
    semantic_img = semantic_img.convert("RGB")
    semantic_img = cv2.cvtColor(np.asarray(semantic_img), cv2.COLOR_RGB2BGR)
    return semantic_img

In [4]:
scene_path = f"../../../../../data/scene_datasets/apartment_0/habitat/mesh_semantic.ply"

sim_settings = {
    "scene": scene_path,  # Scene path
    "default_agent": 0,  # Index of the default agent
    "sensor_height": 1.5,  # Height of sensors in meters, relative to the agent
    "width": 512,  # Spatial resolution of the observations
    "height": 512,
    "sensor_pitch": 0,  # sensor pitch (x rotation in rads)
}

foward_per_step = 0.01
rotation_per_step = 1

def make_simple_cfg(settings, foward_per_step, rotation_per_step):
    # simulator backend
    sim_cfg = habitat_sim.SimulatorConfiguration()
    sim_cfg.scene_id = settings["scene"]
    # agent
    agent_cfg = habitat_sim.agent.AgentConfiguration()

    # In the 1st example, we attach only one sensor,
    # a RGB visual sensor, to the agent
    rgb_sensor_spec = habitat_sim.CameraSensorSpec()
    rgb_sensor_spec.uuid = "color_sensor"
    rgb_sensor_spec.sensor_type = habitat_sim.SensorType.COLOR
    rgb_sensor_spec.resolution = [settings["height"], settings["width"]]
    rgb_sensor_spec.position = [0.0, settings["sensor_height"], 0.0]
    rgb_sensor_spec.orientation = [
        settings["sensor_pitch"],
        0.0,
        0.0,
    ]
    rgb_sensor_spec.sensor_subtype = habitat_sim.SensorSubType.PINHOLE

    #depth snesor
    depth_sensor_spec = habitat_sim.CameraSensorSpec()
    depth_sensor_spec.uuid = "depth_sensor"
    depth_sensor_spec.sensor_type = habitat_sim.SensorType.DEPTH
    depth_sensor_spec.resolution = [settings["height"], settings["width"]]
    depth_sensor_spec.position = [0.0, settings["sensor_height"], 0.0]
    depth_sensor_spec.orientation = [
        settings["sensor_pitch"],
        0.0,
        0.0,
    ]
    depth_sensor_spec.sensor_subtype = habitat_sim.SensorSubType.PINHOLE

    #semantic snesor
    semantic_sensor_spec = habitat_sim.CameraSensorSpec()
    semantic_sensor_spec.uuid = "semantic_sensor"
    semantic_sensor_spec.sensor_type = habitat_sim.SensorType.SEMANTIC
    semantic_sensor_spec.resolution = [settings["height"], settings["width"]]
    semantic_sensor_spec.position = [0.0, settings["sensor_height"], 0.0]
    semantic_sensor_spec.orientation = [
        settings["sensor_pitch"],
        0.0,
        0.0,
    ]
    semantic_sensor_spec.sensor_subtype = habitat_sim.SensorSubType.PINHOLE

    agent_cfg.sensor_specifications = [rgb_sensor_spec, depth_sensor_spec, semantic_sensor_spec]

    # Here you can specify the amount of displacement in a forward action and the turn angle
    
    #agent_cfg.sensor_specifications = sensor_specs
    agent_cfg.action_space = {
        "move_forward": habitat_sim.agent.ActionSpec(
            "move_forward", habitat_sim.agent.ActuationSpec(amount = foward_per_step)
        ),
        "turn_left": habitat_sim.agent.ActionSpec(
            "turn_left", habitat_sim.agent.ActuationSpec(amount = rotation_per_step)
        ),
        "turn_right": habitat_sim.agent.ActionSpec(
            "turn_right", habitat_sim.agent.ActuationSpec(amount = rotation_per_step)
        ),
    }

    return habitat_sim.Configuration(sim_cfg, [agent_cfg])

cfg = make_simple_cfg(sim_settings, foward_per_step, rotation_per_step)
sim = habitat_sim.Simulator(cfg)
agent = sim.initialize_agent(sim_settings["default_agent"])

I0910 04:34:10.547018 25968 simulator.py:221] Loaded navmesh ../../../../../data/scene_datasets/apartment_0/habitat/mesh_semantic.navmesh
INFO - 2022-09-10 04:34:10,547 - simulator - I0910 04:34:10.547018 25968 simulator.py:221] Loaded navmesh ../../../../../data/scene_datasets/apartment_0/habitat/mesh_semantic.navmesh


In [5]:
def load_path_points(filename = dataset_name):
    tmp = np.load(f'./data/navigate_path/{filename}/path.npy')
    path_points = np.zeros((tmp.shape[0], 3))
    path_points[:, 0] = tmp[:, 1]
    path_points[:, 1] = -1.3747652
    path_points[:, 2] = tmp[:, 0]
    print('total path points: ', path_points.shape[0])

    return path_points

path_points = load_path_points()

total path points:  3


In [6]:
with open(f"../../../../../data/scene_datasets/apartment_0/habitat/info_semantic.json", "r") as f:
    annotations = json.load(f)

instance_id_to_semantic_label_id = np.array(annotations["id_to_label"])

def navigateAndSee(cnt, action = "", filename = dataset_name, target_id = 29, instance_id_to_semantic_label_id = instance_id_to_semantic_label_id):
    observations = sim.step(action)

    # cv2.imshow("RGB", transform_rgb_bgr(observations["color_sensor"]))
    # cv2.imshow("depth", transform_depth(observations["depth_sensor"]))
    # cv2.imshow("semantic", transform_semantic(observations["semantic_sensor"]))

    rgb_img = transform_rgb_bgr(observations["color_sensor"])                                                                                           
    red_mask = np.copy(rgb_img)

    idx = np.where(observations["semantic_sensor"] == 4)
    
    if idx[0].size != 0:
        for i, j in zip(idx[0], idx[1]):
            red_mask[i][j] = np.array([0, 0, 255])

        rgb_img = cv2.addWeighted(red_mask, 0.3, rgb_img, 0.7, 0, red_mask)

    cv2.imwrite(f'./trajectory/{filename}/rgb/{str(cnt).zfill(5)}.png', rgb_img)

    agent_state = agent.get_state()
    # sensor_state = agent_state.sensor_states['color_sensor']
    # print("camera pose: x y z rw rx ry rz")
    # print(sensor_state.position[0],sensor_state.position[1],sensor_state.position[2], sensor_state.rotation.w, sensor_state.rotation.x, sensor_state.rotation.y, sensor_state.rotation.z)

    return agent_state.position, agent_state.position

# Set agent state
agent_state = habitat_sim.AgentState()
agent_state.position = path_points[0]

tangent_orientation_matrix = mn.Matrix4.look_at(path_points[0], path_points[1], np.array([0.0, 1.0, 0.0]))
tangent_orientation_q = mn.Quaternion.from_matrix(tangent_orientation_matrix.rotation())
agent_state.rotation = utils.quat_from_magnum(tangent_orientation_q)
agent.set_state(agent_state)

In [7]:
def is_valid(start_point, end_point, sim):
    vector = (end_point - start_point)/5

    tmp_points = []
    for i in range(10):
        tmp_points.append(start_point + i*vector)
    tmp_points.append(end_point)

    for i in tmp_points:
        if not sim.pathfinder.is_navigable(i):
            return False

    return True      

cnt = 0
for i in range(1, len(path_points)):
    print(f'reach point {i-1}')

    agent_state = agent.get_state()
    sensor_state = agent_state.sensor_states['color_sensor']

    cur_pos = sensor_state.position
    cur_ori = sensor_state.rotation
    tar_pos = path_points[i]

    if i > 1:
        tar_orientation_matrix = np.array(mn.Matrix4.look_at(cur_pos, tar_pos, np.array([0.0, 1.0, 0.0])).rotation())
        cur_orientation_matrix = quaternion.as_rotation_matrix(cur_ori)
        relative_ori_matrix = np.dot(tar_orientation_matrix[0:3, 0:3], cur_orientation_matrix[0:3, 0:3].T)
        agl_diff = np.arcsin(relative_ori_matrix[0][2])/(np.pi)*180

        if agl_diff > 0:
            steps = math.floor(agl_diff/rotation_per_step)

            for j in range(steps):
                action = "turn_left"
                print(f'step{cnt}: action = {action}')
                cur_pos, cur_ori = navigateAndSee(cnt, action)
                cnt += 1
        else:
            steps = math.floor(abs(agl_diff)/rotation_per_step)

            for j in range(steps):
                action = "turn_right"
                print(f'step{cnt}: action = {action}')
                cur_pos, cur_ori = navigateAndSee(cnt, action)
                cnt += 1
    
    dst_diff = distance.euclidean(cur_pos, tar_pos)
    steps = math.floor(dst_diff/foward_per_step)
    unit_vector = (path_points[i] - path_points[i-1])/dst_diff
    
    for j in range(steps):
        if (i == len(path_points) - 1) and (distance.euclidean(cur_pos, tar_pos) < 1):
            break

        action = "move_forward"
        print(f'step{cnt}: action = {action}')
        cur_pos, cur_ori = navigateAndSee(cnt, action)
        cnt += 1

        '''
        test_point = list(path_points[i-1] + unit_vector*foward_per_step)

        if is_valid(cur_pos, test_point, sim):
        #if sim.pathfinder.is_navigable(test_point):
            action = "move_forward"
            print(f'step{cnt}: action = {action}')
            cur_pos, cur_ori = navigateAndSee(cnt, action)
            cnt += 1
        else:
            # sample_points = Simulator.sample_navigable_point()
            # if(distance.euclidean(sample_points, cur_pos) < foward_per_step):
            
            print('early turn')
            break
        '''
    print()

reach point 0
step0: action = move_forward
step1: action = move_forward
step2: action = move_forward
step3: action = move_forward
step4: action = move_forward
step5: action = move_forward
step6: action = move_forward
step7: action = move_forward
step8: action = move_forward
step9: action = move_forward
step10: action = move_forward
step11: action = move_forward
step12: action = move_forward
step13: action = move_forward
step14: action = move_forward
step15: action = move_forward
step16: action = move_forward
step17: action = move_forward
step18: action = move_forward
step19: action = move_forward
step20: action = move_forward
step21: action = move_forward
step22: action = move_forward
step23: action = move_forward
step24: action = move_forward
step25: action = move_forward
step26: action = move_forward
step27: action = move_forward
step28: action = move_forward
step29: action = move_forward
step30: action = move_forward
step31: action = move_forward
step32: action = move_forward
step33

In [13]:
img_array = []
filename = dataset_name

size = (0, 0)
img_pth = sorted(glob.glob(f'./trajectory/{filename}/rgb/*.png'))

for cnt, i in enumerate(img_pth):
    img = cv2.imread(i)
    height, width, layers = img.shape
    size = (width, height)
    img_array.append(img)

    if cnt == len(img_pth) - 1:
        for j in range(200):
            img_array.append(img)
 
out = cv2.VideoWriter(f'./trajectory/{filename}/project.mp4', cv2.VideoWriter_fourcc(*'MP4V'), 80, size)
 
for i in range(len(img_array)):
    out.write(img_array[i])
out.release()