In [3]:
import math
from collections import defaultdict, deque, Counter
from queue import PriorityQueue,Queue
FIFOQueue = deque
LIFOQueue = list

class Node:
    "A Node in a search tree."
    def __init__(self, state, parent=None, action=None, path_cost=0):
        self.__dict__.update(state=state, parent=parent, action=action, path_cost=path_cost)


    def __len__(self): return 0 if self.parent is None else (1 + len(self.parent))
    def __lt__(self, other): return self.path_cost < other.path_cost

    
    
failure = Node('failure', path_cost=math.inf) # Indicates an algorithm couldn't find a solution.
cutoff  = Node('cutoff',  path_cost=math.inf) # Indicates iterative deepening search was cut off.
    
    
def expand(problem, node):
    """"Expand a node, generating the children nodes.
    function EXPAND( node, problem) 
        '''returns a set of nodes'''
    successors ← the empty set
    for each 〈action, result〉 in SUCCESSOR-FN[ problem](STATE[node]) do
    s ← a new NODE
    STATE[s] ← result
    PARENT-NODE[s] ← node
    ACTION[s] ← action
    PATH-COST[s] ← PATH-COST[node] + STEP-COST(node, action, s)
    DEPTH[s] ← DEPTH[node] + 1
    add s to successors
    return successors
    """
    s = node.state
    for action in problem.actions(s):
        s1 = problem.result(s, action)
        cost = node.path_cost + problem.action_cost(s, action, s1)
        yield Node(s1, node, action, cost)
        

In [4]:
import pandas as pd

# Map data
edges_data = {
    'Start_Node': ['O', 'O', 'A', 'A', 'A', 'L', 'L', 'D', 'C', 'C', 'C', 'R', 'F', 'B', 'B', 'B', 'B', 'H', 'E', 'U', 'I', 'I', 'P'],
    'End_Node': ['Z', 'S', 'Z', 'S', 'T', 'T', 'M', 'M', 'D', 'R', 'P', 'S', 'S', 'F', 'P', 'G', 'U', 'U', 'H', 'V', 'V', 'N', 'R'],
    'Cost': [71, 151, 75, 140, 118, 111, 70, 75, 120, 146, 138, 80, 99, 211, 101, 90, 85, 98, 86, 142, 92, 87, 97]
}

edges_df = pd.DataFrame(edges_data)

print("Edges DataFrame:")
print(edges_df)



Edges DataFrame:
   Start_Node End_Node  Cost
0           O        Z    71
1           O        S   151
2           A        Z    75
3           A        S   140
4           A        T   118
5           L        T   111
6           L        M    70
7           D        M    75
8           C        D   120
9           C        R   146
10          C        P   138
11          R        S    80
12          F        S    99
13          B        F   211
14          B        P   101
15          B        G    90
16          B        U    85
17          H        U    98
18          E        H    86
19          U        V   142
20          I        V    92
21          I        N    87
22          P        R    97


In [45]:
# The pandas version

class Map:
    """Bản đồ của các địa điểm: một đồ thị với các đỉnh và liên kết giữa chúng. 
    Trong `Map(links, locations)`, `links` là DataFrame có cột 'Start_Node', 'End_Node' và 'Cost'. 
    Nếu `directed=False` thì đối với mỗi liên kết (v1, v2), chúng ta thêm một liên kết (v2, v1)."""

    def __init__(self, links, directed=False):
        if isinstance(links, pd.DataFrame):
            links_dict = {(row['Start_Node'], row['End_Node']): row['Cost'] for _, row in links.iterrows()}  
        if not directed:
            links_dict = {**links_dict, **{(v2, v1): cost for (v1, v2), cost in links_dict.items()}}
            
        
        self.distances = links_dict
        self.neighbors = self.create_neighbors(links_dict)
        
            
    def create_neighbors(self, links):
        """Given (key, val) pairs, make a dict of {key: [val,...]}."""
        result = defaultdict(list)
        for (v1, v2) ,_ in links.items():
            result[v1].append(v2)       
        return result


In [46]:
map_object = Map(edges_df,False)

In [27]:
class Map:
    """Bản đồ của các địa điểm : một đồ thị với các đỉnh và liên kết giữa chúng. 
    Trong `Map(links, locations)`, `links` có thể là cặp [(v1, v2)...], 
    hoặc một từ điển {(v1, v2): khoảng cách...}. Tham số tùy chọn `locations` có thể là {v1: (x, y)} 
    Nếu `directed=False` thì đối với mỗi liên kết (v1, v2), chúng ta thêm một liên kết (v2, v1)."""


    def __init__(self, links, locations=None, directed=False):
        if not hasattr(links, 'items'): # Distances are 1 by default
            links = {link: 1 for link in links}
        if not directed:
            for (v1, v2) in list(links):
                links[v2, v1] = links[v1, v2]
        self.distances = links
        self.neighbors = multimap(links)
        self.locations = locations or defaultdict(lambda: (0, 0))
        print(self.neighbors)
        
