In [1]:
RANDOM_SEED = 17
SIZE = 3

In [None]:
from gx_utils import *
import numpy as np

In [None]:
import logging
from random import seed, choice
from typing import Callable

logging.basicConfig(format="%(message)s", level=logging.INFO)

In [None]:
#in this situation the state is the board that contains all possible actions
class State:
    def __init__(self, data: np.ndarray):
        self._data = data.copy()
        self._data.flags.writeable = False

    def __hash__(self):
        return hash(bytes(self._data))

    def __eq__(self, other):
        return bytes(self._data) == bytes(other._data)

    def __lt__(self, other):
        return bytes(self._data) < bytes(other._data)

    def __str__(self):
        return str(self._data)

    def __repr__(self):
        return repr(self._data)

    @property
    def data(self):
        return self._data

    def copy_data(self):
        return self._data.copy()

# Search algorithm

In [None]:
def search(
    initial_state: State,
    goal_test: Callable,
    parent_state: dict,
    state_cost: dict,
    priority_function: Callable,
    unit_cost: Callable,
):
    frontier = PriorityQueue()
    parent_state.clear()
    state_cost.clear()

    state = initial_state
    parent_state[state] = None
    state_cost[state] = 0

    while state is not None and not goal_test(state):
        for a in possible_actions(state): 
            new_state = result(state, a) # this is what happens to the previous state if i apply the action a
            cost = unit_cost(a) # compute the cost
            if new_state not in state_cost and new_state not in frontier:
                parent_state[new_state] = state
                state_cost[new_state] = state_cost[state] + cost # append to the dict of costs the cost corresponding to the new_state
                frontier.push(new_state, p=priority_function(new_state)) # append with a specified priority function
                logging.debug(f"Added new node to frontier (cost={state_cost[new_state]})")
            elif new_state in frontier and state_cost[new_state] > state_cost[state] + cost:
                old_cost = state_cost[new_state]
                parent_state[new_state] = state
                state_cost[new_state] = state_cost[state] + cost
                logging.debug(f"Updated node cost in frontier: {old_cost} -> {state_cost[new_state]}")
        if frontier:
            state = frontier.pop()
        else:
            state = None

    path = list()
    s = state
    while s:
        path.append(s.copy_data())
        s = parent_state[s]

    logging.info(f"Found a solution in {len(path):,} steps; visited {len(state_cost):,} states")
    return list(reversed(path))

## Graph search for the the n-puzzle

In [None]:
seed(RANDOM_SEED)

In [None]:
# this is not a time-dependent problem, so given the goal it will remain the same overtime
GOAL = State(np.array(list(range(1, SIZE**2)) + [0]).reshape((SIZE, SIZE)))
logging.info(f"Goal:\n{GOAL}")


def goal_test(state):
    return state == GOAL

In [None]:
# (R, C) -> UP / RIGHT / DOWN / LEFT, all the possible moves are encoded in an array
# an action is a possible move and if you are in the middle you can do 4 possible moves,
# instead if you are in the corner you can do just 2 moves
# and if you are on the side you can do 3 possible moves
MOVES = [np.array(_) for _ in [(-1, 0), (0, +1), (+1, 0), (0, -1)]]

In [None]:
def find_empty_space(board: np.ndarray):
    t = np.where(board == 0)
    return np.array([t[0][0], t[1][0]])


def is_valid(board: np.ndarray, action):
    return all(0 <= (find_empty_space(board) + action)[i] < board.shape[i] for i in [0, 1])


def possible_actions(state: State):
    return (m for m in MOVES if is_valid(state._data, m))
    # this function returns a generator (like range) so an object on which i can apply the function next()

In [None]:
def result(state, action):
    board = state.copy_data()
    space = find_empty_space(board)
    pos = space + action
    board[space[0], space[1]] = board[pos[0], pos[1]]
    board[pos[0], pos[1]] = 0
    return State(board)

In [None]:
INITIAL_STATE = GOAL
for r in range(500):
    INITIAL_STATE = result(INITIAL_STATE, choice(list(possible_actions(INITIAL_STATE))))
INITIAL_STATE

## Breadth-First

In [None]:
parent_state = dict()
state_cost = dict()

final = search(
    INITIAL_STATE,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: len(state_cost), # the priority is the breadth so the len of possible actions, 
                                                #i'm handling the states in the very same order i discover them 
    unit_cost=lambda a: 1, # the cost of the action is 1 so i just want to find the shortest path
)
# it finds the solution in 25 steps and this is the minimum because it is the shortest path to reach the goal
# first: all possible solutions one step away, then all possible solutions two steps away and so on..

If the cost was not 1 we could have used a Dijkstra's search which is going  to expand the closest nodes. 

## Depth-First

In [None]:
parent_state = dict()
state_cost = dict()

final = search(
    INITIAL_STATE,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: -len(state_cost), # here the priority function is LIFO basically
    unit_cost=lambda a: 1,
)
# his solution contains 65813 steps even though the number of visited states is less then the first one. 
# so the solution is dumb but it is faster
# we can also set a threshold to the state_cost and it will take less steps but if we choose the threshold unwisly we risk to not find the right solution or any solution
# we cannot set a threshold to the breadth first, it is usually used on this one. 

## Gready Best-First

In [None]:
parent_state = dict()
state_cost = dict()

# if any state is in some way similar (close) to the final one, then use that as solution.  
def h(state):
    return np.sum((state._data != GOAL._data) & (state._data > 0))


final = search(
    INITIAL_STATE,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: h(s), 
    unit_cost=lambda a: 1,
)
# this is not always correct. we can notice that it reaches 49 steps (less than breadth first, but more than depth first) but it takes less states than the other ones.

## A*

In [None]:
parent_state = dict()
state_cost = dict()

def h(state):
    return np.sum((state._data != GOAL._data) & (state._data > 0))


final = search(
    INITIAL_STATE,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: state_cost[s] + h(s),
    unit_cost=lambda a: 1,#      | safe part     | greedy part
)
# In this way i'm able to find the solution which contains 25 steps but with a reasonable number of steps. 