# Search for AIMA 4th edition

Implementation of search algorithms and search problems for AIMA.

We start by defining the abstract class for a `Problem`; problem domains will subclass this, and then you can create individual problems with specific initial states and goals. We also ddefine a `Node` in a search tree, and some functions on nodes: `expand` to generate successors, and `path_actions`, `path_states` and `path` to recover aspects of the path from the node.  Finally, a `PriorityQueue`, which allows you to keep a collection of items, and continually remove from it the item with minimum `f(item)` score.

In [1]:
import heapq
import math
import sys
from collections import defaultdict, deque, Counter

class Problem(object):
    """The abstract class for a formal problem. You should subclass this,
    overriding `actions` and `results`, and other methods if necessary.
    Note: a problem can specify a default heuristic if desired. By default, 
    the heuristic is 0 for all states, and the step cost is 1 for all actions."""

    def __init__(self, initial=None, goal=None, **other_keywords):
        """Specify the initial and goal states.
        Subclasses can use other keywords if they want."""
        self.__dict__.update(initial=initial, goal=goal, **other_keywords) 

    def actions(self, state):           raise NotImplementedError
    def result(self, state, action):    raise NotImplementedError
    def is_goal(self, state):           return state == self.goal
    def step_cost(self, s, action, s1): return 1
    def h(self, node):                  return 0
    

class Node:
    "A Node in a search tree."
    def __init__(self, state, parent=None, action=None, path_cost=0):
        self.__dict__.update(state=state, parent=parent, action=action, path_cost=path_cost)

    def __repr__(self): return '<{}>'.format(self.state)
    def __len__(self): return 0 if self.parent is None else (1 + len(self.parent))
    def __lt__(self, other): return self.state < other.state
    
    
def expand(problem, node):
    "Expand a node, generating the children nodes."
    s = node.state
    for action in problem.actions(s):
        s1 = problem.result(s, action)
        cost = node.path_cost + problem.step_cost(s, action, s1)
        yield Node(s1, node, action, cost)
        

def path_actions(node):
    "The sequence of actions to get to this node."
    return [] if node.parent is None else path_actions(node.parent) + [node.action]


def path_states(node):
    "The sequence of states to get to this node."
    return ([] if node.parent is None else path_states(node.parent) ) + [node.state]


def path(node):
    "Alternating states and actions to get to this node."
    return ([] if node.parent is None else path(node.parent) + [node.action] ) + [node.state]

# Queues

In [2]:
FIFOQueue = list

LIFOQueue = deque

class PriorityQueue:
    """A queue in which the item with minimum f(item) is always popped first."""

    def __init__(self, items=(), key=lambda x: x): 
        self.key = key
        self.items = []
        for item in items:
            self.add(item)
         
    def add(self, item):
        """Add item to the queuez."""
        heapq.heappush(self.items, (self.key(item), item))

    def pop(self):
        """Pop and return the item with min f(item) value."""
        return heapq.heappop(self.items)[1]

    def __len__(self): return len(self.items)

# Search Algorithms

Here are the six major state-space search algorithms covered in the book:

In [3]:
def breadth_first_search(problem):
    "Search shallowest nodes in the search tree first."
    frontier = LIFOQueue([Node(problem.initial)])
    reached = set()
    while frontier:
        node = frontier.pop()
        if problem.is_goal(node.state):
            return node
        for child in expand(problem, node):
            s = child.state
            if s not in reached:
                reached.add(s)
                frontier.appendleft(child)
    return failure

def depth_limited_search(problem, limit=5):
    "Search deepest nodes in the search tree first."
    frontier = FIFOQueue([Node(problem.initial)])
    solution = failure
    while frontier:
        node = frontier.pop()
        if len(node) > limit:
            solution = cutoff
        else:
            for child in expand(problem, node):
                if problem.is_goal(child.state):
                    return child
                frontier.append(child)
    return solution

def iterative_deepening_search(problem):
    "Do depth-limited search with increasing depth limits."
    for limit in range(1, sys.maxsize):
        result = depth_limited_search(problem, limit)
        if result != cutoff:
            return result


