# Advanced Topics in Embodied Learning and Vision: Habitat Navigation Demo
##### 2025-01-30, Chris Hoang

Tutorial materials derived from https://aihabitat.org/tutorial/2020 and https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

# Setup

1. Follow https://sites.google.com/nyu.edu/nyu-hpc/hpc-systems/greene/software/singularity-with-miniconda to create a Singularity container and conda environment

2. Create Habitat conda environment
```
conda create -n habitat python=3.9 cmake=3.14.0
conda activate habitat
```

3. Install pytorch
```
pip install torch==2.0.0 torchvision==0.15.1 --index-url https://download.pytorch.org/whl/cu118
```

4. Install habitat-sim
```
conda install habitat-sim withbullet -c conda-forge -c aihabitat
```

5. Install habitat-lab and habitat-baselines
```
git clone --branch stable https://github.com/facebookresearch/habitat-lab.git
cd habitat-lab
pip install -e habitat-lab  # install habitat_lab
pip install -e habitat-baselines  # install habitat_baselines
```

6. Download Habitat test scenes
```
!wget -q https://dl.fbaipublicfiles.com/habitat/habitat-test-scenes.zip
!unzip -q habitat-test-scenes.zip
```

7. Transfer Habitat demo files to your scratch
```
export BASE_DIR=<dir containing habitat-lab>
cp /scratch/ch3451/evl/habitat-demo/habitat-demo.ipynb $BASE_DIR
cp /scratch/ch3451/evl/habitat-demo/habitat-demo.yaml $BASE_DIR
cp /scratch/ch3451/evl/habitat-demo/habitat-demo.json.gz $BASE_DIR/data/datasets/pointnav/habitat-test-scenes/v1/train
```

## Imports and helper functions

In [None]:
BASE_DIR='/scratch/ch3451/evl'

In [None]:
import habitat

In [None]:
from collections import deque
from dataclasses import dataclass

import os
import random
import sys

import git
from gym import spaces
import magnum as mn
%matplotlib inline
from matplotlib import pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torchvision.transforms.functional as tvF
import numpy as np
from PIL import Image

%cd $BASE_DIR/"habitat-lab"
# repo = git.Repo(".", search_parent_directories=True)
# dir_path = repo.working_tree_dir
# %cd $dir_path

import habitat
from habitat import Env, get_config
from habitat.config.default_structured_configs import ActionConfig
from habitat.core.logging import logger
from habitat.core.registry import registry
from habitat.sims.habitat_simulator.actions import HabitatSimActions
from habitat.tasks.nav.nav import NavigationTask, SimulatorTaskAction
from habitat_baselines.common.baseline_registry import baseline_registry
from habitat_baselines.config.default import get_config as get_baselines_config
from habitat.config.default_structured_configs import ActionConfig

def display_sample(
    rgb_obs, semantic_obs=np.array([]), depth_obs=np.array([]), rgb_name='rgb'
):  # noqa B006
    from habitat_sim.utils.common import d3_40_colors_rgb

    rgb_img = Image.fromarray(rgb_obs, mode="RGB")

    arr = [rgb_img]
    titles = [rgb_name]
    if semantic_obs.size != 0:
        semantic_img = Image.new(
            "P", (semantic_obs.shape[1], semantic_obs.shape[0])
        )
        semantic_img.putpalette(d3_40_colors_rgb.flatten())
        semantic_img.putdata((semantic_obs.flatten() % 40).astype(np.uint8))
        semantic_img = semantic_img.convert("RGBA")
        arr.append(semantic_img)
        titles.append("semantic")

    if depth_obs.size != 0:
        depth_img = Image.fromarray(
            (depth_obs / 10 * 255).astype(np.uint8), mode="L"
        )
        arr.append(depth_img)
        titles.append("depth")

    plt.figure(figsize=(12, 8))
    for i, data in enumerate(arr):
        ax = plt.subplot(1, 3, i + 1)
        ax.axis("off")
        ax.set_title(titles[i])
        plt.imshow(data)
    plt.show(block=False)

### Walkthrough environment using simulator

In [None]:
%cd $BASE_DIR
!cat './habitat-demo.yaml'

In [None]:
config = habitat.get_config(
    config_path="./habitat-demo.yaml"
)

