In [None]:
# Utils
import time
from tqdm import tqdm

# Data Structures
import heapq

# Data Manipulation and computations
import numpy as np
import pandas as pd
import tensorflow as tf
import scipy.spatial.distance as dist

# Visualization
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


### Agent 1 Code

In [None]:
#@title A*Search Utils

class Node:
  
  def __init__(self, parent=None, position=None, recency_factor=None):
    """ 
    Represents a Node in the A* search tree. 
    
    Parameters
    ----------
    parent: Node
      Node for the neighbouring cell from which this node was discovered
    position: (int, int)
      Coordinates of the cell.
    recency_factor: int
      Represents how recently this node was created. Used for breaking ties in
      the priority queue.
    """

    self.parent = parent
    self.position = position
    self.recency_factor = recency_factor

    self.g = 0
    self.h = 0
    self.f = 0
  
  def __eq__(self, other):
    """ 
    For Node comparison. 
    
    Parameters
    ----------
    other: Node
      The object with which this instance needs to be compared.
    """

    # In case 'other' is not a Node
    if type(self) != type(other):
      return False

    return self.position == other.position
  
  def __lt__(self, other):
    """ 
    For heap comparison. 
    
    Parameters
    ----------
    other: Node
      The object with which this instance needs to be compared.
    """

    if self.f < other.f:  # If priority is less
      return True
    elif self.f == other.f:  # If tie, check recency
      if self.recency_factor > other.recency_factor:
        return True
    
    return False


  def __gt__(self, other):
    """
    For heap comparison. 
    
    Parameters
    ----------
    other: Node
      The object with which this instance needs to be compared.
    """

    if self.f > other.f:  # If priority is more
      return True
    elif self.f == other.f:  # If tie, check recency
      if self.recency_factor < other.recency_factor:
        return True
    
    return False



def is_unvisited_and_unblocked(coords, state):
  """
  Checks if a cell is not blocked and not yet visited.
  
  Parameters
  ----------
  coords: (int, int)
    Coordinates of the cell to check
  state: (2D np.array, 2D np.array, 2D np.array)
    The elements are passed in the following order:
      `knowledge` - Represents the knowledge of the agent. knowledge[x][y] = 1 if
        the agent knows there exists a block at position (x, y) and is 0 otherwise.
      `visited` - Represents which nodes the agent has already expanded. visited[x][y] = 1 if
        the agent has already expanded Node at (x, y) and 0 otherwise.
      `in_fringe` - Represents if the node is in the fringe already. in_fringe[x][y] = 1
        if Node at (x, y) is already in the fringe and 0 otherwise.

  Returns
  -------
  True if not blocked and not visited else False
  """

  # Get coordinates
  x, y = coords
  knowledge, visited, _ = state

  # Return false if cell is blocked or already visited
  if knowledge[x][y] != 0 or visited[x][y] != 0:
    return False

  return True

def neighbourhood(coords, num_rows, num_cols, nbhd_type="compass", parent_coords=None):
  """
  Returns the possible neighbours of a cell. Doesn't check for visited or blocked nodes.
  Assumes the grid is between `(0, 0)` to `(num_rows-1, num_cols-1)`. 

  Parameters
  ----------
  coords: (int, int)
    Coordinates of the cell
  num_rows: int
    No. of rows in the grid.
  num_cols: int
    No. of columns in the grid.
  nbhd_type: str
    `compass` - the agent can see in all 4 directions (up, down, left, right)
    `directional` - the agent can only see in the direction in which it is moving
  parent_coords: (int, int)
    Coordinates of the parent cell, required if `nbhd_type` is `directional`.

  Returns
  -------
  possible_cells: List[(int, int)]
    List of coordinates for the possible neighbours
  """

  if nbhd_type == "compass":

    # Get possible cells
    x, y = coords
    possible_cells = [(x+1, y), (x, y+1), (x-1, y), (x, y-1)]

    # Remove cells outside boundaries
    possible_cells = [cell for cell in possible_cells if cell[0] >= 0 and cell[1] >= 0 and cell[0] <= num_rows-1 and cell[1] <= num_cols-1]

    return possible_cells
  
  elif nbhd_type == "directional":

    if parent_coords is None:
      raise ValueError("Parent coords cannot be none if nbhd_type == 'directional'.")

    # Find the possible neighbouring node in the field of view
    cell = tuple(np.array(coords) + (np.array(coords) - np.array(parent_coords)))

    # Check if the node is valid
    possible_cells = [cell] if cell[0] >= 0 and cell[1] >= 0 and cell[0] <= num_rows-1 and cell[1] <= num_cols-1 else []

    return possible_cells
  
  raise ValueError("nbhd type can only be from ['compass', 'directional'], not", nbhd_type)

def get_valid_children(coords, parent, num_rows, num_cols, state):
  """
  Gets valid children for a cell, based on position, known blockages and visited
  neighbours.

  Parameters
  ----------
  coords: (int, int)
    Coordinates of the cell
  parent: Node
    Parent of the cell for which the neighbours need to be created
  num_rows: int
    No. of rows in the grid.
  num_cols: int
    No. of columns in the grid.
  state: (2D np.array, 2D np.array, 2D np.array)
    The elements are passed in the following order:
      `knowledge` - Represents the knowledge of the agent. knowledge[x][y] = 1 if
        the agent knows there exists a block at position (x, y) and is 0 otherwise.
      `visited` - Represents which nodes the agent has already expanded. visited[x][y] = 1 if
        the agent has already expanded Node at (x, y) and 0 otherwise.
      `in_fringe` - Represents if the node is in the fringe already. in_fringe[x][y] = 1
        if Node at (x, y) is already in the fringe and 0 otherwise.

  Returns
  -------
  valid_children: List[(int, int)]
    List of the possible coordinates of the children.
  """

  # Get all possible children
  nbhd = neighbourhood(coords, num_rows, num_cols)

  # Don't add the parent to the list of valid children
  if parent is not None:
    nbhd = [x for x in nbhd if x != parent.position]

  # Remove known blocked and visited cells
  valid_children = [cell for cell in nbhd if is_unvisited_and_unblocked(cell, state)]

  return valid_children

In [None]:
#@title Generate Gridworld

