In [1]:
import random
import heapq
from dataclasses import dataclass, field
from typing import List, Tuple, Callable, Any, Set
import numpy as np
from time import time
from tabulate import tabulate
import threading
import collections
from math import ceil
from enum import Enum
from functools import partial
from copy import deepcopy

In [2]:

class LightsOutPuzzle:
    def __init__(self, board: List[List[int]]):
        self.board = np.array(board, dtype = np.uint8)
        self.size = len(board)
        self.moves = []
        self.depth = 0
        self.f = 0

    def toggle(self, x, y):
        for dx, dy in [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.size and 0 <= ny < self.size:
                self.board[nx, ny] ^= 1

    def is_solved(self):
        return np.all(self.board == 0)

    def get_moves(self):
        return [(x, y) for x in range(self.size) for y in range(self.size)]

    def __str__(self):
        return '\n'.join(' '.join(str(cell) for cell in row) for row in self.board)


In [3]:
def create_random_board(size: int,  seed: int, num_toggles: int = None):
    random.seed(time() if seed is None else seed)

    board = [[0 for _ in range(size)] for _ in range(size)]
    puzzle = LightsOutPuzzle(board)

    if num_toggles is None:
        num_toggles = random.randint(1, size * size)

    for _ in range(num_toggles):
        x = random.randint(0, size - 1)
        y = random.randint(0, size - 1)
        puzzle.toggle(x, y)

    return puzzle.board

In [4]:
def show_solution(
    puzzle: LightsOutPuzzle,
    solution: List[Tuple[int, int]],
    algorithm_name: str,
    nodes_visited: int,
    show_steps: bool = False,
):
    print(f"\nSolving with {algorithm_name}:")
    if solution is None:
        print("No solution found.")
        return
    print(f"Solution: {solution}")
    print(f"Nodes visited: {nodes_visited}")
    if show_steps:
        print("\nSolution steps:")
        for i, move in enumerate(solution):
            print(f"\nStep {i+1}: Toggle position {move}")
            puzzle.toggle(*move)
            print(puzzle)
        print(
            "\nFinal state (solved):"
            if puzzle.is_solved()
            else "\nFinal state (not solved):"
        )
        print(puzzle)

In [5]:
tests = [
    create_random_board(3, 42),
    create_random_board(3, 41),
    create_random_board(3, 42, 5),
    create_random_board(4, 42),
    create_random_board(4, 41),
    create_random_board(4, 42, 5),
    create_random_board(5, 42),
    create_random_board(5, 41),
    create_random_board(5, 42, 5),
]

In [6]:
# TODO: Must return a list of tuples and the number of visited nodes

def bfs_solve(puzzle: LightsOutPuzzle) -> Tuple[List[Tuple[int, int]], int]:
    explored = []
    frontier = []
    visited_nodes_number = 0
    initial = deepcopy(puzzle)

    if (initial.is_solved()):     # Goal test for initial state
        return [], 1

    indexes = initial.get_moves()
    
    frontier.append(initial)     # Add initial state to queue frontier
    visited_nodes_number += 1

    while (len(frontier) > 0):     # Run algorithm untill frontier is not empty and there is at least a node to expand
        for x, y in indexes:
            state = deepcopy(frontier[0])
            state.toggle(x, y)
            state.moves.append((x, y))

            flag_explored = 1
            for s in explored:   
                if (state.board.tolist() == s.board.tolist()):       # Check if the new state is not in previously explored states 
                    flag_explored = 0
                    break

            flag_frontier = 1
            for s in frontier:   
                if (state.board.tolist() == s.board.tolist()):       # Check if the new state is not in previously frontier states 
                    flag_frontier = 0
                    break

            if (flag_explored and flag_frontier):
                frontier.append(state)     # Add the new state to queue frontier

                if (state.is_solved()):     # Goal test
                    return state.moves, visited_nodes_number     # Return the solution
    
        explored.append(frontier[0])     # Add the explored state to explored
        frontier.pop(0)     # Pop the oldest state that was added to frontier 
        visited_nodes_number += 1     # Increase number of visited nodes after exploring a state

    return [], 0     # Return none if the solution doesnt exist

In [7]:
# TODO: Must return a list of tuples and the number of visited nodes

def ids_solve(puzzle: LightsOutPuzzle) -> Tuple[List[Tuple[int, int]], int]:
    limit = 1
    visited_nodes_number = 1
    initial = deepcopy(puzzle)

    if (initial.is_solved()):     # Goal test for initial state
        return [], 1

    indexes = initial.get_moves()

    while (True):     # Check all limits untill a timeout
        explored = []
        frontier = []
        state = deepcopy(puzzle)

        frontier.append(state)     # Add initial state to stack frontier
    
        while (len(frontier) > 0):     # Run DFS untill frontier is not empty and there is at least a node to expand
            state = frontier.pop()     # Pop the newest state that was added to frontier

            if (state.depth < limit):     # Check if we are in a less depth than limit
                for x, y in indexes:
                    temp_state = deepcopy(state)
                    temp_state.toggle(x, y)
                    temp_state.moves.append((x, y))
                    temp_state.depth += 1     # Increase the depth of this new state

                    flag_explored = 1
                    for s in explored:     
                        if (temp_state.board.tolist() == s.board.tolist()):     # Check if the new state is not in previously explored states 
                            flag_explored = 0
                            break

                    flag_frontier = 1
                    for s in frontier:   
                        if (temp_state.board.tolist() == s.board.tolist()):       # Check if the new state is not in previously frontier states 
                            flag_frontier = 0
                            break

                    if (flag_explored and flag_frontier):
                        frontier.append(temp_state)     # Add the new state to stack frontier

                        if (temp_state.is_solved()):     # Goal test
                            return temp_state.moves, visited_nodes_number     # Return the solution
            
                explored.append(state)     # Add the explored state to explored
                visited_nodes_number += 1     # Increase number of visited nodes after exploring a state

        limit += 1     # Increase the limit

    return [], 0     # Return none if the solution doesnt exist

In [8]:
# TODO: Must return a list of tuples and the number of visited nodes

def astar_solve(puzzle: LightsOutPuzzle, heuristic: Callable[[LightsOutPuzzle], int]) -> Tuple[List[Tuple[int, int]], int]:
    explored = []
    frontier = []
    visited_nodes_number = 0
    state_name = 0     # Give a number to each new state for compare them when they are equal in f(n)
    initial = deepcopy(puzzle)

    if (initial.is_solved()):     # Goal test for initial state
        return [], 1

    indexes = initial.get_moves()
    
    heapq.heappush(frontier, (heuristic(initial), state_name, initial))     # Add initial state to min heap frontier

    while (len(frontier) > 0):     # Run algorithm untill frontier is not empty and there is at least a node to expand
        h, name, state = heapq.heappop(frontier)     # Find and pop the state with minimum f(n)

        explored.append(state)     # Add the explored state to explored
        visited_nodes_number += 1     # Increase number of visited nodes after exploring a state
        
    
        if (state.is_solved()):     # Goal test
            return state.moves, visited_nodes_number     # Return the solution
        
        for x, y in indexes:
            temp_state = deepcopy(state)
            temp_state.toggle(x, y)
            temp_state.moves.append((x, y))
            temp_state.f = len(temp_state.moves) + heuristic(temp_state)     # Calculate f(n) for the new state

            flag_explored = 1
            for s in explored:
                if (temp_state.board.tolist() == s.board.tolist()):     # Check if the new state is not in previously explored states 
                    flag_explored = 0
                    break

            if (flag_explored):  
                state_name += 1
            
                heapq.heappush(frontier, (temp_state.f, state_name, temp_state))     # Add the new state to min heشp frontier

    return [], 0     # Return none if the solution doesnt exist

In [9]:
# TODO: Implement heuristic functions and store them in the list

def heuristic1(puzzle: LightsOutPuzzle):     # Number of 1s in each state
  h = 0
  board = puzzle.board

  for line in board:
    for light in line:
      h += light
      
  return h

def heuristic2(puzzle: LightsOutPuzzle):     # Number of 0s in each state
  h = 0
  board = puzzle.board

  for line in board:
    for light in line:
      if (light == 0):
        h += light
      
  return h

def heuristic3(puzzle: LightsOutPuzzle):     # Number of 1s divide by 5 in each state
  h = 0
  board = puzzle.board

  for line in board:
    for light in line:
      h += light
      
  return h / 5

heuristics = [heuristic1, heuristic2, heuristic3]

In [10]:
# TODO: Implement weighted heuristic functions and store them in the list

def weighted_heuristic1(puzzle: LightsOutPuzzle, alpha = 5):
  return heuristic1(puzzle) * alpha

def weighted_heuristic2(puzzle: LightsOutPuzzle, alpha = 5):
  return heuristic2(puzzle) * alpha

def weighted_heuristic3(puzzle: LightsOutPuzzle, alpha = 5):
  return heuristic3(puzzle) * alpha

weighted_heuristics = [weighted_heuristic1, weighted_heuristic2, weighted_heuristic3]     

# TODO: assign 2 weights you wanna run

weights = [2, 5]

In [11]:
class SearchStatus(Enum):
    SOLVED = "Solved"
    TIMEOUT = "Timeout"
    NO_SOLUTION = "No Solution"

@dataclass
class SearchResult:
    result: SearchStatus
    solution: Any = None
    nodes_visited: int = None
    time: float = None

    def __str__(self):
        output = f"Result: {self.result.value}\n"
        if self.result == SearchStatus.SOLVED:
            output += f"Solution: {self.solution}\n"
        if self.result != SearchStatus.TIMEOUT:
            output += f"Nodes Visited: {self.nodes_visited}\n"
            output += f"Time Taken: {self.time:.3f} seconds\n"

        return output


DEFAULT_SEARCH_RESULT = SearchResult(SearchStatus.TIMEOUT, None, None, float("inf"))

In [12]:
def run_searches(func: Callable, args: Any, time_limit: float = 60.0, name: str = "") -> SearchResult:
    result = []

    def target():
        try:
            solution, nodes_visited = func(*args)
            result.append((solution, nodes_visited))
        except Exception as e:
            result.append(e)

    thread = threading.Thread(target=target)
    thread.start()

    start_time = time()
    thread.join(timeout=time_limit)

    if thread.is_alive() or not result:
        print(f"\nTime limit of {time_limit} seconds exceeded for {name}")
        return SearchResult(SearchStatus.TIMEOUT)

    if isinstance(result[0], Exception):
        raise result[0]

    solution, nodes_visited = result[0]
    show_solution(args[0], solution, name, nodes_visited)
    time_taken = time() - start_time
    return SearchResult(SearchStatus.SOLVED if solution else SearchStatus.NO_SOLUTION, solution, nodes_visited, time_taken)

In [13]:
def run_test(test: List, heuristics: List[Callable[[Any], int]], time_limit: float = 60.0):

    print(f"Running test:\n {test}")

    puzzle = LightsOutPuzzle(test)

    bfs_result = run_searches(bfs_solve, (deepcopy(puzzle),), name="BFS", time_limit=time_limit)
    ids_result = run_searches(ids_solve, (deepcopy(puzzle),), name="IDS", time_limit=time_limit)

    astar_results = []
    for heuristic in heuristics:
        astar_results.append(
            run_searches(
                astar_solve,
                (deepcopy(puzzle), heuristic),
                name=f"A*({heuristic.__name__})",
                time_limit=time_limit,
            )
        )

    weighted_astar_results = []
    for heuristic in weighted_heuristics:
        for weight in weights:
          weighted_astar_results.append(
              run_searches(
                  astar_solve,
                  (deepcopy(puzzle), partial(heuristic, alpha = weight)),
                  name=f"weighted A*({heuristic.__name__}, alpha = {weight})",
                  time_limit=time_limit,
              )
          )

    print("\n-------------------------------\n")

    return (
        test,
        bfs_result,
        ids_result,
        *astar_results,
        *weighted_astar_results
    )

In [14]:
results: List[Tuple[str, SearchResult, SearchResult, SearchResult, SearchResult]] = []

for test in tests:
    results.append(run_test(test, heuristics, 120))

Running test:
 [[1 1 1]
 [1 1 1]
 [1 0 0]]

Solving with BFS:
Solution: [(0, 2), (1, 0)]
Nodes visited: 4

Solving with IDS:
Solution: [(1, 0), (0, 2)]
Nodes visited: 8

Solving with A*(heuristic1):
Solution: [(1, 0), (0, 2)]
Nodes visited: 4

Solving with A*(heuristic2):
Solution: [(0, 2), (1, 0)]
Nodes visited: 29

Solving with A*(heuristic3):
Solution: [(1, 0), (0, 2)]
Nodes visited: 8

Solving with weighted A*(weighted_heuristic1, alpha = 2):
Solution: [(1, 0), (0, 2)]
Nodes visited: 6

Solving with weighted A*(weighted_heuristic1, alpha = 5):
Solution: [(1, 0), (0, 2)]
Nodes visited: 6

Solving with weighted A*(weighted_heuristic2, alpha = 2):
Solution: [(0, 2), (1, 0)]
Nodes visited: 29

Solving with weighted A*(weighted_heuristic2, alpha = 5):
Solution: [(0, 2), (1, 0)]
Nodes visited: 29

Solving with weighted A*(weighted_heuristic3, alpha = 2):
Solution: [(1, 0), (0, 2)]
Nodes visited: 4

Solving with weighted A*(weighted_heuristic3, alpha = 5):
Solution: [(1, 0), (0, 2)]
Nodes

In [15]:
table_data: List[List[str]] = []

for result in results:
    table_data.append(
        [
            "\n".join(" ".join(str(cell) for cell in row) for row in result[0]),
            str(result[1]),
            str(result[2]),
        ]
    )

    for i in range(len(heuristics)):
        table_data[-1].append(str(result[3 + i]))

    for i in range(len(weighted_heuristics)):
        for j in range(len(weights)):
          table_data[-1].append(str(result[3 + len(heuristics) + i * 2 + j]))

    table_data.append("\n\n\n")

if table_data[-1] == "\n\n\n":
    table_data.pop()

print(len(table_data))
print("Results:")
print(
    tabulate(
        table_data,
        headers=[
            "Test",
            "BFS",
            "IDS",
            *[f"A* ({heuristic.__name__})" for heuristic in heuristics],
            *[f"Weighted A* ({heuristic.__name__}, alpha = {weight})" for heuristic in weighted_heuristics for weight in weights]
        ],
        floatfmt=".3f",
        numalign="center",
        stralign="center",
        tablefmt="grid",
    )
)

17
Results:
+-----------+----------------------------------------------------+----------------------------------------------------+--------------------------------------------------------------------+----------------------------------------------------+----------------------------------------------------+------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+--------------------------------------------------------------------+
|   Test    |                        BFS                         |                        IDS                         |                          A* (heuristic1)                           |                  A* (heuristic2)                   |      