<a href="https://colab.research.google.com/github/Jerryhqy/notfunnyatall/blob/s7_inperson/autonomy_repo/scripts/Frontier_Exploration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Frontier Exploration Node

Step 1: Import all useful(less) packs

In [None]:
#!/usr/bin/env python3
import numpy as np
import rclpy
from rclpy.node import Node
from asl_tb3_lib.control import BaseHeadingController
from std_msgs.msg import Bool
from asl_tb3_lib.math_utils import wrap_angle
from asl_tb3_msgs.msg import TurtleBotControl, TurtleBotState

Grid Occupancy Class

In [None]:
class StochOccupancyGrid2D(object):
    """ A stochastic occupancy grid derived from ROS2 map data

    The probability of grid cell being occupied is computed by the joint probability of
    any neighboring cell being occupied within some fixed window. For some examples of size-3
    occupancy windows,

    0.1 0.1 0.1
    0.1 0.1 0.1  ->  1 - (1 - 0.1)**9 ~= 0.61
    0.1 0.1 0.1

    0.0 0.1 0.0
    0.0 0.1 0.0  ->  1 - (1 - 0)**6 * (1 - 0.1)**3 ~= 0.27
    0.0 0.1 0.0

    The final occupancy probability is then converted to binary occupancy using a threshold
    """

    def __init__(self,
        resolution: float,
        size_xy: np.ndarray,
        origin_xy: np.ndarray,
        window_size: int,
        probs: T.Sequence[float],
        thresh: float = 0.5
    ) -> None:
        """
        Args:
            resolution (float): resolution of the map
            size_xy (np.ndarray): size-2 integer array representing map size
            origin_xy (np.ndarray): size-2 float array representing map origin coordinates
            window_size (int): window size for computing occupancy probabilities
            probs (T.Sequence[float]): map data
            thresh (float): threshold for final binarization of occupancy probabilites
        """
        self.resolution = resolution
        self.size_xy = size_xy
        self.origin_xy = origin_xy
        self.probs = np.reshape(np.asarray(probs), (size_xy[1], size_xy[0]))
        self.window_size = window_size
        self.thresh = thresh

    def state2grid(self, state_xy: np.ndarray) -> np.ndarray:
        """ convert real state coordinates to integer grid indices

        Args:
            state_xy (np.ndarray): real state coordinates (x, y)

        Returns:
            np.ndarray: quantized 2D grid indices (kx, ky)
        """
        state_snapped_xy = snap_to_grid(state_xy, self.resolution)
        grid_xy = ((state_snapped_xy - self.origin_xy) / self.resolution).astype(int)

        return grid_xy

    def grid2state(self, grid_xy: np.ndarray) -> np.ndarray:
        """ convert integer grid indices to real state coordinates

        Args:
            grid_xy (np.ndarray): integer grid indices (kx, ky)

        Returns:
            np.ndarray: real state coordinates (x, y)
        """
        return (grid_xy * self.resolution + self.origin_xy).astype(float)

    def is_free(self, state_xy: np.ndarray) -> bool:
        """ Check whether a state is free or occupied

        Args:
            state_xy (np.ndarray): size-2 state vectory of (x, y) coordinate

        Returns:
            bool: True if free, False if occupied
        """
        # combine the probabilities of each cell by assuming independence of each estimation
        grid_xy = self.state2grid(state_xy)

        half_size = int(round((self.window_size-1)/2))
        grid_xy_lower = np.maximum(0, grid_xy - half_size)
        grid_xy_upper = np.minimum(self.size_xy, grid_xy + half_size + 1)

        prob_window = self.probs[grid_xy_lower[1]:grid_xy_upper[1],
                                 grid_xy_lower[0]:grid_xy_upper[0]]
        p_total = np.prod(1. - np.maximum(prob_window / 100., 0.))

        return (1. - p_total) < self.thresh

The Frontier Explorer Class