def generate_gridworld(shape, block_prob):
  """
  Parameters
  ----------
  shape: (int, int)
    Number of rows and columns in the required gridworld
  block_prob: float
    Each cell is blocked with a probability p = block_prob

  Returns
  -------
  grid_world: 2D np.array
    Grid with each cell blocked with a probability `block_prob`.
  """

  num_rows, num_cols = shape

  # Randomly sample a 2D array with each cell being either 1 or 0 with prob as block_prob
  grid_world = np.random.choice([1, 0], (num_rows, num_cols), p=[block_prob, 1-block_prob])

  # Exclude start and end cells
  grid_world[0][0] = 0
  grid_world[num_rows-1][num_cols-1] = 0

  return grid_world

In [None]:
#@title A* Search

def grid_path(current_node):
    """ 
    Recursively finds the path from the leaf to the root node in the A* search tree. 
    
    Parameters
    ----------
    current_node: Node
      Node in the A* search tree.

    Returns
    -------
    planned_path: List[(int, int)]
      Path from the root node to this leaf node
    """

    path = []

    # Recursively travel up to the root
    current = current_node

    while current != None:
      path.append(current.position)
      current = current.parent

    # Reverse and return
    return path[::-1]
  
def heuristic(name, start, end):
    """ 
    Returns the heuristic (distance) between the start and end cells. 

    Parameters
    ----------
    name: str
      Can be `'euclidean'`, `'manhattan'`, or `'chebyshev'`.
    start: (int, int)
      Coordinates of the start cell
    end: (int, int)
      Coordinates of the end cell

    Returns
    -------
    distance: float or int
      Distance based on which heuristic is being used.
    """

    if name == "euclidean":
      return dist.euclidean(start, end)
    elif name == "manhattan":
      return dist.cityblock(start, end)
    elif name == "chebyshev":
      return dist.chebyshev(start, end)

    # If heuristic is unknown, raise error
    raise NotImplementedError("Unknown heuristic:", name)


def a_star_search(start, goal, grid, heuristic_type, 
                  visited, knowledge, max_steps=None, 
                  epsilon=1.):
  """ 
  Executes A* search on a grid.

  Parameters
  ----------
  start: (int, int)
    Coordinates of the start cell.
  goal: (int, int)
    Coordinates of the goal cell.
  grid: 2D np.array
    Represents the original grid on which A* needs to be executed
  heuristic_type: str
    Type of heuristic to use. Can be `'euclidean'`, `'manhattan'`, or `'chebyshev'`
  visited: 2D np.array
    Represents which nodes the agent has already expanded. visited[x][y] = 1 if
    the agent has already expanded Node at (x, y) and 0 otherwise.
  knowledge: 2D np.array
    Represents the knowledge of the agent. knowledge[x][y] = 1 if
    the agent knows there exists a block at position (x, y) and is 0 otherwise.
  max_steps: int or None
    Max. number of times we pop a Node from the Fringe
  epsilon: float
    Weight given to the heuristic function while calculating the priority f. Used as:
    f(n) = g(n) + epsilon*h(n)

  Returns
  -------
  planned_path: List[(int, int)]
    List of cells to traverse from the start node to end node
  exit_status: str
    `"SUCCESS"` - If A* was successfully completed
    `"FAILED_NOPATH"` - If no path can be found from the start to the goal cells
    `"FAILED_STEPS"` - Max. number of steps was reached and path not found
  num_cells_popped: int
    Total number of nodes popped from the fringe throughout the search process.
  
  TODO
  ----
  - num_cells_popped == steps? Remove num_cells_popped and use steps instead
  - Remove the `grid` parameter since this is not required
  - Implement a separate `Fringe` class
  """

  start_node = Node(position=start)
  goal_node = Node(position=goal)

  # Implementing priority queue using heap
  fringe = []
  in_fringe = np.zeros(grid.shape)
  heapq.heapify(fringe)

  # Initialize tracking parameters
  recency_counter = 0
  num_cells_popped = 0

  # Add start node to the fringe
  start_node.recency_factor = recency_counter
  heapq.heappush(fringe, start_node)
  in_fringe[start[0]][start[1]] = 1
  recency_counter += 1
  
  # Stopping condition
  if max_steps is None:
    max_steps = grid.shape[0]**2 * grid.shape[1]**2

  steps = 0
  exit_status = "FAILED_NOPATH"

  # Start
  while fringe:
    
    steps += 1

    # Pop the current node from the fringe
    current_node = heapq.heappop(fringe)
    num_cells_popped += 1
    curr_x, curr_y = current_node.position

    # Visit the current node and note that it is out of the fringe
    visited[curr_x][curr_y] = 1
    in_fringe[curr_x][curr_y] = 0

    # Check and return path if we have reached the goal node
    if current_node == goal_node:
      exit_status = "SUCCESS"
      return grid_path(current_node), exit_status, num_cells_popped

    # Stopping condition
    if steps > max_steps:
      exit_status = "FAILED_STEPS"
      return grid_path(current_node), exit_status, num_cells_popped

    # Create children
    children = get_valid_children(current_node.position, current_node.parent, grid.shape[0], grid.shape[1], (knowledge, visited, in_fringe))
    children = [Node(current_node, x) for x in children]

    # Set parameters for each child
    for i in range(len(children)):
      children[i].g = current_node.g + 1
      children[i].h = heuristic(heuristic_type, children[i].position, goal)
      children[i].f = children[i].g + epsilon*children[i].h
      children[i].recency_factor = recency_counter

      heapq.heappush(fringe, children[i])
      in_fringe[children[i].position[0]][children[i].position[1]] = 1

      recency_counter += 1

  return [], exit_status, num_cells_popped

In [None]:
#@title Visualize Grid

def visualize_grid(sol, grid):
  """
  Visualize grid with the solution on top of it, marked by `*`.

  Parameters
  ----------
  sol: List[(int, int)]
    List of cells in the path from the start to end node
  grid: 2D np.array
    Grid on which to superimpose the solution

  Returns
  -------
  vis_grid: 2D np.array
    The grid to visualize. Use with `pretty_print`.
  """

  # Create copy of the original grid
  vis_grid = grid.copy().astype(str)

  # Mark path on grid
  for cell in sol:
    vis_grid[cell[0]][cell[1]] = "*"

  return vis_grid

def pretty_print(A):
  """
  Prints a 2D np.array with good visual clarity
  
  Parameters
  ----------
  A: 2D np.array
    The grid to visualize
  """
  print('\n'.join([''.join(['{:4}'.format(item) for item in row]) for row in A]))

