#### Install python dependencies

In [1]:
%pip install jsonpickle dacite shapely

Note: you may need to restart the kernel to use updated packages.


#### Run the planning server (should be executed only once)

In [2]:
import py_planning
py_planning.init()

#### Visualization

In [3]:
# # you can also open http://127.0.0.1:8008 in your browser

from IPython.display import IFrame
IFrame('http://127.0.0.1:8008', width="100%", height=650)

#### Lane centering

In [4]:
from enum import IntEnum
import math
import numpy as np

In [5]:
from py_planning.data_types import PlannedPath, PlannedState, State, Position  # data types used by planner interface
from shapely.geometry import LineString, Point

import time

"""
find closest point on a polyline to the given point
"""
def get_index_of_closest_point(line: LineString, point: Point):
    closest_point_index = None
    min_distance = float('inf')

    for i, line_point in enumerate(line.coords):
        line_point = Point(line_point)
        distance = point.distance(line_point)
        if distance < min_distance:
            min_distance = distance
            closest_point_index = i

    return closest_point_index


"""
This function is called by the simulator for each tick.
It should return recent planned trajectory up to date with the environment state.
'state' parameter contains current world observations and vehicle state.
"""
def do_plan(state: State) -> PlannedPath:
    vehicle_pose = state.vehicle_pose
    vehicle_pos = Point(vehicle_pose.pos.x, vehicle_pose.pos.y)  # current position of the AV

    centerline = LineString([(p.x, p.y) for p in state.lane_path.centerline])

    closest_index = get_index_of_closest_point(centerline, vehicle_pos)
    current_velocity = vehicle_pose.velocity

    # we leave some previous poses to make AV control stable
    prev_poses_count = 3
    max_poses_count = 50
    first_pose_index = max(closest_index - prev_poses_count, 0)

    # as a baseline here we just follow the centerline
    planned_states = [
        PlannedState(pos=p, velocity=current_velocity) for p in state.lane_path.centerline
    ][first_pose_index:first_pose_index+ max_poses_count]

    return PlannedPath(states=planned_states)
    

# run the case in the simulator, watch the visualization
py_planning.run_planner(
    do_plan,
    stop_on_fail=True  # set to False to continue planning after case fail (useful for debugging)
)

Case failed: Collision with static object


#### Graph geometry planning

In [6]:
def create_rotation_matrix(yaw):
    T = np.zeros((len(yaw), 2, 2))
    T[:, 0, 0] = np.cos(yaw)
    T[:, 0, 1] = -np.sin(yaw)
    T[:, 1, 0] = np.sin(yaw)
    T[:, 1, 1] = np.cos(yaw)

    return T
    
class Layer():
    class Id(IntEnum):
        X = 0
        Y = 1
        YAW = 2
        COST = 3
        PARENT = 4
        SIZE = 5

    def __init__(self, N=None, nodes=None):
        assert (N is None) ^ (nodes is None)
        if N is not None:
            self.nodes = np.zeros((N, Layer.Id.SIZE))
        if nodes is not None:
            assert nodes.shape[1] == Layer.Id.SIZE
            self.nodes = nodes
        
    @property
    def x(self):
        return self.nodes[:, Layer.Id.X]
    
    @property
    def y(self):
        return self.nodes[:, Layer.Id.Y]
    
    @property
    def yaw(self):
        return self.nodes[:, Layer.Id.YAW]
    
    @property
    def cost(self):
        return self.nodes[:, Layer.Id.COST]
    
    @property
    def parent(self):
        return self.nodes[:, Layer.Id.PARENT]
    
    @property
    def N(self):
        return self.nodes.shape[0]
    
    @property
    def M(self):
        return self.nodes.shape[1]
    
    
def arc_primitive(c, ds):
    if c == 0:
        return 0, ds, 0
    else:
        dyaw = c * ds
        return dyaw, 1 / c * math.sin(dyaw), 1 / c * (1 - math.cos(dyaw))


class Graph(list):
    def nodes_num(self):
        nodes = 0
        for layer in self:
            nodes += layer.N
        return nodes


def search(initial_state, lane_path, obstacles, curvature_primitives=[-0.2, 0., 0.2], ds=1, tree_depth=6, sparse=True):
    graph = Graph()
    initial_layer = Layer(1)
    initial_layer.nodes[:, Layer.Id.X] = initial_state.vehicle_pose.pos.x
    initial_layer.nodes[:, Layer.Id.Y] = initial_state.vehicle_pose.pos.y
    initial_layer.nodes[:, Layer.Id.YAW] = initial_state.vehicle_pose.rot
    graph.append(initial_layer) 
    
    for i in range(tree_depth):
        X_c = graph[-1]
        X_n = _make_step(X_c, ds, curvature_primitives, lane_path, obstacles)
        if sparse:
            X_n = _sparsify(X_n)

        graph.append(X_n)

    return graph, _restore_path(graph, np.argmin(graph[-1].nodes[:, Layer.Id.COST]))


def _make_step(X_c, ds, curvature_primitives, lane_path, obstacles):
    N = X_c.N
    X_n = Layer(N * len(curvature_primitives))

    for i, c in enumerate(curvature_primitives):
        # assumme instant change of curvature and movement along circle
        dyaw, dx, dy = arc_primitive(c, ds)
        shift = np.array([dx, dy])

        yaw_c = X_c.yaw
        T = create_rotation_matrix(yaw_c)

        X_n.x[i * N : (i + 1) * N] = X_c.x + T[:, 0] @ shift
        X_n.y[i * N : (i + 1) * N] = X_c.y + T[:, 1] @ shift
        X_n.yaw[i * N : (i + 1) * N] = yaw_c + dyaw
        X_n.parent[i * N : (i + 1) * N] = np.arange(N)
        X_n.cost[i * N : (i + 1) * N] = X_c.cost + c ** 2 
        # _update_cost(X_n.nodes[i * N : (i + 1) * N, :], lane_path, obstacles)

    return X_n


