In [1]:
import heapq
import scipy.spatial
import numpy as np
import math
import matplotlib.pyplot as plt
import sys
from a_star_heuristic import dp_planning  # , calc_obstacle_map
import reeds_shepp_path as rs
from harvey import move, check_harvey_collision, MAX_STEER, WB, plot_harvey, plot_arrow
from parameters import *
import time
from math import sqrt, cos, sin, tan, pi
import time
import plotly.express as pl
import plotly.graph_objects as go
# %pip install pandas
%matplotlib inline
import time
import pylab as pyl
from IPython import display
import plotly.express as pl
import plotly.graph_objects as go

Vehicle Parameters: WB : 3 W:  2 LF:  3 LB:  3


In [2]:
class Node:

    def __init__(self, xind, yind, yawind, direction,
                 xlist, ylist, yawlist, directions,
                 steer=0.0, pind=None, cost=None):
        self.xind = xind
        self.yind = yind
        self.yawind = yawind
        self.direction = direction
        self.xlist = xlist
        self.ylist = ylist
        self.yawlist = yawlist
        self.directions = directions
        self.steer = steer
        self.pind = pind
        self.cost = cost




In [3]:
class Path:

    def __init__(self, xlist, ylist, yawlist, directionlist, cost):
        self.xlist = xlist
        self.ylist = ylist
        self.yawlist = yawlist
        self.directionlist = directionlist
        self.cost = cost

In [4]:
class KDTree:
    """
    Nearest neighbor search class with KDTree
    """

    def __init__(self, data):
        # store kd-tree
        # used cKDTree instead of KDTree for faster querying
        self.tree = scipy.spatial.cKDTree(data)

    def search(self, inp, k=1):
        """
        Search NN
        inp: input data, single frame or multi frame
        """

        if len(inp.shape) >= 2:  # multi input
            index = []
            dist = []

            for i in inp.T:
                idist, iindex = self.tree.query(i, k=k)
                index.append(iindex)
                dist.append(idist)

            return index, dist

        dist, index = self.tree.query(inp, k=k)
        return index, dist

    def search_in_distance(self, inp, r):
        """
        find points with in a distance r
        """

        index = self.tree.query_ball_point(inp, r)
        return index


In [5]:
class Config:

    def __init__(self, ox, oy, xyreso, yawreso):
        min_x_m = 0
        min_y_m = 0
        max_x_m = 100
        max_y_m = 100

        ox.append(min_x_m)
        oy.append(min_y_m)
        ox.append(max_x_m)
        oy.append(max_y_m)

        self.minx = 0
        self.miny = 0
        self.maxx = round(max_x_m / xyreso)
        self.maxy = round(max_y_m / xyreso)

        self.xw = round(self.maxx - self.minx)
        self.yw = round(self.maxy - self.miny)

        self.minyaw = round(- math.pi / yawreso) - 1
        self.maxyaw = round(math.pi / yawreso)
        self.yaww = round(self.maxyaw - self.minyaw)

In [6]:
def calc_motion_inputs():

    for steer in np.concatenate((np.linspace(-MAX_STEER, MAX_STEER, N_STEER),[0.0])):
        for d in [1, -1]:
            yield [steer, d]


def get_neighbors(current, config, ox, oy, kdtree):

    for steer, d in calc_motion_inputs():
        node = calc_next_node(current, steer, d, config, ox, oy, kdtree)
        if node and verify_index(node, config):
            yield node