In [None]:
#@title Test A* Search Custom

def test_a_star_search_custom():
  grid = [[0, 1, 0, 1, 0, 0, 1, 0, 0, 0],
          [0, 1, 0, 1, 1, 0, 1, 0, 0, 0],
          [0, 1, 1, 1, 0, 0, 0, 0, 1, 0],
          [0, 1, 0, 0, 0, 1, 0, 1, 1, 0],
          [0, 1, 0, 1, 1, 0, 1, 1, 0, 0],
          [0, 1, 0, 1, 0, 0, 1, 0, 0, 1],
          [0, 1, 0, 1, 0, 0, 0, 0, 0, 0],
          [0, 1, 0, 1, 1, 0, 1, 0, 0, 1],
          [0, 0, 0, 1, 1, 0, 1, 0, 0, 0],
          [1, 1, 0, 1, 0, 0, 0, 0, 1, 0]]

  grid = np.array(grid)

  visited = np.zeros(grid.shape)
  blocked = grid.copy()

  sol = a_star_search((0, 0), (grid.shape[0]-1, grid.shape[1]-1), grid, "euclidean", visited, blocked)

  pretty_print(grid)
  print("\n")
  pretty_print(visualize_grid(sol, grid))

In [None]:
#@title Is Open Cell

def is_open_cell(cell, knowledge):
  """
  Checks if a cell is open, i.e., has 3 or more unblocked neighbours.

  Parameters
  ----------
  cell: (int, int)
    Coordinates of the cell for which to check
  knowledge: 2D np.array
    Represents the knowledge of the agent. knowledge[x][y] = 1 if
    the agent knows there exists a block at position (x, y) and is 0 otherwise.
  """

  # Get neighbourhood of the cell
  nbhd = neighbourhood(cell, knowledge.shape[0], knowledge.shape[1])

  # Retain only the unblocked ones
  unblocked = [nbr for nbr in nbhd if knowledge[nbr[0]][nbr[1]] != 1]

  # Check length of unblocked cells
  if len(unblocked) >= 3:
    return True

  return False

In [None]:
#@title Move and Record


def move_and_record(start, goal, grid, planned_path, knowledge, nbhd_type="compass"):
  """
  Moves the robot along the path and records the environment as it travels.
  If a block is encountered, returns the last unblocked location.

  Parameters
  ----------
  start: (int, int)
    The start coordinates in the grid. Between (0, 0) and 
    (grid.shape[0]-1, grid.shape[1]-1).
  goal: (int, int) 
    The goal coordinates in the grid. Limits are similar to start.
  planned_path: List[(int, int)]
    List of coordinates (including start and end) to visit.
  knowledge: 2D np.array
    Represents the knowledge of the agent. knowledge[x][y] = 1 if
    the agent knows there exists a block at position (x, y) and is 0 otherwise.
  nbhd_type: str
    `'compass'` if the agent can see in all 4 directions while moving and
    `'directional'` if the agent can see only in the direction it is moving in
  
  Returns
  -------
  final_node: (int, int)
    The last unblocked node visited or the goal node.
  knowledge: 2D np.array
    Updated knowledge array.
  steps: int
    Number of steps taken along the path.
  last_open_cell: (int, int)
    The last cell seen with no. of unblocked neighbours >= 3
  """

  if not planned_path:  # Planned path is empty
    raise ValueError("Planned path cannot be empty.")

  if planned_path[0] != start:  # Planned path and start don't coincide
    raise ValueError("Planned path doesn't start with 'start'! planned_path[0] =", 
                     planned_path[0], "start =", start)

  steps = 0
  last_open_cell = None

  # Start moving
  for i in range(len(planned_path)):
    cell = planned_path[i]

    # If cell is blocked, return the last known location
    if grid[cell[0]][cell[1]] == 1:
      knowledge[cell[0]][cell[1]] = 1
      return planned_path[i-1], knowledge, steps, last_open_cell

    # Find all neighbouring cells and update knowledge
    parent_coords = planned_path[i-1] if i-1 > 0 else tuple(np.array(cell) - (np.array(planned_path[i+1]) - np.array(cell)))
    nbhd = neighbourhood(cell, grid.shape[0], grid.shape[1], nbhd_type=nbhd_type, parent_coords=parent_coords)

    # 'See' the whole neighbourhood
    for nbr in nbhd:
      knowledge[nbr[0]][nbr[1]] = grid[nbr[0]][nbr[1]]

    # If this cell is open, update last_open_cell
    if is_open_cell(cell, knowledge):
      last_open_cell = cell

    steps += 1

  return planned_path[i], knowledge, steps, last_open_cell

In [None]:
#@title Repeated A*

