In [1]:
import math
from collections import deque
from itertools import combinations
import heapq

## Formulate a pour water problem

In [2]:
class PourProblem(object):
    
    def __init__(self, initial=None, goal=None, sizes=None):
        self.initial = initial
        self.goal = goal
        self.sizes = sizes

    def actions(self, state):
        buckets = range(len(state)) 
        return ([('Fill', i)    for i in buckets if state[i] < self.sizes[i]] +        
                [('Pour', i, j) for i in buckets if state[i] for j in buckets if i != j]) 
    
    def result(self, state, action):   
        result = list(state)
        act, i, *_ = action # action[0] = act, action[1]=i, action[2] = j (not all actions contains j)
       
        if act == 'Fill':  # Fill i to capacity
            result[i] = self.sizes[i]
        elif act == 'Pour': # Pour from i into j
            j = action[2]
            amount = min(state[i], self.sizes[j] - state[j]) 
            result[i] -= amount
            result[j] += amount
        return tuple(result)

    def is_goal(self, state):
        return self.goal in state
    
    def action_cost(self, start_state, action, end_state): 
        return 1

In [3]:
p1 = PourProblem((0,0,0),7,(3,5,9))

## Formulat an EightPuzzel Game problem

In [4]:
class EightPuzzle(object):
    """ The problem of sliding tiles numbered from 1 to 8 on a 3x3 board,
    where one of the squares is a blank, trying to reach a goal configuration.
    A board state is represented as a tuple of length 9, where the element at index i 
    represents the tile number at index i, or 0 if for the empty square, e.g. the goal:
        1 2 3
        4 5 6 ==> (1, 2, 3, 4, 5, 6, 7, 8, 0)
        7 8 _
    """
    def __init__(self, initial=None, goal=(0, 1, 2, 3, 4, 5, 6, 7, 8)):
        """The constructor specifies the initial state = None, and the default goal
        state= (0,1,2,3,4,5,6,7,8). Both initial and goal states can be modify by users 
        through arguments 'initial' and 'goal'"""
        assert inversions(initial) % 2 == inversions(goal) % 2 # Parity check, assert expression => if not expression: raise AssertionError
        self.initial = initial
        self.goal = goal
    
    def actions(self, state):
        """The indexes of the squares that the blank can move to."""
        moves = ((1, 3),    (0, 2, 4),    (1, 5),
                 (0, 4, 6), (1, 3, 5, 7), (2, 4, 8),
                 (3, 7),    (4, 6, 8),    (7, 5))
        blank = state.index(0)
        return moves[blank]
    
    def result(self, state, action):
        """Swap the blank with the square numbered `action`."""
        s = list(state) #convert the state from tuple into list s
        blank = state.index(0)
        s[action], s[blank] = s[blank], s[action]
        return tuple(s) #convert the state back into tuple
    def is_goal(self, state): 
        """Return True if the state is a goal. """
        return state == self.goal
    
    def action_cost(self, start_state, action, end_state): 
        return 1
    
    def h1(self, node):
        """The misplaced tiles heuristic."""
        return hamming_distance(node.state, self.goal)
    
    def h2(self, node):
        """The Manhattan heuristic."""
        X = (0, 1, 2, 0, 1, 2, 0, 1, 2) #x coordinate of goal state 0,1,2,3,4,5,6,7,8
        Y = (0, 0, 0, 1, 1, 1, 2, 2, 2) #y coordinate of goal state 0,1,2,3,4,5,6,7,8
        return sum(abs(X[s] - X[g]) + abs(Y[s] - Y[g])
                   for (s, g) in zip(node.state, self.goal) if s != 0)
    
    def h(self, node): return self.h1(node) 
    
def hamming_distance(A, B):
    "Number of positions where vectors A and B are different."
    return sum(a != b for a, b in zip(A, B))
    

def inversions(board):
    "The number of times a piece is a smaller number than a following piece."
    return sum((a > b and a != 0 and b != 0) for (a, b) in combinations(board, 2))
    
    
def board8(board, fmt=(3 * '{} {} {}\n')):
    "A string representing an 8-puzzle board"
    return fmt.format(*board).replace('0', '_') # *board => treat board as 9 arguments

In [5]:
e1 = EightPuzzle((1, 4, 2, 0, 7, 5, 3, 6, 8))
e2 = EightPuzzle((1, 2, 3, 4, 5, 6, 7, 8, 0))
e3 = EightPuzzle((4, 0, 2, 5, 1, 3, 7, 8, 6))
e4 = EightPuzzle((7, 2, 4, 5, 0, 6, 8, 3, 1))
e5 = EightPuzzle((8, 6, 7, 2, 5, 4, 3, 0, 1))

# 1. Define the node object used in the tree structure
 
### Each node contains fields include:
* **state**
* **parent** node
* **action** performed on to the current node
* **path cost** from the initial node to the current node

In [6]:
class Node:
    "A Node in a search tree."
    def __init__(self, state, parent=None, action=None, path_cost=0): #here self defining the current node object
        self.__dict__.update(state=state, parent=parent, action=action, path_cost=path_cost)
    def __repr__(self): return '<{}>'.format(self.state) #print out format of node, <state name>
    def __len__(self): return 0 if self.parent is None else (1 + len(self.parent)) #depth of node
    def __lt__(self, other): return self.path_cost < other.path_cost #define the way of comparing two nodes.
    
    
failure = Node('failure', path_cost=math.inf) # Indicates an algorithm couldn't find a solution. state = 'failure'
cutoff  = Node('cutoff',  path_cost=math.inf) # Indicates iterative deepening search was cut off. state = 'cutoff
    
    

# 2. Define the object that used to maintain frontier