env = habitat.Env(config=config)
action = None
obs = env.reset()
valid_actions = ["turn_left", "turn_right", "move_forward"]
num_steps = 15
for i in range(num_steps):
    display_sample(obs["rgb"])
    display_sample(obs["imagegoal"], rgb_name='imagegoal')
    metrics = env.get_metrics()
    print(
        "distance to goal: {:.2f}".format(
            metrics["distance_to_goal"]
        )
    )
    action = random.sample(valid_actions, 1)[0]
    print(action)
    obs = env.step(
        {
            "action": action,
        }
    )

env.close()

## DQN

#### Helper classes and functions

In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        transitions = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*transitions)
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

def extract_obs(x, device=torch.device('cpu')):
    x = torch.Tensor(x).float()
    x = x[None, :].permute(0, 3, 1, 2)
    x = x / 255.
    x = tvF.normalize(x, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    return x.to(device)

#### Model architecture

In [None]:
class DQN(nn.Module):
    def __init__(self, num_actions):
        super(DQN, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=8, stride=4),
            nn.GELU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.GELU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=1),
            nn.GELU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=1),
            nn.GELU(),
        )

        self.mlp = nn.Sequential(
            nn.Linear(128, 128),
            nn.GELU(),
            nn.Linear(128, num_actions)
        )

    def forward(self, img):
        z_img = self.encoder(img)
        q_values = self.mlp(z_img.mean(dim=(2, 3)))
        return q_values


#### Custom actions

In [None]:
@dataclass
class StrafeActionConfig(ActionConfig):
    move_amount: float = 0.0  # We will change this in the configuration
    noise_amount: float = 0.0
    angle: float = 0.0


# This is a helper that implements strafing that we will use in our actions
def _strafe_body(
    sim,
    move_amount: float,
    strafe_angle_deg: float,
    noise_amount: float,
):
    # Get the state of the agent
    agent_state = sim.get_agent_state()

    # Convert from np.quaternion to mn.Quaternion
    normalized_quaternion = agent_state.rotation
    agent_mn_quat = mn.Quaternion(
        normalized_quaternion.imag, normalized_quaternion.real
    )

    # Apply noise to strafe angle
    strafe_angle = np.random.uniform(
        (1 - noise_amount) * strafe_angle_deg,
        (1 + noise_amount) * strafe_angle_deg,
    )
    strafe_angle = mn.Deg(strafe_angle)

    # Calculate relative rotation
    rotation = agent_mn_quat * \
        mn.Quaternion.rotation(strafe_angle, mn.Vector3.y_axis())

    # Apply noise to move amount
    move_amount = np.random.uniform(
        (1 - noise_amount) * move_amount, (1 + noise_amount) * move_amount
    )

    # Calculate new position
    forward = rotation.transform_vector(-mn.Vector3.z_axis())
    delta_position = forward * move_amount
    final_position = sim.pathfinder.try_step(  # type: ignore
        agent_state.position, agent_state.position + delta_position
    )

    # Set the new state of the agent
    sim.set_agent_state(
        final_position,
        [*rotation.vector, rotation.scalar],
        reset_sensors=False,
    )

# We define and register our actions as follows.
# the __init__ method receives a sim and config argument.
@habitat.registry.register_task_action
class MoveForward(SimulatorTaskAction):
    def __init__(self, *args, config, sim, **kwargs):
        super().__init__(*args, config=config, sim=sim, **kwargs)
        self._sim = sim
        self._move_amount = config.move_amount
        self._noise_amount = config.noise_amount

    def _get_uuid(self, *args, **kwargs) -> str:
        return "move_forward"

    def step(self, *args, **kwargs):
        # print(
        #     f"Calling {self._get_uuid()} d={self._move_amount}m noise={self._noise_amount}"
        # )
        # This is where the code for the new action goes. Here we use a
        # helper method but you could directly modify the simulation here.
        _strafe_body(self._sim, self._move_amount, 0, self._noise_amount)

@habitat.registry.register_task_action
class MoveLeft(SimulatorTaskAction):
    def __init__(self, *args, config, sim, **kwargs):
        super().__init__(*args, config=config, sim=sim, **kwargs)
        self._sim = sim
        self._move_amount = config.move_amount
        self._angle = config.angle
        self._noise_amount = config.noise_amount

    def _get_uuid(self, *args, **kwargs) -> str:
        return "move_left"

    def step(self, *args, **kwargs):
        # print(
        #     f"Calling {self._get_uuid()} d={self._move_amount}m noise={self._noise_amount} angle={self._angle}"
        # )
        # This is where the code for the new action goes. Here we use a
        # helper method but you could directly modify the simulation here.
        _strafe_body(self._sim, self._move_amount, self._angle, self._noise_amount)


