Skip to content

Commit

Permalink
finish navi and obs details
Browse files Browse the repository at this point in the history
  • Loading branch information
PENG Zhenghao committed Sep 5, 2021
1 parent 2f2ec23 commit cd80447
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 51 deletions.
2 changes: 1 addition & 1 deletion documentation/source/observation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ We use this state vector in almost all existing RL experiments such as the Gener

The state vector consist of three parts:

1. **Ego State**: current states such as the steering, heading, velocity and relative distance to boundaries, implemented in `StateObservation Class <https://github.com/decisionforce/metadrive/blob/main/metadrive/obs/state_obs.py#L9>`_.
1. **Ego State**: current states such as the steering, heading, velocity and relative distance to boundaries, implemented in the :code:`vehicle_state` function of `StateObservation Class <https://github.com/decisionforce/metadrive/blob/main/metadrive/obs/state_obs.py#L9>`_. Please find the detailed meaning of each state dimension in the code.
2. **Navigation**: the navigation information that guides the vehicle toward the destination. Concretely, MetaDrive first computes the route from the spawn point to the destination of the ego vehicle. Then a set of checkpoints are scattered across the whole route with certain intervals. The relative distance and direction to the next checkpoint and the next next checkpoint will be given as the navigation information. This part is implemented in the :code:`_get_info_for_checkpoint` function of `Navigation Class <https://github.com/decisionforce/metadrive/blob/main/metadrive/component/vehicle_module/navigation.py>`_.
3. **Surrounding**: the surrounding information is encoded by a vector containing the Lidar-like cloud points. The data is generated by the `Lidar Class <https://github.com/decisionforce/metadrive/blob/main/metadrive/component/vehicle_module/lidar.py#L16>`_. We typically use 240 lasers (single-agent) and 70 lasers (multi-agent) to scan the neighboring area with radius 50 meters.