### There are different types of queneing function
* First in First out => **BFS** => deque 
* First in Last out => **DFS** => list
* Sort the elements w.r.t. different criteria => **UCS(Greedy search, A*)** => PriorityQueue

In [7]:
FIFOQueue = deque # A list-like sequence optimized for data accesses near its end points.
LIFOQueue = list #add to the right most, remove the right most

class PriorityQueue: #used in best first search
    """A queue in which the item with minimum f(item) is always popped first."""

    def __init__(self, items=(), key=lambda x: x): 
        self.key = key
        self.items = [] # a heap of (score, item) pairs
        for item in items:
            self.add(item)
         
    def add(self, item):
        """Add item to the queuez."""
        pair = (self.key(item), item) #pair is a tuple type
        heapq.heappush(self.items, pair) #put the node to the sorted priority queue at the correct location w.r.t. value of key(item)

    def pop(self):
        """Pop and return the item with min f(item) value."""
        return heapq.heappop(self.items)[1]
    
    def top(self): return self.items[0][1]

    def __len__(self): return len(self.items)
    

# 3. Fundamental actions 

* ### `Expand` : Ask a node for its children 
    1. Get node's state `s`
    2. Find the posible legal actions can be performed on the `s` with `problem.actions(s)`
    3. Find all the resulting states `s1` with `problem.result(s,action)`, and update their accumulated path cost `cost` by add `problem.action_cost(s, action, s1)`
    4. Generate children nodes with:
        * node state = s1
        * parent node = node
        * action = current action
        * path cost = up to now path cost
    
* ###  `Test` : Test a node to see whether it is a goal using `problem.is_goal(s)` 

In [8]:
def expand(problem, node):
    "Expand a node, generating the children nodes."
    s = node.state
    for action in problem.actions(s): #problem.actions(s) return all the possible actions on state
        s1 = problem.result(s, action)
        cost = node.path_cost + problem.action_cost(s, action, s1)
        yield Node(s1, node, action, cost) #(yield)can be viewed as generator, expand node will yield several nodes (neighbours/frontier) reachable from the current node

# 4. Seach algorithms
### General tree search
    frontier = Make-Queue(Make-Node(Initial-State[p])) 
    Loop do 
        If frontier is empty then return failure 
        node = Remove-Front(frontier) 
        If Goal-Test[p] on State(node) succeeds then return node 
        frontier = QUEUING-FN(frontier, (Expand(node, Actions[p]))
    End
    
### Implementation based on general search with different queueing functions

    * BFS (FIFOQueue)
    * DFS (LIFOQueue) 
    * UCS , Greedy and A* (PriorityQueue) 

In [9]:
def breadth_first_tree_search(problem): 
    "Search shallowest nodes in the search tree first."
    node = Node(problem.initial) # initialize the tree by put problem's initial state into a node object
    frontier = FIFOQueue([node]) # put the root node object into the frontier (fifo queue structure)
    while frontier: #if frontier is not empty
        node = frontier.pop() #remove and return the right most element => front
        if problem.is_goal(node.state): # test if the node contains the goal state
            return node # return the goal node and exit the while loop
        for child in expand(problem, node):
            frontier.appendleft(child) # add to the left most => tail
    return failure

def depth_first_tree_search(problem):
    "Search deepest nodes in the search tree first."
    node = Node(problem.initial)
    frontier = LIFOQueue([node])
    while frontier:
        node = frontier.pop() #remove the right most element
        if problem.is_goal(node.state):
            return node
        for child in expand(problem, node):
            frontier.append(child)
    return failure

def best_first_tree_search(problem, f):
    "A version of best_first_search without the `reached` table."
    node=Node(problem.initial)
    frontier = PriorityQueue([node], key=f)
    while frontier:
        node = frontier.pop()
        if problem.is_goal(node.state):
            return node
        for child in expand(problem, node):
            if not is_cycle(child):
                frontier.add(child)
    return failure

def g(n): return n.path_cost    

def is_cycle(node, k=30):
    "Does this node form a cycle of length k or less?"
    def find_cycle(ancestor, k):
        return (ancestor is not None and k > 0 and
                (ancestor.state == node.state or find_cycle(ancestor.parent, k - 1)))
    return find_cycle(node.parent, k)

def uniform_cost_search(problem):
    "Search nodes with minimum path cost first."
    return best_first_tree_search(problem, f=g)

def astar_tree_search(problem, h=None):
    """Search nodes with minimum f(n) = g(n) + h(n), with no `reached` table."""
    h = h or problem.h # if h=None, h = problem.h, otherwise, h=h
    return best_first_tree_search(problem, f=lambda n: g(n) + h(n)) #n is input, g(n)+h(n) is returned value of lambda function f 



# 5. Find out the solution by tracing back to the ROOT

In [10]:
def path_actions(node):
    "The sequence of actions to get to this node." #solution
    if node.parent is None:
        return []  
    return path_actions(node.parent) + [node.action] #list of actions from current node to root node

def path_states(node):
    "The sequence of states to get to this node."
    if node in (failure, None): 
        return []
    return path_states(node.parent) + [node.state] #list of states from current node to root node

## 6. Solve the two problems with different search algorithms

In [11]:
solu=breadth_first_tree_search(p1)
path_actions(solu)

[('Fill', 0), ('Fill', 2), ('Pour', 0, 1), ('Pour', 2, 1)]

In [12]:
for s in path_states(astar_tree_search(e1)):
    print(board8(s))

1 4 2
_ 7 5
3 6 8

1 4 2
3 7 5
_ 6 8

1 4 2
3 7 5
6 _ 8

1 4 2
3 _ 5
6 7 8

1 _ 2
3 4 5
6 7 8

_ 1 2
3 4 5
6 7 8