def multimap(pairs) -> dict:
    "Given (key, val) pairs, make a dict of {key: [val,...]}."
    result = defaultdict(list)
    for key, val in pairs:
        result[key].append(val)
    return result

In [43]:
class RomaniaProblem:
    """A problem to find a route between locations on a `Map`.
    Create a problem with RomaniaProblem(start, goal, map=Map(...)})."""

    def __init__(self, initial=None, goal=None, map=None):
        self.initial = initial
        self.goal = goal
        self.map = map

    def actions(self, state):
        """The places neighboring `state`."""
        return self.map.neighbors[state]

    def result(self, state, action):
        """Go to the `action` place, if it possible."""
        return action if action in self.map.neighbors[state] else state

    def is_goal(self, state):
        """Check if the state is the goal state."""
        return state == self.goal

    def action_cost(self, s, action, s1):
        """The distance (cost) to go from s to s1."""
        return self.map.distances[s, s1]


    def __str__(self):
        return '{}({!r}, {!r})'.format(type(self).__name__, self.initial, self.goal)

    

In [None]:
romania = Map(
    {('O', 'Z'):  71, ('O', 'S'): 151, ('A', 'Z'): 75, ('A', 'S'): 140, ('A', 'T'): 118, 
     ('L', 'T'): 111, ('L', 'M'):  70, ('D', 'M'): 75, ('C', 'D'): 120, ('C', 'R'): 146, 
     ('C', 'P'): 138, ('R', 'S'):  80, ('F', 'S'): 99, ('B', 'F'): 211, ('B', 'P'): 101, 
     ('B', 'G'):  90, ('B', 'U'):  85, ('H', 'U'): 98, ('E', 'H'):  86, ('U', 'V'): 142, 
     ('I', 'V'):  92, ('I', 'N'):  87, ('P', 'R'): 97},
    None)

In [47]:
problem = RomaniaProblem('A', 'B', map=map_object)

# BFS

In [11]:
def breadth_first_search(problem):
    "Search shallowest nodes in the search tree first."
    node = Node(problem.initial)
    if problem.is_goal(problem.initial):
        return node
    frontier = Queue()
    frontier.put(node)
    reached = {problem.initial}
    while not frontier.empty():
        node = frontier.get()
        for child in expand(problem, node):
            s = child.state
            if problem.is_goal(s):
                return child
            if s not in reached:
                reached.add(s)
                frontier.put(child)
    return failure

In [48]:
solution_node = breadth_first_search(problem)

if solution_node:
    path = []
    while solution_node:
        path.insert(0, solution_node.state)
        solution_node = solution_node.parent
    print("Solution Path from {}: {}".format(problem.initial, ' -> '.join(path)))
else:
    print("No solution found.")


Solution Path from A: S -> F -> B


# Uniform

In [13]:
from queue import PriorityQueue

def uniform_cost_search(problem):
    "Dijkstra search nodes with minimum path cost first."
    frontier = PriorityQueue()
    initial_node = Node(problem.initial)
    frontier.put((0, initial_node))
    reached = {problem.initial: initial_node}

    while not frontier.empty():
        _, node = frontier.get()

        if problem.is_goal(node.state):
            return node

        for child in expand(problem, node):
            s = child.state
            if s not in reached or child.path_cost < reached[s].path_cost:
                reached[s] = child
                frontier.put((child.path_cost, child))

    return failure


In [14]:
solution_node = uniform_cost_search(problem)

if solution_node:
    path = []
    while solution_node:
        path.insert(0, solution_node.state)
        solution_node = solution_node.parent
    print("Solution Path from {}: {}".format(problem.initial, ' -> '.join(path)))
else:
    print("No solution found.")

Solution Path from A: S -> R -> P -> B


# DFS

In [15]:
from queue import LifoQueue
## well, it's different so i try list :D because list is stack in this case
def depth_first_search(problem):
    "Search nodes with a depth-first strategy."
    frontier = LifoQueue()
    initial_node = Node(problem.initial)
    frontier.put(initial_node)
    reached = {problem.initial}

    while not frontier.empty():
        node = frontier.get()

        if problem.is_goal(node.state):
            return node

        children = list(expand(problem, node))
        for child in reversed(children):
            s = child.state
            if s not in reached:
                reached.add(s)
                frontier.put(child)

    return failure



