Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [None]:
from collections import namedtuple
from random import choice
from tqdm.auto import tqdm
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os
import io
from IPython.display import Image as IPImage, display
from dataclasses import dataclass
import functools

VOID_SYMBOL = 0

def counter(fn):
    """Simple decorator for counting number of calls"""

    @functools.wraps(fn)
    def helper(*args, **kargs):
        helper.calls += 1
        return fn(*args, **kargs)

    helper.calls = 0
    return helper

@dataclass
class State:
    void_position: (int, int)
    table: np.ndarray

    def __hash__(self):
        return hash(self.table.tobytes())
    
    def __eq__(self, other):
        return np.array_equal(self.table, other.table)
    
    def __init__(self, table, void_position=None):
        self.table = table
        if void_position is None:
            self.void_position = tuple(np.argwhere(table == VOID_SYMBOL)[0])
        else:
            self.void_position = void_position

    def __lt__(self, other):
        return np.sum(self.table) < np.sum(other.table)

np.random.seed(0)

In [None]:
PUZZLE_DIM = 5
action = namedtuple('Action', ['pos1', 'pos2'])

In [None]:
def available_actions(state: State) -> list['Action']:
    x, y = state.void_position
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions


@counter
def do_action(state: State, action: 'Action') -> np.ndarray:
    new_table = state.table.copy()
    new_table[action.pos1], new_table[action.pos2] = new_table[action.pos2], new_table[action.pos1]
    void_position = action.pos1 if action.pos2 == state.void_position else action.pos2
    return State(new_table, void_position)


def plot_checkerboard(matrix):
    fig, ax = plt.subplots(figsize=(4, 4))
    cmap = plt.cm.copper
    cmap = plt.cm.colors.ListedColormap(cmap(np.linspace(0.3, 0.7, cmap.N)))
    cmap.set_bad(color='white')
    matrix = np.ma.masked_equal(matrix, VOID_SYMBOL)
    for (i, j), val in np.ndenumerate(matrix):
        if val != VOID_SYMBOL:
            ax.text(j, i, f'{val}', ha='center', va='center', color='white')
    
    ax.imshow(matrix, cmap=cmap, interpolation='nearest', vmin=0.1)
    for spine in ax.spines.values():
        spine.set_edgecolor('sienna')
        spine.set_linewidth(7)

    ax.set_xticks([])
    ax.set_yticks([])

    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    plt.close(fig)
    buf.seek(0)
    return Image.open(buf)

## Randomize the puzzle

We start from the target solution and shuffle it by doing a certain number of actions

In [None]:
RANDOMIZE_STEPS = 100_000
VOID_SYMBOL = PUZZLE_DIM**2
initial_state = target_state = State(np.array([i for i in range(1, PUZZLE_DIM**2)] + [VOID_SYMBOL]).reshape((PUZZLE_DIM, PUZZLE_DIM)))
frames = [plot_checkerboard(initial_state.table)]
for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
    initial_state = do_action(initial_state, choice(available_actions(initial_state)))
    if r % 1000 == 0:
        frames.append(plot_checkerboard(initial_state.table))

frames.append(plot_checkerboard(initial_state.table))

frames[0].save('images/puzzle_randomization.gif', format='GIF', append_images=frames[1:], save_all=True, duration=10)


## Quality & Cost

Quality is defined as the number of actions in the solution

Cost is defined as the total number of actions evaluated

In [None]:
def quality(solution_path):
    return -len(solution_path)

def cost():
    return do_action.calls

## Path-search algorithms 

These are the three algorithms I implemented:
- `Breadth-First-Search`: finds a good solution but takes some time.
- `Depth-First-Search`: finds a really bad solution and takes a lot of time.
- `A*`: it is the best algorithm to use, but the right heuristic must be found.

Let's go through the heuristics I experimented:
- `position difference` is the number of tiles in a wrong position
- `manhattan distance` is almost 10x faster than the previous one

