In [1]:
# ALL NECESSARY IMPORTS
import os
import re
import time
import heapq

In [2]:
# FILES AND TIME LIMIT
TIME_LIMIT = 60

PUZZLES_FILE = 'puzzles.txt'

OUTPUT_DIR = 'results'

In [3]:
# Read puzzles from PUZZLES_FILE
puzzles = []
with open(PUZZLES_FILE, 'r') as f:
    for line in f:
        line = line.strip()
        if line:
            nums = [int(s) for s in re.findall(r'\d+', line)]
            size = int(len(nums)**(1/2))
            puzzle = tuple( tuple(nums[i:(i+size)]) for i in range(0, size*size, size) )
            puzzles.append(puzzle)

In [4]:
# CREATE A S-PUZZLE CLASS
class S_Puzzle:   
    def __init__(self, initBoardState):
        self.state = initBoardState
        
    def getSize(self):
        return len(self.state)
    
    def printBoard(self):
        for row in self.state:
            print(row)
        
        print()
    
    def isGoal(self):
        _GOAL_STATE = ((1,2,3),(4,5,6),(7,8,9));
        return self.state == _GOAL_STATE
    
    def isEqual(self, otherPuzzle):
        return self.state == otherPuzzle.state;
        
    def generateChildren(self):
        children = []
        
        #Horizontal swaps
        for row in range(0,self.getSize()):
            row_tuple = self.state[row]
            for col in range(0,self.getSize()-1):
                new_row_tuple = row_tuple[:col] + (row_tuple[col+1],) + (row_tuple[col],) + row_tuple[col+2:]
                
                child = self.state[:row] + (new_row_tuple,) + self.state[row+1:]
                children.append(S_Puzzle(child))
                
        #Vertical swaps
        for row in range(0,self.getSize()-1):
            row1 = self.state[row]
            row2 = self.state[row+1]
            for col in range(0,self.getSize()):
                new_row1 = row1[:col] + (row2[col],) + row1[col+1:]
                new_row2 = row2[:col] + (row1[col],) + row2[col+1:]
                child = self.state[:row] + (new_row1,) + (new_row2,) + self.state[row+2:]
                children.append(S_Puzzle(child))
                
        return children

In [5]:
#CREATE A NODE CLASS
class Node:
    def __init__(self, puzzle, parent, cost):
        self.puzzle = puzzle
        self.parent = parent
        self.cost = cost
        
    def traceback(self):
        trace = []
        current = self
        while current.parent is not None:
            trace.append(current)
            current = current.parent
            
        trace.append(current)
        return trace
    
    # Less than comparator for the heap queue
    def __lt__(self, other):
        return self.cost < other.cost

In [6]:
# DFS ALGORITHM
def dfs(root):
    init_time = time.time()
    
    open_list = [root]
    closed_list = []
    
    visited_states = set()
    visited_states.add(root.puzzle.state)
    
    final_node = None
    
    while len(open_list) > 0:
        if (time.time()-init_time) > TIME_LIMIT:
            break
            
        current_node = open_list.pop()
        closed_list.append(current_node)
        
        if current_node.puzzle.isGoal():
            final_node = current_node
            break
            
        children = current_node.puzzle.generateChildren()
        current_depth = current_node.cost+1
        for child in children:
            if child.state not in visited_states:
                visited_states.add(child.state)
                open_list.append(Node(S_Puzzle(child.state),current_node, current_depth))
    
    timer = time.time()-init_time
    
    solution_path = []
    found_solution = True if final_node is not None else False
    
    if found_solution:
        solution_path = final_node.traceback()
        solution_path.reverse()
          
    return found_solution, solution_path, closed_list, timer     
              

In [7]:
# ITERATIVE DEEPENING ALGORITHMS
def id_dfs(root, limit):    
    dfs_init_time = time.time()
    open_list = [root]
    closed_list = []
    
    visited_states = set()
    visited_states.add(root.puzzle.state)
    
    final_node = None
                       
    max_depth = current_depth = 0
    
    while len(open_list) > 0:
        if (time.time()-dfs_init_time) > TIME_LIMIT:
            break
                 
        current_node = open_list.pop()
        closed_list.append(current_node)
        current_depth = current_node.cost;
        next_depth = current_depth+1
        max_depth = max(max_depth,current_depth)
        
        if current_node.puzzle.isGoal():
            final_node = current_node
            break
        
        if next_depth <= limit:
            children = current_node.puzzle.generateChildren()
            for child in children:
                if child.state not in visited_states:
                    visited_states.add(child.state)
                    open_list.append(Node(S_Puzzle(child.state),current_node, next_depth))
    
    solution_path = []
    found_solution = True if final_node is not None else False
    
    if found_solution:
        solution_path = final_node.traceback()
        solution_path.reverse()
          
    return found_solution, solution_path, closed_list, max_depth

def iterative_deepening(root):
    init_time = time.time()
    
    global_search = []
    id_sol_path = []
    
    found_solution = False
    
    max_depth = limit = 0
    
    while True:
        if (time.time()-init_time) > TIME_LIMIT:
            break
            
        found_solution, solution_path, closed_list, max_depth = id_dfs(root, limit)
        
        global_search.append("Limit {}:".format(limit))
        global_search += closed_list
        
        if found_solution:
            id_sol_path = solution_path
            break
            
        if max_depth < limit:
            break
        else:
            limit +=1
    
    timer = time.time()-init_time
            
    return found_solution, id_sol_path, global_search, timer
    

