# Roche Robotics project

## Initial setup

### Installation of ressources

In [None]:
%pip install seaborn

In [None]:
pip install stable-baselines3[extra]

In [None]:
pip install gymnasium

### Imports

In [None]:
import seaborn as sns
import matplotlib
matplotlib.rcParams["figure.dpi"] = 100
sns.set_style("whitegrid")

In [None]:
import random

from PIL import Image
%matplotlib inline
from matplotlib import pyplot as plt
import numpy as np

import habitat_sim

In [None]:
import gymnasium as gym
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.ppo.policies import MlpPolicy
from gymnasium import spaces
from stable_baselines3.common.vec_env import DummyVecEnv

In [None]:
from typing import List, Dict, Any, Tuple, Optional
import magnum as mn
from habitat_sim.physics import ManagedRigidObject

In [None]:
import time
from magnum import Vector3, Quaternion, Rad
import cv2

### Object container class

This module provides helper classes to load, scale, and place static objects (walls, obstacles, and a target) in a Habitat-Sim scene. It manages object templates efficiently and allows for dynamic scene modification, including navmesh recomputation.

In [None]:
class ObjectTemplateContainer:
    def __init__(self, obj_path: str, scale: List[float]) -> None:
        self.obj_path: str = obj_path
        self.scale: np.ndarray = np.array(scale, dtype=float)
        self.handle = self._propose_handle(
            obj_path=obj_path,
            scale=scale  # use list
        )

    def _propose_handle(self, obj_path: str, scale: List[float]) -> str:
        base, suffix = obj_path.split('.')
        for s in scale:
            base += '_' + self._smart_round(s)
        return base + '.' + suffix

    @staticmethod
    def _smart_round(val: float, n_places: int = 3) -> str:
        rounded = round(val, n_places)
        if rounded == int(val):  # if val is an int
            return str(int(val))  # return val as str
        else:
            return str(rounded).replace('.', '-')  # return rounded number as str

    def __eq__(self, other) -> bool:
        return other.obj_path == self.obj_path \
            and np.allclose(other.scale, self.scale)

    def eq(self, other_obj_path: str, other_scale: List[float]):
        return other_obj_path == self.obj_path \
            and np.allclose(other_scale, self.scale)

