Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [1]:
from collections import namedtuple
from random import choice
from tqdm.auto import tqdm
import numpy as np
from heapq import heappush, heappop 

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
PUZZLE_DIM = 3
action = namedtuple('Action', ['pos1', 'pos2'])

### Support Functions

In [29]:
def available_actions(state: np.ndarray) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions

def is_solvable(state: np.ndarray) -> bool:
    """
    Verifica se il puzzle è risolvibile.
    """
    flat_state = state.flatten()
    inversions = sum(
        1
        for i in range(len(flat_state))
        for j in range(i + 1, len(flat_state))
        if flat_state[i] and flat_state[j] and flat_state[i] > flat_state[j]
    )
    if PUZZLE_DIM % 2 == 1:  # Dimensione dispari
        return inversions % 2 == 0
    else:  # Dimensione pari
        blank_row = np.where(state == 0)[0][0]
        return (inversions + blank_row) % 2 == 0


def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state

def is_solved(state: np.ndarray) -> bool:
    goal = np.arange(1, PUZZLE_DIM**2 + 1).reshape((PUZZLE_DIM, PUZZLE_DIM))
    goal[-1, -1] = 0
    return np.array_equal(state, goal)

def manhattan_distance(state: np.ndarray) -> int:
    distance = 0
    goal = np.arange(1, PUZZLE_DIM**2 + 1).reshape((PUZZLE_DIM, PUZZLE_DIM))
    goal[-1, -1] = 0
    for x in range(PUZZLE_DIM):
        for y in range(PUZZLE_DIM):
            value = state[x, y]
            if value != 0:
                target_x, target_y = divmod(value - 1, PUZZLE_DIM)
                distance += abs(x - target_x) + abs(y - target_y)
    return distance

### Algorithm A*

In [None]:
def solve_puzzle(initial_state: np.ndarray):
    start = initial_state.tobytes()
    goal = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM)).tobytes()

    priority_queue = []
    heappush(priority_queue, (0, start, []))
    visited = set()
    nodes_evaluated = 0

    while priority_queue:
        _, current_state_bytes, path = heappop(priority_queue)
        current_state = np.frombuffer(current_state_bytes, dtype=int).reshape((PUZZLE_DIM, PUZZLE_DIM))

        if current_state_bytes in visited:
            continue
        visited.add(current_state_bytes)
        nodes_evaluated += 1  

        if current_state_bytes == goal:
            return path, nodes_evaluated

        for action in available_actions(current_state):
            new_state = do_action(current_state, action)
            new_state_bytes = new_state.tobytes()

            if new_state_bytes not in visited:
                g = len(path) + 1
                h = manhattan_distance(new_state) 
                f = g + h
                heappush(priority_queue, (f, new_state_bytes, path + [action]))

    return None, nodes_evaluated

def print_solution(initial_state: np.ndarray, solution: list['Action'], nodes_evaluated: int):
    state = initial_state.copy()
    print("Stato iniziale:")
    print(state)
    print()
    
    for step, action in enumerate(solution, start=1):
        state = do_action(state, action)
        print(f"Step {step}: Move tile from {action.pos2} to {action.pos1}")
        print(state)
        print()
    
    print("Final Results:")
    print(f"Quality (solution length): {len(solution)} moves")
    print(f"Cost (nodes evaluated): {nodes_evaluated}")

## for long solutions
def print_solution_partial(initial_state: np.ndarray, solution: list['Action'], nodes_evaluated: int):
    state = initial_state.copy()
    print("Stato iniziale:")
    print(state)
    print()

    total_steps = len(solution)
    to_show = 5  
    skip_until = max(0, total_steps - 15) 

    for step, action in enumerate(solution, start=1):
        state = do_action(state, action)

        # Show only first 5 and last 15 
        if step <= to_show or step > skip_until:
            print(f"Step {step}: Move tile from {action.pos2} to {action.pos1}")
            print(state)
            print()
        elif step == to_show + 1:
            print("...")
            print(f"(Skipping steps {to_show+1} to {skip_until})")
            print("...")

    print("Final Results:")
    print(f"Quality (solution length): {total_steps} moves")
    print(f"Cost (nodes evaluated): {nodes_evaluated}")

In [15]:
# this creates a random puzzle
RANDOMIZE_STEPS = 100_000
state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
    state = do_action(state, choice(available_actions(state)))
print("Initial puzzle: \n", state)

Randomizing:   0%|          | 0/100000 [00:00<?, ?it/s]

Randomizing: 100%|██████████| 100000/100000 [00:00<00:00, 212801.35it/s]

Initial puzzle: 
 [[4 8 6]
 [1 2 3]
 [0 7 5]]





In [16]:
solution, nodes_evaluated = solve_puzzle(state)
if solution:
    print_solution_partial(state, solution, nodes_evaluated)
else:
    print("Nessuna soluzione trovata.")

Stato iniziale:
[[4 8 6]
 [1 2 3]
 [0 7 5]]

Step 1: Move tile from (1, 0) to (2, 0)
[[4 8 6]
 [0 2 3]
 [1 7 5]]