In [8]:
#A* ALGORITHM
def A_star(root, heuristic):
    init_time = time.time()
    
    open_list = []
    heapq.heappush(open_list, root)
    
    closed_list = []
    
    visited_states = set()
    
    final_node = None
    
    while len(open_list) > 0:
        if (time.time()-init_time) > TIME_LIMIT:
            break
             
        current_node = heapq.heappop(open_list)
        
        if current_node.puzzle.state in visited_states:
            continue
        
        if current_node.puzzle.isGoal():
            final_node = current_node
            break
            
        closed_list.append(current_node)
        visited_states.add(current_node.puzzle.state)
        
        
        children = current_node.puzzle.generateChildren();
        new_cost = current_node.cost + 1
        
        for child in children:
            if child not in visited_states:
                heuristic_cost = heuristic(child.state);
                heapq.heappush(open_list, (Node(S_Puzzle(child.state),current_node, new_cost + heuristic_cost)))
            
    
    timer = time.time()-init_time
    
    solution_path = []
    found_solution = True if final_node is not None else False
    
    if found_solution:
        solution_path = final_node.traceback()
        solution_path.reverse()
          
    return found_solution, solution_path, closed_list, timer 

In [9]:
#HEURISTIC FUNCTIONS

def h1(state):
    """
    (NOT ADMISSIBLE)
    Returns a weighted average between Manhattan distance and number of tiles out of place.
    """
    
    # Admissibility counter-example:
    # (2,1,3)
    # (4,5,6)
    # (7,8,9)
    # Then h(state) = 2, when real_cost = 1
    # Hence, non-admissible
    
    manhattan_sum = 0
    out_of_place = 0
    current_count = 1
    
    size = len(state)
    for row in range(size):
        for col in range(size):
            num = state[row][col]
            
            if num != current_count:
                out_of_place += 1
            
            real_row = (num-1) // size
            real_col = (num-1) % size
            manhattan_distance = abs(row-real_row) + abs(col-real_col)
            manhattan_sum += manhattan_distance
            
            current_count += 1
            
    return (7/10)*out_of_place + (3/10)*manhattan_sum

def h2(state):
    """
    (NOT ADMISSIBLE)
    Returns the Manhattan distance of passed state (sum of all the distances by which tiles are out of place).
    """
    manhattan_sum = 0
    size = len(state)
    for row in range(size):
        for col in range(size):
            num = state[row][col]
            real_row = (num-1) // size
            real_col = (num-1) % size
            manhattan_distance = abs(row-real_row) + abs(col-real_col)
            manhattan_sum += manhattan_distance
    return manhattan_sum

In [10]:
# Function to save results of solvers
def save_results(success, solution_path, search_path, execution_time, puzzle_index, solver_name):
    """
    Saves the results of a Solver in a human-readable way.
    """
    
    if not os.path.isdir(OUTPUT_DIR):
        os.mkdir(OUTPUT_DIR)
    
    with open(os.path.join(OUTPUT_DIR, 'puzzle_{}_{}_solution.txt'.format(puzzle_index, solver_name)), 'w') as f:
        if success:
            f.write('Execution time: {} s\n'.format(execution_time))
            f.write('Solution path (Cost: {}):\n'.format(len(solution_path) - 1))
            for node in solution_path:
                for row in node.puzzle.state:
                    f.write(str(row))
                    f.write('\n')
                f.write('\n')
        else:
            f.write('no solution')
    
    with open(os.path.join(OUTPUT_DIR, 'puzzle_{}_{}_search.txt'.format(puzzle_index, solver_name)), 'w') as f:
        if success:
            f.write('Execution time: {} s\n'.format(execution_time))
            f.write('Search path:\n')
            for item in search_path:
                if isinstance(item, str):
                    # If element is simply a string,
                    # Write it directly
                    f.write(item)
                    f.write('\n')
                elif isinstance(item, Node):
                    # If element is a Node,
                    # Write it in human-readable way
                    for row in item.puzzle.state:
                        f.write(str(row))
                        f.write('\n')
                    f.write('\n')
        else:
            f.write('no solution')

In [11]:
puzzle_counter = 0
for puzzle in puzzles:
    root = Node(S_Puzzle(puzzle),None, 0)

    success, sol_path, search, exec_time = dfs(root);
    save_results(success,sol_path, search, exec_time, puzzle_counter, "DFS")
    success, sol_path, search, exec_time = iterative_deepening(root);
    save_results(success,sol_path, search, exec_time, puzzle_counter, "ID")
    success, sol_path, search, exec_time = A_star(root, h1);
    save_results(success,sol_path, search, exec_time, puzzle_counter, "A_star_h1")
    success, sol_path, search, exec_time = A_star(root, h2);
    save_results(success,sol_path, search, exec_time, puzzle_counter, "A_star_h2")
    
    puzzle_counter+=1
