Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [None]:
from collections import namedtuple
from random import choice,seed
from tqdm.auto import tqdm
import numpy as np
from icecream import ic
import heapq
import math


SEED = 43242
#SEED = 4
seed(SEED)

In [7]:
PUZZLE_DIM = 4
FIRSTCUT=1
action = namedtuple('Action', ['pos1', 'pos2'])

objective=[]
Y=PUZZLE_DIM
X=PUZZLE_DIM
for i in range(Y):
    locobjective=[]
    for k in range(X):
        locobjective.append(k+1+i*X)
    
    objective.append(locobjective)
objective[-1][-1]=0 


objective=np.array(objective, dtype=np.int8)
objective

array([[ 1,  2,  3,  4],
       [ 5,  6,  7,  8],
       [ 9, 10, 11, 12],
       [13, 14, 15,  0]], dtype=int8)

In [8]:



def available_actions(state: np.ndarray,width,hight) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < hight - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < width - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions



def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state

In [9]:
RANDOMIZE_STEPS =100003#100_000

state = np.empty_like(objective)
solfinal=np.empty_like(objective)
k=0
state=objective[:] #np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
    state = do_action(state, choice(available_actions(state,X,Y)))
    k+=1
state

Randomizing:   0%|          | 0/100003 [00:00<?, ?it/s]

array([[ 1,  7, 11,  2],
       [12,  4,  0, 15],
       [14,  3,  6,  9],
       [13, 10,  5,  8]], dtype=int8)

In [10]:

ic(state)





def linear_conflict_distance_fast(curr, goal,y,x,start=0,xstart=0,ystart=0):  #start select the modality , if start =0 it's regular   linear_conflict_distance , otherwise it calculate distances only from first row and column of the objective fuction 
    distance = 0
    linear_conflict = 0
    goodfirstline=[]
    if(start==1):
        for i in goal[0,:]:  #goodfirst line must contain first row and first column elements , so if i have a big puzzle i try first only to complete first row and first column 
            goodfirstline.append(i)
        for j in goal[1:,0]:
             
             goodfirstline.append(j)



  
    for i in range(y):
        for j in range(x):
          #start use is explained above 
            if (start==1 and curr[i][j] in goodfirstline) or (curr[i][j] != 0 and start==0):
               
                goal_i, goal_j = np.where(goal == curr[i][j])   #  Manhattan distance
                distance += abs(goal_i - i) + abs(goal_j - j)
                # Check for linear conflicts in the row
                if goal_i == i:
                    for k in range(j + 1, x):  
                        if curr[i][k] != 0: 
                            goal_k_i, goal_k_j = np.where(goal == curr[i][k])
                            if goal_k_i == i and goal_j > goal_k_j:
                                linear_conflict += 2  # if we have a conflict we must do at least 2 moves 1 to resiolve conflict and one resolve conflict , one to put the second tile in correct position
                # same as above but for col
                if goal_j == j:
                    for k in range(i + 1, y):  # Compare with other tiles in the column
                        if curr[k][j] != 0:
                            goal_k_i, goal_k_j = np.where(goal == curr[k][j])
                            if goal_k_j == j and goal_i > goal_k_i:
                                linear_conflict += 2 

    return (distance[0] + linear_conflict)


def discard_n_worst(open_list, n):  #not used currently, useful to avoid getting your pc unresponsive for too much memory use , it can greatly decrease perfonces and someyimes it never reaches a solution 
    if n <= 0:
        return open_list  
    if len(open_list) <= n:
        return []
    

    all_elements = sorted(open_list, key=lambda x: x[0])  # Sort by `f` value 
    
    remaining_elements = all_elements[:-n]  # Remove the last n elements
    
    # Rebuild the preority queue
    heapq.heapify(remaining_elements)
    return remaining_elements

def check_if_row_or_col_eq(state, objective):  
    completed=(0,0)

    if np.array_equal(state[0], objective[0]): # check if first row it's equal to the solution 
        completed=(completed[0]+1,completed[1])
    if np.array_equal(state[:,0], objective[:,0]): # check if first column it's equal to the solution 
        completed=(completed[0],completed[1]+1)

    
    return completed


