In [1]:
from tqdm.auto import tqdm
import numpy as np
from icecream import ic

In [2]:
PUZZLE_DIM = 4
#ENCODING = "HOLE" # the hole is the one that moves
#ACTIONS_ENCODING = ["U", "D", "L", "R"] # available movements
RANDOMIZE_STEPS = 100_000
HEURISTIC_WEIGHT = 1.5
HEURISTIC_WEIGHT_MULT=1.05
HEURISTIC_WEIGHT_MSTEPS = 10_000

In [3]:
rng = np.random.Generator(np.random.PCG64([PUZZLE_DIM, RANDOMIZE_STEPS]))

As priority value, i will use heuristic expected cost function `h`.

In this case, i used **manhattan distance** as `h`, proven to be the best choice for path searching problems in grid-like environments where movement is restricted to horizontal and vertical directions


*sources:<br/>https://theory.stanford.edu/~amitp/GameProgramming/Heuristics.html, <br/> https://pages.cs.wisc.edu/~dyer/cs540/notes/search2.html*

In [4]:

class State:
    def __init__(self, state: np.ndarray, history: tuple = None):
        self.state = state
        self.history = history if history is not None else tuple()

    def available_actions(self) -> list[str]:
        x, y = [int(_[0]) for _ in np.where(self.state == 0)]
        actions = list()
        if x > 0:
            actions.append("U") 
        if x < PUZZLE_DIM - 1:
            actions.append("D")
        if y > 0:
            actions.append("L")
        if y < PUZZLE_DIM - 1:
            actions.append("R")
        return actions
    
    def swap(self, pos1, pos2):
        self.state[pos1], self.state[pos2] = self.state[pos2], self.state[pos1]

    def copy(self):
        return State(self.state.copy(), tuple(self.history))

    def do_action(self, action: str,*,appending=True) -> np.ndarray:
        new_state = self.copy()
        x, y = [int(_[0]) for _ in np.where(self.state == 0)]
        if action == "U":
            new_state.swap((x, y), (x-1, y))
        elif action == "D":
            new_state.swap((x, y), (x+1, y))
        elif action == "L":
            new_state.swap((x, y), (x, y-1))
        elif action == "R":
            new_state.swap((x, y), (x, y+1))
        else:
            raise ValueError("Invalid action")
        if appending:
            new_state.history = new_state.history + (action,)  # Append as a new tuple
        return new_state
    
    def cost(self):
        return len(self.history)
    
    @staticmethod
    def getSolution():
        return State(np.array([i for i in range(1, PUZZLE_DIM**2)]+[0]).reshape(PUZZLE_DIM, PUZZLE_DIM))

    @staticmethod
    def getRandom():
        return State(rng.permutation(State.getSolution().state))

    def randomize(self, steps: int):
        for _ in tqdm(range(steps), desc="Randomizing"):
            action = rng.choice(self.available_actions())
            self.state = self.do_action(action,appending=False).state

    def __str__(self):
        return str(self.state)
    
    def __repr__(self):
        return str(self.state)
    
    def __eq__(self, other):
        return np.array_equal(self.state, other.state)
    
    def __hash__(self):
        return hash(self.state.tobytes())
    
    def manhattan_distance(self):
        distance = 0
        for i in range(PUZZLE_DIM):
            for j in range(PUZZLE_DIM):
                if self.state[i, j] == 0:
                    continue # skip the hole
                d, m = divmod(self.state[i, j] - 1, PUZZLE_DIM)
                distance += abs(d - i) + abs(m - j)
        return distance
    
    def euclidean_distance(self):
        distance = 0
        for i in range(PUZZLE_DIM):
            for j in range(PUZZLE_DIM):
                if self.state[i, j] == 0:
                    continue  # skip the hole
                d, m = divmod(self.state[i, j] - 1, PUZZLE_DIM)
                distance += np.sqrt((d - i) ** 2 + (m - j) ** 2)
        return distance
    
    def tot_cost(self):
        return self.cost() + HEURISTIC_WEIGHT*self.manhattan_distance()
    
    def __lt__(self, other):
        return self.manhattan_distance() < other.manhattan_distance()
    
    def is_solved(self):
        return np.array_equal(self.state, State.getSolution().state)
    

In [5]:

state = State.getSolution()
state.randomize(RANDOMIZE_STEPS)
ic(state)
ic(state.manhattan_distance())

Randomizing:   0%|          | 0/100000 [00:00<?, ?it/s]

