### Generating a random Maze

In [1]:
from enum import Enum
from typing import List, NamedTuple, Callable, Optional
import random
from math import sqrt
# from generic_search import dfs, bfs, node_to_path, astar, Node

class Cell(str, Enum):
    EMPTY = " "
    BLOCKED = "X"
    START = "S"
    GOAL = "G"
    PATH = "*"
    
class MazeLocation(NamedTuple):
    row: int
    column: int

In [2]:
class Maze:
    def __init__(self, rows: int=10, columns: int=10, sparseness: float=0.2, 
                 start:MazeLocation = MazeLocation(0,0), 
                 goal:MazeLocation = MazeLocation(9,9)) -> None:
        # initialize basic instance variables
        self._rows: int = rows
        self._columns: int = columns
        self.start: MazeLocation = start
        self.goal: MazeLocation = goal
        # Fill the grid with empty cell
        self._grid: List[List[Cell]] = [[Cell.EMPTY for c in range(columns)]
                                       for r in range(rows)]
        #Populaqte the grid with blocked cells
        self._randomly_fill(rows, columns, sparseness)
        # Fill the start and goal locations in
        self._grid[start.row][start.column] = Cell.START
        self._grid[goal.row][goal.column] = Cell.GOAL
        
    def _randomly_fill(self, rows: int,
                      columns: int,
                      sparseness: float):
        for row in range(rows):
            for column in range(columns):
                if random.uniform(0, 1.0) < sparseness:
                    self._grid[row][column] = Cell.BLOCKED
                    
    def __str__(self) -> str:
        output: str = ""
        for row in self._grid:
            output += "".join([c.value for c in row]) + "\n"
        return output
    
    def goal_test(self, ml: MazeLocation) -> bool:
        return ml == self.goal
    
    def successors(self, ml: MazeLocation) -> List[MazeLocation]:
        locations: List[MazeLocation] = []
        if ml.row + 1 < self._rows and self._grid[ml.row + 1][ml.column] != Cell.BLOCKED:
            locations.append(MazeLocation(ml.row + 1, ml.column))
        if ml.row - 1 >= 0 and self._grid[ml.row - 1][ml.column] != Cell.BLOCKED:
            locations.append(MazeLocation(ml.row - 1, ml.column))
        if ml.column + 1 < self._columns and self._grid[ml.row][ml.column + 1] != Cell.BLOCKED:
            locations.append(MazeLocation(ml.row, ml.column + 1))
        if ml.column - 1 >= 0 and self._grid[ml.row][ml.column - 1] != Cell.BLOCKED:
            locations.append(MazeLocation(ml.row, ml.column - 1))
        return locations
    
    def mark(self, path: List[MazeLocation]):
        for maze_location in path:
            self._grid[maze_location.row][maze_location.column] = Cell.PATH
        self._grid[self.start.row][self.start.column] = Cell.START
        self._grid[self.goal.row][self.goal.column] = Cell.GOAL
    
    def clear(self, path: List[MazeLocation]):
        for maze_location in path:
            self._grid[maze_location.row][maze_location.column] = Cell.EMPTY
        self._grid[self.start.row][self.start.column] = Cell.START
        self._grid[self.goal.row][self.goal.column] = Cell.GOAL

In [3]:
maze: Maze = Maze()
print(maze)

SXX       
        XX
X  X X  X 
   X   X  
         X
X         
          
    X  X  
        X 
      XX G



### Depth-First Search

## Creating a Stack Class:

In [4]:
from typing import Generic, TypeVar
T = TypeVar("T")

class Stack(Generic[T]):
    def __init__(self) -> None:
        self._container: List[T] = []
    
    @property
    def empty(self) -> bool:
        return not self._container  # not is true for empty container
    
    def push(self, item: T) -> None:
        self._container.append(item)
    
    def pop(self) -> T:
        return self._container.pop()  # LIFO
    
    def __repr__(self) -> str:
        return repr(self._container)

In [5]:
from __future__ import annotations

^ This allows a Class to reference itself in the type hints of its methods

In [6]:
class Node(Generic[T]):
    def __init__(self, state: T, parent: Optional[Node], cost: float = 0.0,
                heuristic: float = 0.0) -> None:
        self.state: T = state
        self.parent: Optional[Node] = parent
        self.cost: float = cost
        self.heuristic: float = heuristic
            
    def __lt__(self, other: Node) -> bool:
        return (self.cost + self.heuristic) < (other.cost + other.heuristic)