class RepeatedAStar:

  def __init__(self, grid, move_nbhd_type="compass", epsilon=1., 
               escape_tunnels=False):
    """ 
    Implements the Repeated A* algorithm on a grid-world. 
    
    Parameters
    ----------
    grid: 2D np.array
      The grid world on which we need to implement the algorithm
    move_nbhd_type: str
      Can be 'compass' (see in all 4 directions in the execution phase) 
      or 'directional' (see in only the direction of the movement 
      in the execution phase)
    epsilon: float
      Weight with which the heuristic function is multiplied, as:
      f(n) = g(n) + epsilon*h(n)
    escape_tunnels: bool
      Set to true to restart A* from the start of a tunnel by backtracking
      instead of dead-ends
    """

    # Logging
    self.knowledge_snaps = []
    self.visited_snaps = []
    self.start_end_snaps = []
    self.sol_snaps = []
    self.step = 0
    self.successfully_completed = False
    self.final_exit_status = None
    self.grid = grid
    self.total_cells_processed_by_run = []
    self.backtracks = []

    # Algorithm parameters
    self.epsilon = epsilon
    self.move_nbhd_type = move_nbhd_type
    self.escape_tunnels = escape_tunnels

  def _update_state(self, knowledge, visited, path_end_points, soln, 
                    step, successfully_completed, final_exit_status, 
                    num_cells_popped):
    """ 
    Updates the state (logging variables) after each run of the planning + execution phases. 
    
    Parameters
    ----------
    knowledge: 2D np.array
      Represents the knowledge of the agent. knowledge[x][y] = 1 if
      the agent knows there exists a block at position (x, y) and is 0 otherwise.
    visited: 2D np.array
      Represents which nodes the agent has already expanded. visited[x][y] = 1 if
      the agent has already expanded Node at (x, y) and 0 otherwise.
    path_end_points: ((int, int), (int, int))
      Start and end points for 1 run of A* search
    soln: List[(int, int)]
      Planned path from the start node to the end node for 1 run of A* search
    step: int
      The step number for repeated A* search
    successfully_completed: bool
      If goal node has been reached by repeated A* search
    final_exit_status: str
      `"SUCCESS"` - If A* was successfully completed
      `"RUNNING"` - If Repeated A* search is still running
      `"FAILED_NOPATH"` - A* search could not find a path between the start and end nodes
      `"FAILED_STEPS"` - No. of steps in 1 run of A* search exceeded max steps
      `"FAILED_STEPS_REP"` - No. of steps for repeated A* search exceeded max steps
    num_cells_popped: int
      Number of cells popped during one run of A* search


    TODO
    ----
    - Add backtracks to the final path
    """
    
    self.knowledge_snaps.append(knowledge)
    self.visited_snaps.append(visited)
    self.start_end_snaps.append(path_end_points)
    self.sol_snaps.append(soln)
    self.step = step
    self.successfully_completed = successfully_completed
    self.final_exit_status = final_exit_status
    self.total_cells_processed_by_run.append(num_cells_popped)

  def logs(self):
    """
    Fetches all the logging parameters

    Returns
    -------
    log: Dict[str, Any]
      All logging vars added to a dictionary
    """
    return {
        "grid": self.grid,
        "knowledge": self.knowledge_snaps,
        "visited": self.visited_snaps,
        "start_end": self.start_end_snaps,
        "solns": self.sol_snaps,
        "num_steps": self.step,
        "successfully_completed": self.successfully_completed,
        "final_exit_status": self.final_exit_status
    }

  def path_followed(self):
    """ Returns the actual path followed (planning + execution) """

    # If the paths followed and solutions don't match
    assert len(self.sol_snaps) == len(self.start_end_snaps), \
      "More steps for A* than elements in start_end."
    
    full_path = []
    for i in range(len(self.start_end_snaps)):

      # Get index of end node in path
      _, end_node = self.start_end_snaps[i]
      end_index = self.sol_snaps[i].index(end_node)

      # Clip the array according to end index and append to parent array
      clipped_array = self.sol_snaps[i][:end_index]
      full_path += clipped_array

    return full_path

  def total_cells_processed(self):
    """
    Returns total number of Nodes popped from the Fringe across each run of A* search.
    """
    
    return sum(self.total_cells_processed_by_run)

  def total_backtracked_cells(self):
    """ 
    Returns total number of cells backtracked to the start of a tunnel.
    """

    flat_list = [item for sublist in self.backtracks for item in sublist]
    return len(flat_list)

  def search(self, start, goal, grid, heuristic_type, 
             max_steps_astar=None, max_steps_repeated=None):
    """
    Performs the repeated search on a grid.

    Parameters
    ----------
    start: (int, int)
      Coordinates of the start node
    goal: (int, int)
      Coordinates of the goal node
    grid: 2D np.array
      Original gridworld on which to perform grid search
    heuristic_type: str
      Can be `'euclidean'`, `'manhattan'`, or `'chebyshev'`
    max_steps_astar: int
      Max. no. of steps for 1 run of A* search
    max_steps_repeated: int
      Max. no. of steps for 1 run of Repeated A* search
    """

    # Exit condition
    if max_steps_repeated is None:
      max_steps_repeated = grid.shape[0]*grid.shape[1]

    # Initialize visited matrix, knowledge matrix and planned_path array
    knowledge = np.zeros(grid.shape)
    
    # Start
    while not self.successfully_completed:

      # If number of steps have been exceeded for Repeated A*
      if self.step > max_steps_repeated:
        self._update_state(None, None, (None, None),
                           None, self.step, False,
                           "FAILED_STEPS_REP", 0)
        return

      visited = np.zeros(grid.shape)

      # Run A* search algorithm once
      planned_path, exit_status, num_cells_popped = \
        a_star_search(start, goal, grid, heuristic_type, 
                      visited, knowledge, max_steps=max_steps_astar, 
                      epsilon=self.epsilon)

      # If no path can be found
      if exit_status == "FAILED_NOPATH":
        self._update_state(knowledge, visited, (start, None), 
                           planned_path, self.step + 1, False, 
                           exit_status, num_cells_popped)
        return

      # If num of steps were exceeded for one run of A*
      elif exit_status == "FAILED_STEPS":
        self._update_state(knowledge, visited, (start, None), 
                           planned_path, self.step + 1, False, 
                           exit_status, num_cells_popped)
        return

      # Move robot along the grid on the planned path and record environment
      final_node, knowledge, _, last_open_cell = move_and_record(start, goal, grid, 
                                                                planned_path, knowledge, 
                                                                nbhd_type=self.move_nbhd_type)

      # If we were able to reach the goal successfully
      if final_node == goal:
        self._update_state(knowledge, visited, (start, final_node), 
                           planned_path, self.step + 1, True, 
                           "SUCCESS", num_cells_popped)
        continue

      # If we were not able to reach the goal successfully, repeat until possible
      self._update_state(knowledge, visited, (start, final_node), 
                         planned_path, self.step + 1, False,
                         "RUNNING", num_cells_popped)
      
      # Backtracking to the start of tunnels if this parameter is set, else start from end of last path
      if self.escape_tunnels and last_open_cell is not None and final_node != last_open_cell:
        start = last_open_cell

        loc_idx = planned_path.index(last_open_cell)
        backtrack = reversed(planned_path[loc_idx:])

        self.backtracks.append(backtrack)
      else:
        start = final_node

    # We were able to reach successfully
    return

In [None]:
#@title Test Repeated A*