class RunTimeObjectManager:
    def __init__(self, sim: habitat_sim.Simulator) -> None:
        self._sim = sim
        self._obj_templates_mgr = self._sim.get_object_template_manager()
        self._rigid_obj_mgr = self._sim.get_rigid_object_manager()

        self._navmesh_settings = habitat_sim.NavMeshSettings()
        self._navmesh_settings.set_defaults()

        self._raw_tpl_ctrs: List[ObjectTemplateContainer] = []
        self._scaled_tpl_ctrs: List[ObjectTemplateContainer] = []

        self._added_target: Optional[ManagedRigidObject] = None
        self._added_walls: List[ManagedRigidObject] = []
        self._added_obstacles: List[ManagedRigidObject] = []

    def add_walls(self, walls_layout_cfg: List[Dict[str, Any]]) -> None:
        for wall_cfg in walls_layout_cfg:
            wall_obj = self._add_static_obj(
                obj_path=wall_cfg['asset'],
                position=wall_cfg['pos'],
                rotation=mn.Quaternion.rotation(
                    mn.Deg(wall_cfg['rot_amount']), wall_cfg['rot_vector']
                ),
                scale=wall_cfg['scale'],
            )

            self._added_walls.append(wall_obj)

    def add_obstacles(self, obstacles_layout_cfg: List[Dict[str, Any]]) -> None:
        for obstacle_cfg in obstacles_layout_cfg:
            obstacle_obj = self._add_static_obj(
                obj_path=obstacle_cfg['asset'],
                position=obstacle_cfg['pos'],
                rotation=mn.Quaternion.rotation(
                    mn.Deg(obstacle_cfg['rot_amount']),
                    obstacle_cfg['rot_vector']
                ),
                scale=obstacle_cfg['scale'],
            )

            self._added_obstacles.append(obstacle_obj)

    def add_target(self, target_path: str, position: List[float]) -> None:

        target_obj = self._add_static_obj(
            obj_path=target_path,
            position=position,
            rotation=mn.Quaternion.rotation(
                mn.Deg(0.0),
                [0.0, 1.0, 0.0]
            ),  # Goal in PointNav has no rotation
            scale=[1.0, 1.0, 1.0],  # Maybe various scales?
            # scale=[10.0, 10.0, 10.0],
        )

        self._added_target = target_obj

    def _add_static_obj(
        self, obj_path: str, scale: List[float],
        position: List[float], rotation: mn.Quaternion.rotation
    ) -> ManagedRigidObject:
        has_raw_tpl, raw_tpl_ctr = self._has_raw_template(
            obj_path=obj_path
        )
        if not has_raw_tpl:  # Load raw tpl
            raw_obj_tpl_id = self._obj_templates_mgr.load_configs(
                path=obj_path
            )[0]
            raw_obj_tpl = self._obj_templates_mgr.get_template_by_id(
                template_id=raw_obj_tpl_id
            )
            raw_scale = raw_obj_tpl.scale
            raw_tpl_ctr = ObjectTemplateContainer(
                obj_path=obj_path,
                scale=raw_scale
            )
            # Raw handle is from mgr
            raw_tpl_ctr.handle = self._obj_templates_mgr.get_template_handle_by_id(
                template_id=raw_obj_tpl_id
            )
            self._raw_tpl_ctrs.append(raw_tpl_ctr)
        else:
            raw_obj_tpl = self._obj_templates_mgr.get_template_by_handle(
                handle=raw_tpl_ctr.handle
            )

        has_scaled_tpl, scaled_tpl_ctr = self._has_scaled_template(
            obj_path=obj_path,
            scale=scale
        )
        if not has_scaled_tpl:
            scaled_tpl_ctr = ObjectTemplateContainer(
                obj_path=obj_path,
                scale=scale
            )

            raw_obj_tpl.scale = scale
            self._obj_templates_mgr.register_template(
                template=raw_obj_tpl,
                specified_handle=scaled_tpl_ctr.handle
            )
            self._scaled_tpl_ctrs.append(scaled_tpl_ctr)

        obj: ManagedRigidObject = \
            self._rigid_obj_mgr.add_object_by_template_handle(
                object_lib_handle=scaled_tpl_ctr.handle
            )

        obj.translation = mn.Vector3(position)
        obj.rotation = rotation
        obj.motion_type = habitat_sim.physics.MotionType.STATIC

        return obj

    def _has_raw_template(
        self, obj_path: str
    ) -> Tuple[bool, Optional[ObjectTemplateContainer]]:
        for tpl_ctr in self._raw_tpl_ctrs:
            if tpl_ctr.obj_path == obj_path:
                return True, tpl_ctr
        return False, None

    def _has_scaled_template(
        self, obj_path: str, scale: List[float]
    ):
        for tpl_ctr in self._scaled_tpl_ctrs:
            if tpl_ctr.eq(obj_path, scale):
                return True, tpl_ctr
        return False, None

    def delete_added_target(self) -> None:
        if self._added_target is None:
            print("[DEBUG] No target to delete")
            return

        
        print(f"[DEBUG] existing target to delete, id={self._added_target.object_id}")
        self._rigid_obj_mgr.remove_object_by_id(
            object_id=self._added_target.object_id
        )
        self._added_target = None

    def delete_added_walls(self) -> None:
        if len(self._added_walls) <= 0:
            return
        while len(self._added_walls) > 0:
            wall_to_rm = self._added_walls.pop()
            self._rigid_obj_mgr.remove_object_by_id(
                object_id=wall_to_rm.object_id
            )

    def delete_added_obstacles(self) -> None:
        if len(self._added_obstacles) <= 0:
            return
        while len(self._added_obstacles) > 0:
            obstacle_to_rm = self._added_obstacles.pop()
            self._rigid_obj_mgr.remove_object_by_id(
                object_id=obstacle_to_rm.object_id
            )

    def recompute_navmesh(self) -> None:
        self._sim.recompute_navmesh(
            pathfinder=self._sim.pathfinder,
            navmesh_settings=self._navmesh_settings,
            include_static_objects=True  # Take int account added objects
        )

    def refresh(self, sim: habitat_sim.Simulator) -> None:
        if sim is not self._sim:  # new sim
            self._sim = sim
            self._obj_templates_mgr = self._sim.get_object_template_manager()

            # Clear
            self._raw_tpl_ctrs: List[ObjectTemplateContainer] = []
            self._scaled_tpl_ctrs: List[ObjectTemplateContainer] = []

        if self._sim.get_rigid_object_manager() is not self._rigid_obj_mgr:  # sim reconfigured
            self._rigid_obj_mgr = self._sim.get_rigid_object_manager()

            # Clear
            self._added_target: Optional[ManagedRigidObject] = None
            self._added_walls: List[ManagedRigidObject] = []
            self._added_obstacles: List[ManagedRigidObject] = []

## Simulator creation

### Sensor and Agent configuration

This section defines the agent's RGB and depth sensors.  
To train or test models using specific modalities (e.g., RGB-only or Depth-only), you can **comment or uncomment** the corresponding sensor specs below.  

Adjust the `sensor_specs` list to control which sensors are active during simulation.

In [None]:
sensor_settings = {
    "height": 256, "width": 256,  # Spatial resolution of observations
    "sensor_height": 0,  # Height of sensors in meters, relative to the agent
}

