# Shortest path follower with ego_map visulization

In [1]:
from vlnce_baselines.common.env_utils import construct_envs
from habitat import Config
from vlnce_baselines.config.default import get_config
from habitat_baselines.common.environments import get_env_class



#Import config and perform some manipulation
exp_config ='vlnce_baselines/config/paper_configs/seq2seq.yaml'
config = get_config(exp_config, None)
split = config.TASK_CONFIG.DATASET.SPLIT
config.defrost()
config.TASK_CONFIG.TASK.NDTW.SPLIT = split
config.TASK_CONFIG.TASK.SDTW.SPLIT = split

# if doing teacher forcing, don't switch the scene until it is complete
if config.DAGGER.P == 1.0:
    config.TASK_CONFIG.ENVIRONMENT.ITERATOR_OPTIONS.MAX_SCENE_REPEAT_STEPS = (
        -1
    )
config.freeze()
envs = construct_envs(config, get_env_class(config.ENV_NAME))
# obs=envs.reset()
# print(obs[0])

2020-05-27 17:16:22,759 Initializing dataset VLN-CE-v1
2020-05-27 17:16:23,626 [construct_envs] Using GPU ID 0


In [3]:
def images_to_grid(imgs, grid_size):
    h, w = grid_size
    img_h, img_w, img_c = imgs[0].shape

    m_x = 0
    m_y = 0

    imgmatrix = np.zeros((img_h * h + m_y * (h - 1),
                          img_w * w + m_x * (w - 1),
                          img_c),
                         np.uint8)

    imgmatrix.fill(255)    

    positions = itertools.product(range(w), range(h))
    for (x_i, y_i), img in zip(positions, imgs):
        x = x_i * (img_w + m_x)
        y = y_i * (img_h + m_y)
        if img.shape[-1] > img_c:
            img = img[..., :img_c]
        imgmatrix[y:y+img_h, x:x+img_w, :] = img

    return imgmatrix


def next_perfect_square(N): 
    sqrt = math.sqrt(N)
    next_ = math.floor(sqrt)
    if next_ < sqrt:
        next_ += 1
    return next_


def get_grid_size(N):
    grid = (2, 3)
    if N < 4:
        grid = (1, 3)
    elif N > 6:
        square = next_perfect_square(N)
        grid = (square, square)
    
    return grid


def concatentate_and_write_video(observations,
                                 directory_to_save_video='./',
                                 video_filename='vid.mp4'):
    output_shape = observations[0][0].shape[:2]
    num_observations = len(observations[0])
    if num_observations == 1:
        output_all = [cv2.resize(obs[0], output_shape) for obs in observations]
    else:
        grid_size = get_grid_size(num_observations)
        observations = [[cv2.resize(o, output_shape) for o in obs] for obs in observations]
        output_all = [images_to_grid(obs, grid_size)
                        for obs in observations]

    images_to_video(output_all,
                    directory_to_save_video,
                    video_filename)

def save_map(observations, info, images):
    im = observations["rgb"]
    top_down_map = draw_top_down_map(
        info, observations["heading"], im.shape[0]
    )
    output_im = np.concatenate((im, top_down_map), axis=1)
    output_im = append_text_to_image(
        output_im, observations["instruction"]["text"]
    )
    images.append(output_im)
    
def draw_top_down_map(info, heading, output_size):
    top_down_map = maps.colorize_topdown_map(
        info["top_down_map"]["map"], info["top_down_map"]["fog_of_war_mask"]
    )
    original_map_size = top_down_map.shape[:2]
    map_scale = np.array(
        (1, original_map_size[1] * 1.0 / original_map_size[0])
    )
    new_map_size = np.round(output_size * map_scale).astype(np.int32)
    # OpenCV expects w, h but map size is in h, w
    top_down_map = cv2.resize(top_down_map, (new_map_size[1], new_map_size[0]))

    map_agent_pos = info["top_down_map"]["agent_map_coord"]
    map_agent_pos = np.round(
        map_agent_pos * new_map_size / original_map_size
    ).astype(np.int32)
    top_down_map = maps.draw_agent(
        top_down_map,
        map_agent_pos,
        heading - np.pi / 2,
        agent_radius_px=top_down_map.shape[0] / 40,
    )
    return top_down_map

In [5]:
import os
from habitat.tasks.nav.shortest_path_follower import ShortestPathFollower
IMAGE_DIR = os.path.join("examples", "images")
follower = ShortestPathFollower(
    env.habitat_env.sim, goal_radius=0.5, return_one_hot=False
)
follower.mode = mode
print("Environment creation successful")

for episode in range(1):
    obs=envs.reset()
    print(obs)
    print("\n")
    episode = current_episodes()
    episode_id = episode[0].episode_id
    print(
        f"Agent stepping around inside environment. Episode id: {episode_id}"
    )

    dirname = os.path.join(
        IMAGE_DIR, "vln_reference_path_example", mode, "%02d" % episode
    )
    if os.path.exists(dirname):
        shutil.rmtree(dirname)
    os.makedirs(dirname)

    images = []
    steps = 0
    reference_path = episode[0].reference_path + [
        episode[0].goals[0].position
    ]
    for point in reference_path:
        done = False
        while not done:
            best_action = follower.get_next_action(point)
            if best_action == None or best_action ==0:
                break
            observations, reward, done, info = envs.step(best_action)
            
            semantic_obs = observation[0]['semantic'] #128*128 - will return instance segmentation
            ego_map= observation[0]['ego_sem_map'] # 40 * 40 ego centric version. 
            
            object2idx = envs.call_at(0,'get_object2idx')
            semantic_obs = object2idx[semantic_obs] 
            ego_map = object2idx[ego_map.astype(np.int)]
            
            ego_img= semantic_to_image(ego_map)
            semantic_img = semantic_to_image(semantic_obs)
            
            save_map(observations[0], info[0], images)
            images.append([ semantic_img,
                                 ego_img])
            steps += 1
    envs.close()
    
    print(f"Navigated to goal in {steps} steps.")
    concatentate_and_write_video(images, directory_to_save_video=os.getcwd(), video_filename="ego_semantic_map")
    images_to_video(images, dirname, str(episode_id))
    images = []

NameError: name 'env' is not defined