def test_repeated_a_star_search_custom(move_nbhd_type="compass", epsilon=1., escape_tunnels=False):
  """ 
  For custom testing of the repeated A* search algorithm. The default grid presents
  a particularly difficult case for A* search. 
  
  Parameters
  ----------
  move_nbhd_type: str
    `'compass'` - Agent can see in all 4 directions while moving
    `'directional'` - Agent can only see in the direction it is moving
  epsilon: float
    Weight given to the heuristic function. Used as:
    f(n) = g(n) * epsilon*h(n)
  escape_tunnels: bool
    Escape tunnels before restarting A* search
  """
  grid = [[0, 1, 0, 1, 0, 0, 1, 0, 0, 0],
          [0, 1, 0, 1, 1, 0, 1, 0, 0, 0],
          [0, 1, 1, 1, 0, 0, 0, 0, 1, 0],
          [0, 1, 0, 0, 0, 1, 0, 1, 1, 0],
          [0, 1, 0, 1, 1, 0, 1, 1, 0, 0],
          [0, 1, 0, 1, 0, 0, 1, 0, 0, 1],
          [0, 1, 0, 1, 0, 0, 0, 0, 0, 0],
          [0, 1, 0, 1, 1, 0, 1, 0, 0, 1],
          [0, 0, 0, 1, 1, 0, 1, 0, 0, 0],
          [1, 1, 0, 1, 0, 0, 0, 0, 1, 0]]

  grid = np.array(grid)

  # Run Repeated A* on the grid
  repeated_a_star = RepeatedAStar(grid, move_nbhd_type=move_nbhd_type, epsilon=epsilon, escape_tunnels=escape_tunnels)
  repeated_a_star.search((0, 0), (grid.shape[0]-1, grid.shape[1]-1), grid, "euclidean")

  # Return the completed R.A* object
  return repeated_a_star

def test_repeated_a_star_search_random(grid_shape, block_prob, 
                                       heuristic_type="euclidean", 
                                       move_nbhd_type="compass", 
                                       solvable=True, 
                                       epsilon=1., 
                                       escape_tunnels=False):
  """ 
  For random testing of the repeated A* search algorithm. 
  
  Parameters
  ----------
  grid_shape: (int, int)
    Shape of the randomly generated gridworld
  block_prob: float
    Each cell in the randomly generated gridworld will be blocked with p = block_prob
  heuristic_type: str
    Can be `'euclidean'`, `'manhattan'`, or `'chebyshev'`
  move_nbhd_type: str
    `'compass'` - Agent can see in all 4 directions while moving
    `'directional'` - Agent can only see in the direction it is moving
  solvable: bool
    Set to True if the randomly generated gridworld has to be solvable 
  epsilon: float
    Weight given to the heuristic function. Used as:
    f(n) = g(n) * epsilon*h(n)
  escape_tunnels: bool
    Escape tunnels before restarting A* search
  """

  grid = None

  # If the grid needs to be solvable
  if solvable:
    is_solvable = False

    # Keep generating random gridworlds until a solvable one comes up
    while not is_solvable:

      # Generate and check solvability of gridworld
      grid = generate_gridworld(grid_shape, block_prob=block_prob)
      _, exit_status, _ = a_star_search((0, 0), 
                                        (grid.shape[0]-1, grid.shape[1]-1), 
                                        grid, 
                                        heuristic_type, 
                                        np.zeros(grid.shape), 
                                        grid.copy())

      # If gridworld is solvable, break out of loop
      if exit_status == "SUCCESS":
        is_solvable = True
  else:
    # Else generate a random gridworld
    grid = generate_gridworld(grid_shape, block_prob=block_prob)

  t0 = time.time()

  # Run Repeated A* on the grid
  repeated_a_star = RepeatedAStar(grid, move_nbhd_type=move_nbhd_type, epsilon=epsilon, escape_tunnels=escape_tunnels)
  repeated_a_star.search((0, 0), (grid.shape[0]-1, grid.shape[1]-1), grid, heuristic_type)

  time_taken = time.time() - t0

  # Return the completed R.A* object and the time taken
  return repeated_a_star, time_taken

### Agent 10 Code

In [None]:
#@title Fake Placeholder Model

class FakeModel:

    def __init__(self, num_classes):
        self.num_classes = num_classes
        self.summary = "FakeModel instance"

    def summary(self):
        return self.summary

    def predict(self, X_test):
        try:
            num_samples = X_test.shape[0]
        except AttributeError:
            num_samples = len(X_test)

        predictions = []
        for _ in range(num_samples):
            pred = np.random.uniform(size=self.num_classes)
            predictions.append(pred)

        return np.array(predictions)

In [None]:
#@title Generate Center Grid


def gen_center_grid(cell, knowledge, window_size):
    res = np.ones((window_size, window_size), dtype=int)

    border_diff = int(window_size/2)

    for x in range(window_size):
        for y in range(window_size):
            if cell[0] - border_diff + x < 0 or \
                cell[1] - border_diff + y < 0 or \
                cell[0] - border_diff + x > len(knowledge) - 1 or \
                cell[1] - border_diff + y > len(knowledge) - 1:
                continue
            else:
                res[x][y] = knowledge[cell[0] - border_diff + x][cell[1] - border_diff + y]

    return res

In [None]:
#@title Read Models

conv_model = tf.keras.models.load_model('/content/gdrive/MyDrive/CS-520-Agent-1/Agent_11_conv_2021-12-15_03:53:40.583259')
dense_model = tf.keras.models.load_model('/content/gdrive/MyDrive/CS-520-Agent-1/agent_10_nn_dense_2021-12-15_03_42_15.953232/agent_10_nn_dense_2021-12-15_03_42_15.953232')