# Create a RGB sensor configuration
rgb_sensor_spec = habitat_sim.CameraSensorSpec()
rgb_sensor_spec.uuid = "color_sensor"
rgb_sensor_spec.sensor_type = habitat_sim.SensorType.COLOR
rgb_sensor_spec.resolution = [sensor_settings["height"], sensor_settings["width"]]
rgb_sensor_spec.position = [0.0, sensor_settings["sensor_height"], 0.0]
rgb_sensor_spec.sensor_subtype = habitat_sim.SensorSubType.PINHOLE

#Create a depth sensor configuration
depth_sensor_spec = habitat_sim.CameraSensorSpec()
depth_sensor_spec.uuid = "depth_sensor"
depth_sensor_spec.sensor_type = habitat_sim.SensorType.DEPTH
depth_sensor_spec.resolution = [sensor_settings["height"], sensor_settings["width"]]
depth_sensor_spec.position = [0.0, sensor_settings["sensor_height"], 0.0]
depth_sensor_spec.sensor_subtype = habitat_sim.SensorSubType.PINHOLE

sensor_specs = [rgb_sensor_spec, depth_sensor_spec]
                #]
                #, depth_sensor_spec]

In [None]:
agent_settings = {
    "action_space": {
        "move_forward": 0.25, "move_backward": 0.25,  # Distance to cover in a move action in meters
        "turn_left": 30.0, "turn_right": 30,  # Angles to cover in a turn action in degrees
    }
}

# Create an agent configuration
agent_cfg = habitat_sim.agent.AgentConfiguration()
agent_cfg.action_space = {
    k: habitat_sim.agent.ActionSpec(
        k, habitat_sim.agent.ActuationSpec(amount=v)
    ) for k, v in agent_settings["action_space"].items()
}
agent_cfg.sensor_specifications = sensor_specs

### Simulator configuration

This section defines which 3D scene to load in the simulator.  
To switch scenes, **comment or uncomment** the desired `scene_id` path in `sim_settings`.  
You can also add custom scene paths here and select which one is used during simulation by setting it in `"scene_id"`.

In [None]:
sim_settings = {
    "default_agent": 0,  # Index of the default agent
    "scene_id": "data/scene_datasets/gibson/Edgemere.glb",
                # "data/scene_datasets/gibson/Cantwell.glb"
    "enable_physics": False,  # kinematics only
    "seed": 42  # used in the random navigation
}

# Create a simulator backend configuration
sim_cfg = habitat_sim.SimulatorConfiguration()
sim_cfg.scene_id = sim_settings["scene_id"]
sim_cfg.enable_physics = sim_settings["enable_physics"]

In [None]:
# Create a configuration for the simulator
cfg = habitat_sim.Configuration(sim_cfg, [agent_cfg])

In [None]:
try:
    sim.close()
except NameError:
    pass
sim = habitat_sim.Simulator(cfg)

## Wrapping simulator in gym.env

##### Custom Gym Environments for Hide & Seek in Habitat-Sim

This section defines multiple `gym.Env` environments for training agents with Stable-Baselines3 in Habitat-Sim.  
Each environment varies based on:

- **Task Type**: *Seek* (find the target) or *Hide* (avoid the target)
- **Sensor Modalities**: RGB only or RGB + Depth

##### Available Environments:
- `RedBallEnv`: **RGB + Depth**, task = *Seek* the red ball
- `RedBallEnvRGB`: **RGB only**, task = *Seek* the red ball
- `HideFromBallEnv`: **RGB + Depth**, task = *Hide* from the red ball

Use `DummyVecEnv` to wrap these environments for compatibility with SB3 algorithms.

**Note**: You can use different 3D target objects by changing the file path in the environment’s `reset()` method,  
in the `self.sim_rom.add_target(...)` call. Uncomment or modify the desired `.glb` file path depending on the object you want to use.


### Seeking environment

#### RGB + Depth sensors agent