In [16]:
def depth_first_search(problem):
    "Search deepest nodes in the search tree first."
    frontier = LIFOQueue([Node(problem.initial)])
    result = failure
    while frontier:
        node = frontier.pop()
        if problem.is_goal(node.state):
            return node

        for child in expand(problem, node):
            frontier.append(child)
    return result

In [17]:
solution_node = depth_first_search(problem)

if solution_node:
    path = []
    while solution_node:
        path.insert(0, solution_node.state)
        solution_node = solution_node.parent
    print("Solution Path from {} : {}".format(problem.initial, ' -> '.join(path)))
else:
    print("No solution found.")

Solution Path from A : T -> L -> M -> D -> C -> P -> B


# DLS

In [18]:
def is_cycle(node, k=10):
    "Does this node form a cycle of length k or less?"
    def find_cycle(origin, k):
        return (origin is not None and k > 0 and
                (origin.state == node.state or find_cycle(origin.parent, k - 1)))
    return find_cycle(node.parent, k)

In [19]:
def depth_limited_search(problem, limit=1):
    "Search deepest nodes in the search tree first."
    frontier = LIFOQueue([Node(problem.initial)])
    result = failure
    while frontier:
        node = frontier.pop()
        if problem.is_goal(node.state):
            return node
        elif len(node) >= limit:
            result = cutoff
        elif not is_cycle(node):
            for child in expand(problem, node):
                frontier.append(child)
    return result

In [20]:
solution_node = depth_limited_search(problem, limit = 1) # < 10 nice

if solution_node:
    path = []
    while solution_node:
        path.insert(0, solution_node.state)
        solution_node = solution_node.parent
    print("Solution Path from {} : {}".format(problem.initial, ' -> '.join(path)))
else:
    print("No solution found.")

No solution found.


# IDS (Iterative Deepening Search)

pseudo :  
    depth 0 - > inf 
    res = DLS(prob,depth)
    if res != cutoff :
        result


In [21]:
import numpy as np
def iterative_deepening_search(problem):
    "Do depth-limited search with increasing depth limits."
    for limit in range(1, 999999999999999):
        result = depth_limited_search(problem, limit)
        if result != cutoff:
            return result

In [22]:
solution_node = iterative_deepening_search(problem) # < 10 nice

if solution_node:
    path = []
    while solution_node:
        path.insert(0, solution_node.state)
        solution_node = solution_node.parent
    print("Solution Path from {} : {}".format(problem.initial, ' -> '.join(path)))
else:
    print("No solution found.")

Solution Path from A : S -> F -> B


# Bidi Search 


start from two side -> use BFS to search till come meet each other ?

In [33]:
import copy

class CountCalls:
    """Delegate all attribute gets to the object, and count them in ._counts"""
    def __init__(self, obj):
        self._object = obj
        self._counts = Counter()
        
    def __getattr__(self, attr):
        "Delegate to the original object, after incrementing a counter."
        self._counts[attr] += 1
        return getattr(self._object, attr)

def path_cost(n): return n.path_cost
def inverse_problem(problem):
    if isinstance(problem, CountCalls):
        return CountCalls(inverse_problem(problem._object))
    else:
        inv = copy.copy(problem)
        inv.initial, inv.goal = inv.goal, inv.initial
        return inv

In [34]:
"""Additional function for solving Bidi Search"""
def join_nodes(nf, nb):
    """Join the reverse of the backward node nb to the forward node nf."""
    #print('join', S(nf), S(nb))
    join = nf
    while nb.parent is not None:
        cost = join.path_cost + nb.path_cost - nb.parent.path_cost
        join = Node(nb.parent.state, join, nb.action, cost)
        nb = nb.parent
        #print('  now join', S(join), 'with nb', S(nb), 'parent', S(nb.parent))
    return join
def path_states(node):
    "The sequence of states to get to this node."
    if node in (cutoff, failure, None): 
        return []
    return path_states(node.parent) + [node.state]

def proceed(direction, problem, frontier, reached, reached2, solution):
    node = frontier.get()
    for child in expand(problem, node):
        s = child.state
        #print('proceed', direction, path_states(child))
        if s not in reached or child.path_cost < reached[s].path_cost:
            frontier.put(child)
            reached[s] = child
            if s in reached2: # Frontiers collide; solution found
                solution2 = (join_nodes(child, reached2[s]) if direction == 'f' else
                             join_nodes(reached2[s], child))
                if solution2.path_cost < solution.path_cost:
                    solution = solution2
    return solution