Expand Down
86 changes: 44 additions & 42 deletions metadrive/component/vehicle_module/navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
from metadrive.component.lane.straight_lane import StraightLane
from metadrive.component.map.base_map import BaseMap
from metadrive.component.road.road import Road
from metadrive.constants import Mask
from metadrive.constants import RENDER_MODE_ONSCREEN, CamMask
from metadrive.engine.asset_loader import AssetLoader
from metadrive.utils import clip, norm
from metadrive.utils import get_np_random
from metadrive.utils import clip, norm, get_np_random
from metadrive.utils.coordinates_shift import panda_position
from metadrive.utils.scene_utils import ray_localization
from metadrive.utils.space import Parameter, BlockParameterSpace
Expand Down Expand Up @@ -103,7 +101,6 @@ def update(self, map: BaseMap, current_lane_index, final_road_node=None, random_
if start_road_node is None:
start_road_node = FirstPGBlock.NODE_1
if final_road_node is None:
# auto find if PGMap
current_road_negative = Road(*current_lane_index[:-1]).is_negative_road()
# choose first block when born on negative road
block = map.blocks[0] if current_road_negative else map.blocks[-1]
Expand Down Expand Up @@ -166,16 +163,20 @@ def update_localization(self, ego_vehicle):

assert len(self.checkpoints) >= 2

# target_road_1 is the road segment the vehicle is driving on.
target_road_1_start = self.checkpoints[self._target_checkpoints_index[0]]
target_road_1_end = self.checkpoints[self._target_checkpoints_index[0] + 1]
target_lanes_1 = self.map.road_network.graph[target_road_1_start][target_road_1_end]
self.current_ref_lanes = target_lanes_1
self.current_road = Road(target_road_1_start, target_road_1_end)

# target_road_2 is next road segment the vehicle should drive on.
target_road_2_start = self.checkpoints[self._target_checkpoints_index[1]]
target_road_2_end = self.checkpoints[self._target_checkpoints_index[1] + 1]
target_lanes_1 = self.map.road_network.graph[target_road_1_start][target_road_1_end]
target_lanes_2 = self.map.road_network.graph[target_road_2_start][target_road_2_end]
self.current_ref_lanes = target_lanes_1

self.current_road = Road(target_road_1_start, target_road_1_end)
if target_road_1_start == target_road_2_start:
# When we are in the final road segment that there is no further road to drive on
self.next_road = None
self.next_ref_lanes = None
else:
Expand All @@ -185,18 +186,14 @@ def update_localization(self, ego_vehicle):
self._navi_info.fill(0.0)
half = self.navigation_info_dim // 2
self._navi_info[:half], lanes_heading1, checkpoint = self._get_info_for_checkpoint(
lanes_id=0,
lanes=target_lanes_1,
ego_vehicle=ego_vehicle #, print_info=False
lanes_id=0, lanes=target_lanes_1, ego_vehicle=ego_vehicle
)

self._navi_info[half:], lanes_heading2, _ = self._get_info_for_checkpoint(
lanes_id=1,
lanes=target_lanes_2,
ego_vehicle=ego_vehicle #, print_info=True
lanes_id=1, lanes=target_lanes_2, ego_vehicle=ego_vehicle
)

if self._show_navi_info:
if self._show_navi_info: # Whether to visualize little boxes in the scene denoting the checkpoints
pos_of_goal = checkpoint
self._goal_node_path.setPos(pos_of_goal[0], -pos_of_goal[1], 1.8)
self._goal_node_path.setH(self._goal_node_path.getH() + 3)
Expand All @@ -210,20 +207,33 @@ def update_localization(self, ego_vehicle):

return lane, lane_index

def _get_info_for_checkpoint(self, lanes_id, lanes, ego_vehicle, print_info=False):
def _get_info_for_checkpoint(self, lanes_id, lanes, ego_vehicle):

navi_information = []

# Project the checkpoint position into the target vehicle's coordination, where
# +x is the heading and +y is the right hand side.
ref_lane = lanes[0]
later_middle = (float(self.get_current_lane_num()) / 2 - 0.5) * self.get_current_lane_width()
check_point = ref_lane.position(ref_lane.length, later_middle)
dir_vec = check_point - ego_vehicle.position # get the vector from center of vehicle to checkpoint
dir_norm = norm(dir_vec[0], dir_vec[1])
if dir_norm > self.NAVI_POINT_DIST: # if the checkpoint is too far then crop the direction vector
dir_vec = dir_vec / dir_norm * self.NAVI_POINT_DIST
ckpt_in_heading, ckpt_in_rhs = ego_vehicle.projection(dir_vec) # project to ego vehicle's coordination

# Dim 1: the relative position of the checkpoint in the target vehicle's heading direction.
navi_information.append(clip((ckpt_in_heading / self.NAVI_POINT_DIST + 1) / 2, 0.0, 1.0))

# Dim 2: the relative position of the checkpoint in the target vehicle's right hand side direction.
navi_information.append(clip((ckpt_in_rhs / self.NAVI_POINT_DIST + 1) / 2, 0.0, 1.0))

if lanes_id == 0:
# calculate ego v lane heading
lanes_heading = ref_lane.heading_at(ref_lane.local_coordinates(ego_vehicle.position)[0])
else:
lanes_heading = ref_lane.heading_at(min(self.PRE_NOTIFY_DIST, ref_lane.length))
dir_vec = check_point - ego_vehicle.position
dir_norm = norm(dir_vec[0], dir_vec[1])
if dir_norm > self.NAVI_POINT_DIST:
dir_vec = dir_vec / dir_norm * self.NAVI_POINT_DIST
proj_heading, proj_side = ego_vehicle.projection(dir_vec)

# Try to include the current lane's information into the navigation information
bendradius = 0.0
dir = 0.0
angle = 0.0
Expand All @@ -237,27 +247,19 @@ def _get_info_for_checkpoint(self, lanes_id, lanes, ego_vehicle, print_info=Fals
angle = ref_lane.end_phase - ref_lane.start_phase
elif dir == -1:
angle = ref_lane.start_phase - ref_lane.end_phase
ret = []
ret.append(clip((proj_heading / self.NAVI_POINT_DIST + 1) / 2, 0.0, 1.0))
ret.append(clip((proj_side / self.NAVI_POINT_DIST + 1) / 2, 0.0, 1.0))
ret.append(clip(bendradius, 0.0, 1.0))
ret.append(clip((dir + 1) / 2, 0.0, 1.0))
ret.append(clip((np.rad2deg(angle) / BlockParameterSpace.CURVE[Parameter.angle].max + 1) / 2, 0.0, 1.0))

# if print_info:
# print("[2ND] Distance to this navigation: {:.3f}, Angle: {:.3f}. Return value: {}".format(
# norm(proj_heading, proj_side),
# np.rad2deg(np.arctan2(proj_side, proj_heading)),
# ret
# ))
# else:
# print("[1ND] Distance to this navigation: {:.3f}, Angle: {:.3f}. Return value: {}".format(
# norm(proj_heading, proj_side),
# np.rad2deg(np.arctan2(proj_side, proj_heading)),
# ret
# ))

return ret, lanes_heading, check_point

# Dim 3: The bending radius of current lane
navi_information.append(clip(bendradius, 0.0, 1.0))

# Dim 4: The bending direction of current lane (+1 for clockwise, -1 for counterclockwise)
navi_information.append(clip((dir + 1) / 2, 0.0, 1.0))

# Dim 5: The angular difference between the heading in lane ending position and
# the heading in lane starting position
navi_information.append(
clip((np.rad2deg(angle) / BlockParameterSpace.CURVE[Parameter.angle].max + 1) / 2, 0.0, 1.0)
)
return navi_information, lanes_heading, check_point

def _update_target_checkpoints(self, ego_lane_index, ego_lane_longitude):
"""
Expand Down
37 changes: 29 additions & 8 deletions metadrive/obs/state_obs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,45 +62,66 @@ def vehicle_state(self, vehicle):
# update out of road
info = []
if self.config["random_agent_model"]:

# The length of the target vehicle
info.append(clip(vehicle.LENGTH / vehicle.MAX_LENGTH, 0.0, 1.0))

# The width of the target vehicle
info.append(clip(vehicle.WIDTH / vehicle.MAX_WIDTH, 0.0, 1.0))

if hasattr(vehicle, "side_detector") and vehicle.side_detector.available:

# If side detector (a Lidar scanning road borders) is turn on, then add the cloud points of side detector
info += vehicle.side_detector.perceive(vehicle, vehicle.engine.physics_world.static_world).cloud_points

else:

# If the side detector is turn off, then add the distance to left and right road borders as state.
lateral_to_left, lateral_to_right, = vehicle.dist_to_left_side, vehicle.dist_to_right_side
total_width = float((vehicle.navigation.map.MAX_LANE_NUM + 1) * vehicle.navigation.map.MAX_LANE_WIDTH)
lateral_to_left /= total_width
lateral_to_right /= total_width
info += [clip(lateral_to_left, 0.0, 1.0), clip(lateral_to_right, 0.0, 1.0)]

# print("Heading Diff: ", [
# vehicle.heading_diff(current_reference_lane)
# for current_reference_lane in vehicle.navigation.current_ref_lanes
# ])

current_reference_lane = vehicle.navigation.current_ref_lanes[-1]
info += [

# The angular difference between vehicle's heading and the lane heading at this location.
vehicle.heading_diff(current_reference_lane),
# Note: speed can be negative denoting free fall. This happen when emergency brake.

# The velocity of target vehicle
clip((vehicle.speed + 1) / (vehicle.max_speed + 1), 0.0, 1.0),

# Current steering
clip((vehicle.steering / vehicle.MAX_STEERING + 1) / 2, 0.0, 1.0),

# The normalized actions at last steps
clip((vehicle.last_current_action[0][0] + 1) / 2, 0.0, 1.0),
clip((vehicle.last_current_action[0][1] + 1) / 2, 0.0, 1.0)
]

# Current angular acceleration (yaw rate)
heading_dir_last = vehicle.last_heading_dir
heading_dir_now = vehicle.heading
cos_beta = heading_dir_now.dot(heading_dir_last) / (norm(*heading_dir_now) * norm(*heading_dir_last))
beta_diff = np.arccos(clip(cos_beta, 0.0, 1.0))
# print(beta)
yaw_rate = beta_diff / 0.1
# print(yaw_rate)
info.append(clip(yaw_rate, 0.0, 1.0))

if vehicle.lane_line_detector.available:

# If lane line detector (a Lidar scanning current lane borders) is turn on,
# then add the cloud points of lane line detector
info += vehicle.lane_line_detector.perceive(vehicle, vehicle.engine.physics_world.static_world).cloud_points

else:

# If the lane line detector is turn off, then add the offset of current position
# against the central of current lane to the state. If vehicle is centered in the lane, then the offset
# is 0 and vice versa.
_, lateral = vehicle.lane.local_coordinates(vehicle.position)
info.append(clip((lateral * 2 / vehicle.navigation.map.MAX_LANE_WIDTH + 1.0) / 2.0, 0.0, 1.0))

return info

def get_line_detector_dim(self):
Expand Down

0 comments on commit cd80447

Please sign in to comment.