In [None]:
class RgbDepthSeekingEnv(gym.Env):
    def __init__(self, sim: habitat_sim.Simulator, sim_rom: RunTimeObjectManager):
        super().__init__()
        self.sim = sim
        self.sim_rom = sim_rom
        self.agent = sim.get_agent(0)
        self.step_count = 0
        self.max_steps = 500

        # RGB + Depth as 4 channels
        self.observation_space = spaces.Box(
            low=0, high=255, shape=(256, 256, 4), dtype=np.uint8
        )

        # Discrete action space
        self.actions = {
            0: "move_forward",
            1: "turn_left",
            2: "turn_right",
            3: "move_backward",
            4: "done",
        }
        self.action_space = spaces.Discrete(len(self.actions))
        self.spl = -1

        agent_state = habitat_sim.AgentState()
        agent_state.position = self.sim.pathfinder.get_random_navigable_point()
        self.agent.set_state(agent_state)

    def reset(self, *, seed=None, options=None):
        
        self.sim.reset()
        agent_state = habitat_sim.AgentState()
        agent_state.position = self.sim.pathfinder.get_random_navigable_point()
        self.agent.set_state(agent_state)
        #self.agent.set_state(habitat_sim.AgentState())
        self.step_count = 0

        # Delete and add red target
        self.sim_rom.delete_added_target()
        
        # The object will be placed at a random navigable position defined by 'target_position'.
        target_position = self.sim.pathfinder.get_random_navigable_point()

        # Add a target object to the scene.
        # You can switch between different target models by commenting/uncommenting the desired .glb file path.
        self.sim_rom.add_target(
            '/workspace/habitat-sim/data/test_assets/objects/TurtleBot4_Lite_model_with_red_ball.glb',
            #'/workspace/habitat-sim/data/test_assets/objects/TurtleBot4_Lite_model_new.glb',
            #'/workspace/habitat-sim/data/test_assets/objects/red_ball.glb',
            target_position
        )
        self.target_position = target_position  # Keep track for reward
        print("[INFO] Currently in reset(), target_position={}".format(target_position))

        self.start_position = self.agent.get_state().position
        shortest_path_request = habitat_sim.ShortestPath()
        shortest_path_request.requested_start = self.start_position
        shortest_path_request.requested_end = self.target_position
        
        found_path = self.sim.pathfinder.find_path(shortest_path_request)
        
        self.geodesic_distance = shortest_path_request.geodesic_distance



        self.success = False
        self.episode_path_length = 0.0
        self.last_position = self.start_position

        print(f"[INFO]: spl={self.spl}") 

        return self._get_obs(), {}

    def step(self, action):
        
        act_name = self.actions[action]

        agent_pos = self.agent.get_state().position
        dist_to_target = np.linalg.norm(agent_pos - np.array(self.target_position))
        
        done = False

        if self.max_steps < self.step_count:
            done = True
            print("NO SUCCESS")
        else :     
            
            if (act_name == "done"):
                done = (dist_to_target < 0.5)
                if done: 
                    self.success = True
                    print("SUCCESS !!!")
            else:
                self.agent.act(act_name)

        # Track distance moved
        current_position = self.agent.get_state().position
        step_distance = np.linalg.norm(current_position - self.last_position)
        self.episode_path_length += step_distance
        self.last_position = current_position
        
        obs = self._get_obs()
        reward = -0.01 + self.success * 20
        
        self.step_count += 1
        
        info = {
            "success": self.success,
            "geodesic_distance": self.geodesic_distance,
            "episode_path_length": self.episode_path_length,
            "spl": self.success * (self.geodesic_distance / max(self.episode_path_length, self.geodesic_distance + 1e-6)),
        }
        
        self.spl = info["spl"]
            
        return obs, reward, done, False, info

    def _get_obs(self):
        observations = self.sim.get_sensor_observations()
        rgb = observations["color_sensor"]
        depth = observations["depth_sensor"]

        rgb = rgb[..., :3].astype(np.uint8)
        depth = np.clip(depth, 0, 10)
        depth = (depth / 10.0 * 255).astype(np.uint8)
        depth = np.expand_dims(depth, axis=2)

        return np.concatenate([rgb, depth], axis=2)

#### RGB sensor agent