def calc_next_node(current, steer, direction, config, ox, oy, kdtree):

    x, y, yaw = current.xlist[-1], current.ylist[-1], current.yawlist[-1]

    arc_l = XY_GRID_RESOLUTION * 1.5
    xlist, ylist, yawlist = [], [], []
    for _ in np.arange(0, arc_l, MOTION_RESOLUTION):
        x, y, yaw = move(x, y, yaw, MOTION_RESOLUTION * direction, steer)
        xlist.append(x)
        ylist.append(y)
        yawlist.append(yaw)
    if not check_harvey_collision(xlist, ylist, yawlist, ox, oy, kdtree):
        return None

    d = direction == 1
    xind = round(x / XY_GRID_RESOLUTION)
    yind = round(y / XY_GRID_RESOLUTION)
    yawind = round(yaw / YAW_GRID_RESOLUTION)

    addedcost = 0.0

    if d != current.direction:
        addedcost += SB_COST

    # steer penalty
    addedcost += STEER_COST * abs(steer)

    # steer change penalty
    addedcost += STEER_CHANGE_COST * abs(current.steer - steer)

    cost = current.cost + addedcost + arc_l

    node = Node(xind, yind, yawind, d, xlist,
                ylist, yawlist, [d],
                pind=calc_index(current, config),
                cost=cost, steer=steer)

    return node


def is_same_grid(n1, n2):
    if n1.xind == n2.xind and n1.yind == n2.yind and n1.yawind == n2.yawind:
        return True
    return False


def analytic_expansion(current, goal, c, ox, oy, kdtree):

    sx = current.xlist[-1]
    sy = current.ylist[-1]
    syaw = current.yawlist[-1]

    gx = goal.xlist[-1]
    gy = goal.ylist[-1]
    gyaw = goal.yawlist[-1]

    max_curvature = math.tan(MAX_STEER) / WB
    paths = rs.calc_paths(sx, sy, syaw, gx, gy, gyaw,
                          max_curvature, step_size=MOTION_RESOLUTION)

    if not paths:
        return None

    best_path, best = None, None

    for path in paths:
        if check_harvey_collision(path.x, path.y, path.yaw, ox, oy, kdtree):
            cost = calc_rs_path_cost(path)
            if not best or best > cost:
                best = cost
                best_path = path


    return best_path


def update_node_with_analystic_expantion(current, goal,
                                         c, ox, oy, kdtree):
    apath = analytic_expansion(current, goal, c, ox, oy, kdtree)

    if apath:
        # plt.plot(apath.x, apath.y)
        fx = apath.x[1:]
        fy = apath.y[1:]
        fyaw = apath.yaw[1:]

        fcost = current.cost + calc_rs_path_cost(apath)
        fpind = calc_index(current, c)

        fd = []
        for d in apath.directions[1:]:
            fd.append(d >= 0)

        fsteer = 0.0
        fpath = Node(current.xind, current.yind, current.yawind,
                     current.direction, fx, fy, fyaw, fd,
                     cost=fcost, pind=fpind, steer=fsteer)
        return True, fpath

    return False, None


def calc_rs_path_cost(rspath):

    cost = 0.0
    for l in rspath.lengths:
        if l >= 0:  # forward
            cost += l
        else:  # back
            cost += abs(l) * BACK_COST

    # swich back penalty
    for i in range(len(rspath.lengths) - 1):
        if rspath.lengths[i] * rspath.lengths[i + 1] < 0.0:  # switch back
            cost += SB_COST

    # steer penalyty
    for ctype in rspath.ctypes:
        if ctype != "S":  # curve
            cost += STEER_COST * abs(MAX_STEER)

    # ==steer change penalty
    # calc steer profile
    nctypes = len(rspath.ctypes)
    ulist = [0.0] * nctypes
    for i in range(nctypes):
        if rspath.ctypes[i] == "R":
            ulist[i] = - MAX_STEER
        elif rspath.ctypes[i] == "L":
            ulist[i] = MAX_STEER

    for i in range(len(rspath.ctypes) - 1):
        cost += STEER_CHANGE_COST * abs(ulist[i + 1] - ulist[i])

    return cost