In [None]:
class Agent10:

    def __init__(self, model, grid, model_type, max_steps=None, window_size=25, 
                 max_steps_astar=500, a_star_heuristic="manhattan", a_star_push_length=3):
        """
        Initialize agent.
        """
        self.prediction_model = model
        self.grid = grid

        self.status = "INITIALIZED"

        self.knowledge = np.zeros(self.grid.shape)

        if window_size % 2 == 0:
            raise ValueError("window_size can only be an odd number!")

        self.window_size = window_size
        self.window_diff = int(self.window_size/2)

        if model_type not in ("dense", "conv"):
            raise ValueError("model_type can only be 'dense' or 'conv'!")

        self.model_type = model_type

        self.max_steps_astar = max_steps_astar
        self.a_star_heuristic = a_star_heuristic
        self.a_star_push_length = a_star_push_length

        self.trigger_a_star = False

        self.forced_path = []

        # Limit the number of steps
        self.max_steps = \
            grid.shape[0] * grid.shape[1] if max_steps is None else max_steps

        # To break out of a loop
        self.is_2_loop = False
        self.is_4_loop = False
        
        self.loop_patience_2 = 4
        self.loop_patience_4 = 8

        # 0: Up, 1: down, 2: left, 3: right
        self.dir_mapping = {
            0: np.array((-1, 0)),   
            1: np.array((1, 0)),
            2: np.array((0, -1)),
            3: np.array((0, 1))
        }

        # For debugging
        self.directions = {
            0: "Up",
            1: "Down",
            2: "Left",
            3: "Right"
        }

        # For logging
        self.logs = {'curr_cells': [], 
                     'next_cells': [], 
                     'knowledge_snaps': [],
                     'predictions': []}

    def summary(self):
        """
        Prints model summary.
        """

        print(self.prediction_model.summary())

    def predict(self, X_test):
        """
        Uses the model to predict on test samples (or a new state when moving).
        """

        return self.prediction_model.predict(X_test)

    def _update_knowledge(self, curr_cell):
        nbrs = neighbourhood(curr_cell, self.grid.shape[0], self.grid.shape[1])

        for nbr in nbrs:
            self.knowledge[nbr[0]][nbr[1]] = self.grid[nbr[0]][nbr[1]]

    def _update_state(self, prediction, curr_cell=None, 
                      next_cell=None, knowledge=None):
        """
        Updates the agent's state. For logging purposes.
        """
        if curr_cell is None:
            curr_cell = self.curr_cell
        
        if next_cell is None:
            next_cell = self.next_cell
        
        if knowledge is None:
            knowledge = self.knowledge

        self.logs['curr_cells'].append(curr_cell)
        self.logs['next_cells'].append(next_cell)
        self.logs['knowledge_snaps'].append(knowledge)
        self.logs['predictions'].append(prediction)

    def _encode_knowledge(self, curr_position):
        """ 
        Flattens the knowledge and adds the current cell as -1 in knowledge
        for input by the model.
        """

        knowledge_rep = \
            gen_center_grid(curr_position, self.knowledge, self.window_size)
        
        if knowledge_rep[self.window_diff][self.window_diff] == 1:
            raise ValueError("knowledge =", knowledge_rep, 
                             "\ncurr_position =", curr_position)

        if self.model_type == "conv":
            knowledge_rep = np.reshape(np.array([knowledge_rep]), (-1, self.window_size, self.window_size, 1))
        elif self.model_type == "dense":
            knowledge_rep = np.reshape(np.array([knowledge_rep]), (-1, self.window_size**2))

        return knowledge_rep

    def _is_valid_cell(self, cell):
        """
        Checks if cell is within the grid bounds.
        """
        x, y = cell
        x_lim, y_lim = self.grid.shape

        return x >= 0 and y >= 0 and x <= x_lim - 1 and y <= y_lim - 1
            
    def _is_unblocked_cell(self, cell):
        """
        Returns true if cell is unblocked in knowledge else false
        """
        return not self.knowledge[cell[0]][cell[1]]

    def _detect_loops(self):

        if self.status == "FORCED_RUN":
            return

        path_tail_2 = \
            self.logs['next_cells'][-1*self.loop_patience_2:] \
                if len(self.logs['next_cells']) >= self.loop_patience_2 else []

        path_tail_4 = \
            self.logs['next_cells'][-1*self.loop_patience_4:] \
                if len(self.logs['next_cells']) >= self.loop_patience_4 else []

        if path_tail_2:
            if path_tail_2[0] == path_tail_2[2] and \
                path_tail_2[1] == path_tail_2[3]:
                self.is_2_loop = True
        elif path_tail_4:
            if path_tail_4[0] == path_tail_4[4] and \
                path_tail_4[1] == path_tail_4[5] and \
                path_tail_4[2] == path_tail_4[6] and \
                path_tail_4[3] == path_tail_4[7]:
                self.is_4_loop = True

    def _next_cell(self, curr_cell, prediction):
        """
        Gets the next cell based on the model's prediction and current position.
        """

        if type(prediction) == tuple:
            # print("Tuple prediction.")
            if self._is_valid_cell(prediction) and \
                self._is_unblocked_cell(prediction):
                next_cell = prediction
                # print("Next cell =", next_cell)
                return next_cell, []

        self._detect_loops()

        # print("is_4_loop =", self.is_4_loop, end=", ")
        # print("is_2_loop =", self.is_2_loop)

        if self.trigger_a_star or self.is_2_loop or self.is_4_loop:
            a_star_path, exit_status, _ = a_star_search(curr_cell, 
                                                        (self.grid.shape[0]-1, self.grid.shape[1]-1), 
                                                        self.grid, 
                                                        self.a_star_heuristic,
                                                        np.zeros(self.knowledge.shape),
                                                        self.knowledge,
                                                        max_steps=self.max_steps_astar)
            
            self.status = "FORCED_RUN"
            
            self.forced_path = a_star_path[1: 1 + self.a_star_push_length]

            # pretty_print(self.knowledge.astype(int))
            # print(self.forced_path)

            self.is_2_loop = False
            self.is_4_loop = False
            self.trigger_a_star = True

            return a_star_path[0], []

        
        dir_prefs = np.argsort(prediction[0])

        curr_cell_arr = np.array(curr_cell)

        # print(dir_prefs)

        next_cell = None
        for dir in dir_prefs:
            next_possible_cell = curr_cell_arr + self.dir_mapping[dir]

            # print(next_possible_cell, end="->")
            # print("is_valid =", self._is_valid_cell(next_possible_cell), end=', ')
            # print("is_unblocked =", self._is_unblocked_cell(next_possible_cell))

            if self._is_valid_cell(next_possible_cell) and \
                self._is_unblocked_cell(next_possible_cell):
                next_cell = next_possible_cell
                break
            
        next_cell = tuple(next_cell) if next_cell is not None else None

        return next_cell, dir_prefs

    def traverse_grid(self, start, goal):
        """
        Traverses a gridworld.
        """
        self.steps = 0

        self.status = "RUNNING"

        self.curr_cell = start
        self.next_cell = None

        while self.curr_cell != goal:
            if self.steps > self.max_steps:
                self.status = "FAILED_STEPS"
                return

            # if self.status == "FORCED_RUN" and not self.forced_path:
            #     return

            self._update_knowledge(self.curr_cell)

            # print(self.curr_cell, end=" => ")

            if self.forced_path:
                pred = self.forced_path.pop(0)
            else:
                self.status = "RUNNING"
                agent_state = self._encode_knowledge(self.curr_cell)
                pred = self.predict(agent_state)

            # print("Prediction =", pred)

            next_cell, dir_prefs = self._next_cell(self.curr_cell, pred)

            self.next_cell = next_cell

            # print("Next cell =", self.next_cell)

            self._update_state(dir_prefs)

            # if self.next_cell is None:
            #     self.status = "FAILED_STUCK"
            #     return

            if self.grid[self.next_cell[0], self.next_cell[1]] != 1:
                self.curr_cell = self.next_cell
                self.next_cell = None

            self.steps += 1
            # print("\n")

        self.status = "SUCCESS"

