# 配置

In [None]:
settings = {
    "width": 256,
    "height": 256,
    'rotation_step': 30, # 旋转的粒度
    'scale': 0.05, # 每个像素的大小，单位：米
    'initial_grid': 1.0, # 枚举扫描位置时使用的初始间隔
    'ignore': 0.20,
    'threshold': 0.3,

    # 数据集位置
    "dataset": "../../../data/hm3d_minival/",
    "dataset_config": "../../../data/hm3d_minival/hm3d_annotated_minival_basis.scene_dataset_config.json", # Configuration of the dataset


    # 采样结果的储存位置
    "groundtruth_path": "../../../data/topdown/minival/",

    "sensor_height": 1.5,  # 传感器的高度，单位：米
    "enable_physics": False, 
}

# 运行全部

In [None]:
import torch
import sys
sys.path.append('../../..')
from models.common import device
from models.perception.mapping.mapping import local_semantic_map

## 可视化

In [None]:
%matplotlib inline
from matplotlib import pyplot as plt
import numpy as np
import os
from tqdm.auto import tqdm

# function to display the topdown map
from PIL import Image, ImageOps

def display_sample(rgb_obs, semantic_obs=np.array([]), depth_obs=np.array([])):
    from habitat_sim.utils.common import d3_40_colors_rgb

    rgb_img = Image.fromarray(rgb_obs, mode="RGBA")

    arr = [rgb_img]
    titles = ["rgb"]
    if semantic_obs.size != 0:
        semantic_img = Image.new("P", (semantic_obs.shape[1], semantic_obs.shape[0]))
        semantic_img.putpalette(d3_40_colors_rgb.flatten())
        semantic_img.putdata((semantic_obs.flatten() % 40).astype(np.uint8))
        semantic_img = semantic_img.convert("RGBA")
        arr.append(semantic_img)
        titles.append("semantic")

    if depth_obs.size != 0:
        depth_img = Image.fromarray((depth_obs / 10 * 255).astype(np.uint8), mode="L")
        arr.append(depth_img)
        titles.append("depth")

    plt.figure(figsize=(12, 8))
    for i, data in enumerate(arr):
        ax = plt.subplot(1, 3, i + 1)
        ax.axis("off")
        ax.set_title(titles[i])
        plt.imshow(data)
    plt.show(block=False)

def display_map(topdown_map, key_points=None):
    # plt.figure(figsize=(12, 8))
    plt.imshow(topdown_map)
    # plot points on map
    if key_points is not None:
        for point in key_points:
            plt.plot(point[0], point[1], marker="o", markersize=10, alpha=0.8)
    plt.show(block=False)

## Habitat

In [None]:
import habitat_sim

In [None]:
def make_cfg(scene_dir, scene_name):
    sim_cfg = habitat_sim.SimulatorConfiguration()
    sim_cfg.gpu_device_id = 0
    sim_cfg.scene_id =  os.path.join(settings['dataset'], scene_dir, scene_name + '.basis.glb')
    sim_cfg.enable_physics = settings['enable_physics']
    sim_cfg.scene_dataset_config_file = settings['dataset_config']


    color_sensor_spec = habitat_sim.CameraSensorSpec()
    color_sensor_spec.uuid = 'color_sensor'
    color_sensor_spec.sensor_type = habitat_sim.SensorType.COLOR
    color_sensor_spec.resolution = [settings['height'], settings['width']]
    color_sensor_spec.position = [0.0, settings['sensor_height'], 0.0]
    color_sensor_spec.sensor_subtype = habitat_sim.SensorSubType.PINHOLE

    depth_sensor_spec = habitat_sim.CameraSensorSpec()
    depth_sensor_spec.uuid = 'depth_sensor'
    depth_sensor_spec.sensor_type = habitat_sim.SensorType.DEPTH
    depth_sensor_spec.position = [0.0, settings['sensor_height'], 0.0]
    depth_sensor_spec.resolution = [settings['height'], settings['width']]
    depth_sensor_spec.sensor_subtype = habitat_sim.SensorSubType.PINHOLE

    semantic_sensor_spec = habitat_sim.CameraSensorSpec()
    semantic_sensor_spec.uuid = 'semantic_sensor'
    semantic_sensor_spec.sensor_type = habitat_sim.SensorType.SEMANTIC
    semantic_sensor_spec.resolution = [settings["height"], settings["width"]]
    semantic_sensor_spec.position = [0.0, settings["sensor_height"], 0.0]
    semantic_sensor_spec.sensor_subtype = habitat_sim.SensorSubType.PINHOLE

    sensor_specs = [color_sensor_spec, depth_sensor_spec, semantic_sensor_spec]

    agent_cfg = habitat_sim.agent.AgentConfiguration()
    agent_cfg.sensor_specifications = sensor_specs
    agent_cfg.action_space = {
        "move_forward": habitat_sim.agent.ActionSpec(
            "move_forward", habitat_sim.agent.ActuationSpec(amount=0.25)
        ),
        "turn_left": habitat_sim.agent.ActionSpec(
            "turn_left", habitat_sim.agent.ActuationSpec(amount=settings['rotation_step'])
        ),
        "turn_right": habitat_sim.agent.ActionSpec(
            "turn_right", habitat_sim.agent.ActuationSpec(amount=settings['rotation_step'])
        ),
    }

    return habitat_sim.Configuration(sim_cfg, [agent_cfg])