def hybrid_a_star_planning(start, goal, ox, oy, xyreso, yawreso):
    start[2], goal[2] = rs.pi_2_pi(start[2]), rs.pi_2_pi(goal[2])
    tox, toy = ox[:], oy[:]
    obkdtree = KDTree(np.vstack((tox, toy)).T)
    config = Config(tox, toy, xyreso, yawreso)
    nstart = Node(round(start[0] / xyreso), round(start[1] / xyreso), round(start[2] / yawreso),
                  True, [start[0]], [start[1]], [start[2]], [True], cost=0)
    ngoal = Node(round(goal[0] / xyreso), round(goal[1] / xyreso), round(goal[2] / yawreso),
                 True, [goal[0]], [goal[1]], [goal[2]], [True])

    openList, closedList = {}, {}
    _, _, h_dp = dp_planning(nstart.xlist[-1], nstart.ylist[-1],
                             ngoal.xlist[-1], ngoal.ylist[-1], ox, oy, xyreso, VR)
    pq = []
    openList[calc_index(nstart, config)] = nstart
    heapq.heappush(pq, (calc_cost(nstart, h_dp, ngoal, config),
                        calc_index(nstart, config)))
    while True:
        if not openList:
            print("Error: Cannot find path, No open set")
            return [], [], []

        cost, c_id = heapq.heappop(pq)
        if c_id in openList:
            current = openList.pop(c_id)
            closedList[c_id] = current
        else:
            continue
        isupdated, fpath = update_node_with_analystic_expantion(
            current, ngoal, config, ox, oy, obkdtree)
        if isupdated:
            break
        for neighbor in get_neighbors(current, config, ox, oy, obkdtree):
            neighbor_index = calc_index(neighbor, config)
            if neighbor_index in closedList:
                continue
            if neighbor not in openList \
                    or openList[neighbor_index].cost > neighbor.cost:
                heapq.heappush(
                    pq, (calc_cost(neighbor, h_dp, ngoal, config),
                         neighbor_index))
                openList[neighbor_index] = neighbor

    path = get_final_path(closedList, fpath, nstart, config)
    return path

def calc_cost(n, h_dp, goal, c):
    ind = (n.yind - c.miny) * c.xw + (n.xind - c.minx)
    if ind not in h_dp:
        return n.cost + 999999999  # collision cost
    return n.cost + H_COST * h_dp[ind].cost


def get_final_path(closed, ngoal, nstart, config):
    rx, ry, ryaw = list(reversed(ngoal.xlist)), list(
        reversed(ngoal.ylist)), list(reversed(ngoal.yawlist))
    direction = list(reversed(ngoal.directions))
    nid = ngoal.pind
    finalcost = ngoal.cost

    while nid:
        n = closed[nid]
        rx.extend(list(reversed(n.xlist)))
        ry.extend(list(reversed(n.ylist)))
        ryaw.extend(list(reversed(n.yawlist)))
        direction.extend(list(reversed(n.directions)))

        nid = n.pind

    rx = list(reversed(rx))
    ry = list(reversed(ry))
    ryaw = list(reversed(ryaw))
    direction = list(reversed(direction))

    # adjust first direction
    direction[0] = direction[1]

    path = Path(rx, ry, ryaw, direction, finalcost)

    return path

def verify_index(node, c):
    xind, yind = node.xind, node.yind
    if xind >= c.minx and xind <= c.maxx and yind >= c.miny \
            and yind <= c.maxy:
        return True

    return False

def calc_index(node, c):
    ind = (node.yawind - c.minyaw) * c.xw * c.yw + \
        (node.yind - c.miny) * c.xw + (node.xind - c.minx)

    if ind <= 0:
        print("Error(calc_index):", ind)

    return ind

class Error(Exception):
    pass

class unknownerror(Error):
    pass


