In [90]:
import functools
from collections import namedtuple
from random import choice
from tqdm.auto import tqdm
import numpy as np
from queue import PriorityQueue
from numpy import array_str
import heapq


In [204]:
PUZZLE_DIM = 2  ### change to 5
action = namedtuple('Action', ['pos1', 'pos2'])

In [39]:
### put before the function the @
def counter(fn):
    """Simple decorator for counting number of calls"""

    @functools.wraps(fn)
    def helper(*args, **kargs):
        helper.calls += 1
        return fn(*args, **kargs)

    helper.calls = 0
    return helper

In [None]:
def available_actions(state: np.ndarray) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions



def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state

More function

In [205]:
RANDOMIZE_STEPS = 100_000
state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
goal = state
print(state)

[[1 2]
 [3 0]]


In [207]:
def goal_function( state ,goal ):
    label = np.array_equal(state, goal)
    if(label):
        return True
    else:
        return False

In [208]:
for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
    state = do_action(state, choice(available_actions(state)))
state

Randomizing: 100%|██████████| 100000/100000 [00:00<00:00, 150450.28it/s]


array([[0, 3],
       [2, 1]])

In [None]:
class CustomPriorityQueue:
    def __init__(self):
        self.pq = PriorityQueue()  # The actual PriorityQueue
        self.items_set = set()  # Set to track items in the queue
        self.count = 0  # Variable to keep track of the number of elements
    
    def put(self, item, priority):
        self.pq.put((item, priority))  # Add to the PriorityQueue
        self.items_set.add(priority)  # Track item in the set
        self.count += 1  # Increment the count when an item is added

    def get(self):
        priority, item = self.pq.get()  # Get from the PriorityQueue
        self.items_set.remove(priority)  # Remove from the set
        self.count -= 1  # Increment the count when an item is added
        return item, priority
    
    def contains(self, item):
        return item in self.items_set  # Check if the item exists in the set
    
    def empty(self):
        return self.pq.empty()  # Check if the PriorityQueue is empty
    
    def size(self):
        return self.count  # Return the number of elements in the queue

In [None]:
####
import logging

def search( initial_state , cost_function):
    state =initial_state
    frontier = CustomPriorityQueue()
    state_cost = dict() 
    parent_state = dict()
    global_state = dict() 

    state_cost[array_str(state)] = 0
    parent_state[array_str(state)] = None
    global_state[array_str(state)] = state

    while state is not None and not goal_function(state, goal):
    #for _ in range(5):
        for a in available_actions(state):
            new_state = do_action(state,a)
            cost = cost_function(a)   ### given an action, give you the cost
            if(array_str(new_state) not in state_cost and not frontier.contains(array_str(new_state)) ):
                state_cost[array_str(new_state)]= state_cost[array_str(state)]  + cost
                new_cost = state_cost[array_str(new_state)]
                parent_state[array_str(new_state)] = state
                global_state[array_str(new_state)] = new_state
                frontier.put(  new_cost  , array_str(new_state)   )  ### priority item
            elif( frontier.contains(array_str(new_state)) and state_cost[array_str(new_state)] > state_cost[array_str(state)] + cost):  #array_str(new_state) in frontier
                parent_state[array_str(new_state)] = state
                state_cost[array_str(new_state)] = state_cost[array_str(state)] + cost
                global_state[array_str(new_state)] = new_state
        
        if not frontier.empty():
                tuple = frontier.get()
                print(tuple)
                arr_state = tuple[0]
                state = global_state[arr_state]
        else:
            state = None
    path = list()
    s = state
    flag =True
    while flag:        
        # Check if array_str(s) is not None
        if array_str(s) is not None:
            path.append(parent_state[array_str(s)])
            s = parent_state[array_str(s)]
            if(s is None):
                flag= False
        

    logging.info(f"Found a solution in {len(path):,} steps; visited {len(state_cost):,} states")
    return list(reversed(path))


In [234]:
solution = search( state , lambda x: 1)
print(solution)

('[[2 3]\n [0 1]]', 1)
('[[3 0]\n [2 1]]', 1)
('[[2 3]\n [1 0]]', 2)
('[[3 1]\n [2 0]]', 2)
('[[2 0]\n [1 3]]', 3)
('[[3 1]\n [0 2]]', 3)
('[[0 1]\n [3 2]]', 4)
('[[0 2]\n [1 3]]', 4)
('[[1 0]\n [3 2]]', 5)
('[[1 2]\n [0 3]]', 5)
('[[1 2]\n [3 0]]', 6)
[[1 0]
 [3 2]]
<class 'numpy.ndarray'>
[[0 1]
 [3 2]]
<class 'numpy.ndarray'>
[[3 1]
 [0 2]]
<class 'numpy.ndarray'>
[[3 1]
 [2 0]]
<class 'numpy.ndarray'>
[[3 0]
 [2 1]]
<class 'numpy.ndarray'>
[[0 3]
 [2 1]]
<class 'numpy.ndarray'>
None
<class 'NoneType'>


AttributeError: 'NoneType' object has no attribute 'shape'

In [178]:
print(solution.get())

KeyError: 1

In [None]:
def search(state):
    global goal
    cost=0
    frontier = []
    cost_frontier = []
    state_cost= {}
    state_cost[array_str(state)] = 0

    while state is not None and  not goal_function(state, goal):
        print(state)
        for a in available_actions(state):
            new_state = do_action(state, a)

            if array_str(new_state) is not state_cost and new_state is not frontier:
                new_cost = state_cost[array_str(state)] +1  ### in this case each step is + 1   
                state_cost[array_str(new_state)] = new_cost 
        
                frontier.append(new_state)
                cost_frontier.append(new_cost)

            elif( new_state in frontier and state_cost[array_str(new_state)] < state_cost[array_str(state)] + 1 ):
                state_cost[array_str(new_cost)] =  state_cost[array_str(state)] + 1   # is the cost function
                
        if not frontier:
            state = None
            cost = None
        else:
            state = frontier.pop()
            cost = cost_frontier.pop()
    print("solution found  ", state )

In [None]:
search(state)

In [None]:
frontier = []
cost_frontier = []

cost=0
frontier.append(state)
cost_frontier.append(0)

while not goal_function( state,goal ):
    state = frontier.pop()
    cost = cost_frontier.pop()
    print()
    print(state)
    for a in available_actions(state):
        new_state = do_action(state, a )
        if(new_state is not frontier):
            frontier.append(new_state)
            cost_frontier.append(cost+1)
        elif ( new_state is frontier and cost_frontier[frontier.index(new_state)] > cost + 1 ):
            old_cost =  cost_frontier[frontier.index(new_state)] 
            cost_frontier[frontier.index(new_state)] = cost + 1
    if len(frontier)!=0:
        state = frontier.pop()
        cost = cost_frontier.pop()

print( state , cost        )

In [None]:
frontier = []
cost_frontier = []

cost=0


In [None]:
while state is not None and not goal_function(state, goal): 
    for a in available_actions(state):
        print(a)
        new_state = do_action(state,a) 
        print(new_state)
        if new_state not in frontier:   
            frontier.append(new_state)
            cost_frontier.append(cost+1)
        elif new_state in frontier and  cost_frontier[frontier.index(new_state)] > cost_frontier[frontier.index(new_state)]+1 :
            cost_frontier[frontier.index(new_state)]  =cost_frontier[frontier.index(new_state)]+1
    if frontier:
        state = frontier.pop()
        cost = cost_frontier
    else:
        state = None
    #logging.info(f"Found a solution in {len(path):,} steps; visited {len(state_cost):,} states")