Depth-first search (DFS) is an algorithm for traversing or searching tree or graph data structures. The algorithm starts at the root node (selecting some arbitrary node as the root node in the case of a graph) and explores as far as possible along each branch before backtracking.

In [7]:
def dfs(initial: T, goal_test: Callable[[T], bool], 
        successors: Callable[[T], List[T]]) -> Optional[Node[T]]:
    # frontier is where we've yet to go
    frontier: Stack[Node[T]] = Stack()
    frontier.push(Node(initial, None))
    # explored is where we've been
    explored: Set[T] = {initial}
        
    # Keep going while there is more to explore
    while not frontier.empty:
        current_node: Node[T] = frontier.pop()
        current_state: T = current_node.state
        # if we found the goal, we're done
        if goal_test(current_state):
            return current_node
        # check where we can go next and haven't explored
        for child in successors(current_state):
            if child in explored: # skip children we already explored
                continue
            explored.add(child)
            frontier.push(Node(child, current_node))
    return None  # Went through everything and never found goal

If dfs() is successful, it returns the Node encapsulating the goal state. The path from the start to the goal can be reconstructed by working backward from this Node and its priors using the parent property.

In [8]:
def node_to_path(node: Node[T]) -> List[T]:
    path: List[T] = [node.state]
    # Work backwards from end to front
    while node.parent is not None:
        node = node.parent
        path.append(node.state)
    path.reverse()
    return path

In [9]:
if __name__ == "__main__":
    # Test DFS
    m: Maze = Maze()
    print(m)
    solution1: Optional[Node[MazeLocation]] = dfs(m.start, 
                                                  m.goal_test, m.successors)
    if solution1 is None:
        print("No solution found using depth-first search!")
    else:
        path1: List[MazeLocation] = node_to_path(solution1)
        m.mark(path1)
        print(m)
        m.clear(path1)

S      X  
   X XX   
    XXX X 
          
  X  X    
          
         X
X         
 XX  X  XX
         G

S**    X  
  *X XX   
*** XXX X 
*         
**X  X    
 ******** 
        *X
X  ****** 
 XX* X  XX
   ******G



## Breadth-first search

### Queues

To implement BFS, a data structure known as a queue is required. Whereas a stack is LIFO, a queue is FIFO (First-In-First-Out).


In [10]:
from collections import deque

In [11]:
class Queue(Generic[T]):
    def __init__(self) -> None:
        self._container: Deque[T] = deque()
    
    @property
    def empty(self) -> bool:
        return not self._container  # Not is true for empty container
    
    def push(self, item: T) -> None:
        self._container.append(item)
    
    def pop(self) -> T:
        return self._container.popleft()  # FIFO
    
    def __repr__(self) -> str:
        return repr(self._container)

Also, deque.popleft() and list.pop(0) perform the same action but there is a difference in performance.
deque.popleft() is faster than list.pop(0), because the deque has been optimized to do popleft() approximately in O(1), while list.pop(0) takes O(n)

### The BFS Algorithms


Amazingly, the algorithm for a breadth-first search is identical to the algorithm for a depth-first search, with the frontier changed from a stack to a queue. Changing the frontier from a stack to a queue changes the order in which states are searched and ensures that the states closest to the start state are searched first.

In [12]:
def bfs(initial: T, goal_test: callable[[T], bool], 
        successors: Callable[[T], List[T]]) -> Optional[Node[T]]:
    # Frontier is where we've yet to go
    frontier: Queue[Node[T]] = Queue()
    frontier.push(Node(initial, None))
    # explored is where we've been
    explored: Set[T] = {initial}
        
    # Keep going while there is more to explore
    while not frontier.empty:
        current_node: Node[T] = frontier.pop()
        current_state: T = current_node.state
        # if we found the goal, we're done!
        if goal_test(current_state):
            return current_node
        # check where we can go next and haven't explored
        for child in successors(current_state):
            if child  in explored: #skip children we already explored
                continue
            explored.add(child)
            frontier.push(Node(child, current_node))
    return None  # Went through everything and never found goal

