##**Import Required Libraries**

- `heapq`: Provides priority queue for A Star Algorithm
- `queue`: Provides  queue for Breadth-First Search
- `graphviz`: Used for graph visualization, helpful for visualizing data structures.
- `uuid`: Generates universally unique identifiers (UUIDs).
- ...


In [115]:
import heapq
import copy
import graphviz as g
import uuid
import random
import queue
import math
import pickle
import os
import queue
from IPython.display import display

##**Problem**
The Problem class represents the task environment for an agent, defining the initial state, goal state(s), and methods to support environment formulation. It includes the following methods:

- **GetInitialState (`self`, `initial_state`)**: The starting state of the problem.
- **Goal states (`self`, `goal_states`)**: The set of states that satisfy the goal condition.
- **isGoalState (`self`, `state`)**: Determines whether a given state is a goal state or not.
- **getSuccessors (`self`, `state`)**: Returns possible next states from the current state.
- **getActionCost (`self`, `state`, `action`)**: Returns the cost of taking an action from a state.(For 8-puzzle problem the cost of action is 1)


In [116]:
class Problem:
    def __init__(self, initial_state, goal_states):
        self.initial_state = initial_state
        self.goal_states = goal_states

    def getInitialState(self):
        return self.initial_state

    def isGoalState(self, state):
        return state in self.goal_states

    def getSuccessors(self, state):
        node = Node(state)
        return node.getSuccessors()

    def getActionCost(self, state, action):
        node = Node(state)
        return node.getActionCost()

    def getGoalStates(self):
        return self.goal_states


##**State**

The `State` class represents a board configuration in a tile-based puzzle. It includes the following methods:
- **GetActions (`self`)**: List of possible actions of the current state.
- **GetActionCost (`self`, `state`, `action`)**: Get action cost
- **getSuccessor (`self`, `action`)**: Returns the resulting successor after applying the action
- **are_adjacent (`getSuccessors`)**, **auto_swap_if_adjacency(`self`)**: Implements logic for swap the tiles if they are in pair (1 and 3 or 2 and 4)



In [117]:
class State():
    def __init__(self, board):
        self.board = board

    def __eq__(self, other):
        return self.board == other.board

    def __hash__(self):
        return hash(str(self.board))

    def __str__(self):
        return str(self.board)

    def get_board(self):
        return self.board

    def set_board(self, board):
        self.board = board

    def get_blank_position(self):
        for i in range(len(self.get_board())):
            for j in range(len(self.get_board()[0])):
                if self.board[i][j] == 0:
                    return (i, j)

    def swap(self, pos1, pos2):
        i1, j1 = pos1
        i2, j2 = pos2
        self.board[i1][j1], self.board[i2][j2] = self.board[i2][j2], self.board[i1][j1]

    '''Get the possible actions of the current state'''
    def getActions(self):
        actions = []
        blank_position = self.get_blank_position()
        if blank_position[0] > 0:
            actions.append('U')
        if blank_position[0] < len(self.get_board()) - 1:
            actions.append('D')
        if blank_position[1] > 0:
            actions.append('L')
        if blank_position[1] < len(self.get_board()) - 1:
            actions.append('R')
        return actions

    '''Cost of action is 1'''
    def getActionCost(self, state, action):
        return 1

    '''Return successor with action'''
    def getSuccessor(self, action):
        blank_position = self.get_blank_position()
        new_state = copy.deepcopy(self)

        if 'U' == action:
            new_state.swap(blank_position, (blank_position[0] - 1, blank_position[1]))
        elif 'D' == action:
            new_state.swap(blank_position, (blank_position[0] + 1, blank_position[1]))
        elif 'L' == action:
            new_state.swap(blank_position, (blank_position[0], blank_position[1] - 1))
        elif 'R' == action:
            new_state.swap(blank_position, (blank_position[0], blank_position[1] + 1))


        new_state.auto_swap_if_adjacency()
        return new_state


    def are_adjacent(self, pos1, pos2):
        return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) == 1



    def auto_swap_if_adjacency(self):
        '''using dictionary to store position of each number in the board'''
        position = {}
        for i in range(len(self.get_board())):
            for j in range(len(self.get_board()[0])):
                position[self.board[i][j]] = (i, j)

        ''' swap if cell 1 and cell 3 are adjacent'''
        if self.are_adjacent(position[1], position[3]):
            self.swap(position[1], position[3])

        '''swap if cell 2 and cell 4 are adjacent'''
        if self.are_adjacent(position[2], position[4]):
            self.swap(position[2], position[4])