In [None]:
from models.perception.utils import (
    quaternion2radian, 
    depth_map_to_point_cloud, 
    rotate_point_cloud, 
    fall_on_map,
    ClassReducer,
    display_semantic_map
)

def look_around(relative_position: tuple, sim: habitat_sim.Simulator, global_map: torch.Tensor, reducer: ClassReducer, tmp:torch.Tensor=None):
    instance_map = []
    depth_map = []
    degrees = []

    for t in range(int(360 / settings['rotation_step'])):
        obs = sim.step('turn_right')
        instance_map.append(torch.tensor(obs['semantic_sensor'] * 1.0).to(device))
        depth_map.append(torch.tensor(obs['depth_sensor'] * 1.0).to(device))
        degrees.append(quaternion2radian(sim.get_agent(0).get_state().rotation))


    depth_map = torch.stack(depth_map)
    instance_map = torch.stack(instance_map)
    semantic_map = reducer.instance_to_category(instance_map)
    assert (semantic_map != reducer.recover(reducer.reduce(semantic_map))).sum() == 0
    semantic_map = reducer.reduce(semantic_map)
    degrees = torch.stack(degrees).to(device)

    fall_on_map(
        rotate_point_cloud(
            depth_map_to_point_cloud(
                depth_map,
                torch.tensor(-1.0).acos() / 2
            ),
            degrees
        ),
        semantic_map,
        global_map,
        relative_position,
        settings['scale'],
        tmp=tmp
    )



In [None]:
from torch.nn.functional import max_pool2d, avg_pool2d

