In [1]:
class Search_problem(object):
    """A search problem consists of:
    * a start node
    * a neighbors function that gives the neighbors of a node
    * a specification of a goal
    * a (optional) heuristic function.
    The methods must be overridden to define a search problem."""

    def start_node(self):
        """returns start node"""
        raise NotImplementedError("start_node")   # abstract method
    
    def is_goal(self,node):
        """is True if node is a goal"""
        raise NotImplementedError("is_goal")   # abstract method

    def neighbors(self,node):
        """returns a list of the arcs for the neighbors of node"""
        raise NotImplementedError("neighbors")   # abstract method

    def heuristic(self,n):
        """Gives the heuristic value of node n.
        Returns 0 if not overridden."""
        return 0


In [2]:
class Arc(object):
    """An arc has a from_node and a to_node node and a (non-negative) cost"""
    def __init__(self, from_node, to_node, cost=1, action=None):
        assert cost >= 0, "Cost cannot be negative for {} -> {}, cost: {}".format(from_node, to_node, cost)
        self.from_node = from_node
        self.to_node = to_node
        self.action = action
        self.cost = cost

    def __repr__(self):
        """string representation of an arc"""
        if self.action:
            return '{} --{}--> {}'.format(self.from_node, self.action, self.to_node)
        else:
            return '{} --> {}'.format(self.from_node, self.to_node)


In [3]:
class Search_problem_from_explicit_graph(Search_problem):
    """A search problem consists of:
    * a list or set of nodes
    * a list or set of arcs
    * a start node
    * a list or set of goal nodes
    * a dictionary that maps each node into its heuristic value.
    """

    def __init__(self, nodes, arcs, start=None, goals=set(), hmap={}):
        self._neighbors = {}
        self.nodes = nodes
        for node in nodes:
            self._neighbors[node]=[]
        self.arcs = arcs
        for arc in arcs:
            self._neighbors[arc.from_node].append(arc)
        self.start = start
        self.goals = goals
        self.hmap = hmap

    def start_node(self):
        """returns start node"""
        return self.start
    
    def is_goal(self, node):
        """is True if node is a goal"""
        return node in self.goals

    def neighbors(self, node):
        """returns the neighbors of node"""
        return self._neighbors[node]

    def heuristic(self, node):
        """Gives the heuristic value of node n.
        Returns 0 if not overridden in the hmap."""
        if node in self.hmap:
            return self.hmap[node]
        else:
            return 0
        
    def __repr__(self):
        """returns a string representation of the search problem"""
        res=""
        for arc in self.arcs:
            res += '{}.  '.format(arc)
        return res

    def neighbor_nodes(self, node):
        """returns an iterator over the neighbors of node"""
        return (path.to_node for path in self._neighbors[node])


In [5]:
class Path(object):
    """A path is either a node or a path followed by an arc"""
    
    def __init__(self, initial, arc=None):
        """initial is either a node (in which case arc is None) or
        a path (in which case arc is an object of type Arc)"""
        self.initial = initial
        self.arc = arc
        if arc is None:
            self.cost = 0
        else:
            self.cost = initial.cost + arc.cost

    def end(self):
        """returns the node at the end of the path"""
        if self.arc is None:
            return self.initial
        else:
            return self.arc.to_node

    def nodes(self):
        """enumerates the nodes for the path.
        This starts at the end and enumerates nodes in the path backwards."""
        current = self
        while current.arc is not None:
            yield current.arc.to_node
            current = current.initial
        yield current.initial

    def initial_nodes(self):
        """enumerates the nodes for the path before the end node.
        This starts at the end and enumerates nodes in the path backwards."""
        if self.arc is not None:
            for nd in self.initial.nodes(): yield nd     # could be "yield from"
        
    def __repr__(self):
        """returns a string representation of a path"""
        if self.arc is None:
            return str(self.initial)
        elif self.arc.action:
            return '{}\n   --{}--> {}'.format(self.initial, self.arc.action, self.arc.to_node)
        else:
            return '{} --> {}'.format(self.initial, self.arc.to_node)


In [9]:
problem1 = Search_problem_from_explicit_graph(
    {'a','b','c','d','g'},
    [
        Arc('a','b',1),
        Arc('a','c',3),
        Arc('b','d',3),
        Arc('b','c',1),
        Arc('c','d',1),
        Arc('c','g',3),
        Arc('d','g',1)
    ],
    start = 'a',
    goals = {'g'}
)

problem1

a --> b.  a --> c.  b --> d.  b --> c.  c --> d.  c --> g.  d --> g.  

In [10]:
problem2 = Search_problem_from_explicit_graph(
    {'a','b','c','d','e','g','h','j'},
    [
        Arc('a','b',1),
        Arc('b','c',3),
        Arc('b','d',1),
        Arc('d','e',3),
        Arc('d','g',1),
        Arc('a','h',3),
        Arc('h','j',1)
    ],
    start = 'a',
    goals = {'g'}
)

problem2

a --> b.  b --> c.  b --> d.  d --> e.  d --> g.  a --> h.  h --> j.  