##**Generate random state**

In [118]:
'''Generate the random state '''
@staticmethod
def generate_random_state():
    numbers = list(range(9))
    random.shuffle(numbers)
    board = [numbers[i:i+3] for i in range(0, 9, 3)]
    return State(board)

## **Node**

The `Node` class represents a state in the search tree. Each node stores information about the state, the action taken to reach it, its parent, and heuristic values used for informed search algorithms like A*.It includes the following methods:

- **__str__(`self`)**: Returns a string representation of the board, including `G` and `H` values.
- **__lt__(`self`, `other`)**: Compares two nodes based on their `f(x) = G + H` value for A* search.
- **get_id(`self`)**: Generates a unique ID for visualization based on the state's hash.
- **getSuccessors(`self`)**: Generates and returns a list of successor nodes by applying all possible actions.
- **draw(`self`, `dot`, `color='black'`, `drawn_edges=None`)**: Visualizes the node in a graph structure using Graphviz, drawing edges to its `parent`.



In [119]:
class Node:
    def __init__(self, state, action = None, parent = None, G =None, H=None):
        self.state = state
        self.action = action
        self.parent = parent
        self.G = G
        self.H = H
        self.ID = str(self)

    ''''''
    def __str__(self):
        board_str = '\n'.join(''.join(str(num) if num != 0 else '_' for num in row) for row in self.get_state().get_board())
        return f"{board_str}\n g_value:{self.get_G()}\n h_value: {self.H}"

    def __lt__(self, other):
        """ Compare the f value for AStar """
        return (self.G + self.H) < (other.G + other.H)
    def get_state(self):
        return self.state

    def get_action(self):
        return self.action

    def get_G(self):
        return self.G

    def set_G(self, G):
        self.G = G

    def get_parent(self):
        return self.parent

    def get_id(self):
        "Using the ID based on state for drawing the graph"
        return str(hash(str(self.state)))

    """VIsualize the node in a graph structure using Graphviz"""
    def get_str_node(self):
        return str(self)

    def getActionCost(self):
        return 1

    def set_H(self, H):
        self.H = H

    def get_H(self):
        return self.H

    def set_parent(self, parent):
        self.parent = parent

    """Get the possible successors of the current node"""
    def getSuccessors(self):
        successors = []
        state_of_node = self.get_state()
        for action in state_of_node.getActions():
            new_state = state_of_node.getSuccessor(action)
            cost = new_state.getActionCost(state_of_node, action)
            successors.append(Node(new_state, action, self, cost))
        return successors

    '''Method for drawing the node and its edge'''
    def draw(self, dot, color='black', drawn_edges=None):
        dot.node(self.get_id(), self.get_str_node(), shape="square", color=color)
        if self.get_parent() is not None:
            dot.edge(self.get_parent().get_id(), self.get_id(), label=self.get_action(), color=color)


##**Heuristic**
The Heuristic class is a parent to details Heuristic class
- **getHeuristic(`self`, `state`, `goalStates`)**: Return the Heuristic value when apply into the A Star algorithm


In [120]:
class Heuristic:
    def __init__(self):
        pass
    def getHeuristic(self, state, goalStates):
        return 0

In [121]:
# trivial heuristic (for testing) make it change to ucs/bfs f(n) = g(n)
class Trivial(Heuristic):
    def __init__(self):
        pass
    def getHeuristic(self, state, goalStates):
        return 0

##**PaternDataBase**
The subclass of Heuristic that estimate the distance by build the real distance dictionary from goal to all states in states space, the value will be the real cost to goal
- **loadPatternDataBase(`self`, `save-dir`)**: Load the goal pattern databases
- **buildIntoFile(`self`, `save-dir`)**: Add the list of pattern database in files
- **getHeuristic(`self`, `state`, `goalStates`)**: Pattern database values of the nearest goal in pattern of `goalStates` to `state`
- **PatternDataBaseBuildAll(`self`, `goalStates`)**: List of pattern databases of goals
- **buildPDBFromGoal(`self`, `goal`)**: Build a pattern database from a goal