In [None]:
class RgbSeekingEnv(gym.Env):
    def __init__(self, sim: habitat_sim.Simulator, sim_rom: RunTimeObjectManager):
        super().__init__()
        self.sim = sim
        self.sim_rom = sim_rom
        self.agent = sim.get_agent(0)
        self.step_count = 0
        self.max_steps = 500

        # RGB only observation space with 3 channels (Height x Width x Channels)
        self.observation_space = spaces.Box(
            low=0, high=255, shape=(256, 256, 3), dtype=np.uint8
        )

        # Define discrete action space with 5 actions:
        # move_forward, turn_left, turn_right, move_backward, and done (end episode)
        self.actions = {
            0: "move_forward",
            1: "turn_left",
            2: "turn_right",
            3: "move_backward",
            4: "done",
        }
        self.action_space = spaces.Discrete(len(self.actions))
        self.spl = -1  # Initialize Success weighted by Path Length metric

        # Initialize agent's position randomly on a navigable point in the environment
        agent_state = habitat_sim.AgentState()
        agent_state.position = self.sim.pathfinder.get_random_navigable_point()
        self.agent.set_state(agent_state)

    def reset(self, *, seed=None, options=None):
        # Reset the simulator environment and agent state
        self.sim.reset()

        # Randomly place the agent at a new navigable point
        agent_state = habitat_sim.AgentState()
        agent_state.position = self.sim.pathfinder.get_random_navigable_point()
        self.agent.set_state(agent_state)
        self.step_count = 0

        # Remove any previously added target objects from the scene
        self.sim_rom.delete_added_target()

        # Select a new random target position on navigable terrain
        target_position = self.sim.pathfinder.get_random_navigable_point()

        # Add a target object to the scene.
        # You can switch between different target models by commenting/uncommenting the desired .glb file path.        
        self.sim_rom.add_target(
            #'/workspace/habitat-sim/data/test_assets/objects/TurtleBot4_Lite_model_with_red_ball.glb',
            #'/workspace/habitat-sim/data/test_assets/objects/TurtleBot4_Lite_model_new.glb',
            '/workspace/habitat-sim/data/test_assets/objects/red_ball.glb',
            target_position
        )
        self.target_position = target_position  # Store target position for distance calculations

        # Record the agent's starting position for navigation metrics
        self.start_position = self.agent.get_state().position

        # Setup shortest path query from start to target to measure geodesic distance
        shortest_path_request = habitat_sim.ShortestPath()
        shortest_path_request.requested_start = self.start_position
        shortest_path_request.requested_end = self.target_position
        self.sim.pathfinder.find_path(shortest_path_request)
        self.geodesic_distance = shortest_path_request.geodesic_distance

        # Initialize episode success flag and path length tracking
        self.success = False
        self.episode_path_length = 0.0
        self.last_position = self.start_position

        # Return the initial RGB observation and empty info dict
        return self._get_obs(), {}

    def step(self, action):
        # Map discrete action index to action name
        act_name = self.actions[action]

        # Calculate Euclidean distance between agent and target positions
        agent_pos = self.agent.get_state().position
        dist_to_target = np.linalg.norm(agent_pos - np.array(self.target_position))

        done = False

        # Terminate episode if max steps exceeded
        if self.step_count >= self.max_steps:
            print("NO SUCCESS")
            done = True
        elif act_name == "done":
            # Check if agent claims to be done and is close enough to the target
            done = (dist_to_target < 0.5)
            if done:
                self.success = True
                print("SUCCESS !!!")
        else:
            # Perform the agent action in the environment
            self.agent.act(act_name)

        # Track incremental distance traveled by the agent this step
        current_position = self.agent.get_state().position
        step_distance = np.linalg.norm(current_position - self.last_position)
        self.episode_path_length += step_distance
        self.last_position = current_position

        # Get new observation from simulator after action
        obs = self._get_obs()

        # Simple reward: small negative reward each step + big reward on success
        reward = -0.01 + self.success * 20

        self.step_count += 1

        # Info dictionary with success flag, path length, geodesic distance, and SPL metric
        info = {
            "success": self.success,
            "geodesic_distance": self.geodesic_distance,
            "episode_path_length": self.episode_path_length,
            "spl": self.success * (self.geodesic_distance / max(self.episode_path_length, self.geodesic_distance + 1e-6)),
        }
        self.spl = info["spl"]

        # Return observation, reward, done flag, truncated flag (False), and info dict
        return obs, reward, done, False, info

    def _get_obs(self):
        # Fetch sensor observations from the simulator
        observations = self.sim.get_sensor_observations()

        # Extract RGB image and convert to uint8 format (3 channels)
        rgb = observations["color_sensor"]
        rgb = rgb[..., :3].astype(np.uint8)

        return rgb

### Hiding environment

