#### Install python dependencies

In [1]:
%pip install jsonpickle dacite shapely
!pip3 install chart_studio

Note: you may need to restart the kernel to use updated packages.


#### Run the planning server (should be executed only once)

In [2]:
import py_planning
py_planning.init()

### Visualization

In [3]:
# you can also open http://127.0.0.1:8008 in your browser

from IPython.display import IFrame
IFrame('http://127.0.0.1:8008', width="100%", height=650)

#### Place your planning code here, then run the cell and watch the visualization

In [4]:
from enum import IntEnum
import math
import numpy as np

In [5]:
# from py_planning.data_types import PlannedPath, PlannedState, State, Position  # data types used by planner interface
# from shapely.geometry import LineString, Point

# import time

# """
# find closest point on a polyline to the given point
# """
# def get_index_of_closest_point(line: LineString, point: Point):
#     closest_point_index = None
#     min_distance = float('inf')

#     for i, line_point in enumerate(line.coords):
#         line_point = Point(line_point)
#         distance = point.distance(line_point)
#         if distance < min_distance:
#             min_distance = distance
#             closest_point_index = i

#     return closest_point_index


# """
# This function is called by the simulator for each tick.
# It should return recent planned trajectory up to date with the environment state.
# 'state' parameter contains current world observations and vehicle state.
# """
# def do_plan(state: State) -> PlannedPath:
#     vehicle_pose = state.vehicle_pose
#     vehicle_pos = Point(vehicle_pose.pos.x, vehicle_pose.pos.y)  # current position of the AV

#     centerline = LineString([(p.x, p.y) for p in state.lane_path.centerline])

#     closest_index = get_index_of_closest_point(centerline, vehicle_pos)
#     current_velocity = vehicle_pose.velocity

#     # we leave some previous poses to make AV control stable
#     prev_poses_count = 3
#     max_poses_count = 50
#     first_pose_index = max(closest_index - prev_poses_count, 0)

#     # as a baseline here we just follow the centerline
#     planned_states = [
#         PlannedState(pos=p, velocity=current_velocity) for p in state.lane_path.centerline
#     ][first_pose_index:first_pose_index+ max_poses_count]

#     return PlannedPath(states=planned_states)
    

# # run the case in the simulator, watch the visualization
# py_planning.run_planner(
#     do_plan,
#     stop_on_fail=True  # set to False to continue planning after case fail (useful for debugging)
# )

Case failed: Collision with static object


In [6]:
def create_rotation_matrix(yaw):
    T = np.zeros((len(yaw), 2, 2))
    T[:, 0, 0] = np.cos(yaw)
    T[:, 0, 1] = -np.sin(yaw)
    T[:, 1, 0] = np.sin(yaw)
    T[:, 1, 1] = np.cos(yaw)

    return T
    
class Layer():
    class Id(IntEnum):
        X = 0
        Y = 1
        YAW = 2
        COST = 3
        PARENT = 4
        SIZE = 5

    def __init__(self, N=None, nodes=None):
        assert (N is None) ^ (nodes is None)
        if N is not None:
            self.nodes = np.zeros((N, Layer.Id.SIZE))
        if nodes is not None:
            assert nodes.shape[1] == Layer.Id.SIZE
            self.nodes = nodes
        
    @property
    def x(self):
        return self.nodes[:, Layer.Id.X]
    
    @property
    def y(self):
        return self.nodes[:, Layer.Id.Y]
    
    @property
    def yaw(self):
        return self.nodes[:, Layer.Id.YAW]
    
    @property
    def cost(self):
        return self.nodes[:, Layer.Id.COST]
    
    @property
    def parent(self):
        return self.nodes[:, Layer.Id.PARENT]
    
    @property
    def N(self):
        return self.nodes.shape[0]
    
    @property
    def M(self):
        return self.nodes.shape[1]
    
    
def arc_primitive(c, ds):
    if c == 0:
        return 0, ds, 0
    else:
        dyaw = c * ds
        return dyaw, 1 / c * math.sin(dyaw), 1 / c * (1 - math.cos(dyaw))


class Goal:
    def __init__(self, x, y, threshold):
        self.x = x
        self.y = y
        self.threshold = threshold


class Graph(list):
    def nodes_num(self):
        nodes = 0
        for layer in self:
            nodes += layer.N
        return nodes


def search(initial_pose, curvature_primitives=[-0.2, 0., 0.2], ds=2, tree_depth=6, sparse=True):
    graph = Graph()
    initial_layer = Layer(1)
    initial_layer.nodes[:, Layer.Id.X] = initial_pose.pos.x
    initial_layer.nodes[:, Layer.Id.Y] = initial_pose.pos.y
    initial_layer.nodes[:, Layer.Id.YAW] = initial_pose.rot
    graph.append(initial_layer) 
    
    for i in range(tree_depth):
        X_c = graph[-1]
        X_n = _make_step(X_c, ds, curvature_primitives)
        if sparse:
            X_n = _sparsify(X_n)

        graph.append(X_n)

    return graph, _restore_path(graph, np.argmin(graph[-1].nodes[:, Layer.Id.COST]))