ic| state: [[ 4 13 11  3]
            [ 5  7  8 14]
            [ 2  1  9  6]
            [12  0 10 15]]
ic| state.manhattan_distance(): np.int64(34)


np.int64(34)

## A* Algorithm

First let's define a priority queue using `heapq` module:

In [6]:
import heapq

class PriorityQueue:
    def __init__(self):
        self.elements = []
    
    def empty(self):
        return len(self.elements) == 0
    
    def append(self, item, priority):
        heapq.heappush(self.elements, (priority, item))
    
    def popleft(self):
        return heapq.heappop(self.elements)[1]
    
    def __bool__(self):
        """ Override the boolean operator to allow direct check with while and if statements """
        return not self.empty()
    
    def __len__(self):
        return len(self.elements)
    
    def __str__(self):
        return str(self.elements)
    
    def __repr__(self):
        return str(self.elements)
    
class StatePriorityQueue(PriorityQueue):
    def append(self, item: State):
        super().append(item, item.tot_cost())

In [7]:
frontier = StatePriorityQueue()
frontier.append(state)
visited = set()
visited.add(state)
ic(frontier.elements);

ic| frontier.elements: [(np.float64(51.0),
                         [[ 4 13 11  3]
                        [ 5  7  8 14]
                        [ 2  1  9  6]
                        [12  0 10 15]])]


In [8]:
cycle=0
while frontier:
    current_state = frontier.popleft()
    visited.add(current_state) # avoid using np.array as keys in a set
    if current_state.is_solved():
        print("Solved!")
        break
    actions = current_state.available_actions()
    for action in actions:
        new_state = current_state.do_action(action)
        if new_state not in visited:
            frontier.append(new_state)
    if cycle % HEURISTIC_WEIGHT_MSTEPS == 0:
        print(f"current_state (@cycle-{cycle}):\n{current_state}")
        print(f"current memory usage:\nfrontier_size: {len(frontier)}\nvisited_size: {len(visited)}")
        old_heuristic_weight = HEURISTIC_WEIGHT
        HEURISTIC_WEIGHT *= HEURISTIC_WEIGHT_MULT
        print(f"incrementing heuristic weight from {old_heuristic_weight} to {HEURISTIC_WEIGHT}")
    cycle += 1

current_state (@cycle-0):
[[ 4 13 11  3]
 [ 5  7  8 14]
 [ 2  1  9  6]
 [12  0 10 15]]
current memory usage:
frontier_size: 3
visited_size: 1
incrementing heuristic weight from 1.5 to 1.5750000000000002
current_state (@cycle-10000):
[[ 5  2  4  3]
 [ 9  6 11  7]
 [ 1 10 15  8]
 [13 12 14  0]]
current memory usage:
frontier_size: 10004
visited_size: 9906
incrementing heuristic weight from 1.5750000000000002 to 1.6537500000000003
current_state (@cycle-20000):
[[ 5  4 11  3]
 [ 1  7  0  8]
 [10  2 12 14]
 [13  9  6 15]]
current memory usage:
frontier_size: 20469
visited_size: 19614
incrementing heuristic weight from 1.6537500000000003 to 1.7364375000000003
current_state (@cycle-30000):
[[ 5  2  4  3]
 [ 0  6  1  7]
 [ 9 10 11 12]
 [13 14 15  8]]
current memory usage:
frontier_size: 30872
visited_size: 29309
incrementing heuristic weight from 1.7364375000000003 to 1.8232593750000003
current_state (@cycle-40000):
[[ 1  2  4  3]
 [13  6 11  7]
 [ 5 10  9 12]
 [14  0 15  8]]
current memory us

In [9]:
print(cycle)
len(visited)

89441


85977

In [10]:
print(current_state)
print(f"in {current_state.cost()} steps:")
print(current_state.history)

[[ 1  2  3  4]
 [ 5  6  7  8]
 [ 9 10 11 12]
 [13 14 15  0]]
in 58 steps:
('U', 'R', 'R', 'U', 'L', 'L', 'U', 'L', 'D', 'D', 'R', 'U', 'L', 'D', 'R', 'D', 'L', 'U', 'U', 'R', 'D', 'R', 'R', 'U', 'L', 'U', 'L', 'D', 'D', 'D', 'R', 'U', 'L', 'U', 'L', 'U', 'R', 'R', 'R', 'D', 'D', 'L', 'U', 'U', 'L', 'D', 'D', 'D', 'R', 'R', 'U', 'U', 'U', 'L', 'D', 'R', 'D', 'D')