In [None]:
class HidingEnv(gym.Env):
    def __init__(self, sim: habitat_sim.Simulator, sim_rom: RunTimeObjectManager):
        super().__init__()
        self.sim = sim
        self.sim_rom = sim_rom
        self.agent = sim.get_agent(0)
        self.step_count = 0
        self.max_steps = 500

        # Observation space includes RGB + Depth channels (4 channels total)
        self.observation_space = spaces.Box(
            low=0, high=255, shape=(256, 256, 4), dtype=np.uint8
        )

        # Define discrete action space: move_forward, turn_left, turn_right, move_backward
        self.actions = {
            0: "move_forward",
            1: "turn_left",
            2: "turn_right",
            3: "move_backward",
        }
        self.action_space = spaces.Discrete(len(self.actions))

        # Parameters to control seeker object's star-shaped movement pattern
        self.step_in_leg = 0    # Current step count within one leg of star movement
        self.leg_length = 4     # Number of steps per leg segment
        self.legs_done = 0      # Number of completed legs in star pattern
        self.total_legs = 5     # Total legs in star movement (360° / 72° per leg)
        self.done = False       # Flag indicating episode completion
        self.success = True     # Flag for success status (starts True but may become False if seeker finds agent)

        # The seeker object will be added as a target during reset()

    def reset(self, *, seed=None, options=None):
        # Reset environment to initial state for new episode
        self.sim.reset()
        self.step_count = 0
        self.done = False
        self.success = True
        self.step_in_leg = 0
        self.legs_done = 0

        # Randomly position agent (hider) on navigable point in the environment
        agent_state = habitat_sim.AgentState()
        agent_state.position = self.sim.pathfinder.get_random_navigable_point()
        self.agent.set_state(agent_state)

        # Remove any previously added seeker target from the scene
        self.sim_rom.delete_added_target()

        # Add seeker object at fixed position in the environment
        seeker_pos = [-1.2753457 ,  0.18560381,  0.6338574 ] # Fixed seeker start position near the center of Edgemere environment
        # You can switch between different seeker models by commenting/uncommenting the desired .glb file path. 
        self.sim_rom.add_target(
            '/workspace/habitat-sim/data/test_assets/objects/TurtleBot4_Lite_model_with_red_ball.glb',
            #'/workspace/habitat-sim/data/test_assets/objects/TurtleBot4_Lite_model_new.glb',
            #'/workspace/habitat-sim/data/test_assets/objects/red_ball.glb',
            seeker_pos
        )

        # Store reference to seeker object and set its motion type to kinematic (controlled externally)
        self.seeker_obj = self.sim_rom._added_target
        self.seeker_obj.motion_type = habitat_sim.physics.MotionType.KINEMATIC

        # Return initial RGB + Depth observation and empty info dictionary
        return self._get_obs(), {}

    def step(self, action):
        # Raise error if called after episode is done without reset
        if self.done:
            raise RuntimeError("Episode done, reset required")

        # Execute agent (hider) action in the environment
        act_name = self.actions[action]
        self.agent.act(act_name)

        # Move seeker object in a star pattern if not done
        if self.legs_done < self.total_legs:
            if self.step_in_leg < self.leg_length:
                # Calculate forward direction vector relative to seeker orientation
                forward_vec = self.seeker_obj.rotation.transform_vector(Vector3(0, 0, -1))
                new_pos = self.seeker_obj.translation + forward_vec * 0.5  # Move forward by 0.5 units
                
                # Check if the new position is navigable by seeker
                if self.sim.pathfinder.is_navigable(new_pos):
                    self.seeker_obj.translation = new_pos  # Update seeker position
                    self.step_in_leg += 1                  # Increment step within current leg
                else:
                    # If blocked, end current leg early and prepare to turn
                    self.step_in_leg = self.leg_length
            else:
                # Turn seeker by 72 degrees to start new leg of star pattern
                angle_rad = Rad(np.deg2rad(72))
                axis = Vector3(0, 1, 0)  # Y-axis for vertical rotation
                turn_quat = Quaternion.rotation(angle_rad, axis)
                self.seeker_obj.rotation = turn_quat * self.seeker_obj.rotation  # Apply rotation

                # Reset step counter and increment number of legs completed
                self.step_in_leg = 0
                self.legs_done += 1
        else:
            # Star movement finished; mark episode done
            self.done = True

        # Calculate Euclidean distance between seeker and agent positions
        seeker_pos = np.array(self.seeker_obj.translation)
        agent_pos = np.array(self.agent.get_state().position)
        dist = np.linalg.norm(seeker_pos - agent_pos)

        # If seeker is within 0.5 units of agent, seeker "found" hider: big penalty and end episode
        if dist < 0.5:
            print("[INFO] we are close")
            reward = -20
            self.done = True
            self.success = False
        else:
            # Small positive reward for each step survived without being found
            reward = 0.01

        # Increment step count and check max step termination
        self.step_count += 1
        if self.step_count > self.max_steps:
            self.done = True

        # Get latest observation from simulator
        obs = self._get_obs()

        # Info dictionary with success flag and current seeker-agent distance
        info = {"success": self.success, "distance": dist}

        # Return observation, reward, done flag, truncated flag (False), and info dictionary
        return obs, reward, self.done, False, info

    def _get_obs(self):
        # Retrieve raw sensor observations from the simulator
        observations = self.sim.get_sensor_observations()

        # Extract RGB image and convert to uint8 (3 channels)
        rgb = observations["color_sensor"][..., :3].astype(np.uint8)

        # Extract depth sensor data, clip to max 10 meters, scale to 0-255 range, and expand dims for concatenation
        depth = observations["depth_sensor"]
        depth = np.clip(depth, 0, 10)
        depth = (depth / 10.0 * 255).astype(np.uint8)
        depth = np.expand_dims(depth, axis=2)

        # Concatenate RGB and depth channels to produce final 4-channel observation
        return np.concatenate([rgb, depth], axis=2)

## Simulation Visualization and Testing