In [122]:
'''Inheritant from Heuristic
   Idea: Build the real distance from goal to every states in states space, Heuristic will be the real cost'''
class PatternDataBase(Heuristic):
    def __init__(self):
        self.pdbList = []
        self.ourChoice = None

    '''Load .pkl file of goals'''
    def loadPatternDataBase(self, save_dir):
      self.pdbList = []

      for i in range(1, 5):
          filename = f'DatabaseForGoal{i}.pkl'
          fullpath = os.path.join(save_dir, filename)

          with open(fullpath, 'rb') as f:
              data = pickle.load(f)
              self.pdbList.append(data)
          print(f"Loaded file: {fullpath}")

    '''Save the Pattern Database into files for using in the furture'''
    def buildIntoFile(self, save_dir):
        os.makedirs(save_dir, exist_ok=True)
        i = 1
        for goal_pdb in self.pdbList:
          filename = f'DatabaseForGoal{i}.pkl'
          fullpath = os.path.join(save_dir, filename)
          with open(fullpath, 'wb') as f:
            pickle.dump(goal_pdb, f)
          print(f"Saved file: {fullpath}")
          i += 1

    def getHeuristic(self, state, goalStates):
      if not self.pdbList:
          return 0

      state_key = tuple(num for row in state.get_board() for num in row)

      '''Choose the pattern of initial State (which pattern database it uses) in the first expanding'''
      if self.ourChoice is None:
          best_cost = math.inf

          for i, pdb_dict in enumerate(self.pdbList):
              cost = pdb_dict.get(state_key, math.inf)
              if cost < best_cost:
                  best_cost = cost
                  chosen_pdb = pdb_dict
          return best_cost

      '''The following nodes with initial will use the pattern database that initial state use'''
      h_val = self.ourChoice.get(state_key, math.inf)
      return h_val

    '''Create a list to append 4 goal pattern database '''
    def PatternDataBaseBuildAll(self, goalStates):
        pdb_list = []
        for goal in goalStates:
            pdb_g = self.buildPDBFromGoal(goal)
            pdb_list.append(pdb_g)
        return pdb_list

    '''Build a pattern database from a goal using BFS (tracking the path from goal to all states space)'''
    def buildPDBFromGoal(self, goal):
        pdb = {}
        Q = queue.Queue()

        goal_key = tuple(num for row in goal.get_board() for num in row)

        pdb[goal_key] = 0
        Q.put(goal)

        while not Q.empty():
            current_state = Q.get()
            current_cost_key = tuple(num for row in current_state.get_board() for num in row)
            current_cost = pdb[current_cost_key]

            current_node = Node(current_state)
            for successor in current_node.getSuccessors():
                succ_key = tuple(num for row in successor.get_state().get_board() for num in row)
                # if succ_key not in pdb:
                pdb[succ_key] = current_cost + successor.getActionCost()
                Q.put(successor.get_state())

        return pdb


##**Manhattan**
The subclass of Heuristic that estimate the distance by Manhattan distance (only accepting up, dow, left, right move in puzzle)
- **getHeuristic(`self`, `state`, `goalStates`)**: Minimum mahattan value from cells in `state` to cells in goals of `goalStates`



In [123]:
'''Additional Heuristic'''
class Manhantan(Heuristic):
    def __init__(self):
        pass
    def getHeuristic(self, state, goalStates):
        list_distance = []
        for goal in goalStates:
            board_goal = goal.get_board()
            goal_pos = {board_goal[i][j]: (i, j) for i in range(len(board_goal)) for j in range(len(board_goal))}
            distance = 0
            board_state = state.get_board()
            for i in range(len(board_state)):
                for j in range(len(board_state[0])):
                    tile = board_state[i][j]
                    if tile != 0:
                        gx, gy = goal_pos[tile]
                        distance += abs(gx - i) + abs(gy - j)
            list_distance.append(distance)
        return min(list_distance)

