In [2]:
import import_ipynb #Allows you to import ipynb files
from queue import PriorityQueue
from dataclasses import dataclass
import time

from reedsshepp import *
from vehicle import *
from Grid import Grid
from utils import *

from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
np.set_printoptions(threshold=np.inf)

importing Jupyter notebook from reedsshepp.ipynb
importing Jupyter notebook from vehicle.ipynb
importing Jupyter notebook from utils.ipynb
importing Jupyter notebook from Grid.ipynb
importing Jupyter notebook from GVD.ipynb


In [3]:
@dataclass
class Cell:
    """The cell is the discretized position that the vehicleState falls into"""
    col: int = -1
    row: int = -1
    ori: float = 0.0
    rev: float = 0.0
    def reset(self):
        self.col = -1
        self.row = -1
        self.ori = 0
        self.rev = 0
    def __eq__(self, other):
        return self.col == other.col and self.row == other.row and self.ori == other.ori

In [4]:
#Node class that the A* will be referencing to check and compare costs
class HybridAStarNode:
    
    def __init__(self, parent=None, cell:Cell=None, vehicleState:VehicleState=None, rsAction:ReedsSheppAction=None):
        self.cell = cell
        self.vehicleState = vehicleState #Even though cell and vehicle state store the same thing, cell is discrete and vehiclestate is nondiscrete
        self.reedsSheppAction = rsAction
        self.parent = parent
        self.g = 0 #distance to start node
        self.h = 0 #distance to goal node
        
        self.rsindex = -1
    def FCost(self):
        return self.g + self.h
    
    def __eq__(self, other):
        return self.cell == other.cell
    
    def __lt__(self, other):
        return self.FCost() < other.FCost()
    
    def __gt__(self, other):
        return self.FCost() > other.FCost()
    def __repr__(self):
        return 'Point(x=%s, y=%s, ori=%s, rev=%s)' % (self.cell.col, self.cell.row, np.rad2deg(self.cell.ori), self.cell.rev)

In [5]:
#This will implement the basic A* Algorithm
class HybridAStar:
    #class variables
    def __init__(self, grid:Grid, vehicle:Vehicle, debug:bool=False):
        self._grid = grid
        self._vehicle = vehicle
        self.currentCostMap = [[float('inf') for i in range(self._grid.width)] for j in range(self._grid.height)]
        self._debug = debug
        
    def ReedsSheppPath(self, vehicleState, goal):
        pass
    
    #Generates the Path
    def GeneratePath(self, current):
        path = []
        states = []
        while current is not None:
            path.append((current.cell.row, current.cell.col))
            states.append(current.vehicleState)
            current = current.parent
        return path[::-1], states[::-1]
    
    #Calculates distance heuristic
    def CalculateDistance(self, current, destination):
        return np.sqrt(((current.vehicleState.position[0] - destination.vehicleState.position[0]) ** 2 + (current.vehicleState.position[1] - destination.vehicleState.position[1]) ** 2))
    
    #gets all neighbors at current node
    def GetNextNodes(self, current_state:VehicleState, gear):
        nodes = []
        positions = []
        for steer in self._vehicle.steers:
            newVehicleState, length = self._vehicle.getNextState(current_state, steer, gear, 1)
            #self._grid.IsSafe(newVehicleState, 1.5)
            positions.append((newVehicleState.position, newVehicleState.orientation))
            if(IsPointInGrid(newVehicleState.position, (self._grid.width, self._grid.height))):
                nodes.append(HybridAStarNode(None, None, newVehicleState, ReedsSheppAction(steer, gear, length)))
        #check reeds shepp path
        #add to child if its cheap af
        if(self._debug):
            print(positions)
        return nodes
    
    def statetoCell(self, state:VehicleState):
        x = int(np.floor(state.position[0]))
        y = int(np.floor(state.position[1]))
        theta = mod2Pi(state.orientation)
        rev = 1 if state.gear==Gear.Backward else 0
        return Cell(row=x, col=y, ori=theta, rev=rev)
    
    def run(self, start:VehicleState, end:VehicleState):
        start_node = HybridAStarNode(None, self.statetoCell(start), start, ReedsSheppAction(Steer.Straight, Gear.Forward, 0))
        end_node = HybridAStarNode(None, self.statetoCell(end), end)
        
        open_list = PriorityQueue()
        closed_list = []
        open_list.put(start_node)
        voro = self._grid.returnVoroDistanceMap()
        
        while open_list.qsize():
            current_node = open_list.get()
            closed_list.append(current_node)
            if(current_node == end_node):
                return self.GeneratePath(current_node)
            if(self._debug):
                print("Current Position: " + str(current_node.vehicleState.position))
                print("End Position: " + str(end_node.vehicleState.position))
                print("Current Cell: " + str((current_node.cell.col, current_node.cell.row)))
                print("End Cell: " + str((end_node.cell.col, end_node.cell.row)))
                print("Current Angle: " + str((current_node.cell.ori)))
                print("End Angle: " + str((end_node.cell.ori)))
                print()
            for gear in self._vehicle.gears:
                nextNodes = self.GetNextNodes(current_node.vehicleState, gear)
                for nextNode in nextNodes:
                    nextNode.g = current_node.g + 1 #path cost
                    nextNode.h = self.CalculateDistance(nextNode, end_node) #hueristic
                    nextNode.parent = current_node
                    nextNode.cell = self.statetoCell(nextNode.vehicleState)
                    
                    if(self.currentCostMap[nextNode.cell.col][nextNode.cell.row] == float('inf')):
                        if(nextNode in open_list.queue): continue
                        open_list.put(nextNode)
                        self.currentCostMap[nextNode.cell.col][nextNode.cell.row] = nextNode.FCost()
                    elif(nextNode.FCost() < self.currentCostMap[nextNode.cell.col][nextNode.cell.row]):
                        if(nextNode in open_list.queue): continue
                        self.currentCostMap[nextNode.cell.col][nextNode.cell.row] = nextNode.FCost()
                        open_list.put(nextNode)
            if(self._debug):
                print()
                for i,row in enumerate(hastar.currentCostMap):
                    printer = []
                    for q,cost in enumerate(row):
                        if((i,q) == (current_node.cell.col, current_node.cell.row)):
                            printer.append("XXXX")
                        elif(cost != float('inf')):
                            printer.append("{:.2f}".format(cost))
                        elif((i,q) == (end_node.cell.col, end_node.cell.row)):
                            printer.append("YYYY")
                        else:
                            printer.append("     ")
                    print(printer)