def best_first_search(problem, f):
    "Search nodes with minimum f(node) value first."
    frontier = PriorityQueue([Node(problem.initial)], key=f)
    reached = {}
    while frontier:
        node = frontier.pop()
        if problem.is_goal(node.state):
            return node
        for child in expand(problem, node):
            s = child.state
            if s not in reached or child.path_cost < reached[s].path_cost:
                reached[s] = child
                frontier.add(child)
    return failure


def uniform_cost_search(problem):
    "Search niodes with minimum  path cost first."
    return best_first_search(problem, lambda node: node.path_cost)


def astar_search(problem, h=None):
    """Search niodes with minimum f(n) = g(n) + h(n)."""
    h = h or problem.h
    return best_first_search(problem, lambda node: node.path_cost + h(node))

failure = Node('failure', path_cost=math.inf) # Indicates an algorithm couldn't find a solution.
cutoff  = Node('cutoff', path_cost=math.inf)  # Indicates iterative deeepening search was cut off.

# Problem Domains

Now we turn our attention to defining some problem domains.

# Water Pouring Problems

In [4]:
class PourProblem(Problem):
    """Problem about pouring water between jugs to achieve some water level.
    Each state is a tuples of water levels. In the initialization, provide a tuple of 
    sizes, e.g. PourProblem((2, 4, 3), 7, sizes=(8, 16, 32)), 
    which means three jugs of sizes (8, 16, 32), initially filled with (2, 4, 3) units of 
    water, respectively, and the goal is to get a level of 7 in any one of the jugs."""
    
    def actions(self, state):
        """The actions executable in this state."""
        jugs = range(len(state))
        return ([('Fill', i)    for i in jugs if state[i] < self.sizes[i]] +
                [('Dump', i)    for i in jugs if state[i]] +
                [('Pour', i, j) for i in jugs if state[i] for j in jugs if i != j])

    def result(self, state, action):
        """The state that results from executing this action in this state."""
        result = list(state)
        act, i, *_ = action
        if act == 'Fill':   # Fill i to capacity
            result[i] = self.sizes[i]
        elif act == 'Dump': # Empty i
            result[i] = 0
        elif act == 'Pour': # Pour from i into j
            j = action[2]
            amount = min(state[i], self.sizes[j] - state[j])
            result[i] -= amount
            result[j] += amount
        return tuple(result)

    def is_goal(self, state):
        """True if the goal level is in any one of the jugs."""
        return self.goal in state
    
    
class GreenPourProblem(PourProblem): 
    """A PourProblem in which we count not the number of steps, but the amount of water used."""
    def step_cost(self, state, action, result=None):
        "The cost is the amount of water used in a fill."
        act, i, *_ = action
        return self.sizes[i] - state[i] if act == 'Fill' else 0

# Route Finding Problems

In [5]:
class RouteProblem(Problem):
    """A problem to find a route between places on a map.
    Use RouteProblem('S', 'G', map=Map(...)})"""
    
    def actions(self, state): 
        """The places you can get to from this state. (Action names are the same as place names.)"""
        return self.map.neighbors[state]
    
    def result(self, state, action):
        """Go to the `action` place, if the map says that is possible."""
        return action if action in self.map.neighbors[state] else state
    
    def step_cost(self, s, action, s1):
        """The actual distance between s and s1."""
        return self.map.distances[s, s1]
    
    def h(self, node):
        "Straight-line distance between state and the goal."
        locs = self.map.locations
        s, g = locs[node.state], locs[self.goal]
        return abs(complex(*s) - complex(*g))

class Map:
    """Builds an undirected graph of {vertex: [neighbors...]}, with two additional annotations:
       neighbors:
       distances: a dict of {(v1, v2): number} giving the distance from v1 to v2;
       locations: a dict of {v: (x, y)} giving the (x, y) location of each vertex."""
    def __init__(self, distances, locations=()):
        self.neighbors = undirected_graph(distances)
        self.distances = distances
        self.locations = locations or defaultdict(lambda: (0, 0))
        for (v1, v2) in list(distances):
            distances[v2, v1] = distances[v1, v2]
            
