# Search

In [1]:
import networkx as nx

## Problem Setup

First we need to define the search problem that we need to solve.  Here in this demo we are only going to tackle a classic search through a graph.  If you want to see the CS188 Search project which incorporates Pac-Man then head to this link: http://ai.berkeley.edu/search.html.

In [2]:
class SearchProblem:
    """
    This class outlines the structure of a search problem.
    """
    def __init__(self, graph, start_state, manhattan_distances):
        self.graph = graph
        self.start_state = start_state
        self.manhattan_distances = manhattan_distances

    def getStartState(self):
        """
        Returns the start state for the search problem.
        """
        return self.start_state

    def isGoalState(self, state):
        """
          state: Search state

        Returns True if and only if the state is a valid goal state.
        """
        return self.graph.node[state]['food']

    def getSuccessors(self, state):
        """
          state: Search state

        For a given state, this should return a list of triples, (successor, stepCost), 
        where 'successor' is a successor to the current state and 'stepCost' 
        is the incremental cost of expanding to that successor.
        """
        return [(s, self.graph[state][s]['weight']) for s in self.graph.neighbors(state)]

The above problem set-up assumes that you construct a search problem based on a Graph input.  Obviously, we could just run regular graph traversals on any given graph if we wanted to find a way from point A to point B.  But this allows us to have multiple goal nodes!  For example, in the Pac-Man situation, the goal might be to collect a single food pellet as quickly as possible, yet there may be multiple food pellets around the grid.  Thus, in our graph analogy, there may be multpile nodes which would be valid solutions!

Here we will construct a graph to model the aforementioned Pac-Man problem of trying to eat a single food pellet.  Each node will represent an intersection on the Pac-Man grid and each edge between them will have a weight equal to the length of the path from one intersection to the next.

In [3]:
G = nx.Graph()

for i in range(10):
    G.add_node(i, food=False)
    
G.add_edge(0, 1, weight=4)
G.add_edge(0, 2, weight=2)
G.add_edge(1, 3, weight=2)
G.add_edge(2, 5, weight=1)
G.add_edge(3, 4, weight=1)
G.add_edge(4, 7, weight=1)
G.add_edge(5, 6, weight=1)
G.add_edge(6, 7, weight=2)
G.add_edge(7, 8, weight=1)
G.add_edge(8, 9, weight=1)

G.node[9]['food'] = True

manhattan_distances = {0:8, 1:4, 2:6, 3:2, 4:3, 5:5, 6:4, 7:2, 8:1, 9:0}

Now let's just quickly define some utils that will help us write our search code:

In [4]:
import heapq

class Stack:
    "A container with a last-in-first-out (LIFO) queuing policy."
    def __init__(self):
        self.list = []

    def push(self,item):
        "Push 'item' onto the stack"
        self.list.append(item)

    def pop(self):
        "Pop the most recently pushed item from the stack"
        return self.list.pop()

    def isEmpty(self):
        "Returns true if the stack is empty"
        return len(self.list) == 0

class Queue:
    "A container with a first-in-first-out (FIFO) queuing policy."
    def __init__(self):
        self.list = []

    def push(self,item):
        "Enqueue the 'item' into the queue"
        self.list.insert(0,item)

    def pop(self):
        """
          Dequeue the earliest enqueued item still in the queue. This
          operation removes the item from the queue.
        """
        return self.list.pop()

    def isEmpty(self):
        "Returns true if the queue is empty"
        return len(self.list) == 0

class PriorityQueue:
    """
      Implements a priority queue data structure. Each inserted item
      has a priority associated with it and the client is usually interested
      in quick retrieval of the lowest-priority item in the queue. This
      data structure allows O(1) access to the lowest-priority item.
    """
    def  __init__(self):
        self.heap = []
        self.count = 0

    def push(self, item, priority):
        entry = (priority, self.count, item)
        heapq.heappush(self.heap, entry)
        self.count += 1

    def pop(self):
        (_, _, item) = heapq.heappop(self.heap)
        return item

    def isEmpty(self):
        return len(self.heap) == 0

    def update(self, item, priority):
        # If item already in priority queue with higher priority, update its priority and rebuild the heap.
        # If item already in priority queue with equal or lower priority, do nothing.
        # If item not in priority queue, do the same thing as self.push.
        for index, (p, c, i) in enumerate(self.heap):
            if i == item:
                if p <= priority:
                    break
                del self.heap[index]
                self.heap.append((priority, c, item))
                heapq.heapify(self.heap)
                break
        else:
            self.push(item, priority)