def _make_step(X_c, ds, curvature_primitives):
    N = X_c.N
    X_n = Layer(N * len(curvature_primitives))

    for i, c in enumerate(curvature_primitives):
        # assumme instant change of curvature and movement along circle
        dyaw, dx, dy = arc_primitive(c, ds)
        shift = np.array([dx, dy])

        yaw_c = X_c.yaw
        T = create_rotation_matrix(yaw_c)

        X_n.x[i * N : (i + 1) * N] = X_c.x + T[:, 0] @ shift
        X_n.y[i * N : (i + 1) * N] = X_c.y + T[:, 1] @ shift
        X_n.yaw[i * N : (i + 1) * N] = yaw_c + dyaw
        X_n.parent[i * N : (i + 1) * N] = np.arange(N)
        X_n.cost[i * N : (i + 1) * N] = X_c.cost + c**2

    return X_n


def _sparsify(layer, min_nodes=5, step_x=1, step_y=1,step_yaw=0.1):
    if layer.N < min_nodes:
        return layer

    def node_to_key(x, y, yaw):
        return (round(x / step_x), round(y / step_y), round(yaw / step_yaw))
    d = {}
    for i in range(layer.N):
        key = node_to_key(layer.x[i], layer.y[i], layer.yaw[i])
        if key in d:
            d[key] = min(d[key], (layer.cost[i], i))
        else:
            d[key] = (layer.cost[i], i)
    indx = list(map(lambda value: value[1][1], d.items()))
    layer.nodes = layer.nodes[indx]

    return layer


def _goal_reached(layer, goal):
    if goal is None:
        return None
    reached = ((layer.x - goal.x) ** 2 + (layer.y - goal.y) ** 2 < goal.threshold ** 2)
    if np.any(reached):
        cost = reached.astype(float) * layer.cost + (~reached).astype(float) * np.max(layer.cost)
        return np.argmin(cost)

def _restore_path(graph, i):
    path = Graph()
    for j in range(len(graph)):
        layer = graph[-j - 1]
        path.append(Layer(nodes=np.copy(layer.nodes[i:i+1])))
        i = int(layer.parent[i])

        # fix parent linkage
        path[-1].parent[:] = 0

    path.reverse()
    return path

In [7]:
import chart_studio.plotly as py
import plotly.graph_objs as go
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:99% !important; }</style>"))

from plotly.offline import download_plotlyjs, init_notebook_mode, iplot
init_notebook_mode(connected=False)

In [8]:
def plot_graph(graph, plot_edges=False):
    # huge number of nodes could freeze browser
    assert graph.nodes_num() < 200000
    
    data = []
    for layer in graph:
        data.append(go.Scatter(
            x=layer.x,
            y=layer.y,
            mode="markers"))

    if plot_edges:
        # huge number of nodes could freeze browser
        assert graph.nodes_num() < 4000
        for prev_layer, layer in zip(graph[:-1], graph[1:]):
            N = prev_layer.N
            for i in range(layer.N):
                data.append(go.Scatter(
                    x=[prev_layer.x[int(layer.parent[i])], layer.x[i]],
                    y=[prev_layer.y[int(layer.parent[i])], layer.y[i]],
                    mode="lines"))

    layout = go.Layout(
        height=600,
        yaxis={
            "scaleanchor": "x",
        },
    )
    figure = go.Figure(data=data, layout=layout)
    iplot(figure, filename="graph")

In [9]:
from py_planning.data_types import PlannedPath, PlannedState, State, Position  # data types used by planner interface
from shapely.geometry import LineString, Point

import time

def do_graph_planning(state: State) -> PlannedPath:
    print('HELLO')
    graph, path = search(state.vehicle_pose, tree_depth=20)
    plot_graph(path, True)
    planned_path = [PlannedState(pos=state.vehicle_pose.pos, velocity=state.vehicle_pose.velocity)]
    print(state.vehicle_pose)
    for layer in path[1:]:
        planned_path.append(PlannedState(pos=Position(layer.nodes[0, Layer.Id.X], layer.nodes[0, Layer.Id.Y]), velocity=state.vehicle_pose.velocity))
    print(planned_path)
    return PlannedPath(states=planned_path)
        

py_planning.run_planner(
    do_graph_planning,
    stop_on_fail=True  # set to False to continue planning after case fail (useful for debugging)
)