Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [57]:
from collections import namedtuple,defaultdict
from tqdm.auto import tqdm
import random
import numpy as np
import heapq
import icecream as ic
import functools
from typing import Any
from typing_extensions import Self

In [58]:
PUZZLE_DIM = 3
action = namedtuple('Action', ['pos1', 'pos2'])

## Utility methods

In [59]:
def counter(fn):
    """Simple decorator for counting number of calls"""

    @functools.wraps(fn)
    def helper(*args, **kargs):
        helper.calls += 1
        return fn(*args, **kargs)

    helper.calls = 0
    return helper

@counter
def available_actions(state: np.ndarray) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions
@counter
def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state

def is_solvable(state:np.ndarray)->bool:
    inv_count = 0
    tmp_state = state.copy().reshape(PUZZLE_DIM**2)
    for i in range(0,PUZZLE_DIM**2):
        for j in range(i+1,PUZZLE_DIM**2):
            if tmp_state[j] != 0 and tmp_state[i] != 0 and tmp_state[i] > tmp_state[j]:
                inv_count += 1
    return (inv_count%2) == 0 

## Class definition

In [60]:
class State:
    def __init__(self,matrix:np.ndarray):
        self.matrix = matrix
        self.f=float('inf')
        self.g=float('inf')
        self.h =float('inf')
    # Dunder methods so that we can use State as a proper key in a dictionary/set
    def __str__(self):
        return self.matrix.__str__()
    def __repr__(self) -> str:
        return self.matrix.__str__()+f'\nf:{self.f} g:{self.g} h:{self.h}\n'
    def __hash__(self):
        return hash(str(self))
    def __eq__(self, value: object) -> bool:
        if isinstance(value,State):
            return self.__str__() == value.__str__()
        return NotImplemented
    def __lt__(self,other):
        return self.f < other.f
    
    def is_solvable(self)->bool:
        inv_count = 0
        tmp_state = self.matrix.copy().reshape(PUZZLE_DIM**2)
        for i in range(0,PUZZLE_DIM**2):
            for j in range(i+1,PUZZLE_DIM**2):
                if tmp_state[j] != 0 and tmp_state[i] != 0 and tmp_state[i] > tmp_state[j]:
                    inv_count += 1
        return (inv_count%2) == 0 
    def is_goal(self)-> bool:
        tmp = self.matrix.copy().reshape(PUZZLE_DIM**2)
        for (i,e) in enumerate(tmp):
            if i+1 != e and e != 0:
                return False
        return True

    def get_neighbours(self)->list[Self]:
        neigh = [State(do_action(self.matrix,a)) for a in  available_actions(self.matrix)]
        return neigh

## Heuristic and Misc.

In [61]:
def toop_heuristic(state:State,goal_state:State)->int:
    """
    Computing the tiles-out-of-place heuristic.
    state: State of the n-puzzle game
    goal_state: Desired goal state of the n-puzzle game
    """
    print("a")
    tmp_state = state.matrix.reshape(PUZZLE_DIM**2)
    tmp_goal = goal_state.matrix.reshape(PUZZLE_DIM**2)
    tmp = tmp_goal!=tmp_state
    return tmp.sum()-1

def manhattan_heuristic(state:State,goal_state:State)->int:
    distance = 0
    for elem in range(1,PUZZLE_DIM**2):
        start_coords = np.where(state.matrix == elem)
        end_coords = np.where(goal_state.matrix == elem)
        distance+=  abs(start_coords[0]-end_coords[0])+abs(start_coords[1]-end_coords[1])
    return distance
                

def get_lowest_node(open_set:list[State],f_score)->State:
    """
    Retrieves the node with the lowest f_score. It replaces the priority queue
    open_set: list with all nodes to explore
    f_score: dictionary containing nodes with their respective f score where f(n) is g(n) + h(n)
    """
    sorted_os = sorted(open_set,key=lambda e:f_score[e])
    return sorted_os[0]

## A* algorithm

In [62]:

def A_star(initial_state:State,goal_state:State,heuristic=manhattan_heuristic)->State:
    open_set:list[State] = []

    closed_list:set[State] = set()
    initial_state.f = 0
    # With priority queue
    heapq.heappush(open_set,initial_state)

    #Without priority queue
    # open_set=[initial_state]

    # g[n] is the cost(number of movements) from initial_state from n
    g_score = defaultdict(lambda:float('inf'),dict())
    g_score[initial_state] = 0
    # f[n] is the cost from start to goal state accordin to our heuristic-> f(n) = g(n)+h(n)
    f_score = defaultdict(lambda:float('inf'),dict())
    f_score[initial_state] = heuristic(initial_state,goal_state)
    i=0
    while len(open_set) != 0:
        # With priority queue
        current = heapq.heappop(open_set)
        closed_list.add(current)

        # Without Priority Queue
        # current = get_lowest_node(open_set,f_score)
        # current.h = heuristic(current,goal_state)
        # open_set.remove(current)

        if current.is_goal():
            print(f'Closed list len:{len(closed_list)}')
            return current,i
        for neighbour in current.get_neighbours():
            tentative_gscore = g_score[current]+1
            if tentative_gscore < g_score[neighbour]:
                g_score[neighbour] = tentative_gscore
                neighbour.g = g_score[neighbour]
                neighbour.h = heuristic(neighbour,goal_state)
                f_score[neighbour] = tentative_gscore + neighbour.h
                neighbour.f = f_score[neighbour]
                # if neighbour not in open_set:
                if neighbour not in closed_list:
                    # With priority queue

                    heapq.heappush(open_set,neighbour)
                    
                    # Without priority queue
                    # open_set.append(neighbour)
                    
        i+=1
    
    return np.array([0]),i

## Driver code

In [63]:
# Creating a valid problem starting from the solution
RANDOMIZE_STEPS = 100_000
state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
    state = do_action(state, random.choice(available_actions(state)))
state = state.reshape(PUZZLE_DIM,PUZZLE_DIM)

# Easy solvable n-puzzle problem useful for testing purposes.  PUZZLE_DIM = 3. For an A* the solving time should be instant
# state = np.array([np.array([1,8,2]),np.array([0,4,3]),np.array([7,6,5])]). 

# Hard solvable n-puzzle problem useful for testing purposes.  PUZZLE_DIM = 3. For an A* the solving time should be around 20 sec-ish
state = np.array([np.array([0,2,1]),np.array([3,7,5]),np.array([8,6,4])])

# Almost impossible solvable n-puzzle problem with PUZZLE_DIM = 5
# state = np.array([[13,  9,  0, 10, 19],
#                            [ 3, 21, 14,  5,  8],
#                            [22, 16,  4, 24, 18],
#                            [ 6,  2, 11,  1, 20],
#                            [ 7, 15,23,12,17]])
print(f'Starting state:\n{state}')
goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM,PUZZLE_DIM))

# Applying A_star on the randomized problem
solution,i = A_star(State(state),State(goal_state),manhattan_heuristic)
# i is equal to  number of times get_neighbours/available_actions function is called
print(f'Found solution: \n{solution} \nwith {solution.g} actions taken(quality) and {i} nodes evaluated(cost)')

Randomizing:   0%|          | 0/100000 [00:00<?, ?it/s]

Starting state:
[[0 2 1]
 [3 7 5]
 [8 6 4]]
Closed list len:662
Found solution: 
[[1 2 3]
 [4 5 6]
 [7 8 0]] 
with 22 actions taken(quality) and 666 nodes evaluated(cost)