In [7]:
def unit_testing(ox,oy):
    startList = [[0, 0, np.deg2rad(90.0)],[0, 0, np.deg2rad(90.0)], [0, 0, np.deg2rad(90.0)],[0, 0, np.deg2rad(90.0)],[0, 0, np.deg2rad(90.0)]]
    goalList = [[10.0, 0.0, np.deg2rad(90.0)], [10.0, 5.0, np.deg2rad(90.0)], [10.0, 10.0, np.deg2rad(90.0)], [10.0, -5.0, np.deg2rad(90.0)], [10.0, -10.0, np.deg2rad(90.0)]]
    plt.plot(ox, oy, ".k")
    for start in startList:
        for goal in goalList:
            print('Start Location:  ',start)
            print('Goal location:    ',goal)
            # print(start)
            # print('!!!!',goal)
            rs.plot_arrow(start[0], start[1], start[2], fc='g')
            rs.plot_arrow(goal[0], goal[1], goal[2])
            path = hybrid_a_star_planning(
            start, goal, ox, oy, XY_GRID_RESOLUTION, YAW_GRID_RESOLUTION)

            x = path.xlist
            y = path.ylist
            yaw = path.yawlist
            plot_trajectory_and_motion(ox, oy,x, y, yaw)

def unit_testing_local_planner(ox,oy):
    startList = [[30.0, 20.0, np.deg2rad(90.0)]]
    goalList = [[30.0, 35.0, np.deg2rad(90.0)], [35.0, 35.0, np.deg2rad(90.0)],[25.0, 35.0, np.deg2rad(90.0)],[40.0, 35.0, np.deg2rad(90.0)],[20.0, 35.0, np.deg2rad(90.0)],[45.0, 35.0, np.deg2rad(90.0)],[15.0, 35.0, np.deg2rad(90.0)],[50.0, 35.0, np.deg2rad(90.0)],[10.0, 35.0, np.deg2rad(90.0)]]
    plt.plot(ox, oy, ".k")
    for start in startList:
        for goal in goalList:
            print("Start Location:  ",start)
            print('Goal location:    ',goal)
            rs.plot_arrow(start[0], start[1], start[2], fc='g')
            rs.plot_arrow(goal[0], goal[1], goal[2])
            path = hybrid_a_star_planning(
            start, goal, ox, oy, XY_GRID_RESOLUTION, YAW_GRID_RESOLUTION)
            x = path.xlist
            y = path.ylist
            yaw = path.yawlist
            plot_trajectory_and_motion(ox, oy,x, y, yaw)

def unit_testing_parking(ox,oy):
    startList = [[10.0, 30.0, np.deg2rad(0)]]
    goalList = [[10.0, 50.0, np.deg2rad(0)]]
    plt.plot(ox, oy, ".k")
    for start in startList:
        for goal in goalList:
            print("Start Location:  ",start)
            print('Goal location:    ',goal)
            rs.plot_arrow(start[0], start[1], start[2], fc='g')
            rs.plot_arrow(goal[0], goal[1], goal[2])
            path = hybrid_a_star_planning(
            start, goal, ox, oy, XY_GRID_RESOLUTION, YAW_GRID_RESOLUTION)
            x = path.xlist
            y = path.ylist
            yaw = path.yawlist
            plot_trajectory_and_motion(ox, oy,x, y, yaw)

def create_world():
    x, y = [], []
    for i in range(60):
        x.append(i)
        y.append(0.0)
    for i in range(60):
        x.append(60.0)
        y.append(i)
    for i in range(61):
        x.append(i)
        y.append(60.0)
    for i in range(61):
        x.append(0.0)
        y.append(i)
    for i in range(40):
        x.append(20.0)
        y.append(i)
    for i in range(40):
        x.append(40.0)
        y.append(60.0 - i)
    return x, y

def create_world_local_planner():
    x, y = [], []
    for i in range(60):
        x.append(i)
        y.append(0.0)
    for i in range(60):
        x.append(60.0)
        y.append(i)
    for i in range(61):
        x.append(i)
        y.append(60.0)
    for i in range(61):
        x.append(0.0)
        y.append(i)
    for i in range(40):
        x.append(i)
        y.append(45)
    return x, y

def plot_arrow(x, y, yaw, arrow_length=1.0,
               origin_point_plot_style="xr",
               head_width=0.1, fc="r", ec="k", **kwargs):
    if not isinstance(x, float):
        for (i_x, i_y, i_yaw) in zip(x, y, yaw):
            plot_arrow(i_x, i_y, i_yaw, head_width=head_width,
                       fc=fc, ec=ec, **kwargs)
    else:
        plt.arrow(x, y,
                  arrow_length * math.cos(yaw),
                  arrow_length * math.sin(yaw),
                  head_width=head_width,
                  fc=fc, ec=ec,
                  **kwargs)
        if origin_point_plot_style is not None:
            plt.plot(x, y, origin_point_plot_style)