# def _update_cost(X_n, lane_path, obstacles):
#     centerline = LineString([(p.x, p.y) for p in lane_path])
#     for i, node in enumerate(X_n):
#         _, d = get_index_of_closest_point(centerline, Point(node[Layer.Id.X], node[Layer.Id.Y]))
#         X_n[i, Layer.Id.COST] += d
#         # obstacles = get_closest_static_obstacles(obstacles, node[Layer.Id.X], node[Layer.Id.Y], 1)
#         # if len(obstacles) > 0:
#         #     d_to_closest_static = dist(node[Layer.Id.X], node[Layer.Id.Y], obstacles[0])
#         #     if d_to_closest_static < 2 * max(obstacles[0].w, obstacles[0].h):
#         #         X_n[i, Layer.Id.COST] = np.inf
#         #     else:
#         #         X_n[i, Layer.Id.COST] += 10 * np.exp(-d_to_closest_static + 2 * max(obstacles[0].w, obstacles[0].h))
                


def _sparsify(layer, min_nodes=5, step_x=1, step_y=1,step_yaw=0.1):
    if layer.N < min_nodes:
        return layer

    def node_to_key(x, y, yaw):
        return (round(x / step_x), round(y / step_y), round(yaw / step_yaw))
    d = {}
    for i in range(layer.N):
        key = node_to_key(layer.x[i], layer.y[i], layer.yaw[i])
        if key in d:
            d[key] = min(d[key], (layer.cost[i], i))
        else:
            d[key] = (layer.cost[i], i)
    indx = list(map(lambda value: value[1][1], d.items()))
    layer.nodes = layer.nodes[indx]

    return layer


def _restore_path(graph, i):
    path = Graph()
    for j in range(len(graph)):
        layer = graph[-j - 1]
        path.append(Layer(nodes=np.copy(layer.nodes[i:i+1])))
        i = int(layer.parent[i])

        # fix parent linkage
        path[-1].parent[:] = 0

    path.reverse()
    return path

In [None]:
from py_planning.data_types import PlannedPath, PlannedState, State, Position  # data types used by planner interface
from shapely.geometry import LineString, Point
import matplotlib.pyplot as plt

"""
find closest point on a polyline to the given point
"""
def get_index_of_closest_point(line: LineString, point: Point):
    closest_point_index = None
    min_distance = float('inf')

    for i, line_point in enumerate(line.coords):
        line_point = Point(line_point)
        distance = point.distance(line_point)
        if distance < min_distance:
            min_distance = distance
            closest_point_index = i

    return closest_point_index, min_distance


def dist(x, y, static_obstacle):
    return (x - static_obstacle.p[0]) ** 2 + (y - static_obstacle.p[1]) ** 2


def get_closest_static_obstacles(static_obstacles,x, y, k):
    obstacles = sorted(static_obstacles, key=lambda obstacle: dist(x, y, obstacle))
    return obstacles[:min(len(obstacles), k)]


def do_graph_planning(state: State) -> PlannedPath:
    print(state.dynamic_obstacles)
    vehicle_pose = state.vehicle_pose
    vehicle_pos = Point(vehicle_pose.pos.x, vehicle_pose.pos.y)  # current position of the AV
    centerline = LineString([(p.x, p.y) for p in state.lane_path.centerline])
    closest_index, _ = get_index_of_closest_point(centerline, vehicle_pos)
    lane_path = state.lane_path.centerline[max(0, closest_index - 20) : min(len(state.lane_path.centerline), closest_index + 20) : 2]
    obstacles = get_closest_static_obstacles(state.static_obstacles, state.vehicle_pose.pos.x, state.vehicle_pose.pos.y, 1)

    ds = 1
    graph, path = search(state, lane_path, obstacles, tree_depth=12, ds=ds)
    planned_path = list(map(lambda layer: PlannedState(pos=Position(float(layer.nodes[0, Layer.Id.X]), float(layer.nodes[0, Layer.Id.Y])), velocity=state.vehicle_pose.velocity, rot=float(layer.nodes[0, Layer.Id.YAW])), path))
    return PlannedPath(states=planned_path) 


py_planning.run_planner(
    do_graph_planning,
    stop_on_fail=True  # set to False to continue planning after case fail (useful for debugging)
)

Exception while planning:  Traceback (most recent call last):
  File "/Users/moomin/shad/dash/py_planning/planning_server.py", line 51, in plan_request
    response = self.do_plan(beatify_state(state))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/_b/djhkvpzn4v5_rm3tpbktdpd99dp62b/T/ipykernel_15244/1317474386.py", line 32, in do_graph_planning
    print(state.dynamicObstacles)
          ^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'State' object has no attribute 'dynamicObstacles'



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 60202)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/Users/moomin/shad/dash/py_planning/planning_server.py", line 105, in finish_request
    self.RequestHandlerClass(
  File "/Users/moomin/shad/dash/py_planning/planning_server.py", line 18, in __init__
    super().__init__(*args, **kwargs)
  File "/opt/anaconda3/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/opt/anaconda3/lib/python3.11/http/server.py", line 436, in handle
    self.handle_one_request()
  File "/opt/anaconda3/lib/python3.11/http/server.py", line 424, in handle_one_request
    method()
  File "/Users/moomin/sha