Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [None]:
from collections import namedtuple
from heapq import heappop, heappush
from collections import deque
from random import choice
from tqdm.auto import tqdm
import numpy as np
from random import shuffle

In [None]:
PUZZLE_DIM=3
action = namedtuple('Action', ['pos1', 'pos2'])

In [None]:
def available_actions(state: np.ndarray) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions



def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state


In [None]:
# Check if the status is resolved
def solved_classic(state: np.ndarray) -> bool:
    goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape(PUZZLE_DIM, PUZZLE_DIM)
    return np.array_equal(state, goal_state)

# Check if a puzzle is solvable
def is_solvable_classic(puzzle: np.ndarray) -> bool:
    zero_pos = np.where(puzzle == 0)
    zero_row, _ = zero_pos[0][0], zero_pos[1][0] 

    flat = puzzle.flatten()
    flat = flat[flat != 0]  # Remove zero

    inversions = 0
    for i in range(len(flat)):
        for j in range(i + 1, len(flat)):
            if flat[i] > flat[j]:
                inversions += 1

    n = puzzle.shape[0]  # puzzle dimension
    if n % 2 != 0:  # odd
        # inversions need to be even
        return inversions % 2 == 0
    else:  # even
        # check (inversions + row 0) % 2 == 0
        # Use zero_row + 1 because the row must be 1-indexed
        return (inversions + (zero_row + 1)) % 2 == 0

# Generate a solvable puzzle
def generate_solvable_puzzle_classic():
    while True:
        # Generate a puzzle random
        puzzle = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0])
        shuffle(puzzle)
        puzzle = puzzle.reshape(PUZZLE_DIM, PUZZLE_DIM)

        if is_solvable_classic(puzzle):
            return puzzle

In [None]:
GOAL_STATE = tuple([tuple(range(1 + i * PUZZLE_DIM, 1 + (i + 1) * PUZZLE_DIM)) for i in range(PUZZLE_DIM - 1)] + [tuple([0] + [PUZZLE_DIM * (PUZZLE_DIM - 1) + i for i in range(1, PUZZLE_DIM)])])

def heuristic(state: np.ndarray) -> int:
    PUZZLE_DIM = state.shape[0]
    goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape(PUZZLE_DIM, PUZZLE_DIM)
    
    # Compute the Manhattan distance
    manhattan_distance = 0
    linear_conflict = 0

    for i in range(PUZZLE_DIM):
        for j in range(PUZZLE_DIM):
            value = state[i, j]
            if value != 0:  # Ignore empty space
                goal_i, goal_j = divmod(value - 1, PUZZLE_DIM)
                manhattan_distance += abs(i - goal_i) + abs(j - goal_j)

                # Linear conflict in the row
                if i == goal_i:
                    for k in range(j + 1, PUZZLE_DIM):
                        other_value = state[i, k]
                        if other_value != 0:
                            other_goal_i, other_goal_j = divmod(other_value - 1, PUZZLE_DIM)
                            if other_goal_i == i and other_goal_j < goal_j:
                                linear_conflict += 1

                # Linear conflict in the column
                if j == goal_j:
                    for k in range(i + 1, PUZZLE_DIM):
                        other_value = state[k, j]
                        if other_value != 0:
                            other_goal_i, other_goal_j = divmod(other_value - 1, PUZZLE_DIM)
                            if other_goal_j == j and other_goal_i < goal_i:
                                linear_conflict += 1

    return manhattan_distance + 2 * linear_conflict

### A*

In [None]:
def a_star(initial_state):
    """A* algorithm for solving the N-Puzzle."""
    # Priority queue (frontiera)
    frontier = []
    heappush(frontier, (0, initial_state, 0, None))  # (priority, state (tuple), path cost, parent state)

    visited = set()
    parent_map = {}
    cost_map = {}  # Tracks the minimum cost for each state

    while frontier:
        _, current_state, g, parent = heappop(frontier)

        if current_state in visited:
            continue
        visited.add(current_state)
        parent_map[current_state] = parent

        
        if current_state == GOAL_STATE:
            return reconstruct_path(current_state, parent_map)

        # Expand neighboring states
        for action in available_actions(np.array(current_state)):
            new_state = do_action(np.array(current_state), action)
            new_state_tuple = tuple(map(tuple, new_state))  
            if new_state_tuple in visited:
                continue
            if new_state_tuple in cost_map and cost_map[new_state_tuple] <= g + 1:
                continue
            cost_map[new_state_tuple] = g + 1
            if new_state_tuple not in visited:
                h = heuristic(new_state)
                heappush(frontier, (g + 1 + h, new_state_tuple, g + 1, current_state))

    return None  # No solution found

def reconstruct_path(state, parent_map):
    """Reconstructs the path from the goal state to the initial state."""
    path = []
    while state is not None:
        path.append(state)
        state = parent_map.get(state)
    path.reverse()
    return [np.array(p) for p in path]  # Convert tuples to arrays for display