In [None]:
# grid = generate_gridworld((50, 50), 0.3)

In [None]:
agent = Agent10(dense_model, grid, model_type="dense")

In [None]:
agent.traverse_grid((0, 0), (49, 49))

In [None]:
agent.status

'SUCCESS'

In [None]:
agent.logs['curr_cells']

[(0, 0),
 (0, 1),
 (0, 2),
 (1, 2),
 (0, 2),
 (0, 1),
 (1, 1),
 (2, 1),
 (3, 1),
 (4, 1),
 (3, 1),
 (2, 1),
 (2, 0),
 (2, 1),
 (2, 0),
 (2, 0),
 (2, 1),
 (3, 1),
 (3, 2),
 (3, 2),
 (3, 3),
 (3, 3),
 (4, 3),
 (4, 4),
 (4, 5),
 (4, 5),
 (4, 6),
 (4, 7),
 (4, 8),
 (4, 8),
 (4, 9),
 (4, 9),
 (4, 8),
 (5, 8),
 (5, 8),
 (4, 8),
 (4, 9),
 (3, 9),
 (3, 9),
 (3, 10),
 (3, 10),
 (2, 10),
 (2, 11),
 (2, 12),
 (2, 12),
 (2, 13),
 (2, 14),
 (2, 15),
 (2, 15),
 (2, 16),
 (2, 17),
 (2, 18),
 (2, 18),
 (2, 19),
 (2, 19),
 (3, 19),
 (3, 20),
 (3, 21),
 (3, 21),
 (3, 22),
 (3, 23),
 (3, 24),
 (3, 24),
 (3, 23),
 (4, 23),
 (5, 23),
 (5, 23),
 (5, 24),
 (5, 24),
 (6, 24),
 (6, 25),
 (6, 26),
 (6, 26),
 (6, 27),
 (6, 28),
 (6, 28),
 (6, 27),
 (7, 27),
 (8, 27),
 (8, 27),
 (8, 28),
 (8, 29),
 (8, 30),
 (8, 30),
 (8, 31),
 (8, 32),
 (8, 32),
 (9, 32),
 (9, 33),
 (9, 34),
 (9, 34),
 (10, 34),
 (10, 35),
 (10, 36),
 (10, 36),
 (10, 37),
 (10, 38),
 (10, 39),
 (10, 39),
 (10, 40),
 (10, 41),
 (10, 41),
 (11, 41

In [None]:
x # (40000, 900)
y # (40000, 900)

z = []
for i in range(len(x)):
    z.append(np.array([x[i], y[i]]))

z = np.array(z)

In [None]:
pretty_print(visualize_grid(agent.logs['curr_cells'], agent.grid))

*   *   *   1   0   1   1   0   0   0   0   0   1   0   0   0   0   0   0   0   0   0   0   0   0   1   0   1   1   0   1   1   0   0   0   0   0   1   1   0   1   0   1   0   0   0   1   1   1   0   
1   *   *   0   0   0   0   1   0   0   0   0   0   0   1   0   1   1   0   1   1   0   1   0   0   0   1   0   0   0   0   0   1   0   1   0   0   1   1   1   0   0   1   0   1   0   0   0   1   1   
*   *   1   1   1   0   0   1   1   0   *   *   *   *   *   *   *   *   *   *   1   0   0   1   0   0   0   0   1   0   0   1   1   1   0   0   0   0   0   1   0   0   0   0   0   0   0   0   1   0   
1   *   *   *   1   1   0   1   0   *   *   1   0   1   0   0   1   0   0   *   *   *   *   *   *   1   0   1   1   0   0   0   0   0   0   1   0   0   0   1   0   0   1   1   0   0   1   0   0   1   
1   *   0   *   *   *   *   *   *   *   1   0   1   0   1   0   1   0   1   1   0   1   0   *   1   0   0   0   1   0   0   0   0   0   1   1   1   0   0   1   1   0   0   1   0   0   1   1   0   

In [None]:
any([agent.grid[x, y] == 1 for x, y in agent.logs['curr_cells']])

False

In [None]:
pretty_print(agent.grid)

   0   0   0   1   0   1   1   0   0   0   0   0   1   0   0   0   0   0   0   0   0   0   0   0   0   1   0   1   1   0   1   1   0   0   0   0   0   1   1   0   1   0   1   0   0   0   1   1   1   0
   1   0   0   0   0   0   0   1   0   0   0   0   0   0   1   0   1   1   0   1   1   0   1   0   0   0   1   0   0   0   0   0   1   0   1   0   0   1   1   1   0   0   1   0   1   0   0   0   1   1
   0   0   1   1   1   0   0   1   1   0   0   0   0   0   0   0   0   0   0   0   1   0   0   1   0   0   0   0   1   0   0   1   1   1   0   0   0   0   0   1   0   0   0   0   0   0   0   0   1   0
   1   0   0   0   1   1   0   1   0   0   0   1   0   1   0   0   1   0   0   0   0   0   0   0   0   1   0   1   1   0   0   0   0   0   0   1   0   0   0   1   0   0   1   1   0   0   1   0   0   1
   1   0   0   0   0   0   0   0   0   0   1   0   1   0   1   0   1   0   1   1   0   1   0   0   1   0   0   0   1   0   0   0   0   0   1   1   1   0   0   1   1   0   0   1   0   0   1   1   0

In [None]:
def test_agent_1_vs_agent_10_random(grid_shape, 
                                    block_prob,
                                    model,
                                    model_type,
                                    heuristic_type="manhattan", 
                                    move_nbhd_type="compass", 
                                    solvable=True, 
                                    epsilon=1., 
                                    escape_tunnels=False):
    """ 
    For random testing of the repeated A* search algorithm. 

    Parameters
    ----------
    grid_shape: (int, int)
        Shape of the randomly generated gridworld
    block_prob: float
        Each cell in the randomly generated gridworld will be blocked with p = block_prob
    heuristic_type: str
        Can be `'euclidean'`, `'manhattan'`, or `'chebyshev'`
    move_nbhd_type: str
        `'compass'` - Agent can see in all 4 directions while moving
        `'directional'` - Agent can only see in the direction it is moving
    solvable: bool
        Set to True if the randomly generated gridworld has to be solvable 
    epsilon: float
        Weight given to the heuristic function. Used as:
        f(n) = g(n) * epsilon*h(n)
    escape_tunnels: bool
        Escape tunnels before restarting A* search
    """

    grid = None

    # If the grid needs to be solvable
    if solvable:
        is_solvable = False

        # Keep generating random gridworlds until a solvable one comes up
        while not is_solvable:

            # Generate and check solvability of gridworld
            grid = generate_gridworld(grid_shape, block_prob=block_prob)
            _, exit_status, _ = a_star_search((0, 0), 
                                        (grid.shape[0]-1, grid.shape[1]-1), 
                                        grid, 
                                        heuristic_type, 
                                        np.zeros(grid.shape), 
                                        grid.copy())

            # If gridworld is solvable, break out of loop
            if exit_status == "SUCCESS":
                is_solvable = True
    else:
        # Else generate a random gridworld
        grid = generate_gridworld(grid_shape, block_prob=block_prob)

    # Test Agent 1 on the grid
    agent_1 = RepeatedAStar(grid, move_nbhd_type=move_nbhd_type, epsilon=epsilon, escape_tunnels=escape_tunnels)

    t0_agent_1 = time.time()
    agent_1.search((0, 0), (grid.shape[0]-1, grid.shape[1]-1), grid, heuristic_type)
    agent_1_time_taken = time.time() - t0_agent_1

    # Test Agent 10 on the grid
    agent_10 = Agent10(model, grid, model_type)

    t0_agent_10 = time.time()
    agent_10.traverse_grid((0, 0), (grid.shape[0]-1, grid.shape[1]-1))
    agent_10_time_taken = time.time() - t0_agent_10

    # Return the completed R.A* object and the time taken
    return agent_1, agent_1_time_taken, agent_10, agent_10_time_taken

In [None]:
NUM_ITERS = 10
DIM_ROWS, DIM_COLS, P = 100, 100, 0.2
HEURISTIC_FN = "manhattan"

a1_times, a1_statuses, a1_pl, a10_times, a10_statuses, a10_pl = [], [], [], [], [], []

MODEL = dense_model
MODEL_TYPE = "dense"

for _ in tqdm(range(NUM_ITERS)):

    a1, a1_tt, a10, a10_tt = test_agent_1_vs_agent_10_random((DIM_ROWS, DIM_COLS), P, MODEL, MODEL_TYPE, heuristic_type=HEURISTIC_FN)

    a1_times.append(a1_tt)
    a1_statuses.append(a1.successfully_completed)
    a1_pl.append(len(a1.path_followed()))
    a10_times.append(a10_tt)
    a10_statuses.append(a10.status == "SUCCESS")
    a10_pl.append(a10.steps)

df_dense = pd.DataFrame({"agent1_times": a1_times, 
                   "agent1_statuses": a1_statuses, 
                   "agent1_path_lengths": a1_pl, 
                   "a10_times": a10_times, 
                   "a10_statuses": a10_statuses, 
                   "a10_pl": a10_pl})

100%|██████████| 10/10 [00:55<00:00,  5.55s/it]


In [None]:
df_dense

Unnamed: 0,agent1_times,agent1_statuses,agent1_path_lengths,a10_times,a10_statuses,a10_pl
0,0.439155,True,266,5.356637,True,342
1,0.472019,True,292,5.63234,True,418
2,0.328979,True,254,4.542907,True,361
3,0.276299,True,274,5.001517,True,379
4,0.308511,True,246,5.159401,True,344
5,0.361809,True,266,4.996051,True,371
6,0.349437,True,300,6.11721,True,428
7,0.43312,True,250,4.65003,True,357
8,0.427459,True,262,4.366745,True,356
9,0.341468,True,270,4.794132,True,377


In [None]:
from statsmodels.stats.weightstats import ztest as ztest

  import pandas.util.testing as tm


In [None]:
_, p = ztest(df['agent1_path_lengths'], df['a10_pl'], alternative="larger")


(28.09310080412492, 5.947221572455183e-174)

In [None]:
NUM_ITERS = 10
DIM_ROWS, DIM_COLS, P = 100, 100, 0.2
HEURISTIC_FN = "manhattan"

a1_times, a1_statuses, a1_pl, a10_times, a10_statuses, a10_pl = [], [], [], [], [], []

MODEL = conv_model
MODEL_TYPE = "conv"

for _ in tqdm(range(NUM_ITERS)):

    a1, a1_tt, a10, a10_tt = test_agent_1_vs_agent_10_random((DIM_ROWS, DIM_COLS), P, MODEL, MODEL_TYPE, heuristic_type=HEURISTIC_FN)

    a1_times.append(a1_tt)
    a1_statuses.append(a1.successfully_completed)
    a1_pl.append(len(a1.path_followed()))
    a10_times.append(a10_tt)
    a10_statuses.append(a10.status == "SUCCESS")
    a10_pl.append(a10.steps)

df = pd.DataFrame({"agent1_times": a1_times, 
                   "agent1_statuses": a1_statuses, 
                   "agent1_path_lengths": a1_pl, 
                   "a10_times": a10_times, 
                   "a10_statuses": a10_statuses, 
                   "a10_pl": a10_pl})

100%|██████████| 10/10 [01:11<00:00,  7.17s/it]


In [None]:
df

Unnamed: 0,agent1_times,agent1_statuses,agent1_path_lengths,a10_times,a10_statuses,a10_pl
0,0.515415,True,264,9.452565,True,369
1,0.859238,True,252,11.578412,True,350
2,0.532388,True,274,7.046584,True,377
3,0.431237,True,282,4.748192,True,371
4,0.266399,True,234,4.914743,True,321
5,0.424814,True,286,5.644224,True,414
6,0.29209,True,330,6.268903,True,455
7,0.359288,True,260,5.95592,True,358
8,0.368239,True,254,4.479445,True,349
9,0.403759,True,262,5.315856,True,376