### Now... onto the good stuff!

We are going to try to fill in the following functions to implement DFS, BFS, UCS, and A* search (Hint: after doing the first, it's pretty much copying and pasting from there!).

In [5]:
problem = SearchProblem(G, 0, manhattan_distances)

#### DFS Implementation

In [6]:
def depthFirstSearch(problem):
    """
    Search the deepest nodes in the search tree first.
    """
    fringe = Stack()
    start_state = problem.getStartState()
    fringe.push((start_state, [start_state], 0))
    closed_set = []

    while not fringe.isEmpty():
        node = fringe.pop()
        curr_state = node[0]
        curr_path = node[1]
        curr_cost = node[2]
        if problem.isGoalState(curr_state):
            return curr_path
        if curr_state not in closed_set:
            closed_set.append(curr_state)
            for successor in problem.getSuccessors(curr_state):
                fringe.push((successor[0], curr_path + [successor[0]], curr_cost + successor[1]))

In [7]:
depthFirstSearch(problem)

[0, 2, 5, 6, 7, 8, 9]

#### BFS Implementation

In [8]:
def breadthFirstSearch(problem):
    """Search the shallowest nodes in the search tree first."""
    fringe = Queue()
    start_state = problem.getStartState()
    fringe.push((start_state, [start_state], 0))
    closed_set = []

    while not fringe.isEmpty():
        node = fringe.pop()
        curr_state = node[0]
        curr_path = node[1]
        curr_cost = node[2]
        if problem.isGoalState(curr_state):
            return curr_path
        if curr_state not in closed_set:
            closed_set.append(curr_state)
            for successor in problem.getSuccessors(curr_state):
                fringe.push((successor[0], curr_path + [successor[0]], curr_cost + successor[1]))

In [9]:
breadthFirstSearch(problem)

[0, 1, 3, 4, 7, 8, 9]

#### UCS Implementation

In [10]:
def uniformCostSearch(problem):
    """Search the node of least total cost first."""
    fringe = PriorityQueue()
    start_state = problem.getStartState()
    fringe.push((start_state, [start_state], 0), 0)
    closed_set = []

    while not fringe.isEmpty():
        node = fringe.pop()
        curr_state = node[0]
        curr_path = node[1]
        curr_cost = node[2]
        if problem.isGoalState(curr_state):
            return curr_path
        if curr_state not in closed_set:
            closed_set.append(curr_state)
            for successor in problem.getSuccessors(curr_state):
                fringe.update((successor[0], curr_path + [successor[0]], 
                               curr_cost + successor[1]), curr_cost + successor[1])

In [11]:
uniformCostSearch(problem)

[0, 2, 5, 6, 7, 8, 9]

#### A* Implementation

In [12]:
def manhattanHeuristic(state, problem=None):
    """
    A heuristic function estimates the cost from the current state to the nearest
    goal in the provided SearchProblem.  This heuristic is trivial.
    """
    return problem.manhattan_distances[state]

def aStarSearch(problem, heuristic=manhattanHeuristic):
    """Search the node that has the lowest combined cost and heuristic first."""
    fringe = PriorityQueue()
    start_state = problem.getStartState()
    fringe.push((start_state, [start_state], 0), 0)
    closed_set = []

    while not fringe.isEmpty():
        node = fringe.pop()
        curr_state = node[0]
        curr_path = node[1]
        curr_cost = node[2]
        if problem.isGoalState(curr_state):
            return curr_path
        if curr_state not in closed_set:
            closed_set.append(curr_state)
            for successor in problem.getSuccessors(curr_state):
                fringe.update((successor[0], curr_path + [successor[0]],curr_cost + successor[1]), 
                              heuristic(successor[0], problem) + curr_cost + successor[1])

In [13]:
aStarSearch(problem, manhattanHeuristic)

[0, 2, 5, 6, 7, 8, 9]

# CONGRATULATIONS! YOU FINISHED!