def undirected_graph(pairs):
    "Given {(v1, v2)...} pairs, return a graph of {v1: [v2,...], v2:[v1,...]}."
    graph = defaultdict(tuple)
    for (v1, v2) in pairs:
        graph[v1] += (v2,)
        graph[v2] += (v1,)
    return dict(graph)

romania = Map(distances={
    ('O', 'Z'): 71, ('O', 'S'): 151, ('A', 'Z'): 75, ('A', 'S'): 140, ('A', 'T'): 118, 
    ('L', 'T'): 111, ('L', 'M'): 70, ('D', 'M'): 75, ('C', 'D'): 120, ('C', 'R'): 146, 
    ('C', 'P'): 138, ('R', 'S'): 80, ('F', 'S'): 99, ('B', 'F'): 211, ('B', 'P'): 101, 
    ('B', 'G'): 90, ('B', 'U'): 85, ('H', 'U'): 98,  ('E', 'H'): 86, ('U', 'V'): 142, 
    ('I', 'V'): 92, ('I', 'N'): 87, ('P', 'R'): 97},
    locations=dict(
    A=(91, 492), B=(400, 327), C=(253, 288), D=(165, 299), E=(562, 293), F=(305, 449),
    G=(375, 270), H=(534, 350), I=(473, 506), L=(165, 379), M=(168, 339), N=(406, 537),
    O=(131, 571), P=(320, 368), R=(233, 410), S=(207, 457), T=(94, 410), U=(456, 350),
    V=(509, 444), Z=(108, 531)))

In [6]:
romania.neighbors['A'] # Neighbors of 

('Z', 'S', 'T')

In [7]:
romania.distances['A', 'Z']

75

In [8]:
romania.locations['A']

(91, 492)

# 8 Puzzle Problems



In [9]:
class EightPuzzle(Problem):
    """ The problem of sliding tiles numbered from 1 to 8 on a 3x3 board,
    where one of the squares is a blank, trying to reach a goal configuration.
    A board state is represented as a tuple of length 9, where the element at index i 
    represents the tile number at index i, or 0 if for the empty square, e.g. the goal:
        1 2 3
        4 5 6 ==> (1, 2, 3, 4, 5, 6, 7, 8, 0)
        7 8 _
    """
    
    def actions(self, state):
        """The numbers of the squares that the blank can move to."""
        moves = ((1, 3),    (0, 2, 4),    (1, 5),
                 (0, 4, 6), (1, 3, 5, 7), (2, 4, 8),
                 (3, 7),    (4, 6, 8),    (7, 5))
        blank = state.index(0)
        return moves[blank]
    
    def result(self, state, action):
        """Swap the blank with the square numbered `action`."""
        s = list(state)
        blank = state.index(0)
        s[action], s[blank] = s[blank], s[action]
        return tuple(s)
    
    def h(self, node):
        """The misplaced tiles heuristic."""
        return sum(s != g for (s, g) in zip(node.state, self.goal))
    
    
def board8(board, fmt=(3 * '{} {} {}\n')):
    "A string representing an 8-puzzle board"
    return fmt.format(*board).replace('0', '_')

# Specific Problems and Solutions

Now that we have some domains, we can make specific problems in those domains, and solve them:




In [10]:
p1 = PourProblem((1, 1, 1), 13, sizes=(2, 16, 32))
p2 = PourProblem((0, 0, 0), 21, sizes=(8, 11, 31))
p3 = PourProblem((0, 0), 8, sizes=(7,9))
p4 = PourProblem((0, 0, 0), 21, sizes=(8, 11, 31))

g1 = GreenPourProblem((1, 1, 1), 13, sizes=(2, 16, 32))
g2 = GreenPourProblem((0, 0, 0), 21, sizes=(8, 11, 31))
g3 = GreenPourProblem((0, 0), 8, sizes=(7,9))
g4 = GreenPourProblem((0, 0, 0), 21, sizes=(8, 11, 31))