@habitat.registry.register_task_action
class MoveRight(SimulatorTaskAction):
    def __init__(self, *args, config, sim, **kwargs):
        super().__init__(*args, config=config, sim=sim, **kwargs)
        self._sim = sim
        self._move_amount = config.move_amount
        self._angle = config.angle
        self._noise_amount = config.noise_amount

    def _get_uuid(self, *args, **kwargs) -> str:
        return "move_right"

    def step(self, *args, **kwargs):
        # print(
        #     f"Calling {self._get_uuid()} d={self._move_amount}m noise={self._noise_amount} angle={-self._angle}"
        # )
        _strafe_body(self._sim, self._move_amount, -self._angle, self._noise_amount)

#### Customize environment actions

In [None]:
%cd $BASE_DIR
config = habitat.get_config(
    config_path="./habitat-demo.yaml"
)

with habitat.read_write(config):
    config.habitat.task.actions = {}
    config.habitat.task.actions["MOVE_FORWARD"] = StrafeActionConfig(
            type="MoveForward",
            move_amount=0.25,
            noise_amount=0.0,
        )
    config.habitat.task.actions["MOVE_LEFT"] = StrafeActionConfig(
            type="MoveLeft",
            move_amount=0.25,
            noise_amount=0.0,
            angle=90
        )
    config.habitat.task.actions["MOVE_RIGHT"] = StrafeActionConfig(
            type="MoveRight",
            move_amount=0.25,
            noise_amount=0.0,
            angle=90
        )

### Training

#### Setup environment and models

In [None]:
# Hyperparameters
lr = 1e-4
batch_size = 16
gamma = 0.99
eps_start = 0.50
eps_end = 0.01
eps_decay = 100
tau = 0.005
num_episodes = 30
max_steps = 50

env = habitat.gym.make_gym_from_config(config=config)
env.seed(42)

num_actions = env.action_space.n
device = torch.device('cuda')
model = DQN(num_actions).to(device)
target_model = DQN(num_actions).to(device)
target_model.load_state_dict(model.state_dict())
optimizer = optim.Adam(model.parameters(), lr=lr)
replay_buffer = ReplayBuffer(capacity=10000)

#### Main training loop

In [None]:
from IPython.display import clear_output

# Get the goal image
obs = env.reset()
goal_image = extract_obs(obs['imagegoal'], device)

# Training loop
steps_done = 0
update_interval = 1