In [None]:
problem3 = Search_problem_from_explicit_graph(
    {'a','b','c','d','e','g','h','j'},
    [],
    start = 'g',
    goals = {'k','g'}
)


In [11]:
acyclic_delivery_problem = Search_problem_from_explicit_graph(
    {'mail','ts','o103','o109','o111','b1','b2','b3','b4','c1','c2','c3', 'o125','o123','o119','r123','storage'},
    [
        Arc('ts', 'mail', 6),
        Arc('o103', 'ts', 8),
        Arc('o103', 'b3', 4),
        Arc('o103', 'o109', 12),
        Arc('o109', 'o119', 16),
        Arc('o109', 'o111', 4),
        Arc('b1', 'c2', 3),
        Arc('b1', 'b2', 6),
        Arc('b2', 'b4', 3),
        Arc('b3', 'b1', 4),
        Arc('b3', 'b4', 7),
        Arc('b4', 'o109', 7),
        Arc('c1', 'c3', 8),
        Arc('c2', 'c3', 6),
        Arc('c2', 'c1', 4),
        Arc('o123', 'o125', 4),
        Arc('o123', 'r123', 4),
        Arc('o119', 'o123', 9),
        Arc('o119', 'storage', 7)
    ],
    start = 'o103',
    goals = {'r123'},
    hmap = {
        'mail': 26,
        'ts': 23,
        'o103': 21,
        'o109': 24,
        'o111': 27,
        'o119': 11,
        'o123': 4,
        'o125': 6,
        'r123': 0,
        'b1': 13,
        'b2': 15,
        'b3': 17,
        'b4': 18,
        'c1': 6,
        'c2': 10,
        'c3': 12,
        'storage': 12
    }
)

acyclic_delivery_problem

ts --> mail.  o103 --> ts.  o103 --> b3.  o103 --> o109.  o109 --> o119.  o109 --> o111.  b1 --> c2.  b1 --> b2.  b2 --> b4.  b3 --> b1.  b3 --> b4.  b4 --> o109.  c1 --> c3.  c2 --> c3.  c2 --> c1.  o123 --> o125.  o123 --> r123.  o119 --> o123.  o119 --> storage.  

In [12]:
cyclic_delivery_problem = Search_problem_from_explicit_graph(
    {'mail','ts','o103','o109','o111','b1','b2','b3','b4','c1','c2','c3', 'o125','o123','o119','r123','storage'},
    [
        Arc('ts', 'mail', 6),
        Arc('mail', 'ts', 6),
        Arc('o103', 'ts', 8),
        Arc('ts', 'o103', 8),
        Arc('o103', 'b3', 4), 
        Arc('o103', 'o109', 12),
        Arc('o109', 'o103', 12),
        Arc('o109', 'o119', 16),
        Arc('o119', 'o109', 16),
        Arc('o109', 'o111', 4),
        Arc('o111', 'o109', 4),
        Arc('b1', 'c2', 3),
        Arc('b1', 'b2', 6),
        Arc('b2', 'b1', 6),
        Arc('b2', 'b4', 3),
        Arc('b4', 'b2', 3),
        Arc('b3', 'b1', 4),
        Arc('b1', 'b3', 4),
        Arc('b3', 'b4', 7),
        Arc('b4', 'b3', 7),
        Arc('b4', 'o109', 7), 
        Arc('c1', 'c3', 8),
        Arc('c3', 'c1', 8),
        Arc('c2', 'c3', 6),
        Arc('c3', 'c2', 6),
        Arc('c2', 'c1', 4),
        Arc('c1', 'c2', 4),
        Arc('o123', 'o125', 4),
        Arc('o125', 'o123', 4),
        Arc('o123', 'r123', 4),
        Arc('r123', 'o123', 4),
        Arc('o119', 'o123', 9),
        Arc('o123', 'o119', 9),
        Arc('o119', 'storage', 7),
        Arc('storage', 'o119', 7)
    ],
    start = 'o103',
    goals = {'r123'},
    hmap = {
        'mail': 26,
        'ts': 23,
        'o103': 21,
        'o109': 24,
        'o111': 27,
        'o119': 11,
        'o123': 4,
        'o125': 6,
        'r123': 0,
        'b1': 13,
        'b2': 15,
        'b3': 17,
        'b4': 18,
        'c1': 6,
        'c2': 10,
        'c3': 12,
        'storage': 12
    }
)

cyclic_delivery_problem

ts --> mail.  mail --> ts.  o103 --> ts.  ts --> o103.  o103 --> b3.  o103 --> o109.  o109 --> o103.  o109 --> o119.  o119 --> o109.  o109 --> o111.  o111 --> o109.  b1 --> c2.  b1 --> b2.  b2 --> b1.  b2 --> b4.  b4 --> b2.  b3 --> b1.  b1 --> b3.  b3 --> b4.  b4 --> b3.  b4 --> o109.  c1 --> c3.  c3 --> c1.  c2 --> c3.  c3 --> c2.  c2 --> c1.  c1 --> c2.  o123 --> o125.  o125 --> o123.  o123 --> r123.  r123 --> o123.  o119 --> o123.  o123 --> o119.  o119 --> storage.  storage --> o119.  