r1 = RouteProblem('A', 'B', map=romania)
r2 = RouteProblem('N', 'L', map=romania)
r3 = RouteProblem('E', 'T', map=romania)
r4 = RouteProblem('O', 'M', map=romania)

goal = (1, 2, 3, 4, 5, 6, 7, 8, 0)
e1 = EightPuzzle((4, 0, 2, 5, 1, 3, 7, 8, 6), goal)
e2 = EightPuzzle((1, 4, 2, 0, 7, 5, 3, 6, 8), goal)
e3 = EightPuzzle((2, 5, 8, 1, 4, 7, 0, 3, 6), goal)
e4 = EightPuzzle((0, 1, 2, 3, 4, 5, 6, 7, 8), goal)

In [11]:
# Solve a problem (which gives a node) and recover the sequence of states in that node's path
path_states(astar_search(r1))

['A', 'S', 'R', 'P', 'B']

In [12]:
# Breadth first search finds a solution with fewer steps, but in this case higher path cost
path_states(breadth_first_search(r1))

['A', 'S', 'F', 'B']

In [13]:
# Solve a problem and recover the path of alternating states and actions
path(breadth_first_search(p1))

[(1, 1, 1),
 ('Fill', 1),
 (1, 16, 1),
 ('Pour', 1, 0),
 (2, 15, 1),
 ('Dump', 0),
 (0, 15, 1),
 ('Pour', 1, 0),
 (2, 13, 1)]

In [14]:
# Solve an 8 puzzle problem and print out each state

for s in path_states(astar_search(e1)):
    print(board8(s))

4 _ 2
5 1 3
7 8 6

4 1 2
5 _ 3
7 8 6

4 1 2
_ 5 3
7 8 6

_ 1 2
4 5 3
7 8 6

1 _ 2
4 5 3
7 8 6

1 2 _
4 5 3
7 8 6

1 2 3
4 5 _
7 8 6

1 2 3
4 5 6
7 8 _



# Reporting Metrics

Now let's gather some metrics on how well each algorithm does.  We'll use `CountCalls` to wrap a `Problem` object in such a way that calls to its methods are delegated, but each call increments a counter. Once we've solved the problem, we print out summary statistics.

In [15]:
class CountCalls:
    """Delegate all attribute accesses to the object, and count them in ._counts"""
    def __init__(self, obj):
        self._object = obj
        self._counts = Counter()
        
    def __getattr__(self, attr):
        self._counts[attr] += 1
        return getattr(self._object, attr)
        
def report(searchers, problems):
    "Show metrics for each searcher on each problem."
    for searcher in searchers:
        print(searcher.__name__ + ':')
        total_counts = Counter()
        for p in problems:
            prob = CountCalls(p)
            soln = searcher(prob)
            cts  = prob._counts; 
            cts.update(len=len(path_actions(soln)), cost=soln.path_cost)
            total_counts += cts
            report_line(cts, type(p).__name__)
        report_line(total_counts, 'TOTAL\n')
        
def report_line(counts, name):
    "Print one line of the report."
    print('{:7,d} Exp |{:7,d} Gen |{:7,d} Goal |{:5.0f} cost |{:3d} len | {}'
          .format(counts['actions'], counts['result'], counts['is_goal'], 
                  counts['cost'], counts['len'], name))

In [16]:
# Here's a tiny report
report([astar_search], [p1, p2])

astar_search:
    150 Exp |  1,325 Gen |    151 Goal |    4 cost |  4 len | PourProblem
    378 Exp |  3,381 Gen |    379 Goal |    9 cost |  9 len | PourProblem
    528 Exp |  4,706 Gen |    530 Goal |   13 cost | 13 len | TOTAL



The last line says that, over the two problems `[p1, p2]`, the `astar_search` algorithm expanded 528 nodes, generating 4,706 nodes and doing 530 goal checks. Together, the two solutions had a path cost of 13 and also a total length of 13 (since step cost was 1 in these problems). 

