Skip to content

Commit

Permalink
update myopic planner so to sync the timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
YaolinGe committed Sep 11, 2023
1 parent 1d5f99e commit 90676ba
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 125 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
*RRT_Random_Locations.npy
**RRT_Random_Locations.npy

*/npy
**/npy
*npy
**npy

_site
.sass-cache
Expand All @@ -24,3 +28,7 @@ vendor
**.npz
*.npy
**.npy

*.joblib
**.joblib

6 changes: 3 additions & 3 deletions Publication/src/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ def __init__(self) -> None:
self.__budget_mode = False

""" Default simulation parameter seteup. """
self.__num_steps = 120 # number of steps.
self.__num_replicates = 100 # number of replicates
self.__num_cores = 10 # number of cores to use
self.__num_steps = 15 # number of steps.
self.__num_replicates = 3 # number of replicates
self.__num_cores = 3 # number of cores to use

@staticmethod
def wgs2xy(value: np.ndarray) -> np.ndarray:
Expand Down
105 changes: 69 additions & 36 deletions Publication/src/Planner/Myopic2D/Myopic2D.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
- Next waypoint: contains the next waypoint, and the AUV should go to next waypoint once it arrives at the current one.
"""
from CostValley.CostValley import CostValley
from Field import Field
from Config import Config
from usr_func.is_list_empty import is_list_empty
from shapely.geometry import Point, LineString
import numpy as np
import os

Expand All @@ -28,10 +28,18 @@ def __init__(self, weight_eibv: float = 1., weight_ivr: float = 1.) -> None:
# s0: set up default environment
self.__cost_valley = CostValley(weight_eibv=weight_eibv, weight_ivr=weight_ivr)
self.__grf = self.__cost_valley.get_grf_model()
wp_distance = self.__config.get_waypoint_distance()
self.__field = Field(neighbour_distance=wp_distance)

# s1: set up trackers
self.__waypoint_distance = self.__config.get_waypoint_distance()
self.__candidates_angle = np.linspace(0, 2 * np.pi, 7)

# s1: add polygon border and polygon obstacles.
self.__polygon_border = self.__config.get_polygon_border()
self.__polygon_border_shapely = self.__config.get_polygon_border_shapely()
self.__line_border_shapely = self.__config.get_line_border_shapely()
self.__polygon_obstacle = self.__config.get_polygon_obstacle()
self.__polygon_obstacle_shapely = self.__config.get_polygon_obstacle_shapely()
self.__line_obstacle_shapely = self.__config.get_line_obstacle_shapely()

# s2: set up trackers
self.__wp_curr = self.__config.get_loc_start()
self.__wp_prev = self.__wp_curr
self.__wp_next = self.__wp_curr
Expand All @@ -56,31 +64,36 @@ def update_next_waypoint(self, ctd_data: np.ndarray = None) -> np.ndarray:
self.__cost_valley.update_cost_valley(self.__wp_curr)

# s1: find candidate locations
id_smooth, id_neighbours = self.get_candidates_indices()
# id_smooth, id_neighbours = self.get_candidates_indices()
wp_smooth, wp_neighbours = self.get_candidates_waypoints()

if not is_list_empty(id_smooth):
if not is_list_empty(wp_smooth):
# get cost associated with those valid candidate locations.
costs = []
self.__loc_cand = self.__field.get_location_from_ind(id_smooth)
self.__loc_cand = wp_smooth
for loc in self.__loc_cand:
costs.append(self.__cost_valley.get_cost_at_location(loc))
id_next = id_smooth[np.argmin(costs)]
wp_next = wp_smooth[np.argmin(costs)]
else:
rng_ind = np.random.randint(0, len(id_neighbours))
id_next = id_neighbours[rng_ind]
angles = np.linspace(0, 2 * np.pi, 61)
for angle in angles:
wp_next = self.__wp_curr + self.__waypoint_distance * np.array([np.sin(angle), np.cos(angle)])
if self.is_location_legal(wp_next) and self.is_path_legal(self.__wp_curr, wp_next):
break

self.__wp_next = self.__field.get_location_from_ind(id_next)
self.__wp_next = wp_next
self.__wp_prev = self.__wp_curr
self.__wp_curr = self.__wp_next
self.__trajectory.append([self.__wp_curr[0], self.__wp_curr[1]])
return self.__wp_next

def get_candidates_indices(self) -> tuple:
def get_candidates_waypoints(self) -> tuple:
"""
Filter sharp turn, bottom up and dive down behaviours.
It computes the dot product between two vectors. One is from the previous waypoint to the current waypoint.
The other is from the current waypoint to the potential next waypoint.
Example:
>>> wp_prev = np.array([x1, y1])
>>> wp_curr = np.array([x2, y2])
Expand All @@ -92,39 +105,59 @@ def get_candidates_indices(self) -> tuple:
>>> append(wp)
Returns:
id_smooth: filtered candidate indices in the waypointgraph.
id_neighbours: all the neighbours at the current location.
wp_smooth: filtered candidate waypoints.
wp_neighbours: all the neighbours at the current location.
"""
# s1: get vec from previous wp to current wp.
vec1 = self.get_vector_between_two_waypoints(self.__wp_prev, self.__wp_curr)

# s2: get all neighbours.
id_neighbours = self.__field.get_neighbour_indices(self.__field.get_ind_from_location(self.__wp_curr))

# s3: smooth neighbour locations.
id_smooth = []
if self.__directional_penalty:
for iid in id_neighbours:
wp_n = self.__field.get_location_from_ind(iid)
vec2 = self.get_vector_between_two_waypoints(self.__wp_curr, wp_n)
if vec1.T @ vec2 >= 0:
id_smooth.append(iid)
else:
[id_smooth.append(iid) for iid in id_neighbours]
return id_smooth, id_neighbours
# s2: get all neighbour waypoints
wp_neighbours = []
wp_smooth = []
for angle in self.__candidates_angle:
wp_temp = self.__wp_curr + self.__waypoint_distance * np.array([np.sin(angle), np.cos(angle)])
# s3: filter out illegal locations
if self.is_location_legal(wp_temp) and self.is_path_legal(self.__wp_curr, wp_temp):
wp_neighbours.append(wp_temp)
vec2 = self.get_vector_between_two_waypoints(self.__wp_curr, wp_temp)
if self.__directional_penalty:
if vec1.T @ vec2 >= 0:
wp_smooth.append(wp_temp)
else:
wp_smooth.append(wp_temp)
return wp_smooth, wp_neighbours

def is_location_legal(self, loc: np.ndarray) -> bool:
"""
Check if the location is legal.
Args:
loc: np.array([x, y])
def get_field(self) -> 'Field':
Returns:
True if legal, False if illegal.
"""
Return the field supporting the myopic2d path planner.
point = Point(loc[0], loc[1])
if self.__polygon_border_shapely.contains(point) and \
not self.__polygon_obstacle_shapely.contains(point):
return True
else:
return False

Example:
>>> field = myopic2d.get_field()
>>> field.get_neighbour_indices(0)
def is_path_legal(self, loc_start: np.ndarray, loc_end: np.ndarray) -> bool:
"""
Check if the path is legal.
Args:
loc_start: np.array([x1, y1])
loc_end: np.array([x2, y2])
Returns:
Field: field object supporting the myopic2d path planner.
True if legal, False if illegal.
"""
return self.__field
line = LineString([loc_start, loc_end])
if self.__line_border_shapely.intersects(line) or self.__line_obstacle_shapely.intersects(line):
return False
else:
return True

def get_previous_waypoint(self) -> np.ndarray:
""" Previous waypoint. """
Expand Down
Loading

0 comments on commit 90676ba

Please sign in to comment.