In [None]:
# the heading controller class
# functionalities
# 1. receive /nav_sucess message from the navigator
# 2. send messages regarding to the
class FrontierExplorer():
	def __init__(self):
    # initialize some variables
    self.current_state = None
    self.occupancy = None

    # subscribers
		self.nav_success_sub = self.create_subscription(Bool,     'nav_sucess', self.nav_success_callback, 10)
    self.state_sub = self.create_subscription(TurlebotState, 'state', self.state_callback, 10)
    self.map_sub = self.create_subscription(OccupancyGrid,   'map', self.map_callback, 10)

    # publisher
    self.pub = self.create_publisher(TurtlebotState, "cmd_nav", 10)

    ############################### Properties ###################################

    # no need to define properties for this

    ######################### Callback Functions #################################
    # get whether or not the navigation is successful
  def nav_success_callback(self, msg):
    if msg.data:
      msg_out = self.explore()
      self.pub.publish(msg_out)

  # get the current state
  def state_callback(self, msg):
      self.current_state = msg

  # get the occupancy grid
  def map_callback(self, msg: OccupancyGrid) -> None:
      """ Callback triggered when the map is updated

      Args:
          msg (OccupancyGrid): updated map message
      """
      self.occupancy = StochOccupancyGrid2D(
          resolution=msg.info.resolution,
          size_xy=np.array([msg.info.width, msg.info.height]),
          origin_xy=np.array([msg.info.origin.position.x, msg.info.origin.position.y]),
          window_size=9,
          probs=msg.data,
      )

      # replan if the new map updates causes collision in the original plan
      # if self.is_planned and not all([self.occupancy.is_free(s) for s in self.plan.path[1:]]):
      #     self.is_planned = False
      #     self.replan(self.goal)

  ###################### Implementation Functions #############################

  def snap_to_grid(self, state: np.ndarray, resolution: float) -> np.ndarray:
    """ Snap continuous coordinates to a finite-resolution grid

    Args:
        state (np.ndarray): a size-2 numpy array specifying the (x, y) coordinates
        resolution (float): resolution of the grid

    Returns:
        np.ndarray: state-vector snapped onto the specified grid
    """
    return resolution * np.round(state / resolution)

  def explore(self):
      """ returns potential states to explore
      Args:
          occupancy (StochasticOccupancyGrid2D): Represents the known, unknown, occupied, and unoccupied states. See class in first section of notebook.

      Returns:
          frontier_states (np.ndarray): state-vectors in (x, y) coordinates of potential states to explore. Shape is (N, 2), where N is the number of possible states to explore.

      HINTS:
      - Function `convolve2d` may be helpful in producing the number of unknown, and number of occupied states in a window of a specified cell
      - Note the distinction between physical states and grid cells. Most operations can be done on grid cells, and converted to physical states at the end of the function with `occupancy.grid2state()`
      """

      window_size = 13    # defines the window side-length for neighborhood of cells to consider for heuristics
      ########################### Code starts here ###########################
      # index of the center of the kernel
      middle_ind = np.floor(window_size/2).astype(int)

      # for counting ones
      kernel_1 = np.ones([window_size, window_size])/(window_size**2 - 1)
      kernel_1[middle_ind, middle_ind] = 0
      occupied = convolve2d(self.occupancy.probs >= self.occupancy.thresh, kernel_1, mode='valid')
      unknown  = convolve2d(self.occupancy.probs == -1, kernel_1, mode='valid')
      vacant   = 1 - occupied - unknown

      indices = np.where((unknown >= 0.2) & (vacant >= 0.3) & (occupied == 0.0))
      frontier_gridpoints = np.array(indices).T + middle_ind # to account for the sizing window
      frontier_gridpoints[:, [0, 1]] = frontier_gridpoints[:, [1, 0]] # switching columns to get the correct x and y
      frontier_states = self.occupancy.grid2state(frontier_gridpoints)

      # compute the frontier state that is closest to the current state
      distances = np.linalg.norm(frontier_states - self.current_state, axis = 1)
      closest_idx = np.argmin(distances)

      closest_state = frontier_states[closest_idx,:]

      # construct a turtlebotstate object
      target_state = TurtlebotState()

      # specify x and y
      target_state.x = closest_state[0]
      target_state.y = closest_state[1]
      target_state.theta = 0

      #print("The closest fronter state to the current state is:")
      #print(frontier_states[closest_idx,:])
      ########################### Code ends here ###########################

      return target_state