In [13]:
if __name__ == "__main__":
    # Test DFS
    m: Maze = Maze()
    print(m)
    solution1: Optional[Node[MazeLocation]] = dfs(m.start, 
                                                  m.goal_test, m.successors)
    if solution1 is None:
        print("No solution found using depth-first search!")
    else:
        path1: List[MazeLocation] = node_to_path(solution1)
        m.mark(path1)
        print(m)
        m.clear(path1)
    
    solution2: Optional[Node[MazeLocation]] = bfs(m.start, 
                                                  m.goal_test, m.successors)
    if solution2 is None:
        print("No solution found using breadth-first search!")
    else:
        path2: List[MazeLocation] = node_to_path(solution2)
        m.mark(path2)
        print(m)
        m.clear(path2)

S    X  XX
         X
  XXXX    
  X X     
   X X    
        X 
 X       X
          
    X   X 
         G

S****X  XX
    *****X
  XXXX  **
  X X    *
   X X****
******* X 
*X       X
**********
    X   X*
         G

S    X  XX
*        X
* XXXX    
* X X     
*  X X    
*       X 
*X       X
*         
*   X   X 
*********G



## A* Search

Like a BFS, an A search aims to find the shortest path from start state to goal state. Unlike the preceding BFS implementation, an A search uses a combination of a cost function and a heuristic function to focus its search on pathways most likely to get to the goal quickly.



### Priority Queues


A priority queue keeps its elements in an internal order, such that the first element popped out is always the highest-priority element.



In [14]:
from typing import TypeVar, Generic, List
from heapq import heappush, heappop

In [15]:
class PriorityQueue(Generic[T]):
    def __init__(self) -> None:
        self._container: List[T] = []
    
    @property
    def empty(self) -> bool:
        return not self._container  # not is true for empty container
    
    def push(self, item: T) -> None:
        heappush(self._container, item)  # in by priority
        
    def pop(self) -> T:
        return heappop(self._container)  # out by priority
    
    def __repr__(self) -> str:
        return repr(self._container)

In [16]:
def euclidiean_distance(goal: MazeLocation) -> Callable[[MazeLocation], float]:
    def distance(ml: MazeLocation) -> float:
        xdist: int = ml.column - goal.column
        ydist: int = ml.row - goal.row
        return sqrt((xdist * xdist) + (ydist * ydist))
    return distance

In [17]:
def manhattan_distance(goal: MazeLocation) -> Callable[[MazeLocation], float]:
    def distance(ml: MazeLocation) -> float:
        xdist: int = abs(ml.column - goal.column)
        ydist: int = abs(ml.row - goal.row)
        return (xdist + ydist)
    return distance

In [18]:
def astar(initial: T, goal_test: Callable[[T], bool], 
          successors: Callable[[T], List[T]], 
          heuristic: Callable[[T], float]) -> Optional[Node[T]]: 
        # frontier is where we've yet to go
        frontier: PriorityQueue[Node[T]] = PriorityQueue()
        frontier.push(Node(initial, None, 0.0, heuristic(initial)))
        # explored is where we've been
        explored: Dict[T, float] = {initial: 0.0}
        
        # Keep going while there is more to explore
        while not frontier.empty:
            current_node: Node[T] = frontier.pop()
            current_state: T = current_node.state
            # if we found the goal, we're done
            if goal_test(current_state):
                return current_node
            # check where we can go next and haven't explored
            for child in successors(current_state):
                new_cost: float = current_node.cost + 1
                # 1 assume a grid, need a cost function for more sophisticated apps
                if child not in explored or explored[child] > new_cost:
                    explored[child] = new_cost
                    frontier.push(Node(child, current_node, new_cost, heuristic(child)))
        return None # Went through everything and never found goal
        

# Test A*

In [19]:
distance: callable[[MazeLocation], float] = manhattan_distance(m.goal)
solution3: Optional[Node[MazeLocation]] = astar(m.start, m.goal_test, m.successors, distance)

if solution3 is None:
    print("No solution found using A*!")
else:
    path3: List[MazeLocation] = node_to_path(solution3)
    m.mark(path3)
    print(m)
        

S    X  XX
*        X
* XXXX    
**X X     
 **X X    
  *     X 
 X**     X
   *****  
    X  *X 
       **G