With a small puzzle (3x3) Manhattan distance works perfectly but already with a 4x4 puzzle is too slow.

While experimenting with new heuristics and new combinations, I noticed that squaring the Manhattan distance led to a solution in a reasonable time. But that's a problem: this new heuristic is overestimating the cost so it is not consistent.

At this point I thought to slightly violate this constraint so that I get a suboptimal solution.
I looked for a monotone increasing function, that does not grow too fast.
My choices were:
- $ f(x) = A log(Bx+1) + 1 $
- $ g(x) = \frac{x}{x+A} +1$

![functions](./images/functions.png)

I wanted to use the result from the function as the exponent to elevate the Manhattan distance.
These two functions, for small $x$ are ~1: my idea is that at the beginning I start by using Manhattan distance and then I progressively use a bigger value in order to converge faster.

In [None]:
from collections import deque
from heapq import heappop, heappush

FACTOR = PUZZLE_DIM**3 / 4**3
print(FACTOR)

def bfs(start_state: State, target_state: State):
    queue = deque([(start_state, [])])
    visited = set()
    visited.add(start_state)
    steps = 0

    while queue:
        current_state, path = queue.popleft()

        if current_state == target_state:
            return path

        for act in available_actions(current_state):
            next_state = do_action(current_state, act)
            if next_state not in visited:
                visited.add(next_state)
                queue.append((next_state, path + [act]))
        
        steps += 1

    return path

def dfs(start_state: State, target_state: State):
    stack = [(start_state, [])]
    visited = set()
    visited.add(start_state)
    steps = 0

    while stack:
        current_state, path = stack.pop()

        if current_state == target_state:
            return path

        for act in available_actions(current_state):
            next_state = do_action(current_state, act)
            if next_state not in visited:
                visited.add(next_state)
                stack.append((next_state, path + [act]))
        
        steps += 1

    return path

def manhattan_distance(state: State, target_state: State) -> int:
    distance = 0
    for value in range(1, PUZZLE_DIM**2):
        x1, y1 = np.argwhere(state.table == value)[0]
        x2, y2 = np.argwhere(target_state.table == value)[0]
        distance += abs(x1 - x2) + abs(y1 - y2)
    return distance

def position_difference(state: State, target_state: State) -> int:
    return np.sum(state.table != target_state.table)

def heuristic(state: State, target_state: State, step: int) -> int:
    #return manhattan_distance(state, target_state)
    return manhattan_distance(state, target_state) ** (0.05*np.log(0.01*FACTOR*step+1)+1)
    return manhattan_distance(state, target_state) ** (step/(step+50000)+1)

def astar(start_state: State, target_state: State):
    open_set = []
    heappush(open_set, (0, start_state, []))
    visited = set()
    visited.add(start_state)
    steps = 0

    while open_set:
        _, current_state, path = heappop(open_set)

        if current_state == target_state:
            return path

        for act in available_actions(current_state):
            next_state = do_action(current_state, act)
            if next_state not in visited:
                visited.add(next_state)
                new_path = path + [act]
                heappush(open_set, (-quality(new_path) + heuristic(next_state, target_state, steps), next_state, new_path))
        
        steps += 1

    return path


do_action.calls = 0
state = initial_state
solution_path = astar(state, target_state)




In [None]:
print(f'Solution quality: {quality(solution_path)}')
print(f'Solution cost: {cost()}')

In [None]:
# Create frames for the solution path
solution_frames = [plot_checkerboard(state.table)]
current_state = state
for act in solution_path:
    current_state = do_action(current_state, act)
    solution_frames.append(plot_checkerboard(current_state.table))

# Save the solution path as a GIF
solution_frames[0].save(f'images/{PUZZLE_DIM}x{PUZZLE_DIM}puzzle_solution.gif', format='GIF', append_images=solution_frames[1:], save_all=True, duration=200)
display(IPImage(f'images/{PUZZLE_DIM}x{PUZZLE_DIM}puzzle_solution.gif'))