This section provides tools to visualize RGB and Depth observations from the agent's sensors.  
- `display_obs` shows RGB and Depth images side-by-side.  
- You can manually move the agent and inspect its state.  
- Targets and objects can be added or moved using `sim_rom`.  
- Use this for quick debugging and verifying sensor data before training.

> **Note:** This section is for debugging and testing the environment only.  
> It should not be run before or during training if you only want to train the model
*


In [None]:
# A utility function for displaying observations
def display_obs(rgb_obs: np.ndarray, depth_obs: np.ndarray):
    img_arr, title_arr = [], []
    
    rgb_img = Image.fromarray(rgb_obs, mode="RGBA")
    img_arr.append(rgb_img)
    title_arr.append("rgb")
    
    depth_img = Image.fromarray((depth_obs / 10 * 255).astype(np.uint8), mode="L")
    img_arr.append(depth_img)
    title_arr.append("depth")
    
    plt.figure(figsize=(12, 8))
    for i, (img, title) in enumerate(zip(img_arr, title_arr)):
        ax = plt.subplot(1, 2, i + 1)
        ax.axis("off")
        ax.set_title(title)
        plt.imshow(img)
    plt.show(block=False)

In [None]:
agent = sim.initialize_agent(sim_settings["default_agent"])  # Get our default agent
agent_state = habitat_sim.AgentState()
agent_state.position = np.array([-4.69643, 0.15825, -2.90618])  # Position in world coordinate
agent.set_state(agent_state)

In [None]:
sim_rom = RunTimeObjectManager(sim)
env = RedBallEnv(sim, sim_rom)

In [None]:
#target_position = [-2.3028612 ,  0.18560381, -0.11677533]
#target_position[1] = 0
#sim_rom.add_target('/workspace/habitat-sim/data/test_assets/objects/TurtleBot4_Lite_model_with_red_ball.glb',target_position)
#sim_rom.add_target('/workspace/habitat-sim/data/test_assets/objects/TurtleBot4_Lite_model_new.glb', target_position)
# sim_rom.add_target('/workspace/habitat-sim/data/test_assets/objects/TurtleBot4_Lite_model.glb', [-2.3028612 ,  0  , -4.09697294])

# sim.agents[0].act("turn_left")
#sim.agents[0].act("turn_left")
# sim.agents[0].act("turn_right")
# sim.agents[0].act("turn_right")
# sim.agents[0].act("move_forward")
#sim.agents[0].act("move_forward")
# sim.agents[0].act("move_backward")
# sim.agents[0].act("move_backward")

observations = sim.get_sensor_observations()
rgb_obs = observations["color_sensor"]
depth_obs = observations["depth_sensor"]

display_obs(rgb_obs, depth_obs)

## Train model

### PPO Training Runs Summary

#### Seeking Task

- **ppo_random_ball_pos_v0**  
  Red ball target — RGB + Depth sensors  
  Ball random, agent fixed — Edgemere — `CnnPolicy`, batch size 128  
  **SPL:** 0.837 &nbsp;&nbsp; **SR:** 0.992

- **ppo_random_ball_and_robot_pos_rgb_v01**  
  Red ball target — RGB only  
  Ball and agent random — Edgemere — `CnnPolicy`, batch size 128  
  **SPL:** 0.663 &nbsp;&nbsp; **SR:** 0.992

- **ppo_random_ball_and_robot_pos_v02**  
  Red ball target — RGB + Depth sensors  
  Ball and agent random — Edgemere — `CnnPolicy`, batch size 128  
  **SPL:** 0.844 &nbsp;&nbsp; **SR:** 0.979

- **ppo_robot_seek**  
  Red ball target — RGB + Depth sensors  
  Ball and agent random — Edgemere — `CnnPolicy`, batch size 128  
  **SPL:** 0.168 &nbsp;&nbsp; **SR:** 0.379

- **ppo_robot_and_red_ball_seek**  
  Robot + red ball target — RGB + Depth sensors  
  Ball and agent random — Edgemere — `CnnPolicy`, batch size 128  
  **SPL:** 0.839 &nbsp;&nbsp; **SR:** 0.997

#### Hiding Task

- **ppo_robot_and_red_ball_hide**  
  Seeker = Robot + red ball — RGB + Depth sensors  
  Ball and agent random — Edgemere — `CnnPolicy`, batch size 128  
  **SR:** 0.854


### PPO Training Phase

This section launches the training of a PPO agent using Stable-Baselines3.

- A custom Gym environment (`RgbDepthSeekingEnv`, `RgbSeekingEnv`, or `HidingEnv`) is wrapped in `DummyVecEnv` for compatibility.
- The PPO model uses a `CnnPolicy`, suitable for visual input (RGB and/or Depth).
- The agent is trained for **1,000,000 timesteps** with a batch size of **128**.
- After training, the model is saved to disk.

