Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [49]:
from collections import namedtuple
from random import choice
from tqdm.auto import tqdm
import numpy as np
from collections import deque
import heapq
import time

In [50]:
PUZZLE_DIM = 3
action = namedtuple('Action', ['pos1', 'pos2'])

In [51]:
def available_actions(state: np.ndarray) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions



def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state

In [52]:
def goal_test(state):
    return all(state.flatten() == list(range(1, PUZZLE_DIM**2)) + [0])

In [53]:
# Depth-First Search with Bound
def dfs_bound(state, bound, path, visited):
    states_evaluated = 1  

    if goal_test(state):  
        return path, state, states_evaluated

    if len(path) > bound: 
        return None, None, states_evaluated

    visited.add(state.tobytes())  

    for action in available_actions(state):  
        new_state = do_action(state, action)
        if new_state.tobytes() not in visited:  
            result, final_state, sub_states_evaluated = dfs_bound(new_state, bound, path + [action], visited)
            states_evaluated += sub_states_evaluated  
            if result:  
                return result, final_state, states_evaluated

    visited.remove(state.tobytes())  
    return None, None, states_evaluated  

In [54]:
# Depth-First is Not-Complete without Iterative Deepening
def iterative_deepening_dfs(state, max_depth=60):
    total_states_evaluated  = 0  

    for depth in range(1, max_depth + 1):  
            visited = set()
            print(f"Searching with depth limit: {depth}")
            result, final_state, states_evaluated = dfs_bound(state, depth, [], visited)
            total_states_evaluated += states_evaluated  
            if result:  
                quality = len(result)  
                return final_state, result, quality, total_states_evaluated

    return None, None, None, total_states_evaluated  

In [55]:
# Breadth-First Search
def bfs(state):
    frontier = deque([(state, [])])
    visited = set()
    visited.add(state.tobytes())
    states_evaluated = 0
    while frontier:
        current_state, path = frontier.popleft()
        states_evaluated += 1

        if goal_test(current_state):
            quality = len(path)
            cost = states_evaluated
            return current_state, path, quality, cost # final state

        for action in available_actions(current_state):
            new_state = do_action(current_state, action)
            if new_state.tobytes() not in visited:
                visited.add(new_state.tobytes())
                frontier.append((new_state, path + [action]))
                
    return None, None, None, None

In [56]:
# Hamming distance heuristic for A* search
def h_hamming(state):
    goal = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
    return np.sum(state != goal) - 1  