def plot_trajectory_and_motion(ox, oy,x, y, yaw):
    for ix, iy, iyaw in zip(x, y, yaw):
        plt.cla()
        plt.plot(ox, oy, ".k")
        plt.plot(x, y, "-r", label="Hybrid A* path")
        plt.grid(True)
        plt.axis("equal")
        plot_harvey(ix, iy, iyaw)
        display.clear_output(wait=True)
        display.display(pyl.gcf())
        plt.pause(0.0000000000000000000000000001)        
    plt.scatter(x,y)
    plt.show()
    

In [8]:
ox, oy = [], []
startList = [[0, 0, np.deg2rad(90.0)],[0, 0, np.deg2rad(90.0)], [0, 0, np.deg2rad(90.0)],[0, 0, np.deg2rad(90.0)],[0, 0, np.deg2rad(90.0)]]
goalList = [[10.0, 0.0, np.deg2rad(90.0)], [10.0, 15.0, np.deg2rad(90.0)], [10.0, 30.0, np.deg2rad(90.0)], [10.0, -15.0, np.deg2rad(90.0)], [1,0, 0.0, np.deg2rad(-90.0)]]

fig = go.Figure()

for i in range(0, len(startList)):
    start, goal = startList[i], goalList[i]
    start[2] = np.deg2rad(startList[i][2])
    goal[2] = np.deg2rad(goalList[i][2])
    print('Goal location:', goal)
    start_time = time.time()
    path = hybrid_a_star_planning(start, goal, ox, oy, XY_GRID_RESOLUTION, YAW_GRID_RESOLUTION)
    print("--- %s seconds ---" % (time.time() - start_time))

    x = list(path.xlist)
    y = list(path.ylist)

    fig.add_trace(go.Scatter(x=x, y=y,
                             mode='lines',
                             name=f'Path {i+1}'
                             ))

fig.add_trace(go.Scatter(x=ox, y=oy,
                         mode='markers',
                         name='Boundary',
                         fill='tozeroy'
                         ))

fig.update_layout(
    margin=dict(l=100, r=100, t=100, b=100),
    paper_bgcolor="LightSteelBlue",
)

fig.show()

Goal location: [10.0, 0.0, 0.027415567780803774]
xwidth,ywidth: 300 300
--- 1.443213701248169 seconds ---
Goal location: [10.0, 15.0, 0.027415567780803774]
xwidth,ywidth: 300 300
--- 1.3695323467254639 seconds ---
Goal location: [10.0, 30.0, 0.027415567780803774]
xwidth,ywidth: 300 300
--- 1.327096700668335 seconds ---
Goal location: [10.0, -15.0, 0.027415567780803774]
xwidth,ywidth: 300 300
--- 1.3603112697601318 seconds ---
Goal location: [1, 0, 0.0, -1.5707963267948966]
xwidth,ywidth: 300 300
--- 1.3713557720184326 seconds ---