Now let's do a bigger report, concentrating first on the easier problems, then harder ones:

In [17]:
easy = (p1, g1, r1, r2, r3, r4, e1)
hard = (g2, p2, g3, p3, g4, p4, e2, e3, e4)

report((astar_search, uniform_cost_search,  breadth_first_search, 
        iterative_deepening_search, depth_limited_search), easy)

astar_search:
    150 Exp |  1,325 Gen |    151 Goal |    4 cost |  4 len | PourProblem
    185 Exp |  1,646 Gen |    186 Goal |   10 cost | 12 len | GreenPourProblem
      5 Exp |     15 Gen |      6 Goal |  418 cost |  4 len | RouteProblem
     15 Exp |     35 Gen |     16 Goal |  910 cost |  9 len | RouteProblem
     14 Exp |     34 Gen |     15 Goal |  805 cost |  8 len | RouteProblem
      9 Exp |     22 Gen |     10 Goal |  445 cost |  5 len | RouteProblem
     11 Exp |     29 Gen |     12 Goal |    7 cost |  7 len | EightPuzzle
    389 Exp |  3,106 Gen |    396 Goal | 2599 cost | 49 len | TOTAL

uniform_cost_search:
    150 Exp |  1,325 Gen |    151 Goal |    4 cost |  4 len | PourProblem
    185 Exp |  1,646 Gen |    186 Goal |   10 cost | 12 len | GreenPourProblem
     13 Exp |     33 Gen |     14 Goal |  418 cost |  4 len | RouteProblem
     19 Exp |     43 Gen |     20 Goal |  910 cost |  9 len | RouteProblem
     20 Exp |     45 Gen |     21 Goal |  805 cost |  8 len | Rout

One thing to notice: on three of the problems, `depth_limited_search` had a path cost of `inf`, meaning that the search was cut off, so it reported an infinite cost.

If we look at the whole `cost` column, we see that the optimal algorithms, `astar_search` and `uniform_cost_search`, give the best results, while `breadth_first_search` and `iterative_deepening_search` have non-optimal costs on some problems, because they find a solution with the minimal number of steps, but not the minimal path cost. We see that `astar_search` has fewer expansions, generated nodes, and goal tests that `uniform_cost_search`, which means the heuristic helps (if only by 10% or so).

Next I'll try some harder problems; I won't even try the tree search algorithms on these problems; too many redundant paths.

In [18]:
report((astar_search, uniform_cost_search, breadth_first_search), hard)

astar_search:
    451 Exp |  4,048 Gen |    452 Goal |   21 cost | 19 len | GreenPourProblem
    378 Exp |  3,381 Gen |    379 Goal |    9 cost |  9 len | PourProblem
     30 Exp |    126 Gen |     31 Goal |   35 cost | 16 len | GreenPourProblem
     30 Exp |    126 Gen |     31 Goal |   14 cost | 14 len | PourProblem
    451 Exp |  4,048 Gen |    452 Goal |   21 cost | 19 len | GreenPourProblem
    378 Exp |  3,381 Gen |    379 Goal |    9 cost |  9 len | PourProblem
 10,338 Exp | 27,461 Gen | 10,339 Goal |   23 cost | 23 len | EightPuzzle
 14,119 Exp | 37,562 Gen | 14,120 Goal |   24 cost | 24 len | EightPuzzle
  5,989 Exp | 15,951 Gen |  5,990 Goal |   22 cost | 22 len | EightPuzzle
 32,164 Exp | 96,084 Gen | 32,173 Goal |  178 cost |155 len | TOTAL

uniform_cost_search:
    451 Exp |  4,048 Gen |    452 Goal |   21 cost | 19 len | GreenPourProblem
    378 Exp |  3,381 Gen |    379 Goal |    9 cost |  9 len | PourProblem
     30 Exp |    126 Gen |     31 Goal |   35 cost | 16 len | 

This time we see that A* is an order of magnitude more efficient than the two uninformed algorithm. Note that again,  uniform cost is optimal, but breadth-first is not: it optimized for path length, not path cost.