##**Missplace**
The subclass of Heuristic that estimate the distance by different location of cells from current states to goals
- **getHeuristic(`self`, `state`, `goalStates`)**:Minimum number of wrong position(or missplace) of cells in `states` with goals in `goalStates`




In [124]:
'''Additional Heuristic'''
class Misplace(Heuristic):
    def __init__(self):
        pass
    def getHeuristic(self, state, goalStates):
        min_misplaced = 9
        for goalState in goalStates:
            misplaced = 0
            board_state = state.get_board()
            board_goal = goalState.get_board()
            for i in range(len(board_state)):
                for j in range(len(board_state)):
                    if board_state[i][j] != board_goal[i][j]:
                        misplaced += 1
            min_misplaced = min(min_misplaced, misplaced)
        return min_misplaced

##**IgnoreSwap**
The subclass of Heuristic that estimate the distance by Manhattan distance (only accepting up, dow, left, right move in puzzle) and **skipping the 1,3 and 2,4 cells if they adjacent each other**
- **getHeuristic(`self`, `state`, `goalStates`)**: Minimum mahattan value from cells in `state` to cells in goals of `goalStates`



In [125]:
'''Inheritant from Heuristic
    Idea: Using Manhantan distance and Ingnore the Swap if 1,3 and 2,4 are adjacent to each other'''
class IgnoreSwap(Heuristic):
    def __init__(self):
        pass

    def getHeuristic(self, state, goalStates):
        board_state = state.get_board()
        size = len(board_state)

        costs = []

        for goalState in goalStates:
            board_goal = goalState.get_board()
            goal_pos = {
                board_goal[i][j]: (i, j)
                for i in range(size)
                for j in range(len(board_goal[0]))
                if board_goal[i][j] != 0
            }

            distances = {}
            positions = {}
            for i in range(size):
                for j in range(len(board_state[0])):
                    tile = board_state[i][j]
                    if tile != 0:
                        gx, gy = goal_pos[tile]
                        distances[tile] = abs(i - gx) + abs(j - gy)
                        positions[tile] = (i, j)

            if 1 in distances and 3 in distances:
                pos1 = next(((i, j) for i in range(size) for j in range(len(board_state[0])) if board_state[i][j] == 1), None)
                pos3 = next(((i, j) for i in range(size) for j in range(len(board_state[0])) if board_state[i][j] == 3), None)
                goal_pos1 = goal_pos[1]
                goal_pos3 = goal_pos[3]
                if (state.are_adjacent(positions[1], positions[3]) and (pos1 in [goal_pos1, goal_pos3] and pos3 in [goal_pos1, goal_pos3])) or (pos1 in [goal_pos1, goal_pos3] and pos3 in [goal_pos1, goal_pos3]) :
                    distances[1] = 0
                    distances[3] = 0

            if 2 in distances and 4 in distances:
                pos2 = next(((i, j) for i in range(size) for j in range(len(board_state[0])) if board_state[i][j] == 2), None)
                pos4 = next(((i, j) for i in range(size) for j in range(len(board_state[0])) if board_state[i][j] == 4), None)
                goal_pos2 = goal_pos[2]
                goal_pos4 = goal_pos[4]
                if (state.are_adjacent(positions[2], positions[4]) and (pos2 in [goal_pos2, goal_pos4] and pos4 in [goal_pos2, goal_pos4])) or (pos2 in [goal_pos2, goal_pos4] and pos4 in [goal_pos2, goal_pos4]):
                    distances[2] = 0
                    distances[4] = 0

            total_cost = sum(distances.values())
            costs.append(total_cost)

        return min(costs)


##**Search**  
Implements the **A* search algorithm** with heuristic evaluation and optional visualization. It includes the following methods:

- **AStar(`problem`)**: Performs **A* search** to find an optimal path from `problem`'s initial state to a goal state.  
  Expands nodes using `f(x) = G + H` and maintains a priority queue.  
  Returns the solution path and the number of expanded nodes.  

- **getPath(`node`)**: Reconstructs the solution path by tracing back from `node` to the initial state.  

