Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [1]:
from collections import namedtuple
from random import choice
import numpy as np
import heapq
from typing import List, Tuple, Dict, Set
from datetime import datetime

In [None]:
# Auxiliary functions
def available_actions(state: np.ndarray) -> List[Tuple[int, int]]:
    """
    Calculate the available moves given the state.
    """
    # np.where() returns a tuple in the format ([row_index], [column_index])
    row, col = np.where(state == 0)  # Find the position of the 0
    actions = []
    if row > 0:  # I can move the 0 up
        actions.append((-1, 0))
    if row < state.shape[0] - 1:  # I can move the 0 down
        actions.append((1, 0))
    if col > 0:  # I can move the 0 to the left
        actions.append((0, -1))
    if col < state.shape[1] - 1:  # I can move the 0 to the right
        actions.append((0, 1))
    return actions


def do_action(state: np.ndarray, action: Tuple[int, int]) -> np.ndarray:
    """
    Applies an action to the state.
    """
    new_state = state.copy()
    row, col = np.where(state == 0)  # Find the position of the 0
    row, col = row[0], col[0]
    # dr is the offset to add to the row index to get the row index of the new empty tile
    # dc is the offset to add to the column index to get the column index of the new empty tile
    dr, dc = action
    new_row, new_col = row + dr, col + dc
    # Swap the zero with the value at the new position calculated in the previous line of code
    new_state[row, col], new_state[new_row, new_col] = new_state[new_row, new_col], new_state[row, col]
    return new_state


def generate_random_solvable_puzzle(goal_state: np.ndarray, steps: int = 100000) -> np.ndarray:
    """
    Generates a random but solvable configuration of the puzzle starting from the goal state.
    
    Parameters:
    - goal_state: np.ndarray, the goal state of the puzzle.
    - steps: int, the number of random moves to apply to generate the initial state.

    Returns:
    - np.ndarray, the generated random state.

    """
    current_state = goal_state.copy()
    for _ in range(steps):
        action = choice(available_actions(current_state))
        current_state = do_action(current_state, action)
    return current_state


PUZZLE_DIM = 5
goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))

# Generate a random but solvable configuration
random_solvable = generate_random_solvable_puzzle(goal_state, steps=100)
state = random_solvable

print(f"Random but solvable puzzle configuration {PUZZLE_DIM}x{PUZZLE_DIM}:")
print(state)


Random but solvable puzzle configuration 5x5:
[[ 7  1  0  4  5]
 [ 2 11  3 13 10]
 [ 6  8  9 24 14]
 [16 12 17 18 15]
 [21 22 23 20 19]]


In [3]:
action = namedtuple('Action', ['pos1', 'pos2'])

In [4]:
def available_actions(state: np.ndarray) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions



def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state

In [5]:
# Function to calculate the Manhattan distance
def manhattan_distance(state: np.ndarray, goal_state: np.ndarray) -> int:
    distance = 0
    for x in range(state.shape[0]):
        for y in range(state.shape[1]):
            if state[x, y] != 0:  # Ignore the empty tile (0)
                goal_x, goal_y = np.where(goal_state == state[x, y])
                distance += abs(goal_x[0] - x) + abs(goal_y[0] - y)
    return distance


# Implementation of the A* algorithm with Graph Search
def a_star_search_with_metrics(start_state: np.ndarray, goal_state: np.ndarray):

    frontier = []  # Priority queue for the frontier
    heapq.heappush(frontier, (0, start_state.tobytes(), start_state))
    
    explored: Set[bytes] = set()  # States already examined
    # The came_from dictionary is used to track the optimal path to the goal node in a search algorithm like A*. 
    # It is structured so that each state (represented as a key) points to its parent state and the action that led to the transition.
    # came_from = {
    #   state: (parent_state, action)
    # }
    came_from: Dict[bytes, Tuple[np.ndarray, 'Action']] = {}  # Reconstructs the path

    g_costs = {start_state.tobytes(): 0}  # Accumulated costs for each state
    nodes_explored = 0  # Counter for examined/expanded nodes
    
    while frontier:
        _, current_bytes, current_state = heapq.heappop(frontier)
        
        # This check is needed because I might have added the same state to the frontier more than once
        # and I want to ensure that, once a state has been examined/expanded, it is not processed a second time.
        if current_bytes in explored:
            continue
        
        # Increment the counter of explored nodes
        nodes_explored += 1
        
        # Checks if the goal state has been reached
        if np.array_equal(current_state, goal_state):
            # Path reconstruction
            path = []
            while current_bytes in came_from:
                prev_state, action = came_from[current_bytes]
                path.append(current_state)
                current_state = prev_state
                current_bytes = current_state.tobytes()
            path.reverse()
            return path, len(path) - 1, nodes_explored
        
        explored.add(current_bytes)  # Marks the state as explored

        # Expansion of the current node
        for action in available_actions(current_state):
            next_state = do_action(current_state, action)
            next_bytes = next_state.tobytes()
            
            # If a node has already been expanded, there is no point in adding it to the frontier or updating its cumulative cost.
            if next_bytes in explored:
                continue
            
            tentative_g = g_costs[current_bytes] + 1  # Each move costs 1
            
            # The check ensures that the state next_bytes has never been added to the frontier.
            # The check tentative_g < g_costs[next_bytes] ensures that the state next_bytes
            # has already been added to the frontier but its cumulative cost can be updated
            # because a path has been found that allows reaching it with a lower cost.
            if next_bytes not in g_costs or tentative_g < g_costs[next_bytes]:
                g_costs[next_bytes] = tentative_g
                f_cost = tentative_g + manhattan_distance(next_state, goal_state)
                heapq.heappush(frontier, (f_cost, next_bytes, next_state))
                came_from[next_bytes] = (current_state, action)
    
    return [], 0, nodes_explored  # Returns an empty list if there is no solution

# Start measuring the execution time of the algorithm
start_time = datetime.now()

# Solve the puzzle
solution_path, num_moves, nodes_explored = a_star_search_with_metrics(state, goal_state)

# End of measuring the execution time of the algorithm
end_time = datetime.now()

# Show the stats
"""print("Path:")
for step, state in enumerate(solution_path):
    print(f"Step {step}:\n{state}\n")"""

print(f"Number of moves to solve the puzzle: {num_moves}")
print(f"Number of explored nodes: {nodes_explored}")
# Calculate the execution time of the algorithm in the format hh:mm:ss.microsecond
execution_time = end_time - start_time
print(f"Execution time: {execution_time}")

print("Starting state:")
print(solution_path[0])
print()
print("Goal state:")
print(solution_path[num_moves])

Number of moves to solve the puzzle: 25
Number of explored nodes: 27
Execution time: 0:00:00.007607
Starting state:
[[ 7  1  3  4  5]
 [ 2 11  0 13 10]
 [ 6  8  9 24 14]
 [16 12 17 18 15]
 [21 22 23 20 19]]

Goal state:
[[ 1  2  3  4  5]
 [ 6  7  8  9 10]
 [11 12 13 14 15]
 [16 17 18 19 20]
 [21 22 23 24  0]]