for episode in range(num_episodes):
    state = env.reset()
    img = extract_obs(state['rgb'])

    episode_reward = 0
    episode_loss = 0
    min_dist_to_goal = 1000

    for t in range(max_steps):
        # Select action
        eps_threshold = eps_end + (eps_start - eps_end) * np.exp(-1. * steps_done / eps_decay)
        steps_done += 1
        if np.random.random() > eps_threshold:
            with torch.no_grad():
                q_values = model(img.to(device))
                if t == 0:
                    print(f'MoveForward q_value: {q_values[0][0]:.4f}')
                    print(f'MoveLeft q_value: {q_values[0][1]:.4f}')
                    print(f'MoveRight q_value: {q_values[0][2]:.3f}')
                action = torch.argmax(q_values, dim=1).to('cpu')
        else:
            action = np.random.choice(num_actions, size=1)
            action = torch.tensor(action) # Convert back to tensor to be consistent with other action types

        # Execute action
        next_state, reward, done, info = env.step(action.item())
        next_img = extract_obs(next_state['rgb'])

        if info['distance_to_goal'] < 0.3:
            reward = 100
            print('success!')
            done = True
        min_dist_to_goal = min(min_dist_to_goal, info['distance_to_goal'])

        # Store transition in replay buffer
        replay_buffer.add(img, action, reward, next_img, done)

        # Sample a batch from the replay buffer
        if len(replay_buffer) > batch_size and (t % update_interval) == 0:
            states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

            # Convert the batch to tensors
            states = torch.cat(states).to(device)
            actions = torch.cat(actions).to(device)
            rewards = torch.tensor(rewards, device=device)
            next_states = torch.cat(next_states).to(device)
            dones = torch.tensor(dones, dtype=torch.bool, device=device)

            # Compute the target Q-values
            with torch.no_grad():
                next_state_values = target_model(next_states).max(1)[0]
                target_q_values = rewards + (gamma * next_state_values * ~dones)

            # Compute the expected Q-values
            q_values = model(states).gather(1, actions.unsqueeze(1)).squeeze(1)

            # Compute the loss
            criterion = nn.SmoothL1Loss()
            loss = criterion(q_values, target_q_values)

            # Optimize the model
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_value_(model.parameters(), 100)
            optimizer.step()

            # Log loss for the current step
            episode_loss += loss.item()
            # if t % log_interval == 0:
            #     print(f"Episode: {episode}, Step: {t}, Loss: {loss.item()}")

            target_model_state_dict = target_model.state_dict()
            model_state_dict = model.state_dict()
            for key in model_state_dict:
                target_model_state_dict[key] = model_state_dict[key]*tau + target_model_state_dict[key] * (1-tau)
            target_model.load_state_dict(target_model_state_dict)

        # Update episode reward
        episode_reward += reward

        img = next_img
        if done:
            print('done!')
            break

    # Calculate and print average reward and loss for the episode
    average_reward = episode_reward / (t+1)
    average_loss = episode_loss / ((t+1) / update_interval)
    print(f"Episode: {episode}, Length: {t+1}, Reward: {average_reward:.3f}, Loss: {average_loss}, Eps: {eps_threshold:.3f}, Min dist to goal: {min_dist_to_goal:.3f}")

env.close()

### Visualization

From https://aihabitat.org/docs/habitat-lab/habitat-lab-tdmap-viz.html

In [None]:
from habitat.utils.visualizations.utils import (
    images_to_video,
    observations_to_image,
    overlay_frame,
)

In [None]:
from IPython.display import clear_output

%cd $BASE_DIR
config = habitat.get_config(
    config_path="./habitat-demo.yaml"
)
with habitat.read_write(config):
    config.habitat.task.actions = {}
    config.habitat.task.actions["MOVE_FORWARD"] = StrafeActionConfig(
            type="MoveForward",
            move_amount=0.25,
            noise_amount=0.0,
        )
    config.habitat.task.actions["MOVE_LEFT"] = StrafeActionConfig(
            type="MoveLeft",
            move_amount=0.25,
            noise_amount=0.0,
            angle=90
        )
    config.habitat.task.actions["MOVE_RIGHT"] = StrafeActionConfig(
            type="MoveRight",
            move_amount=0.25,
            noise_amount=0.0,
            angle=90
        )

# mode = 'random'
mode = 'dqn'
with habitat.Env(config=config) as env:
    # Create video of agent navigating in the first episode
    num_episodes = 1
    for _ in range(num_episodes):
        # Load the first episode
        observations = env.reset()

        # Get metrics
        info = env.get_metrics()
        # Concatenate RGB-D observation and topdowm map into one image
        frame = observations_to_image(observations, info)

        # Remove top_down_map from metrics
        info.pop("top_down_map")
        # Overlay numeric metrics onto frame
        frame = overlay_frame(frame, info)
        # Add fame to vis_frames
        vis_frames = [frame]

        # Repeat the steps above while agent doesn't reach the goal
        done = False
        i = 0
        while not done and not env.episode_over and i < 50:
            i += 1
            # Get the next best action
            if mode == 'random':
                action = random.randint(0, env.action_space.n - 1)
                print(action)
            elif mode == 'dqn':
                action = model(extract_obs(obs['rgb'], device)).argmax(dim=1).item()
            if action is None:
                break

            # Step in the environment
            observations = env.step(action)
            info = env.get_metrics()
            if info['distance_to_goal'] < 0.3:
                done = True
            frame = observations_to_image(observations, info)

            info.pop("top_down_map")
            frame = overlay_frame(frame, info)
            vis_frames.append(frame)

        print(f'Episode length {i}')
        current_episode = env.current_episode
        output_path = os.getcwd()
        video_name = f'vis_{mode}'
        # Create video from images and save to disk
        images_to_video(
            vis_frames, output_path, video_name, fps=1, quality=9
        )
        vis_frames.clear()