In [9]:
def plot_3D(startList,goalList):
    fig = go.Figure()
    ox, oy = [], []
    for i in range(0, len(startList)):
        start, goal = startList[i], goalList[i]
        start[2] = np.deg2rad(startList[i][2])
        goal[2] = np.deg2rad(goalList[i][2])
        print('Goal location:', goal)
        start_time = time.time()
        path = hybrid_a_star_planning(start, goal, ox, oy, XY_GRID_RESOLUTION, YAW_GRID_RESOLUTION)
        print("--- %s seconds ---" % (time.time() - start_time))

        x = list(path.xlist)
        y = list(path.ylist)
        yaw = list(path.yawlist)

        fig.add_trace(go.Scatter3d(x=x, y=y, z=[i] * len(x),
                                mode='lines',
                                name=f'Path {i+1}'
                                ))

        for j in range(len(x)):
            arrow_x = x[j]  # x-coordinate of arrow starting point
            arrow_y = y[j]  # y-coordinate of arrow starting point
            arrow_yaw = yaw[j]  # yaw angle of arrow

            cone_length = 0.5
            cone_radius = 0.1
            cone_x = arrow_x + cone_length * np.cos(arrow_yaw)
            cone_y = arrow_y + cone_length * np.sin(arrow_yaw)

            fig.add_cone(x=[arrow_x, cone_x], y=[arrow_y, cone_y], z=[i, i],
                        u=[0, 0], v=[0, 0], w=[-cone_length, 0],
                        sizemode="absolute", sizeref=1,
                        showscale=False, colorscale='Blues',
                        name=f'Arrow {i+1}')

    fig.add_trace(go.Scatter3d(x=ox, y=oy, z=[0] * len(ox),
                            mode='markers',
                            name='Boundary',
                            marker=dict(
                                size=2,
                                color='black'
                            )
                            ))

    fig.update_layout(
        scene=dict(
            aspectmode='manual',
            aspectratio=dict(x=1, y=1, z=0.2),
            camera=dict(
                eye=dict(x=-2, y=-2, z=1.5),
                center=dict(x=0, y=0, z=0),
                up=dict(x=0, y=0, z=1)
            )
        ),
        margin=dict(l=0, r=0, t=0, b=0),
        showlegend=True,
    )

    fig.show()



In [10]:
startList = [[0, 0, np.deg2rad(90.0)],[0, 0, np.deg2rad(90.0)], [0, 0, np.deg2rad(90.0)],[0, 0, np.deg2rad(90.0)],[0, 0, np.deg2rad(90.0)]]
goalList = [[10.0, 0.0, np.deg2rad(90.0)], [10.0, 15.0, np.deg2rad(90.0)], [10.0, 30.0, np.deg2rad(90.0)], [10.0, -15.0, np.deg2rad(90.0)], [10,-30, np.deg2rad(90.0)]]
plot_3D(startList,goalList)

Goal location: [10.0, 0.0, 0.027415567780803774]
xwidth,ywidth: 300 300
--- 1.4649734497070312 seconds ---
Goal location: [10.0, 15.0, 0.027415567780803774]
xwidth,ywidth: 300 300
--- 1.366534948348999 seconds ---
Goal location: [10.0, 30.0, 0.027415567780803774]
xwidth,ywidth: 300 300
--- 1.493016242980957 seconds ---
Goal location: [10.0, -15.0, 0.027415567780803774]
xwidth,ywidth: 300 300
--- 1.3490183353424072 seconds ---
Goal location: [10, -30, 0.027415567780803774]
xwidth,ywidth: 300 300
--- 1.3503127098083496 seconds ---


In [11]:

def transform_vehicle(vehicle_point_object, next_state, angle_of_rotation):
  lst=[]
  for pt in vehicle_point_object.input_coordinates:
    new_x=(pt[0]-vehicle_point_object.center[0])*np.cos(angle_of_rotation) + (pt[1]-vehicle_point_object.center[1])*np.sin(angle_of_rotation)+next_state[0]
    new_y=-(pt[0]-vehicle_point_object.center[0])*np.sin(angle_of_rotation)+(pt[1]-vehicle_point_object.center[1])*np.cos(angle_of_rotation)+next_state[1]
    print(new_x, new_y)
    lst.append([new_x,new_y])
  # vehicle_point_ret=vehicle_points(np.array(lst),next_state)
  print(lst)
  # return vehicle_point_ret


In [12]:
class vehicle_points():
  def __init__(self,input_coordinates,center):
    self.input_coordinates=input_coordinates
    self.center=center

vehicle_pt_obj_actual = vehicle_points( np.array([[0,0.75],[1,1.5],[0,0.75], #left
                                                 [-0.75,0],[row_y_offset,-1.5],[row_y_offset,-2.25]]), #right

                                                  [0,0] )