运行环境Python3

参考教程： UC Berkeley CS188 Intro to AI, http://ai.berkeley.edu/home.html

参考教程： Artifical Intelligence, a Morden Approach

### 测试用例，罗马尼亚地图

<img src = 'pic\RomaniaMap.JPG'>

# Topic: Search 

Keywords: DFS, BFS, Best-first Search, A-star

- 一些辅助类和方法， Node 类用于和节点相关的方法封装。 h计算， Decorator的
- 问题描述类GraphProblem, 用于呈现拓扑结构，起始点，终点，以及目标达成的判断。
- 算法核心过程，以及用于不用策略实现的各种队列


### Some supporting functions

##### Decorator
用于存放调试数据

make it remember the computed value for any argument list. (slot=attribute)
- If slot is specified, store result in that slot of first argument. 
- If slot is false, store results in a dictionary.

在罗马尼亚地图的例子里，给每个节点(obj)增加了一个'f'属性，用于存放f(n), f(n) = h 就是greedy search, f(n) = g(n) + h(n) 就是A-star search


##### Node
表述图中的每一个节点

- path 返回的是该节点到根节点之间的节点列表 (List with Node instances)
- expand 返回的是该节点可以访问到的所有节点的列表 (List with Node instances


##### Directed and Undirected Graphs

有向图和无向图的类,初始化时使用字典表述父子节点的关系。 无向图就是在有向图的基础上，追加基于节点的字典项。

    """A graph connects nodes (verticies) by edges (links).  Each edge can also
    have a length associated with it.  The constructor call is something like:
        g = Graph({'A': {'B': 1, 'C': 2})   
    this makes a graph with 3 nodes, A, B, and C, with an edge of length 1 from
    A to B,  and an edge of length 2 from A to C.  You can also do:
        g = Graph({'A': {'B': 1, 'C': 2}, directed=False)
    This makes an undirected graph, so inverse links are also added. The graph
    stays undirected; if you add more links with g.connect('B', 'C', 3), then
    inverse link is also added.  You can use g.nodes() to get a list of nodes,
    g.get('A') to get a dict of links out of A, and g.get('A', 'B') to get the
    length of the link from A to B.  'Lengths' can actually be any object at 
    all, and nodes can be any hashable object."""


##### Distance

用于计算 h(n), relax problem


##### infinity

Constant value simulate infinity


In [21]:
import math
import bisect

In [22]:
def memoize(fn, slot=None):
    if slot:
        def memoized_fn(obj, *args):
            if hasattr(obj, slot):
                return getattr(obj, slot)
            else:
                val = fn(obj, *args)
                setattr(obj, slot, val)
                return val
    else:
        def memoized_fn(*args):
            if not memoized_fn.cache.has_key(args):
                memoized_fn.cache[args] = fn(*args)
            return memoized_fn.cache[args]
        memoized_fn.cache = {}
    return memoized_fn


In [23]:
class Node:

    def __init__(self, state, parent=None, path_cost=0):
        self.state = state
        self.parent = parent
        self.path_cost = path_cost

        if parent:
            self.depth = parent.depth + 1
        else:
            self.depth = 0

    def __repr__(self):
        return self.state

    def path(self):
        x, result = self, [self]
        while x.parent:
            result.append(x.parent)
            x = x.parent
        return result

    def expand(self, problem):
        return [Node(next, self, problem.path_cost(self.path_cost, self.state, next)) 
                for next in problem.successor(self.state)]

# Note: 两个父节点共享一个子节点时，子节点被创建2遍。 但由于重复的子节点有同样state名字，所以在close set 中，可以防止子节点被重复放入。

In [24]:
infinity = 1.0e400

def distance(A, B):
    "The distance between two (x, y) points."
    (ax, ay) = A
    (bx, by) = B
    return math.hypot((ax - bx), (ay - by))

In [25]:
class Graph:

    def __init__(self, dictionary=None, directed=True):
        self.dictionary = dictionary or {}
        self.directed = directed
        if not directed: self.make_undirected()

    def make_undirected(self):
        "Make a digraph into an undirected graph by adding symmetric edges."
        tmp = list(self.dictionary.keys())
        for a in tmp:
            for (b, distance) in self.dictionary[a].items():
                self.connect1(b, a, distance)

    def connect(self, A, B, distance=1):
        """Add a link from A and B of given distance, and also add the inverse
        link if the graph is undirected."""
        self.connect1(A, B, distance)
        if not self.directed: self.connect1(B, A, distance)

    def connect1(self, A, B, distance):
        "Add a link from A to B of given distance, in one direction only."
        self.dictionary.setdefault(A,{})[B] = distance

    def get(self, a, b=None):
        """Return a link distance or a dict of {node: distance} entries.
        .get(a,b) returns the distance or None;
        .get(a) returns a dict of {node: distance} entries, possibly {}."""
        links = self.dictionary.setdefault(a, {})
        if b is None: return links
        else: return links.get(b)

    def nodes(self):
        "Return a list of nodes in the graph."
        return self.dictionary.keys()
    
def UndirectedGraph(dictionary=None):
    "Build a Graph where every edge (including future ones) goes both ways."
    return Graph(dictionary=dictionary, directed=False)

### Problem 定义

完成如下功能
- init 问题的初始状态和最终状态
- goal_test 是否达到目标
- successor 任意节点的可以拓展的下一个目标（集合）
- path_cost 节点A到节点B的path_cost
- h h值


In [26]:
class GraphProblem:
    "The problem of searching a graph from one node to another."
    def __init__(self, initial, goal, graph):
        self.initial = initial
        self.goal = goal
        self.graph = graph
        
    def goal_test(self, state):
        return state == self.goal

    def successor(self, A):
        "Return a list of connected nodes in state space"
        return [B for B in self.graph.get(A).keys()]

    def path_cost(self, cost_so_far, A, B):
        return cost_so_far + (self.graph.get(A,B) or infinity)

    def h(self, node):
        "h function is straight-line distance from a node's state to goal."
        locs = getattr(self.graph, 'locations', None)
        if locs:
            return int(distance(locs[node.state], locs[self.goal]))
        else:
            return infinity

    def value(self):
        return 1

In [27]:
romania = UndirectedGraph(dict(
    A=dict(Z=75, S=140, T=118),
    B=dict(U=85, P=101, G=90, F=211),
    C=dict(D=120, R=146, P=138),
    D=dict(M=75),
    E=dict(H=86),
    F=dict(S=99),
    H=dict(U=98),
    I=dict(V=92, N=87),
    L=dict(T=111, M=70),
    O=dict(Z=71, S=151),
    P=dict(R=97),
    R=dict(S=80),
    U=dict(V=142)))

romania.locations = dict(
    A=( 91, 492),    B=(400, 327),    C=(253, 288),   D=(165, 299),
    E=(562, 293),    F=(305, 449),    G=(375, 270),   H=(534, 350),
    I=(473, 506),    L=(165, 379),    M=(168, 339),   N=(406, 537),
    O=(131, 571),    P=(320, 368),    R=(233, 410),   S=(207, 457),
    T=( 94, 410),    U=(456, 350),    V=(509, 444),   Z=(108, 531))


### 算法核心过程
##### Queues

定义不同的队列，search 算法的不同就是由队列不同来体现。 从算法角度讲，就是如何按照不同的策略取出下一个/批要搜索/检查的节点。

    """Queue is an abstract class/interface. There are three types:
        Stack(): A Last In First Out Queue.
        FIFOQueue(): A First In First Out Queue.
        PriorityQueue(lt): Queue where items are sorted by lt, (default <).
    Each type supports the following methods and functions:
        q.append(item)  -- add an item to the queue
        q.extend(items) -- equivalent to: for item in items: q.append(item)
        q.pop()         -- return the top item from the queue
        len(q)          -- number of items in q (also q.__len())
    Note that isinstance(Stack(), Queue) is false, because we implement stacks
    as lists.  If Python ever gets interfaces, Queue will be an interface."""


In [28]:
# Queues: Stack, FIFOQueue, PriorityQueue

import abc

class Queue(metaclass=abc.ABCMeta):

    @abc.abstractclassmethod
    def __init__(self):
        pass

    def extend(self, items):
        for item in items: self.append(item)



def Stack():
    """Return an empty list, suitable as a Last-In-First-Out Queue."""
    return []


class FIFOQueue(Queue):
    """A First-In-First-Out Queue."""
    def __init__(self):
        self.A = []
        self.start = 0

    def append(self, item):
        self.A.append(item)

    def __len__(self):
        return len(self.A) - self.start

    def extend(self, items):
        self.A.extend(items)

    def pop(self):
        e = self.A[self.start]
        self.start += 1
        if self.start > 5 and self.start > len(self.A)/2:
            self.A = self.A[self.start:]
            self.start = 0
        return e

    def __repr__(self):
        return str(self.A[self.start:len(self.A)])


class PriorityQueue(Queue):
    """A queue in which the minimum (or maximum) element (as determined by f and
    order) is returned first. If order is min, the item with minimum f(x) is
    returned first; if order is max, then it is the item with maximum f(x)."""
    def __init__(self, order=min, f=lambda x: x):
        self.A = []
        self.f = f
        self.order = order

    def append(self, item):
        bisect.insort(self.A, (self.f(item), item))

    def __len__(self):
        return len(self.A)

    def pop(self):
        if self.order == min:
            return self.A.pop(0)[1]
        else:
            return self.A.pop()[1]

    def __repr__(self):
        return str(self.A)

###  搜索的核心框架

- problem, 就是从Problem基类继承出来的描述问题的逻辑体，完成初始，目标，目标判断，以及触发获取周围节点的能力。
- fringe,是队列，也是核心。代表了search 算法用的何种策略来获取周围节点，如： FILO 实现的DFS, FIFO 实现BFS, PriorityQueue 就是Greedy/Astar 的基础。

In [29]:
def graph_search(problem, fringe):

    closed = {}
    fringe.append(Node(problem.initial))
    while fringe:
        node = fringe.pop()

        if problem.goal_test(node.state):
            return node
        if node.state not in closed:
            closed[node.state] = True
            fringe.extend(node.expand(problem))
    return None

In [30]:
def breadth_first_graph_search(problem):
    return graph_search(problem, FIFOQueue())

def depth_first_graph_search(problem):
    return graph_search(problem, Stack())

In [31]:
def best_first_graph_search(problem):
    def fn(item):
        return item.path_cost
    f = memoize(fn, 'f')
    return graph_search(problem, PriorityQueue(max, f))


In [32]:
def astar_search(problem, h=None):
    """A* search is best-first graph search with f(n) = g(n)+h(n).
    You need to specify the h function when you call astar_search.
    Uses the pathmax trick: f(n) = max(f(n), g(n)+h(n))."""
    h = h or problem.h
    def f(n):
        return max(getattr(n, 'f', -infinity), n.path_cost + h(n))
    f = memoize(f, 'f')
    return graph_search(problem, PriorityQueue(min, f))


## 测试



In [33]:
# DFS and BFS

ab = GraphProblem('A','B',romania)

print("BFS".center(50, '-'))
print(breadth_first_graph_search(ab).state)
[node.state for node in breadth_first_graph_search(ab).path()] 

-----------------------BFS------------------------
B


['B', 'F', 'S', 'A']

In [34]:
print("DFS".center(50, '-'))
print(depth_first_graph_search(ab).state)
[node.state for node in depth_first_graph_search(ab).path()] 

-----------------------DFS------------------------
B


['B', 'P', 'C', 'D', 'M', 'L', 'T', 'A']

In [35]:
print("Best-first Search".center(50, '-'))

print(best_first_graph_search(ab).state)
[node.state for node in best_first_graph_search(ab).path()] 

----------------Best-first Search-----------------
B


['B', 'F', 'S', 'A']

In [36]:
for node in best_first_graph_search(ab).path():
    print(node.state, node.f)

B 450
F 239
S 140
A 0


In [37]:
print("A-star Search".center(50, '-'))
print(astar_search(ab).state)
for node in astar_search(ab).path():
    print(node.state, node.f)

------------------A-star Search-------------------
B
B 418
P 406
R 406
S 372
A 350