To switch environments (seeking or hiding), comment/uncomment the corresponding line during instantiation.

In [None]:
# Always run this cell when you need to create or reset the environment (not just during training)
sim_rom = RunTimeObjectManager(sim)

In [None]:
env = RgbDepthSeekingEnv(sim, sim_rom)
#RgbSeekingEnv(sim, sim_rom)
#HidingEnv(sim, sim_rom)
vec_env = DummyVecEnv([lambda: env])
model = PPO('CnnPolicy', vec_env, n_steps = 500, batch_size = 128, verbose=1)
model.learn(total_timesteps=1_000_000)
model.save("ppo_robot_and_red_ball_hide")

### Fine-Tuning Phase

This section continues training (fine-tuning) a previously saved PPO model.

- The model is loaded from a previous ruppo_robot_and_red_ball_hide_new`).
- A new environmeHidingEnvllEnv`) is defined and wrapped in `DummyVecEnv`.
- The model is linked to the new environment via `set_env()`.
- Training resumes5for **800,000 timesteps**.
- The updated model is saved under a newppo_robot_and_red_ball_hide_fine_tunedwell_v03`).

Useful for adapting a pre-trained agent to a new environment or slightly different tsk setup.


In [None]:
model = PPO.load("ppo_robot_and_red_ball_hide")
env = HidingEnv(sim, sim_rom)
new_vec_env = DummyVecEnv([lambda: env])
model.set_env(new_vec_env)
model.learn(total_timesteps=500_000)

In [None]:
model.save("ppo_robot_and_red_ball_hide_fine_tuned")

### Evaluation Phase (Prediction & Metrics)

- Run the trained agent using `model.predict()`.
- Collect metrics separately:
  - **Success Rate (SR)**: For both Hide and Seek tasks.
  - **SPL**: Only for Seek tasks.

> Note: Comment or uncomment SPL-related lines depending on the task (SPL only applies to Seek).

In [None]:
env = HideSeekEnv(sim, sim_rom)
vec_env = DummyVecEnv([lambda: env])

model = PPO.load("ppo_robot_and_red_ball_hide")

obs = vec_env.reset()
nb_episode = 1000

# Use `spls` for Seeking tasks only (SPL is not relevant for Hiding)
# spls = []
srs = []

# Evaluate model across multiple episodes
for episode in range(nb_episode):
    print(f"[TESTING INFO] episode={episode}/{nb_episode}")
    done = False
    while not done:
        action, _states = model.predict(obs)
        obs, rewards, done, info = vec_env.step(action)
    # spls.append(info[0]["spl"]) # Uncomment for Seeking tasks
    srs.append(info[0]["success"])

# Optional: visualize predictions frame by frame
for i in range(1000):
    print(f"[INFO] iteration={i}")
    time.sleep(.1)
    action, _states = model.predict(obs)
    obs, rewards, dones, info = vec_env.step(action)
    print(action)
    plt.imshow(obs.squeeze()[:,:,:3])
    plt.show()

# Print metrics (SPL for Seeking, SR for both)
# print(f"SPL Mean : {np.mean(spls)}")
print(f"SR Mean : {np.mean(srs)}")

### Video Generation over 10 Episodes

- Run the agent for 10 episodes, collecting RGB and Depth observations at each step.
- Store observations as image frames for video creation.
- Save two videos:
  - **RGB video**: Converts RGB frames to BGR for OpenCV.
  - **Depth video**: Converts single-channel depth frames to 3-channel grayscale for saving.

> Update file paths and parameters (fps, resolution) asneeded.


In [None]:
done = False
images_rgb = []
images_depth = []

model = PPO.load("ppo_random_ball_pos_v02")

obs = vec_env.reset()
nb_episode = 10

for _ in range(nb_episode):
    done = False
    while not done:
        action, _states = model.predict(obs)
        obs, rewards, done, info = vec_env.step(action)
        rgb = obs[0, ..., :3]     
        depth = obs[0, ..., 3]    
        images_rgb.append(rgb)
        images_depth.append(depth)
    

In [None]:
# Video parameters
fps = 10
height, width, _ = images_rgb[0].shape

# === RGB Video ===
rgb_writer = cv2.VideoWriter("videos/rgb_video_10_ep_v0.mp4", cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

for rgb in images_rgb:
    # OpenCV expects images in BGR format
    bgr = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
    rgb_writer.write(bgr)

rgb_writer.release()

# === Depth Video ===
depth_writer = cv2.VideoWriter("videos/depth_video_10_ep_v0.mp4", cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

for depth in images_depth:
    # If depth is 2D (H, W), convert to 3-channel grayscale for saving
    depth_rgb = cv2.cvtColor(depth, cv2.COLOR_GRAY2BGR)
    depth_writer.write(depth_rgb)

depth_writer.release()