Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic metrics for exploration with usage example #358

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions allenact/embodiedai/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from typing import Dict, Any, Optional
from collections import defaultdict

import numpy as np
from ai2thor.controller import Controller


def make_angle_thresholds(quant_angles: int = 4):
angle_spacing = 360 / quant_angles
angle_shift = angle_spacing / 2
angle_thres = [angle_shift + angle_spacing * i for i in range(quant_angles)]
return np.array(
[angle_thres[-1] - 360.0] + angle_thres
) # monotonic bin thresholds for numpy digitize


def quantize_loc(
current_agent_state: Dict[str, Any],
initial_agent_state: Dict[str, Any],
quant_grid_size: float = 0.5,
quant_grid_axes: str = "xz",
quant_angles: int = 4,
angle_thresholds: Optional[np.ndarray] = None,
):
if quant_angles > 1:
if angle_thresholds is None:
angle_thresholds = make_angle_thresholds(quant_angles)

angle_dif = (
current_agent_state["rotation"]["y"] - initial_agent_state["rotation"]["y"]
) % 360

quant_angle = (np.digitize(angle_dif, angle_thresholds) - 1) % quant_angles
else:
quant_angle = 0

current_location = (
current_agent_state["position"]
if "position" in current_agent_state
else current_agent_state
)
initial_location = (
initial_agent_state["position"]
if "position" in initial_agent_state
else initial_agent_state
)

return tuple(
int(round((current_location[x] - initial_location[x]) / quant_grid_size))
for x in quant_grid_axes
) + (quant_angle,)


def visited_cells_metric(
trajectory,
return_state_visits=False,
quant_grid_size: float = 0.5,
quant_grid_axes: str = "xz",
quant_angles: int = 4,
angle_thresholds: Optional[np.ndarray] = None,
):
visited_cells = defaultdict(list)

if quant_angles > 1 and angle_thresholds is None:
angle_thresholds = make_angle_thresholds(quant_angles)

for t, state in enumerate(trajectory):
visited_cells[
quantize_loc(
current_agent_state=state,
initial_agent_state=trajectory[0],
quant_grid_size=quant_grid_size,
quant_grid_axes=quant_grid_axes,
quant_angles=quant_angles,
angle_thresholds=angle_thresholds,
)
].append(t)

if return_state_visits:
return len(visited_cells), visited_cells
else:
return len(visited_cells)


def num_reachable_positions_cells(
controller: Controller,
quant_grid_size: float = 0.5,
quant_grid_axes: str = "xz",
quant_angles: int = 4,
):
"""
Assumes the agent is at the episode's initial state.
Note that there's a chance more states are visitable than here counted.
For quant_angles, we just assume we can always get to each reachable position
in each of the quant_angles relative angles to the initial orientation.
"""

initial_agent_state = controller.last_event.metadata["agent"]

controller.step("GetReachablePositions")
assert controller.last_event.metadata["lastActionSuccess"]
reachable_positions = controller.last_event.metadata["actionReturn"]

return (
visited_cells_metric(
[initial_agent_state] + reachable_positions,
return_state_visits=False,
quant_grid_size=quant_grid_size,
quant_grid_axes=quant_grid_axes,
quant_angles=1,
)
* quant_angles
)
29 changes: 28 additions & 1 deletion allenact_plugins/robothor_plugin/robothor_task_samplers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,11 +1084,38 @@ def next_task(

pose1 = self.env.agent_state(0)
pose2 = self.env.agent_state(1)

def retry_dist(position: Dict[str, float], object_type: Dict[str, float]):
allowed_error = 0.05
debug_log = ""
d = -1.0
while allowed_error < 2.5:
d = self.env.distance_from_point_to_point(
position, object_type, allowed_error
)
if d < 0:
debug_log = (
f"In scene {self.env.scene_name}, could not find a path from {position} to {object_type} with"
f" {allowed_error} error tolerance. Increasing this tolerance to"
f" {2 * allowed_error} any trying again."
)
allowed_error *= 2
else:
break
if d < 0:
get_logger().debug(
f"In scene {self.env.scene_name}, could not find a path from {position} to {object_type}"
f" with {allowed_error} error tolerance. Returning a distance of -1."
)
elif debug_log != "":
get_logger().debug(debug_log)
return d

dist = self.env.distance_cache.find_distance(
self.env.scene_name,
{k: pose1[k] for k in ["x", "y", "z"]},
{k: pose2[k] for k in ["x", "y", "z"]},
self.env.distance_from_point_to_point,
retry_dist,
)

too_close_to_target = (
Expand Down
38 changes: 37 additions & 1 deletion allenact_plugins/robothor_plugin/robothor_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from allenact.base_abstractions.task import Task
from allenact.utils.system import get_logger
from allenact.utils.tensor_utils import tile_images
from allenact.embodiedai.metrics import visited_cells_metric, num_reachable_positions_cells
from allenact_plugins.ithor_plugin.ithor_environment import IThorEnvironment
from allenact_plugins.robothor_plugin.robothor_constants import (
MOVE_AHEAD,
Expand Down Expand Up @@ -478,6 +479,8 @@ def __init__(
self.task_info["followed_path1"] = [pose1]
self.task_info["followed_path2"] = [pose2]
self.task_info["action_names"] = self.class_action_names()
self.task_info["num_reachable_positions"] = num_reachable_positions_cells(self.env.controller)
assert self.task_info["num_reachable_positions"] > 0

@property
def action_space(self):
Expand Down Expand Up @@ -515,11 +518,37 @@ def _step(self, action: Tuple[int, int]) -> RLStepResult:
pose2 = self.env.agent_state(1)
self.task_info["followed_path2"].append(pose2)

def retry_dist(position: Dict[str, float], object_type: Dict[str, float]):
allowed_error = 0.05
debug_log = ""
d = -1.0
while allowed_error < 2.5:
d = self.env.distance_from_point_to_point(
position, object_type, allowed_error
)
if d < 0:
debug_log = (
f"In scene {self.env.scene_name}, could not find a path from {position} to {object_type} with"
f" {allowed_error} error tolerance. Increasing this tolerance to"
f" {2 * allowed_error} any trying again."
)
allowed_error *= 2
else:
break
if d < 0:
get_logger().debug(
f"In scene {self.env.scene_name}, could not find a path from {position} to {object_type}"
f" with {allowed_error} error tolerance. Returning a distance of -1."
)
elif debug_log != "":
get_logger().debug(debug_log)
return d

self.last_geodesic_distance = self.env.distance_cache.find_distance(
self.env.scene_name,
{k: pose1[k] for k in ["x", "y", "z"]},
{k: pose2[k] for k in ["x", "y", "z"]},
self.env.distance_from_point_to_point,
retry_dist,
)

step_result = RLStepResult(
Expand Down Expand Up @@ -556,7 +585,14 @@ def metrics(self) -> Dict[str, Any]:
if not self.is_done():
return {}

num_cells_visited1 = visited_cells_metric(self.task_info["followed_path1"])
num_cells_visited2 = visited_cells_metric(self.task_info["followed_path2"])

return {
**super().metrics(),
"success": self.reached_terminal_state(),
"num_cells_visited1": num_cells_visited1,
"num_cells_visited2": num_cells_visited2,
"ratio_cells_visited1": num_cells_visited1 / self.task_info["num_reachable_positions"],
"ratio_cells_visited2": num_cells_visited2 / self.task_info["num_reachable_positions"],
}