In [57]:
# Manhattan distance heuristic for A* search
def h_manhattan(state):
    goal = {val: (i // PUZZLE_DIM, i % PUZZLE_DIM) for i, val in enumerate(range(1, PUZZLE_DIM**2))}
    goal[0] = (PUZZLE_DIM - 1, PUZZLE_DIM - 1)
    distance = 0
    for x in range(PUZZLE_DIM):
        for y in range(PUZZLE_DIM):
            val = state[x, y]
            gx, gy = goal[val]
            distance += abs(x - gx) + abs(y - gy)
    return distance

In [58]:
# Linear Conflict heuristic for A* search
def h_linear_conflict(state):
    goal_positions = {val: (i // PUZZLE_DIM, i % PUZZLE_DIM) for i, val in enumerate(range(1, PUZZLE_DIM**2))}
    goal_positions[0] = (PUZZLE_DIM - 1, PUZZLE_DIM - 1)  
    manhattan_distance = 0
    linear_conflict = 0

    for x in range(PUZZLE_DIM):
        for y in range(PUZZLE_DIM):
            tile = state[x, y]
            if tile == 0:
                continue
            goal_x, goal_y = goal_positions[tile]

            manhattan_distance += abs(x - goal_x) + abs(y - goal_y)

            # conflicts in rows
            if x == goal_x:  
                for k in range(y + 1, PUZZLE_DIM):
                    conflicting_tile = state[x, k]
                    if conflicting_tile != 0 and goal_positions[conflicting_tile][0] == x and goal_positions[conflicting_tile][1] < goal_y:
                        linear_conflict += 1

            # conflicts in columns
            if y == goal_y:  
                for k in range(x + 1, PUZZLE_DIM):
                    conflicting_tile = state[k, y]
                    if conflicting_tile != 0 and goal_positions[conflicting_tile][1] == y and goal_positions[conflicting_tile][0] < goal_x:
                        linear_conflict += 1

    # each conflict adds 2 moves to the Manhattan distance
    return manhattan_distance + 2 * linear_conflict

In [59]:
# A* Search
def a_star(state, heuristic):
    frontier = []
    heapq.heappush(frontier, (heuristic(state), 0, state.tobytes(), []))  
    visited = set()
    states_evaluated = 0
    while frontier:
        _, cost, state_bytes, path = heapq.heappop(frontier) 
        current_state = np.frombuffer(state_bytes, dtype=int).reshape((PUZZLE_DIM, PUZZLE_DIM))  
        states_evaluated += 1
        
        if goal_test(current_state):  
            quality = len(path)
            return current_state, path, quality, states_evaluated # final state

        visited.add(state_bytes)  # visited state

        for action in available_actions(current_state):  # exploring neighbors
            new_state = do_action(current_state, action)
            new_state_bytes = new_state.tobytes() 

            if new_state_bytes not in visited:  # avoid revisiting states
                new_cost = cost + 1  # g = g + 1
                priority = new_cost + heuristic(new_state)  # f = g + h
                heapq.heappush(frontier, (priority, new_cost, new_state_bytes, path + [action])) 

    return None, None, None, None

In [60]:
RANDOMIZE_STEPS = 100_000

state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
    state = do_action(state, choice(available_actions(state)))

print("Initial state:")
print(state)
print()
print("A* Search with Linear Conflict")
final_state_a_star_linear_conflict, path_a_star_linear_conflict, quality_a_star_linear_conflict, cost_a_star_linear_conflict = a_star(state, h_linear_conflict)
if path_a_star_linear_conflict: 
    print(f"Steps (quality): {quality_a_star_linear_conflict}, Total states evaluated (cost): {cost_a_star_linear_conflict}, Final state: {final_state_a_star_linear_conflict}, A* solution: {path_a_star_linear_conflict}")
else:
    print("A* linear conflict solution not found")

print()
print("A* Search with Manhattan Distance")
final_state_a_star_manhattan, path_a_star_manhattan, quality_a_star_manhattan, cost_a_star_manhattan = a_star(state, h_manhattan)
if path_a_star_manhattan:
    print(f"Steps (quality): {quality_a_star_manhattan}, Total states evaluated (cost): {cost_a_star_manhattan}, Final state: {final_state_a_star_manhattan}, A* solution: {path_a_star_manhattan}")
else:
    print("A* manhattan solution not found")

print()
print("A* Search with Hamming Distance")
final_state_a_star_hamming, path_a_star_hamming, quality_a_star_hamming, cost_a_star_hamming = a_star(state, h_hamming)
if path_a_star_hamming:
   print(f"Steps (quality): {quality_a_star_hamming}, Total states evaluated (cost): {cost_a_star_hamming}, Final state: {final_state_a_star_hamming}, A* solution: {path_a_star_hamming}")
else:
   print("A* hamming solution not found")

print()
print("Breadth-First Search")
final_state_bfs, path_bfs, quality_bfs, cost_bfs = bfs(state)
if path_bfs:
    print(f"Steps (quality): {quality_bfs}, Total states evaluated (cost): {cost_bfs}, Final state: {final_state_bfs}, BFS solution: {path_bfs}")
else:
    print("BFS solution not found")

print()
print("Depth-First Search with Bound")
final_state_dfs, path_dfs, quality_dfs, cost_dfs = iterative_deepening_dfs(state)
if path_dfs:   
    print(f"Steps (quality): {quality_dfs}, Total states evaluated (cost): {cost_dfs}, Final state: {final_state_dfs}, DFS solution: {path_dfs}")
else:
    print("DFS solution not found")

print()


Randomizing:   0%|          | 0/100000 [00:00<?, ?it/s]

Initial state:
[[4 1 8]
 [7 6 3]
 [0 2 5]]

A* Search with Linear Conflict
Steps (quality): 16, Total states evaluated (cost): 52, Final state: [[1 2 3]
 [4 5 6]
 [7 8 0]], A* solution: [Action(pos1=(2, 0), pos2=(1, 0)), Action(pos1=(1, 0), pos2=(0, 0)), Action(pos1=(0, 0), pos2=(0, 1)), Action(pos1=(0, 1), pos2=(1, 1)), Action(pos1=(1, 1), pos2=(1, 2)), Action(pos1=(1, 2), pos2=(0, 2)), Action(pos1=(0, 2), pos2=(0, 1)), Action(pos1=(0, 1), pos2=(1, 1)), Action(pos1=(1, 1), pos2=(2, 1)), Action(pos1=(2, 1), pos2=(2, 2)), Action(pos1=(2, 2), pos2=(1, 2)), Action(pos1=(1, 2), pos2=(0, 2)), Action(pos1=(0, 2), pos2=(0, 1)), Action(pos1=(0, 1), pos2=(1, 1)), Action(pos1=(1, 1), pos2=(2, 1)), Action(pos1=(2, 1), pos2=(2, 2))]

A* Search with Manhattan Distance
Steps (quality): 16, Total states evaluated (cost): 90, Final state: [[1 2 3]
 [4 5 6]
 [7 8 0]], A* solution: [Action(pos1=(2, 0), pos2=(1, 0)), Action(pos1=(1, 0), pos2=(0, 0)), Action(pos1=(0, 0), pos2=(0, 1)), Action(pos1=(0, 1), 