- **Visual(`numberNodesDraw`)**: Visualizes the search process using Graphviz, displaying up to `numberNodesDraw` nodes.  


In [126]:
class Search:
    def __init__(self,heuristic, numberNodesDraw = None, graph_search = True):
        self.heuristic = heuristic
        self.graph = None
        self.numberNodesDraw = numberNodesDraw if numberNodesDraw is not None else 0
        self.uniqueId = str(uuid.uuid4())[:8] #Id for visual graph
        self.graph_search = graph_search

    '''Graph Search AStar Algorithm '''
    def AStar(self, problem):
        if self.numberNodesDraw:
            self.graph = self.graph = g.Digraph(name=f"Graph_{self.uniqueId}")
        expandedNodes = 0

        initial_state = problem.getInitialState()
        if problem.isGoalState(initial_state):
            return [], expandedNodes

        initial_H  = self.heuristic.getHeuristic(initial_state, problem.getGoalStates())
        initial_node = Node(initial_state, H= initial_H, G = 0)

        frontier = []
        heapq.heappush(frontier, (0, initial_node))

        costSoFar = {initial_node.get_state() : 0}

        while frontier:
            g_value, node = heapq.heappop(frontier)
            expandedNodes += 1


            '''Only draw nodes up to the allowed limit; the algorithm still runs normally.'''
            if self.numberNodesDraw is not None and expandedNodes <= self.numberNodesDraw:
                node.draw(self.graph)

            if problem.isGoalState(node.get_state()):
                return self.getPath(node), expandedNodes

            for successor in node.getSuccessors():
                stateOfSuccessor = successor.get_state()
                g_value = node.get_G() + successor.getActionCost()
                h_value = self.heuristic.getHeuristic(stateOfSuccessor, problem.getGoalStates())

                if self.graph_search:
                  '''No need to compute f(x) = g(x) + h(x) explicitly as __lt__ is defined in Node.
                      GraphSearch: Check the visited nodes '''
                  if stateOfSuccessor not in costSoFar or g_value < costSoFar[stateOfSuccessor]:
                      costSoFar[stateOfSuccessor] = g_value
                      successor.set_G(g_value)
                      successor.set_H(h_value)
                      successor.set_parent(node)
                      heapq.heappush(frontier, (g_value, successor))
                else:
                    """ Tree search: Not check the visited nodes """
                    successor.set_G(g_value)
                    successor.set_H(h_value)
                    successor.set_parent(node)
                    heapq.heappush(frontier, (g_value, successor))

        return None, None


    def getPath(self, node):
        path = []
        while node:
            path.append(node)
            if isinstance(node, Node):
                node = node.get_parent()
        return path[::-1]

    '''Visualize the AStar Algorithm'''
    def Visual(self, numberNodesDraw, problem):
        self.numberNodesDraw = numberNodesDraw
        path_nodes,_ = self.AStar(problem)
        display(g.Source(self.graph.source))


##**Demo**
The class for demo the all the task. **Using Manhattan and Pattern Database**
- **main()**: run the demo with 3 option:
  + Visuals by graph of AStar with 2 Heuristic with determined number of nodes.
  + Evaluates the average path cost of AStar with 2 heuristic.
  + Detail path cost and heuristic value each step of AStar with 2 heuristic.