def astar(state,globobjective,distancefun):
    xsolfinal=0
    ysolfinal=0
  
    #maxsize=1000000  #maxsize of open_list if we are using discard_n_worst
    open_list = []

    hashable_state = tuple(tuple(arr) for arr in state)  

    heapq.heappush(open_list, (0,hashable_state,0))  #push in the preority que , with heuristic cost , hashable state and , real cost
 
    visited = set()
    target_size=math.factorial(X*Y) #at worst we have X*Y! states (it's a  simple disposition of X*Y elements)

    heuristic_cache = {}
    objective=globobjective[:]
    hight=Y
    w=X
    if(X>=4 and Y>=4):  #if we have a big table try first to complete first row or columnt , once one of those are completed we search for a solutiin in a semplified state with that row or column cutoff
        firstcut=FIRSTCUT
        
    else:
        firstcut=0
    
    heuristic_cache[hashable_state] = distancefun(state, objective,hight,w,firstcut) # we save the heuristic distance of hashable_state from the solution 
    
    analized=0
    analized+=1  # number of actions evaluated





    open_list_dict = {} 
    with tqdm(total=target_size, desc="Filling the set", unit="item") as pbar:
    
        lenopen=len(open_list)

        while  lenopen>0:   #
       
            h_cost,current_state,current_cost  = heapq.heappop(open_list)
            lenopen-=1  
            if current_state in visited: 
                
                continue
            
            listcurrentstate=np.array(current_state, dtype=np.int8)
            
            if(lenopen%100000==0): #print the state evry now and then
               ic(listcurrentstate)
              
            if (hight>3 and w>3 and firstcut==1):  #bif puzzle it's considered if it has height and width of more then 3
               
           
                to_del=check_if_row_or_col_eq(listcurrentstate,objective)
                if to_del!=(0,0):   #to_del different from (0,0) means that or the first row it's equal to the solution or the first column
            
                    if(to_del[0]==1):
                  
                            objective=objective[1:,:] #we cut the first row from the objective  
                            solfinal[ysolfinal, xsolfinal:xsolfinal + listcurrentstate.shape[1]] = listcurrentstate[0, :] #we save that row in the solution because it's already the right one 
                            ysolfinal += 1 
                            
                            listcurrentstate=listcurrentstate[1:,:] #we cut the first row from the state , from this point on we won't change that row anymore
                            hight=hight-1   #our new state has one less row
                    
                    if(to_del[1]==1): #same as above but with columns
                        
                            objective=objective[:,1:]
                            solfinal[ysolfinal:ysolfinal + listcurrentstate.shape[0], xsolfinal] = listcurrentstate[:, 0]
                            xsolfinal += 1
                            
                            listcurrentstate=listcurrentstate[:,1:]
                            w-=1
                    
                    
                    ic("Modified shape!!!")  # if the shape got modified we can delete the support data of the previus shape
                    open_list = []
                    hashable_state = tuple(tuple(arr) for arr in listcurrentstate)
                   
                    visited = set()
                    lenopen=0
                    if (hight>3 and w>3):

                        firstcut=1
                    else:
                        firstcut=0
                    heuristic_cache = {}
                    heuristic_cache[hashable_state] = distancefun(listcurrentstate, objective,hight,w,firstcut,xsolfinal,ysolfinal)
                    analized+=1
                    current_cost=current_cost
                    current_state=hashable_state

                    ic(listcurrentstate) 
            
            if np.array_equal(listcurrentstate, objective) == True:  # we found the final solution 
                ic(analized)

                solfinal[ysolfinal:ysolfinal + listcurrentstate.shape[0],xsolfinal: xsolfinal+ listcurrentstate.shape[1]] = listcurrentstate[:,:]  #recompose the solution adding to solfinal the current state , (previusly if we reshaped in solfinal there were cutoff rows and column)

                return solfinal, current_cost,analized
            
            

            """  if len(open_list) > maxsize//10: #not used currently, useful to avoid getting your pc unresponsive for too much memory use , it can greatly decrease perfonces and someyimes it never reaches a solution 
                open_list = discard_n_worst(open_list, 1)  """

            visited.add(current_state) #add the state to the visited set , that means it's no longer on the frontier 

            actions=available_actions(listcurrentstate,w,hight)  #provides a list of avaible action(it works for a puzzle nxm also)
       
            for ia in range(len(actions)):
               
                newstate=do_action(listcurrentstate,actions[ia])
             
                hashable_new_state = tuple(tuple(arr) for arr in newstate)
                if hashable_new_state not in visited:
                    g = current_cost + 1  # Assume cost per step is 1; adjust if needed
              
                    if hashable_new_state not in heuristic_cache:
                        heuristic_cache[hashable_new_state] = distancefun(newstate, objective,hight,w,firstcut,xsolfinal,ysolfinal) #calc euristic cost and save it 
                        analized+=1
                        pbar.update(1)  #tracks analized states
                    h = heuristic_cache[hashable_new_state]  #euristic cost
                    f = float(g + h)

                    if hashable_new_state in open_list_dict and open_list_dict[hashable_new_state] <= f:
                       
                        continue  # Skip adding this state if there is already a better one 
                    
                    #otherwise update the best f value for this state
                    open_list_dict[hashable_new_state] = f
                    
                    heapq.heappush(open_list, (f, hashable_new_state,g)) # add the new evaluated state to the priority queue
                    
                    lenopen+=1
    
    

        


    
#we pass the name of the distance fuction to use in A* 
state,cost,analized=astar(state,objective,linear_conflict_distance_fast)  # xe uso euclidea non funziona perche non è ammissibile (penso)

ic(cost)  #number of steps to reach the fuction 
ic(state)  #final state 
ic(analized) #evaluated_states
    

ic| state: array([[ 1,  7, 11,  2],
                  [12,  4,  0, 15],
                  [14,  3,  6,  9],
                  [13, 10,  5,  8]], dtype=int8)


Filling the set:   0%|          | 0/20922789888000 [00:00<?, ?item/s]

ic| listcurrentstate: array([[ 1,  7, 11,  2],
                             [12,  4,  0, 15],
                             [14,  3,  6,  9],
                             [13, 10,  5,  8]], dtype=int8)
ic| 'Modified shape!!!'
ic| listcurrentstate: array([[12,  7,  0, 11],
                             [14,  6,  9, 15],
                             [13, 10,  5,  8]], dtype=int8)
ic| analized: 63216
ic| cost: 51
ic| state: array([[ 1,  2,  3,  4],
                  [ 5,  6,  7,  8],
                  [ 9, 10, 11, 12],
                  [13, 14, 15,  0]], dtype=int8)
ic| analized: 63216


63216