In [25]:
from lib.display import Displayable, visualize

class Searcher(Displayable):
    """returns a searcher for a problem.
    Paths can be found by repeatedly calling search().
    This does depth-first search unless overridden
    """
    
    def __init__(self, problem):
        """creates a searcher from a problem
        """
        self.problem = problem
        self.initialize_frontier()
        self.num_expanded = 0
        self.add_to_frontier(Path(problem.start_node()))
        super().__init__()

    def initialize_frontier(self):
        self.frontier = []
        
    def empty_frontier(self):
        return self.frontier == []
        
    def add_to_frontier(self,path):
        self.frontier.append(path)
        
    @visualize
    def search(self):
        """returns (next) path from the problem's start node
        to a goal node. 
        Returns None if no path exists.
        """
        while not self.empty_frontier():
            path = self.frontier.pop()
            self.display(2, "Expanding:",path,"(cost:",path.cost,")")
            self.num_expanded += 1
            if self.problem.is_goal(path.end()):    # solution found
                self.display(1, self.num_expanded, "paths have been expanded and",
                            len(self.frontier), "paths remain in the frontier")
                self.solution = path   # store the solution found
                return path
            else:
                neighs = self.problem.neighbors(path.end())
                self.display(3,"Neighbors are", neighs)
                for arc in reversed(list(neighs)):
                    self.add_to_frontier(Path(path,arc))
                self.display(3,"Frontier:",self.frontier)
        self.display(1, "No (more) solutions. Total of", self.num_expanded, "paths expanded.")


In [15]:
import heapq        # part of the Python standard library

class FrontierPQ(object):
    """A frontier consists of a priority queue (heap), frontierpq, of
        (value, index, path) triples, where
    * value is the value we want to minimize (e.g., path cost + h).
    * index is a unique index for each element
    * path is the path on the queue
    Note that the priority queue always returns the smallest element.
    """

    def __init__(self):
        """constructs the frontier, initially an empty priority queue 
        """
        self.frontier_index = 0  # the number of items ever added to the frontier
        self.frontierpq = []  # the frontier priority queue

    def empty(self):
        """is True if the priority queue is empty"""
        return self.frontierpq == []

    def add(self, path, value):
        """add a path to the priority queue
        value is the value to be minimized"""
        self.frontier_index += 1    # get a new unique index
        heapq.heappush(self.frontierpq, (value, -self.frontier_index, path))

    def pop(self):
        """returns and removes the path of the frontier with minimum value.
        """
        (_,_,path) = heapq.heappop(self.frontierpq)
        return path 

    def count(self,val):
        """returns the number of elements of the frontier with value=val"""
        return sum(1 for e in self.frontierpq if e[0]==val)

    def __repr__(self):
        """string representation of the frontier"""
        return str([(n,c,str(p)) for (n,c,p) in self.frontierpq])
    
    def __len__(self):
        """length of the frontier"""
        return len(self.frontierpq)

    def __iter__(self):
        """iterate through the paths in the frontier"""
        for (_,_,path) in self.frontierpq:
            yield path


In [16]:
class AStarSearcher(Searcher):
    """returns a searcher for a problem.
    Paths can be found by repeatedly calling search().
    """

    def __init__(self, problem):
        super().__init__(problem)

    def initialize_frontier(self):
        self.frontier = FrontierPQ()

    def empty_frontier(self):
        return self.frontier.empty()

    def add_to_frontier(self,path):
        """add path to the frontier with the appropriate cost"""
        value = path.cost + self.problem.heuristic(path.end())
        self.frontier.add(path, value)


In [26]:
def test(SearchClass, problem=problem1, solution=['g','d','c','b','a'] ):
    """Unit test for aipython searching algorithms.
    SearchClass is a class that takes a problemm and implements search()
    problem is a search problem
    solution is the unique (optimal) solution. 
    """
    print("Testing problem 1:")
    schr1 = SearchClass(problem)
    path1 = schr1.search()
    print("Path found:",path1)
    assert list(path1.nodes()) == solution, "Shortest path not found in problem1"
    print("Passed unit test")

if __name__ == "__main__":
    #test(Searcher)
    test(AStarSearcher)
    
# example queries:
# searcher1 = Searcher(searchProblem.acyclic_delivery_problem)   # DFS
# searcher1.search()  # find first path
# searcher1.search()  # find next path
# searcher2 = AStarSearcher(searchProblem.acyclic_delivery_problem)   # A*
# searcher2.search()  # find first path
# searcher2.search()  # find next path
# searcher3 = Searcher(searchProblem.cyclic_delivery_problem)   # DFS
# searcher3.search()  # find first path with DFS. What do you expect to happen?
# searcher4 = AStarSearcher(searchProblem.cyclic_delivery_problem)    # A*
# searcher4.search()  # find first path


Testing problem 1:
7 paths have been expanded and 4 paths remain in the frontier
Path found: a --> b --> c --> d --> g
Passed unit test