In [127]:
class Demo:
    def __init__(self, goal_states, pdb_path=None):
        self.goal_states = goal_states
        self.pdb_path = pdb_path

        # initialize 2 heuristics
        self.heuristics = {
            "Pattern Database": PatternDataBase(),
            "Manhattan": Manhantan()
        }

        # load Pattern Database if directory is provided
        if self.pdb_path:
            self.heuristics["Pattern Database"].loadPatternDataBase(self.pdb_path)

    def run_menu(self):
        print("=== SELECT MODE ===")
        print("1 - Run A* for a random initial state and visualize")
        print("2 - Evaluate heuristics with multiple random states")
        choice = input("Enter your choice (1 or 2): ")

        if choice == '1':
            self.run_single_case()
        elif choice == '2':
            self.run_experiment()
        else:
            print("Invalid choice!")

    def run_single_case(self):
        initial_state = generate_random_state()
        problem = Problem(initial_state, self.goal_states)

        print("\nInitial State:")
        print(initial_state)

        try:
            n = int(input("Enter the number of nodes to visualize in the search tree: "))
        except:
            n = 10
            print("Invalid input, defaulting to n = 10")

        for name, heuristic in self.heuristics.items():
            print(f"\n--- Heuristic: {name} ---")
            search = Search(heuristic, numberNodesDraw=n)
            path, expanded = search.AStar(problem)

            if path:
                print("Path:")
                for node in path:
                    if node.get_action() is not None:
                        print(node.get_action(), end=',')
                print("\nTotal cost:", len(path) - 1)
                print("Expanded nodes:", expanded)
                search.Visual(n, problem)
            else:
                print("No solution found.")

    def run_experiment(self):
        try:
            n = int(input("How many random initial states do you want to test? "))
        except:
            print("Invalid input, defaulting to n = 10")
            n = 10

        total_costs = {name: 0 for name in self.heuristics}
        success_counts = {name: 0 for name in self.heuristics}

        for i in range(n):
            print(f"\n=== Test {i+1}/{n} ===")
            initial_state = generate_random_state()
            problem = Problem(initial_state, self.goal_states)
            print("Initial State:")
            print(initial_state)

            for name, heuristic in self.heuristics.items():
                search = Search(heuristic)
                path, expanded = search.AStar(problem)

                if path:
                    cost = len(path) - 1
                    total_costs[name] += cost
                    success_counts[name] += 1
                    print(f"[{name}] --> Cost: {cost}, Expanded Nodes: {expanded}")
                else:
                    print(f"[{name}] --> No solution found.")

        print("\n=== AVERAGE RESULTS ===")
        for name in self.heuristics:
            count = success_counts[name]
            if count > 0:
                avg_cost = total_costs[name] / count
                print(f"{name}: Average cost = {avg_cost:.2f} (solved {count}/{n} cases)")
            else:
                print(f"{name}: No solutions found.")


In [135]:
if __name__ == "__main__":
    # Define the 4 goal states
    goal_states = [
        State([[1, 2, 3], [4, 5, 6], [7, 8, 0]]),
        State([[8, 7, 6], [5, 4, 3], [2, 1, 0]]),
        State([[1, 2, 0], [3, 4, 5], [6, 7, 8]]),
        State([[8, 7, 0], [6, 5, 4], [3, 2, 1]])
    ]

    # path to the folder containing Pattern Database .pkl files
    pdb_path = "./"

    random.seed(42) # co dinh seed de test

    demo = Demo(goal_states, pdb_path=pdb_path)
    demo.run_menu()


Loaded file: ./DatabaseForGoal1.pkl
Loaded file: ./DatabaseForGoal2.pkl
Loaded file: ./DatabaseForGoal3.pkl
Loaded file: ./DatabaseForGoal4.pkl
=== SELECT MODE ===
1 - Run A* for a random initial state and visualize
2 - Evaluate heuristics with multiple random states
Enter your choice (1 or 2): 2
How many random initial states do you want to test? 10

=== Test 1/10 ===
Initial State:
[[3, 6, 7], [4, 8, 2], [5, 0, 1]]
[Pattern Database] --> Cost: 15, Expanded Nodes: 8220
[Manhattan] --> Cost: 15, Expanded Nodes: 8220

=== Test 2/10 ===
Initial State:
[[7, 2, 5], [6, 0, 3], [4, 1, 8]]
[Pattern Database] --> Cost: 18, Expanded Nodes: 41229
[Manhattan] --> Cost: 18, Expanded Nodes: 41227

=== Test 3/10 ===
Initial State:
[[2, 5, 7], [8, 6, 1], [4, 0, 3]]
[Pattern Database] --> Cost: 13, Expanded Nodes: 3599
[Manhattan] --> Cost: 13, Expanded Nodes: 3599

=== Test 4/10 ===
Initial State:
[[8, 7, 5], [2, 3, 1], [6, 0, 4]]
[Pattern Database] --> Cost: 19, Expanded Nodes: 53532
[Manhattan] -->