In [1]:
import copy 
import pygame
import threading
import time

pygame 2.5.2 (SDL 2.28.2, Python 3.10.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


# Visualize the game state

In [2]:


# Define puzzle board dimensions and colors
WINDOW_SIZE = 300
GRID_SIZE = 3
GRID_WIDTH = WINDOW_SIZE // GRID_SIZE
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)

current_puzzle = None

def draw_grid(screen):
    for i in range(1, GRID_SIZE):
        pygame.draw.line(screen, BLACK, (i * GRID_WIDTH, 0), (i * GRID_WIDTH, WINDOW_SIZE))
        pygame.draw.line(screen, BLACK, (0, i * GRID_WIDTH), (WINDOW_SIZE, i * GRID_WIDTH))

def draw_puzzle(screen, puzzle):
    font = pygame.font.Font(None, 36)
    for row in range(GRID_SIZE):
        for col in range(GRID_SIZE):
            cell_value = puzzle[row][col]
            if cell_value != 0:
                cell_text = font.render(str(cell_value), True, BLACK)
                cell_rect = cell_text.get_rect(center=(col * GRID_WIDTH + GRID_WIDTH // 2, row * GRID_WIDTH + GRID_WIDTH // 2))
                screen.blit(cell_text, cell_rect)

def visualize_puzzle(puzzle):
    # if a node is sent instead of a state get hold of the state instead
    if isinstance(puzzle, Node):
        puzzle = puzzle.get_state()
        
    global current_puzzle
    current_puzzle = puzzle

def visualize_path(path):
    for puzzle in path:
        visualize_puzzle(puzzle)
        time.sleep(1)

def puzzle_thread():
    pygame.init()
    screen = pygame.display.set_mode((WINDOW_SIZE, WINDOW_SIZE))
    pygame.display.set_caption('8-Puzzle Visualization')

    while True:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                return

        if current_puzzle is not None:
            screen.fill(WHITE)
            draw_grid(screen)
            draw_puzzle(screen, current_puzzle)
            pygame.display.flip()

def start_puzzle_thread():
    thread = threading.Thread(target=puzzle_thread)
    thread.daemon = True
    thread.start()

def stop_puzzle_thread():
    pygame.quit()

start_puzzle_thread()

# Check if a puzzle is solvable

In [44]:
def is_solvable(puzzle):
    inversions = 0
    # rows
    for i in range(len(puzzle)):
        # columns
        for j in range(len(puzzle)):
            # compare with all elements after the current element
            for k in range(i, len(puzzle)):
                start = 0
                # if the current row is the same as the current element, start from the next column
                if k == i:
                    start = j + 1
                # compare with all elements in the row
                for l in range(start, len(puzzle)):
                    if puzzle[i][j] != 0 and puzzle[k][l] != 0 and puzzle[i][j] > puzzle[k][l]:
                        inversions += 1
    return inversions % 2 == 0

# Graph implementation

In [60]:
def myPrint(string):
    pass

In [61]:
class Node:
    
    GRID_SIZE = 3
    
    def __init__(self, state):
        self.__state = copy.deepcopy(state)
        self.__parent = None
        self.__path = []
        self.__children = []
        self.__generate_children()            
    
    # generates all possible moves
    def __generate_children(self):
        self.__children = []
        for i in range(self.GRID_SIZE):
            for j in range(self.GRID_SIZE):
                if self.__state[i][j] == 0:
                    if i > 0:
                        self.__children.append(self.swap(i, j, i - 1, j))
                    if i < self.GRID_SIZE - 1:
                        self.__children.append(self.swap(i, j, i + 1, j))
                    if j > 0:
                        self.__children.append(self.swap(i, j, i, j - 1))
                    if j < self.GRID_SIZE - 1:
                        self.__children.append(self.swap(i, j, i, j + 1))
                    return self.__children
        
    def __str__(self):
        return str(self.__state)    
        
    def __repr__(self):
        return str(self.__state)
    
    # returns a state one move from the current state
    def swap(self, x1, y1, x2, y2):
        new_state = copy.deepcopy(self.__state)
        new_state[x1][y1], new_state[x2][y2] = new_state[x2][y2], new_state[x1][y1] 
        return new_state
    
    def set_parent(self, parent):
        self.__parent = parent
        self.__path = parent.get_path()
        self.__path.append(parent)
    
    def get_path(self):
        return copy.copy(self.__path)
    
    def get_state(self):
        return copy.deepcopy(self.__state)
    
    def get_children(self):
        return copy.deepcopy(self.__children)
    
    def get_parent(self):
        return self.__parent

In [4]:
puzzle1 = [
    [1, 2, 3],
    [8, 0, 4],
    [7, 6, 5]
]

In [5]:
node = Node(puzzle1)
visualize_puzzle(node)

# BFS

In [93]:
def BFS(node, target, max_nodes=10000):
    
    if not is_solvable(node.get_state()):
        return None
    
    queue = [node]
    visited = []
    while queue:
        current_node = queue.pop(0)
        current_state = current_node.get_state()
        visited.append(current_state)
        
        if current_state == target:
            final_path = current_node.get_path()
            final_path.append(current_node)
            return final_path, len(visited)
        
        if len(visited) % 1000 == 0:
            myPrint(f'visited nodes: {len(visited)}, queue size: {len(queue)}')
            if len(visited) >= max_nodes:
                return None
        
        for child in current_node.get_children():
            if child not in visited:
                child = Node(child)
                child.set_parent(current_node)
                queue.append(child)
    return None

In [0]:
target = [
    [0, 1, 2],
    [3, 4, 5],
    [6, 7, 8]
]

puzzle1 = [
    [1, 2, 5],
    [3, 4, 8],
    [6, 7, 0]
]
node = Node(puzzle1)
# visualize_puzzle(node)

In [42]:
visualize_puzzle(node)
t1 = time.time()
path, nodes_visited = BFS(node, target)
t2 = time.time()
time_taken = t2 - t1
f'visited nodes: {len(nodes_visited)}, path length: {len(path)}, time taken: {time_taken}'

visited nodes: 1000, queue size: 663
visited nodes: 2000, queue size: 1160
visited nodes: 3000, queue size: 2013
visited nodes: 4000, queue size: 2529
visited nodes: 5000, queue size: 2954
visited nodes: 6000, queue size: 3555
visited nodes: 7000, queue size: 4388
visited nodes: 8000, queue size: 5221
visited nodes: 9000, queue size: 5942
visited nodes: 10000, queue size: 6287
visited nodes: 11000, queue size: 6607
visited nodes: 12000, queue size: 6921
visited nodes: 13000, queue size: 7227
visited nodes: 14000, queue size: 7565
visited nodes: 15000, queue size: 8120
visited nodes: 16000, queue size: 8915
visited nodes: 17000, queue size: 9711
visited nodes: 18000, queue size: 10496
visited nodes: 19000, queue size: 11284
visited nodes: 20000, queue size: 12080
visited nodes: 21000, queue size: 12882
visited nodes: 22000, queue size: 13667
visited nodes: 23000, queue size: 14082
visited nodes: 24000, queue size: 14302
visited nodes: 25000, queue size: 14531
visited nodes: 26000, queue

KeyboardInterrupt: 

In [8]:
visualize_path(path)

# DFS

In [94]:
def DFS(node, target, max_nodes=10000):
    if not is_solvable(node.get_state()):
        return None
    
    stack = [node]
    visited = []
    while stack:
        current_node = stack.pop()
        current_state = current_node.get_state()
        visited.append(current_state)
        
        if current_state == target:
            final_path = current_node.get_path()
            final_path.append(current_node)
            return final_path, len(visited)
        
        if len(visited) % 1000 == 0:
            myPrint(f'visited nodes: {len(visited)}, stack size: {len(stack)}')
            if len(visited) >= max_nodes:
                return None
            
        for child in current_node.get_children():
            if child not in visited:
                child = Node(child)
                child.set_parent(current_node)
                stack.append(child)
    return None

In [0]:
target = [
    [0, 1, 2],
    [3, 4, 5],
    [6, 7, 8]
]

puzzle1 = [
    [1, 2, 5],
    [6, 3, 4],
    [7, 8, 0]
]
node = Node(puzzle1)
# visualize_puzzle(node)

In [38]:
t1 = time.time()
path, nodes_visited = DFS(node, target)
t2 = time.time()
time_taken = t2 - t1

f'visited nodes: {nodes_visited}, path length: {len(path)}, time taken: {time_taken}'

visited nodes: 1000, stack size: 746
visited nodes: 2000, stack size: 1485
visited nodes: 3000, stack size: 2218
visited nodes: 4000, stack size: 2935
visited nodes: 5000, stack size: 3670
visited nodes: 6000, stack size: 4421
visited nodes: 7000, stack size: 5161
visited nodes: 8000, stack size: 5886
visited nodes: 9000, stack size: 6606


'visited nodes: 9598, path length: 9397, time taken: 15.876030683517456'

In [35]:
visualize_path(path)

KeyboardInterrupt: 

# A*

In [12]:
import heapq

In [15]:
def manhatten_estimate_cost(state):
    cost = 0
    for i in range(len(state)):
        for j in range(len(state)):
            value = state[i][j]
            if value != 0:
                target_row, target_col = value // 3, value % 3
                cost += abs(i - target_row) + abs(j - target_col)
    return cost

In [16]:
def eaclidean_estimate_cost(state):
    cost = 0
    for i in range(len(state)):
        for j in range(len(state)):
            value = state[i][j]
            if value != 0:
                target_row, target_col = value // 3, value % 3
                cost += ((i - target_row)**2 + (j - target_col)**2)**0.5
    return cost

In [113]:
def a_star(node, target, estimate_function=manhatten_estimate_cost, max_nodes=100000):
    if not is_solvable(node.get_state()):
        return None
    
    # Q values are tuples (weight, insert_order, node). insert order is used to break ties
    queue = [(0, 0, node)]
    
    # Q inserts counter
    i = 1
    
    visited = []
    while queue:
        
        # pop the node with the lowest weight
        current_node = heapq.heappop(queue)[-1]
        current_state = current_node.get_state()
        visited.append(current_state)
        
        # check if goal state is reached
        if current_state == target:
            final_path = current_node.get_path()
            final_path.append(current_node)
            return final_path, len(visited)
        
        if len(visited) % 1000 == 0:
            myPrint(f'visited nodes: {len(visited)}, queue size: {len(queue)}')
            if len(visited) >= max_nodes:
                print("max nodes reached")
                return None
        
        for child in current_node.get_children():
            if child not in visited:
                child = Node(child)
                child.set_parent(current_node)
                
                # calculate weight by adding the manhatten distance to goal and the path length
                h_n = len(child.get_path())
                g_n = manhatten_estimate_cost(child.get_state())
                weight = h_n + g_n
                
                # push to priority queue
                heapq.heappush(queue, (weight, i, child))
                i+=1
    return None

In [49]:
target = [
    [0, 1, 2],
    [3, 4, 5],
    [6, 7, 8]
]

puzzle1 = [
    [0, 1, 2],
    [5, 3, 4],
    [8, 6, 7]
]
node = Node(puzzle1)
visualize_puzzle(node)

In [109]:
t = time.time()
path, visited_nodes = a_star(node1, target, estimate_function=manhatten_estimate_cost)
t2 = time.time()
time_taken = t2 - t
# visualize_path(path)
f'visited nodes: {visited_nodes}, path length: {len(path)}, time taken: {time_taken}' 

'visited nodes: 1640, path length: 23, time taken: 0.8945496082305908'

In [55]:
t = time.time()
path, visited_nodes = a_star(node, target, estimate_function=eaclidean_estimate_cost)
t2 = time.time()
time_taken = t2 - t

# visualize_path(path)
f'visited nodes: {visited_nodes}, path length: {len(path)}, time taken: {time_taken}' 

visited nodes: 1000, queue size: 616


'visited nodes: 1640, path length: 23, time taken: 0.6085848808288574'

# Benchmarking

In [128]:
def manhatten_A_star(path, target):
    return a_star(path, target, estimate_function=manhatten_estimate_cost)
    
def eaclidean_A_star(path, target):
    return a_star(path, target, estimate_function=eaclidean_estimate_cost)
    

In [127]:
def time_algorithms(node, target):
    algorithms = [BFS, DFS, manhatten_A_star, eaclidean_A_star]
    times = [[] for _ in range(len(algorithms))]
    for i in range(len(algorithms)):
        for _ in range(10):
            t = time.time()
            ret = algorithms[i](node, target)
            t2 = time.time()
            time_taken = t2 - t
            if time_taken > 60 or ret == None:
                times[i] = [-1]
                break
            times[i].append(time_taken)
            
    avg_times = [sum(time) / len(time) for time in times]
    return avg_times

In [71]:
puzzle1 = [
    [1, 2, 5],
    [3, 4, 8],
    [0, 6, 7]
]
node = Node(puzzle1)
visualize_puzzle(node)

In [124]:
t = time_algorithms(node, target)
t

([0.011474609375,
  0.41716599464416504,
  0.006886959075927734,
  0.0015969276428222656],
 [[0.011474609375],
  [0.41716599464416504],
  [0.006886959075927734],
  [0.0015969276428222656]])

In [125]:
puzzle1 = [
    [0, 1, 2],
    [5, 3, 4],
    [8, 6, 7]
]
node1 = Node(puzzle1)
visualize_puzzle(node)

In [129]:
t = time_algorithms(node1, target)
t

[-1.0, 16.80165731906891, 0.7685029983520508, 0.7607854843139649]