def terminated(solution, frontier_f, frontier_b):
        n_f, n_b = frontier_f.queue[0], frontier_b.queue[0]
        return path_cost(n_f) + path_cost(n_b) > path_cost(solution)

In [35]:

from queue import PriorityQueue

def bidirectional_best_first_search(problem_f, f_f, problem_b, f_b, terminated):
    """
    BIBF-SEARCH( problemF , fF , problemB , fB ) returns a solution node, or failure
    
    nodeF ← NODE(problemF.INITIAL)  # Node for a start state
    
    nodeB ← NODE(problemB.INITIAL)  # Node for a goal state
    
    frontierF ← a priority queue ordered by fF, with nodeF as an element
    
    frontierB ← a priority queue ordered by fB, with nodeB as an element
    
    reachedF ← a lookup table, with one key nodeF.STATE and value nodeF
    
    reachedB ← a lookup table, with one key nodeB.STATE and value nodeB
    
    solution ← failure

    while not TERMINATED(solution, frontierF, frontierB):
        if fF(TOP(frontierF)) < fB(TOP(frontierB)):
            solution = PROCEED('f', problemF, frontierF, reachedF, reachedB, solution)
        else:
            solution = PROCEED('b', problemB, frontierB, reachedB, reachedF, solution)

    return solution"""
    node_f = Node(problem_f.initial)
    node_b = Node(problem_f.goal)
    frontier_f = PriorityQueue()
    frontier_b = PriorityQueue()
    frontier_f.put(node_f)
    frontier_b.put(node_b)
    reached_f = {problem_f.initial: node_f}
    reached_b = {problem_b.initial: node_b}
    solution = failure

    while frontier_f and frontier_b and not terminated(solution, frontier_f, frontier_b):
        def S1(node, f):
            return str(int(f(node))) + ' ' + str(path_states(node))
        #print('Bi:', S1(frontier_f.queue[0], f_f), S1(frontier_b.queue[0], f_b))
        if f_f(frontier_f.queue[0]) < f_b(frontier_b.queue[0]):
            solution = proceed('f', problem_f, frontier_f, reached_f, reached_b, solution)
        else:
            solution = proceed('b', problem_b, frontier_b, reached_b, reached_f, solution)
    return solution
def bidi_search(problem):
    return bidirectional_best_first_search(problem, path_cost, inverse_problem(problem), path_cost, terminated)



In [36]:
solution_node = bidi_search(problem)

if solution_node:
    path = []
    while solution_node:
        path.insert(0, solution_node.state)
        solution_node = solution_node.parent
    print("Solution Path from {} : {}".format(problem.initial, ' -> '.join(path)))
else:
    print("No solution found.")

Solution Path from A : S -> R -> P -> B


In [39]:
def report(searchers, problems):
    """Summary report"""
    for searcher in searchers:
        print(searcher.__name__ + ':')
        total_counts = Counter()
        for p in problems:
            prob   = CountCalls(p)
            soln   = searcher(prob)
            counts = prob._counts; 
            counts.update(actions=len(soln), cost=soln.path_cost)
            total_counts += counts
            report_counts(counts, str(p)[:40])
        
def report_counts(counts, name):
    """Print one line of the counts report."""
    print('{} | {:9,d} nodes |{:9,d} goal |{:5.0f} cost |{:8,d} actions | {}'.format("Sucess" if counts['cost'] != math.inf else "Fail",
          counts['result'], counts['is_goal'], counts['cost'], counts['actions'], name))

In [40]:
report([breadth_first_search,uniform_cost_search,depth_first_search,depth_limited_search,iterative_deepening_search,bidi_search], [problem])

breadth_first_search:
Sucess |        18 nodes |       19 goal |  450 cost |      10 actions | RomaniaProblem('A', 'B')
uniform_cost_search:
Sucess |        30 nodes |       13 goal |  418 cost |      16 actions | RomaniaProblem('A', 'B')
depth_first_search:
Sucess |        17 nodes |        8 goal |  733 cost |      14 actions | RomaniaProblem('A', 'B')
depth_limited_search:
Fail |         3 nodes |        4 goal |  inf cost |       1 actions | RomaniaProblem('A', 'B')
iterative_deepening_search:
Sucess |        27 nodes |       25 goal |  450 cost |      13 actions | RomaniaProblem('A', 'B')
bidi_search:
Sucess |        13 nodes |        0 goal |  418 cost |       9 actions | RomaniaProblem('A', 'B')