Step 2: Move tile from (1, 1) to (1, 0)
[[4 8 6]
 [2 0 3]
 [1 7 5]]

Step 3: Move tile from (0, 1) to (1, 1)
[[4 0 6]
 [2 8 3]
 [1 7 5]]

Step 4: Move tile from (0, 2) to (0, 1)
[[4 6 0]
 [2 8 3]
 [1 7 5]]

Step 5: Move tile from (1, 2) to (0, 2)
[[4 6 3]
 [2 8 0]
 [1 7 5]]

Step 6: Move tile from (1, 1) to (1, 2)
[[4 6 3]
 [2 0 8]
 [1 7 5]]

Step 7: Move tile from (0, 1) to (1, 1)
[[4 0 3]
 [2 6 8]
 [1 7 5]]

Step 8: Move tile from (0, 0) to (0, 1)
[[0 4 3]
 [2 6 8]
 [1 7 5]]

Step 9: Move tile from (1, 0) to (0, 0)
[[2 4 3]
 [0 6 8]
 [1 7 5]]

Step 10: Move tile from (2, 0) to (1, 0)
[[2 4 3]
 [1 6 8]
 [0 7 5]]

Step 11: Move tile from (2, 1) to (2, 0)
[[2 4 3]
 [1 6 8]
 [7 0 5]]

Step 12: Move tile from (2, 2) to (2, 1)
[[2 4 3]
 [1 6 8]
 [7 5 0]]

Step 13: Move tile from (1, 2) to (2, 2)
[[2 4 3]
 [1 6 0]
 [7 5 8]]