def explore(sim, reducer: ClassReducer, global_semantic_map:torch.Tensor=None, tmp:torch.Tensor=None):
    with torch.no_grad():
        height = sim.pathfinder.get_bounds()[0][1]
        bounds = sim.pathfinder.get_bounds()[0]
        bounds = torch.tensor([bounds[2],bounds[0]]).to(device).flatten()

        nav_mesh = torch.tensor(sim.pathfinder.get_topdown_view(settings['scale'], height)).to(device)
        for t in range(10):
            nav_mesh = nav_mesh | torch.tensor(sim.pathfinder.get_topdown_view(settings['scale'], height + t * 0.1)).to(device)
        for t in range(10):
            nav_mesh = nav_mesh | torch.tensor(sim.pathfinder.get_topdown_view(settings['scale'], height - t * 0.1)).to(device)

        if global_semantic_map is None:
            global_semantic_map = torch.zeros([reducer.get_reduced_class_number()] + list(nav_mesh.shape)).to(device)
        if tmp is None:
            tmp = torch.zeros(global_semantic_map.shape).to(device)

        grid = settings['initial_grid']
        count = 0
        while True:
            grid_px = int(grid / settings['scale'])
            if grid_px <= 1 or grid_px <= settings['ignore'] / settings['scale']:
                break

            ys, xs = torch.arange(0, nav_mesh.shape[0]).to(device), torch.arange(0, nav_mesh.shape[1]).to(device)
            ys, xs = ys.unsqueeze(dim=1).expand(-1, nav_mesh.shape[1]), xs.unsqueeze(dim=0).expand(nav_mesh.shape[0], -1)
            coords, ys, xs = torch.stack([ys, xs], dim=0).type(torch.float) + 0.5, None, None
            # coords 目前是像素坐标
            coords = coords * nav_mesh.unsqueeze(dim=0)
            navigatable = (max_pool2d(nav_mesh.unsqueeze(dim=0).type(torch.float), kernel_size=grid_px) > 0).reshape(-1)
            coords = avg_pool2d(coords, kernel_size=grid_px).reshape(2, -1)[:, navigatable] / avg_pool2d(nav_mesh.unsqueeze(dim=0).type(torch.float), kernel_size=grid_px).reshape(-1)[navigatable]
            # coords 目前包含所有可导航的粗像素的可导航中心

            coarse_semantic_map = (avg_pool2d(global_semantic_map.max(dim=0)[0].unsqueeze(dim=0).clamp(max=1), kernel_size=grid_px) > settings['threshold']).squeeze(dim=0)
            coords = coords[:, ~coarse_semantic_map.reshape(-1)[navigatable]]
            # coords 目前包含所有可导航且不满足要求的粗像素的可导航中心


            coords = coords * settings['scale']
            coords = coords.transpose(0, 1)
            abs_coords = coords + bounds

            if len(coords) == 0 or count >= 10:
                print(f"grid={grid}m done")
                grid /= 2
                continue
            count += 1

            for i in tqdm(range(len(coords)), f'grid={grid:.3f}m'):
                pos = sim.pathfinder.get_random_navigable_point_near(
                    [abs_coords[i][1].item(), height, abs_coords[i][0].item()], 
                    radius=settings['scale'] * grid_px
                )
                if not sim.pathfinder.is_navigable(pos):
                    continue

                state = habitat_sim.AgentState(
                    position=pos,
                    rotation=sim.get_agent(0).get_state().rotation
                )
                sim.get_agent(0).set_state(state=state, reset_sensors=False)
                sim.step('turn_left')

                pos = sim.get_agent(0).get_state().position
                pos = torch.tensor([pos[2], pos[0]]).to(device)
                relative_pos = pos - bounds
                look_around(
                    relative_pos,
                    sim,
                    global_semantic_map,
                    reducer,
                    tmp=tmp
                )
        return global_semantic_map, nav_mesh


In [None]:
def save_scene(scene_name: str, global_semantic_map: torch.Tensor, reducer: ClassReducer):
    path = os.path.join(settings['groundtruth_path'], scene_name)
    torch.save(global_semantic_map, path + '.gsm')
    reducer.save(path + '.reducer')

def explore_scene(scene_dir:str, scene_name:str=None):
    if scene_name is None:
        scene_name = scene_dir[scene_dir.find('-') + 1:]
    cfg = make_cfg(scene_dir, scene_name)
    sim = habitat_sim.Simulator(cfg)

    reducer = ClassReducer.from_sim(sim) 
    global_semantic_map, nav_mesh = explore(sim, reducer)

    display_semantic_map(global_semantic_map)
    display_map(nav_mesh.cpu().numpy())
    save_scene(scene_name, global_semantic_map, reducer)

    sim.close()

In [None]:
import tarfile
import utils
def make_samples():
    # 无论 main.tar 是否解压，semantic.tar 都会给出有语义标记的目录
    dirs = []
    for dir in os.listdir(settings['dataset']):
        if not os.path.isdir(os.path.join(settings['dataset'], dir)):
            continue

        contents = os.listdir(os.path.join(settings['dataset'], dir))
        found_semantic = False
        for c in contents:
            if 'semantic' in c:
                found_semantic = True
        if not found_semantic:
            continue
        dirs.append(dir)
    for dir in tqdm(dirs):
        print(dir)
        with utils.fulfill(settings['dataset'], dir):
            explore_scene(dir)

In [None]:
import os
os.environ["MAGNUM_LOG"] = "quiet"
os.environ["HABITAT_SIM_LOG"] = "quiet"
make_samples()