# A* algorithm

Implementation of the A* algorithm.

The A* algorithm is a simple yet elegant way of efficiently finding the lowest cost path from start to goal. 

In [8]:
from queue import Queue, PriorityQueue
import numpy as np
from enum import Enum

Python priority queue is used as a convenient way of maintaining a sorted queue. The PriorityQueue data structure allows us to quickly and efficiently select the lowest cost partial plan from the queue of all partial plans.


[`PriorityQueue`](https://docs.python.org/3/library/queue.html#queue.PriorityQueue) is used as a convenient way of maintaining a sorted queue. It allows us to quickly and efficiently select the lowest cost partial plan from the queue of all partial plans. More about `PriorityQueue` [here](https://www.linode.com/docs/guides/python-priority-queue/).

In [9]:
class Action(Enum):
    """
    An action is represented by a 3 element tuple.
    
    The first 2 values are the delta of the action relative
    to the current grid position. The third and final value
    is the cost of performing the action.
    """
    LEFT = (0, -1, 1)
    RIGHT = (0, 1, 1)
    UP = (-1, 0, 1)      
    DOWN = (1, 0, 1)
    
    def __str__(self):
        if self == self.LEFT:
            return '<'
        elif self == self.RIGHT:
            return '>'
        elif self == self.UP:
            return '^'
        elif self == self.DOWN:
            return 'v'
    
    @property
    def cost(self):
        return self.value[2]
    
    @property
    def delta(self):
        return (self.value[0], self.value[1])
            
    
def valid_actions(grid, current_node):
    """
    Returns a list of valid actions given a grid and current node.
    """
    valid = [Action.UP, Action.LEFT, Action.RIGHT, Action.DOWN]
    n, m = grid.shape[0] - 1, grid.shape[1] - 1
    x, y = current_node
    
    # check if the node is off the grid or
    # it's an obstacle
    
    if x - 1 < 0 or grid[x-1, y] == 1:
        valid.remove(Action.UP)
    if x + 1 > n or grid[x+1, y] == 1:
        valid.remove(Action.DOWN)
    if y - 1 < 0 or grid[x, y-1] == 1:
        valid.remove(Action.LEFT)
    if y + 1 > m or grid[x, y+1] == 1:
        valid.remove(Action.RIGHT)
        
    return valid

def visualize_path(grid, path, start):
    sgrid = np.zeros(np.shape(grid), dtype=np.str)
    sgrid[:] = ' '
    sgrid[grid[:] == 1] = 'O'
    
    pos = start
    
    for a in path:
        da = a.value
        sgrid[pos[0], pos[1]] = str(a)
        pos = (pos[0] + da[0], pos[1] + da[1])
    sgrid[pos[0], pos[1]] = 'G'
    sgrid[start[0], start[1]] = 'S'  
    return sgrid

## Heuristics
The heuristic function determines the $h()$ value for each cell based on the goal cell and the method chosen to determine it. The heuristic value can be the Euclidean distance between these cells $h= \left((x_i-x_{goal})^2+(y_i-y_{goal})^2\right)^{1/2}$ or the "Manhattan distance", which is the minimum number of moves required to reach the goal from the assigned cell $h = ||x_i-x_{goal}|| + ||y_i-y_{goal}||$. For this exercise you could use either, or something else which is *admissible* and *consistent*.

The input variables include
* **```position```** the coordinates of the cell for which you would like to determine the heuristic value.
* **```goal_position```** the coordinates of the goal cell

In [10]:
# Implement a heuristic function. This may be one of the
# functions described above or feel free to think of something
# else.
def Euclidean_d(position, goal_position):
    dx = goal_position[0] - position[0]
    dy = goal_position[1] - position[1] 
    return ((dx**2 + dy**2)**0.5)

def Manhattan_d(position, goal_position):
    dx = goal_position[0] - position[0]
    dy = goal_position[1] - position[1]
    h = (dx**2 + dy**2)**0.5
    return (abs(dx) + abs(dy))

## A* search

A* search is an extension of the cost search you implemented. A heuristic function is used in addition to the cost penalty. Thus if the setup is:

* $c$ is the current cost
* $g$ is the cost function
* $h$ is the heuristic function

Then the new cost is $c_{new} = c + g() + h()$.

The difference between $g$ and $h$ is that $g$ models the cost of performing actions, irrespective of the environment, while $h$ models the cost based on the environment, i.e., the distance to the goal.

In [14]:
def a_star(grid, heuristics, start, goal):

    path = []
    q = PriorityQueue()     # Priority queue for prioritizing expanding cells with lower cost
    q.put((0, start))       # (cost, cell); cost value is use for priority
    visited = set(start)
    branch = {}

    found = False           # Fag for end of search
    
    # Run loop while queue is not empty
    while not q.empty():
        # Get and remove the first element from the queue
        item = q.get()
        current_node = item[1]
        
        # Assignation of current cost
        if current_node == start:
            current_cost = 0.0
        else:              
            current_cost = branch[current_node][0]
        
        # If the current cell corresponds to the goal state, stop the search
        if current_node == goal:        
            print('Found a path.')
            found = True
            break
        else:
            # Get the new valid nodes connected to the current node
            valid = valid_actions(grid, current_node)
            # Iterate through each of the valid actions and:
            for action in valid:
                da = action.delta    # delta-movement of performing the action
                next_node = (current_node[0] + da[0], current_node[1] + da[1])
                
                # Branch cost evaluation (action.cost + g)
                branch_cost = action.cost + current_cost
                
                # Heuristics distance evaluation
                h = heuristics(next_node, goal)
                
                # Queue cost evaluation (action.cost + g + h)
                queue_cost = branch_cost + h
                
                if next_node not in visited:                
                    visited.add(next_node)               
                    branch[next_node] = (branch_cost, current_node, action)
                    q.put((queue_cost, next_node))    
                    
    if found:
        # retrace steps
        n = goal
        path_cost = branch[n][0]
        while branch[n][1] != start:
            path.append(branch[n][2])
            n = branch[n][1]
        path.append(branch[n][2])
    else:
        print('**********************')
        print('Failed to find a path!')
        print('**********************')
        
    return path[::-1], path_cost

### Executing the search

Run `a_star()` and reference the grid to see if the path makes sense.

In [15]:
start = (0, 0)
goal = (3, 5)

grid = np.array([
    [0, 0, 1, 0, 0, 0],
    [0, 1, 1, 0, 1, 0],
    [0, 0, 0, 0, 1, 0],
    [0, 0, 0, 1, 0, 0],
    [0, 1, 0, 0, 1, 0],
])

In [18]:
#path, path_cost = a_star(grid, Euclidean_d, start, goal)    # Euclidean distance
path, path_cost = a_star(grid, Manhattan_d, start, goal)     # Manhattan distance

print('Path cost: ', path_cost)
print(path)

Found a path.
Path cost:  12.0
[<Action.DOWN: (1, 0, 1)>, <Action.DOWN: (1, 0, 1)>, <Action.RIGHT: (0, 1, 1)>, <Action.RIGHT: (0, 1, 1)>, <Action.RIGHT: (0, 1, 1)>, <Action.UP: (-1, 0, 1)>, <Action.UP: (-1, 0, 1)>, <Action.RIGHT: (0, 1, 1)>, <Action.RIGHT: (0, 1, 1)>, <Action.DOWN: (1, 0, 1)>, <Action.DOWN: (1, 0, 1)>, <Action.DOWN: (1, 0, 1)>]


In [19]:
# S -> start, G -> goal, O -> obstacle
visualize_path(grid, path, start)

array([['S', ' ', 'O', '>', '>', 'v'],
       ['v', 'O', 'O', '^', 'O', 'v'],
       ['>', '>', '>', '^', 'O', 'v'],
       [' ', ' ', ' ', 'O', ' ', 'G'],
       [' ', 'O', ' ', ' ', 'O', ' ']], dtype='<U1')