Step 14: Move tile from (1, 1) to (1, 2)
[[2 4 3]
 [1 

## Average Quality and Cost 

In [9]:
def generate_random_puzzle(puzzle_dim: int, randomize_steps: int) -> np.ndarray:
    state = np.array([i for i in range(1, puzzle_dim**2)] + [0]).reshape((puzzle_dim, puzzle_dim))
    for _ in range(randomize_steps):
        state = do_action(state, choice(available_actions(state)))
    return state

def run_experiments(num_experiments: int, puzzle_dim: int, randomize_steps: int):
    steps_list = []
    costs_list = []
    
    for i in range(num_experiments):
        print(f"Running experiment {i+1}/{num_experiments}...")
        puzzle = generate_random_puzzle(puzzle_dim, randomize_steps)
        
        solution, nodes_evaluated = solve_puzzle(puzzle)
        
        if solution:
            steps_list.append(len(solution))
            costs_list.append(nodes_evaluated)
            print(f"Puzzle {i+1}: Solved in {len(solution)} steps, cost {nodes_evaluated}.")
        else:
            print(f"Puzzle {i+1}: No solution found.")
            steps_list.append(float('inf'))
            costs_list.append(float('inf'))

    avg_steps = sum(steps_list) / len(steps_list)
    avg_cost = sum(costs_list) / len(costs_list)
    
    print("\n=== Final Results ===")
    print(f"Average solution length: {avg_steps:.2f} moves")
    print(f"Average cost (nodes evaluated): {avg_cost:.2f}")

In [21]:
run_experiments(num_experiments=1000, puzzle_dim=PUZZLE_DIM, randomize_steps=RANDOMIZE_STEPS)

Running experiment 1/1000...
Puzzle 1: Solved in 18 steps, cost 161.
Running experiment 2/1000...
Puzzle 2: Solved in 22 steps, cost 563.
Running experiment 3/1000...
Puzzle 3: Solved in 22 steps, cost 520.
Running experiment 4/1000...
Puzzle 4: Solved in 24 steps, cost 484.
Running experiment 5/1000...
Puzzle 5: Solved in 18 steps, cost 138.
Running experiment 6/1000...
Puzzle 6: Solved in 20 steps, cost 339.
Running experiment 7/1000...
Puzzle 7: Solved in 18 steps, cost 218.
Running experiment 8/1000...
Puzzle 8: Solved in 26 steps, cost 2044.
Running experiment 9/1000...
Puzzle 9: Solved in 22 steps, cost 510.
Running experiment 10/1000...
Puzzle 10: Solved in 28 steps, cost 4545.
Running experiment 11/1000...
Puzzle 11: Solved in 24 steps, cost 1872.
Running experiment 12/1000...
Puzzle 12: Solved in 24 steps, cost 1096.
Running experiment 13/1000...
Puzzle 13: Solved in 18 steps, cost 228.
Running experiment 14/1000...
Puzzle 14: Solved in 22 steps, cost 555.
Running experiment 1

### BFS Algorithm 

In [32]:
def best_first_search(initial_state: np.ndarray) -> tuple[list['Action'], int]:
    """
    Risolve l'n-puzzle utilizzando il Greedy Best-First Search.
    Ritorna il percorso soluzione e il numero di stati espansi.
    """
    def manhattan_distance(state: np.ndarray) -> int:
        goal = {i: (i // PUZZLE_DIM, i % PUZZLE_DIM) for i in range(1, PUZZLE_DIM**2)}
        goal[0] = (PUZZLE_DIM - 1, PUZZLE_DIM - 1)
        return sum(abs(x - goal[val][0]) + abs(y - goal[val][1]) for x, row in enumerate(state) for y, val in enumerate(row) if val != 0)

    open_list = [(initial_state, manhattan_distance(initial_state), [])]  # (stato, euristica, percorso)
    closed_set = set()
    expanded_states = 0

    while open_list:
        # Ordina la lista aperta in base al valore dell'euristica
        open_list.sort(key=lambda x: x[1])

        # Estrai lo stato con euristica minima
        current_state, _, path = open_list.pop(0)

        # Conta gli stati espansi
        expanded_states += 1

        # Controlla se è stato raggiunto l'obiettivo
        if np.array_equal(current_state, np.arange(1, PUZZLE_DIM**2 + 1).reshape(PUZZLE_DIM, PUZZLE_DIM)):
            return path, expanded_states

        # Aggiungi lo stato corrente al set visitato
        closed_set.add(tuple(map(tuple, current_state)))  # Converte l'array in tuple per l'hashing

        # Espandi i vicini
        for move in available_actions(current_state):
            neighbor = do_action(current_state, move)
            neighbor_tuple = tuple(map(tuple, neighbor))

            if neighbor_tuple not in closed_set:
                open_list.append((neighbor, manhattan_distance(neighbor), path + [move]))

    # Se non trova una soluzione
    return None, expanded_states


In [34]:
# Esperimenti con BFS
def run_bfs_experiments(num_experiments: int, puzzle_dim: int, randomize_steps: int):
    bfs_steps_list = []
    bfs_costs_list = []

    for i in range(num_experiments):
        print(f"Running BFS experiment {i+1}/{num_experiments}...")
        puzzle = generate_random_puzzle(puzzle_dim, randomize_steps)

        solution, nodes_evaluated = best_first_search(puzzle)

        if solution:
            bfs_steps_list.append(len(solution))
            bfs_costs_list.append(nodes_evaluated)

    print("\nFinal BFS Results:")
    print(f"Average solution length: {np.mean(bfs_steps_list)} moves")
    print(f"Average cost (nodes evaluated): {np.mean(bfs_costs_list)} nodes")


In [None]:
RANDOMIZE_STEPS = 100_000

run_bfs_experiments(num_experiments=1, puzzle_dim=PUZZLE_DIM, randomize_steps=RANDOMIZE_STEPS)


Running BFS experiment 1/1...
Espanso 100 nodi, profondità attuale: 33
Espanso 200 nodi, profondità attuale: 58
Espanso 300 nodi, profondità attuale: 68
Espanso 400 nodi, profondità attuale: 95
Espanso 500 nodi, profondità attuale: 108
Espanso 600 nodi, profondità attuale: 114
Espanso 700 nodi, profondità attuale: 104
Espanso 800 nodi, profondità attuale: 101
Espanso 900 nodi, profondità attuale: 71
Espanso 1000 nodi, profondità attuale: 108
Espanso 1100 nodi, profondità attuale: 51
Espanso 1200 nodi, profondità attuale: 88
Espanso 1300 nodi, profondità attuale: 105
Espanso 1400 nodi, profondità attuale: 121
Espanso 1500 nodi, profondità attuale: 123
Espanso 1600 nodi, profondità attuale: 72
Espanso 1700 nodi, profondità attuale: 54
Espanso 1800 nodi, profondità attuale: 64
Espanso 1900 nodi, profondità attuale: 84
Espanso 2000 nodi, profondità attuale: 135
Espanso 2100 nodi, profondità attuale: 132
Espanso 2200 nodi, profondità attuale: 88
Espanso 2300 nodi, profondità attuale: 85
Esp

### Comparison between the two algorithms

In [13]:
def compare_algorithms(num_experiments: int, puzzle_dim: int, randomize_steps: int):
    a_star_steps, a_star_costs = [], []
    bfs_steps, bfs_costs = [], []

    for i in range(num_experiments):
        print(f"Running experiment {i+1}/{num_experiments}...")
        puzzle = generate_random_puzzle(puzzle_dim, randomize_steps)
        
        # A* Algorithm
        a_star_solution, a_star_nodes = solve_puzzle(puzzle)
        if a_star_solution:
            a_star_steps.append(len(a_star_solution))
            a_star_costs.append(a_star_nodes)
        else:
            a_star_steps.append(float('inf'))
            a_star_costs.append(float('inf'))

        # Best-First Search
        bfs_solution, bfs_nodes = best_first_search(puzzle)
        if bfs_solution:
            bfs_steps.append(len(bfs_solution))
            bfs_costs.append(bfs_nodes)
        else:
            bfs_steps.append(float('inf'))
            bfs_costs.append(float('inf'))

    print("\n=== Final Comparison ===")
    print(f"A* Average Steps: {sum(a_star_steps) / len(a_star_steps):.2f}")
    print(f"A* Average Cost: {sum(a_star_costs) / len(a_star_costs):.2f}")
    print(f"BFS Average Steps: {sum(bfs_steps) / len(bfs_steps):.2f}")
    print(f"BFS Average Cost: {sum(bfs_costs) / len(bfs_costs):.2f}")