In [None]:
# Represent a random state with a matrix
RANDOM_STEPS = 300
state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
for r in tqdm(range(RANDOM_STEPS), desc='Randomizing'):
    state = do_action(state, choice(available_actions(state)))

initial_state_tuple = tuple(map(tuple, state))  # Convert initial state to tuples
print("Initial state:")
print(state)

if not is_solvable_classic(state):
    print("The generated puzzle is not solvable.")
else:
    print("The puzzle is solvable. Solving using bidirectional search...")

    solution_states = a_star(initial_state_tuple)

    if solution_states is not None:
        print("Solution found:")
        print(f"Puzzle solved in {len(solution_states) - 1} moves!")
        print("States traversed in the solution path:")

        for step, state in enumerate(solution_states):
            print(f"Step {step}:")
            print(state)
            print("-" * 20)
    else:
        print("The puzzle is unsolvable.")


### Bidirectional A*

In [None]:
def bidirectional_a_star(start_state: np.ndarray):
    PUZZLE_DIM = start_state.shape[0]
    goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape(PUZZLE_DIM, PUZZLE_DIM)

    # Forward search queue
    forward_queue = []
    heappush(forward_queue, (0 + heuristic(start_state), 0, start_state.tobytes(), [start_state]))
    forward_visited = {start_state.tobytes(): (0, [start_state])}  # Stato -> (costo g, percorso)

    # Queue for backward search
    backward_queue = []
    heappush(backward_queue, (0 + heuristic(goal_state), 0, goal_state.tobytes(), [goal_state]))
    backward_visited = {goal_state.tobytes(): (0, [goal_state])}  # Stato -> (costo g, percorso)

    cont = 0  # Counter of visited states

    while forward_queue and backward_queue:
        # Expand from search forward
        f_fwd, g_fwd, current_fwd_bytes, path_fwd = heappop(forward_queue)
        current_fwd = np.frombuffer(current_fwd_bytes, dtype=start_state.dtype).reshape(start_state.shape)

        # Check intersection with backward search
        if current_fwd_bytes in backward_visited:
            print(f"Intersection found after {cont} been visited!")
            backward_cost, backward_path = backward_visited[current_fwd_bytes]
            return path_fwd + list(reversed(backward_path))

        # Generate successors for forward search
        for move in available_actions(current_fwd):
            next_state = do_action(current_fwd, move)
            next_state_bytes = next_state.tobytes()
            next_g = g_fwd + 1
            if next_state_bytes not in forward_visited or forward_visited[next_state_bytes][0] > next_g:
                forward_visited[next_state_bytes] = (next_g, path_fwd + [next_state])
                heappush(forward_queue, (next_g + heuristic(next_state), next_g, next_state_bytes, path_fwd + [next_state]))

        # Expand from backward search
        f_bwd, g_bwd, current_bwd_bytes, path_bwd = heappop(backward_queue)
        current_bwd = np.frombuffer(current_bwd_bytes, dtype=start_state.dtype).reshape(start_state.shape)

        # Check intersection with forward search
        if current_bwd_bytes in forward_visited:
            print(f"Intersection found after {cont} been visited!")
            forward_cost, forward_path = forward_visited[current_bwd_bytes]
            return forward_path + list(reversed(path_bwd))

        # Generate successors for backward search
        for move in available_actions(current_bwd):
            next_state = do_action(current_bwd, move)
            next_state_bytes = next_state.tobytes()
            next_g = g_bwd + 1
            if next_state_bytes not in backward_visited or backward_visited[next_state_bytes][0] > next_g:
                backward_visited[next_state_bytes] = (next_g, path_bwd + [next_state])
                heappush(backward_queue, (next_g + heuristic(next_state), next_g, next_state_bytes, path_bwd + [next_state]))

        cont += 1

    print(f"Unsolvable puzzle. States visited: {cont}")
    return None


In [None]:
PUZZLE_DIM = PUZZLE_DIM + 1
RANDOM_STEPS = 300
state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
for r in tqdm(range(RANDOM_STEPS), desc='Randomizing'):
    state = do_action(state, choice(available_actions(state)))

initial_state_tuple = tuple(map(tuple, state))  # Convert the initial state into tuples
print("Initial state:")
print(state)

# Check solvability
if not is_solvable_classic(state):
    print("The generated puzzle is not solvable.")
else:
    print("The puzzle is solvable. Solving using bidirectional search...")

    # Solve the puzzle using bidirectional search
    solution_states = bidirectional_a_star(state)

    if solution_states is not None:
        print("Solution found:")
        print(f"Puzzle solved in {len(solution_states) - 1} moves!")
        print("States traversed in the solution path:")
        # Print all states in the solution path
        for step, state in enumerate(solution_states):
            print(f"Step {step}:")
            print(state)
            print("-" * 20)
    else:
        print("Puzzle is unsolvable.")
