# Darya Ansaripour, 610300022
## CA1 report

In [2]:
import random
import heapq
from dataclasses import dataclass, field
from typing import List, Tuple, Callable, Any, Set
import numpy as np
from time import time
from tabulate import tabulate
import threading
import collections
from math import ceil
from enum import Enum
from functools import partial
from copy import deepcopy

### Question 1) 
Approach 1.

_Initial state:_ Initial puzzle given by test case.<br>
_Goal state:_ A puzzle which has no 1 elements. (All lights are turned off.)<br>
_Action:_ Toggling a light placed in (x,y).<br>
_State:_ I considered two states same if and only if one can be converted to other by rotations or flips. This will lead to fewer number of nodes being expanded. (look at "eq" function for LightsOutPuzzle)

-------------------------------------------------------------------------------------------------------------------------------------------
Approach 2.

Another approach I tested for state defining (but not used):<br>
Define _bitrepr_ of a state as an integer value which indicates the board uniquely. We can treat each entry as a bit of a $n^2$-digit binary number, by calculating the corresponding decimal value each board can be represented uniquely. This way of representation results in easier search and memory efficiency. 

_Initial state:_ Is _bitrepr_ of initial board. <br>
_Goal state:_ A puzzle which has _bitrepr_ equal to zero. <br>
_Action:_ Toggling a light placed in (x,y). By XORing the _bitrepr_ with 1<<$(n^2-1-(nx+y))$. <br>
_State:_ I considered two states same if and only if they have same _bitrepr_ (This unfortunately does not reduce number of states being checked, since It takes time to find rotations' _bitrepr_ values.)


In [3]:
class LightsOutPuzzle:
    def __init__(self, board: List[List[int]]):
        self.board = np.array(board, dtype=np.uint8)
        self.size = len(board)

    def toggle(self, x, y):
        for dx, dy in [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.size and 0 <= ny < self.size:
                self.board[nx, ny] ^= 1


    def is_solved(self):
        return np.all(self.board == 0)

    def get_moves(self):
        return [(x, y) for x in range(self.size) for y in range(self.size)]
    
    def get_transformations(self):
        rotations = [self.board, np.rot90(self.board), np.rot90(self.board,2), np.rot90(self.board,3)]
        syms = [np.flip(self.board), np.flip(np.rot90(self.board)), np.flip(np.rot90(self.board,2)), np.flip(np.rot90(self.board,3))]
        return rotations+syms

    def __str__(self):
        return '\n'.join(' '.join(str(cell) for cell in row) for row in self.board)
    
    def __lt__(self,other):
        return np.sum(self.board)<np.sum(other.board)
    
    def __hash__(self):
        return hash(tuple(map(tuple,self.board)))
    
    def __eq__(self, other):
        if not isinstance(other, LightsOutPuzzle):
            return False      
        # return any(np.array_equal(tr, other.board) for tr in self.get_transformations())
        return (np.array_equal(self.board, other.board))
        



In [4]:
class LightsOutPuzzleBIT:
    def __init__(self, board: List[List[int]]):
        self.board = np.array(board, dtype=np.uint8)
        self.size = len(board)

        self.bitrepr = self.board_to_bit()

    def board_to_bit(self) -> int:
        bit = 0
        for row in self.board:
            for cell in row:
                bit = (bit << 1) | cell
        return int(bit)

    def toggle(self, x, y):
        for dx, dy in [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.size and 0 <= ny < self.size:
                self.bitrepr ^= 1 << (self.size*self.size - 1 - (nx * self.size + ny))

    def toggleboard(self, x, y):
        for dx, dy in [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.size and 0 <= ny < self.size:
                self.board[nx,ny] ^= 1

    def is_solved(self):
        return self.bitrepr == 0

    def get_moves(self):
        return [(x, y) for x in range(self.size) for y in range(self.size)]

    def __str__(self):
        return '\n'.join(' '.join(str(cell) for cell in row) for row in self.board)
    
    def __lt__(self,other):
        return np.sum(self.board)<np.sum(other.board)
    
    def __hash__(self):
        return self.bitrepr
    
    def __eq__(self,other):
        if not isinstance(other, LightsOutPuzzle):
            return False      
        return self.bitrepr == other.bitrepr

In [5]:
def create_random_board(size: int,  seed: int, num_toggles: int = None):
    random.seed(time() if seed is None else seed)

    board = [[0 for _ in range(size)] for _ in range(size)]
    puzzle = LightsOutPuzzle(board)

    if num_toggles is None:
        num_toggles = random.randint(1, size * size)

    for _ in range(num_toggles):
        x = random.randint(0, size - 1)
        y = random.randint(0, size - 1)
        puzzle.toggle(x, y)

    return puzzle.board

In [6]:
def show_solution(
    puzzle: LightsOutPuzzle,
    solution: List[Tuple[int, int]],
    algorithm_name: str,
    nodes_visited: int,
    show_steps: bool = False,
):
    print(f"\nSolving with {algorithm_name}:")
    if solution is None:
        print("No solution found.")
        return
    print(f"Solution: {solution}")
    print(f"Nodes visited: {nodes_visited}")
    if show_steps:
        print("\nSolution steps:")
        for i, move in enumerate(solution):
            print(f"\nStep {i+1}: Toggle position {move}")
            puzzle.toggle(*move)
            print(puzzle)
        print(
            "\nFinal state (solved):"
            if puzzle.is_solved()
            else "\nFinal state (not solved):"
        )
        print(puzzle)

In [7]:
tests = [
    # create_random_board(3, 42),
    # create_random_board(3, 41),
    # create_random_board(3, 42, 5),
    # create_random_board(4, 42),
    # create_random_board(4, 41),
    # create_random_board(4, 42, 5),
    create_random_board(5, 42),
    create_random_board(5, 41),
    # create_random_board(5, 42, 5),
]

### Question 2) BFS
BFS explores all possible board states level by level, starting from the initial board configuration. It toggles lights to generate new states, ensuring that it explores all states with fewer moves before considering those with more moves.

_Pros:_
BFS is complete, meaning it will find a solution if one exists. It is optimal for this type of puzzle, since it guarantees finding the shortest path (minimum number of moves) to solve the puzzle. Here the optimality of BFS is a result of equal costs per moves.

_Cons:_
BFS is memory-intensive, as it needs to store all possible states at each level. For a larger puzzle like a 5x5 grid, the number of possible board states grows exponentially, making BFS impractical for large grids due to high memory and time requirements. I used set to reduce search load in visited nodes.


In [8]:
# TODO: Must return a list of tuples and the number of visited nodes
def bfs_solve(puzzle: LightsOutPuzzle) -> Tuple[List[Tuple[int, int]], int]:

    # initial state, moves, number of moves
    q = collections.deque()
    q.append([puzzle, []])

    visited = set()
    visited.add(hash(puzzle))
    nodes_visited = 1

    while q:
        current_state, moves = q.popleft()

        if(current_state.is_solved()):
            return moves, nodes_visited
        
        for move in puzzle.get_moves():
            x,y = move
            new_puzzle = LightsOutPuzzle(deepcopy(current_state.board))
            new_puzzle.toggle(x, y)

            # This condition may be omitted for sake of optimality.
            if hash(new_puzzle) not in visited:
                nodes_visited+=1
                visited.add(hash(new_puzzle))
                new_moves = deepcopy(moves)
                new_moves.append(move)
                q.append([new_puzzle, new_moves])


    return None, nodes_visited

In [9]:
# puzzle = LightsOutPuzzle([[0,1,1,0],[1,0,0,1],[0,1,1,0],[0,0,0,0]])
# puzzle = LightsOutPuzzle(tests[-1])
# print(bfs_solve(puzzle))

### Question 2) IDS
IDS repeatedly applies DFS with increasing depth limits.

_Pros:_
Unlike BFS, IDS uses much less memory since it only needs to store a single path from the root to the current state. (In terms of memory it is more similar to DFS than BFS.)
IDS guarantees finding a solution if one exists and returns the shortest solution, similar to BFS, as it explores all states with minimal moves first. (In terms of finding optimum solution it is more similar to BFS than DFS.)

_Cons:_
IDS revisits states multiple times at different depths so it can be slower than BFS. This can lead to unnecessary recomputation, so it may be less efficient in terms of time.

_Note:_ It worths to mention that neither BFS nor IDS result in getting into a loop. But I saved the visited nodes for sake of time efficiency, This may result in nonoptimality of BFS, one can do this for IDS too but as before the goal path may not be optimal

In [10]:
# TODO: Must return a list of tuples and the number of visited nodes

def depth_limited_search(puzzle: LightsOutPuzzle, depth: int, path: List[Tuple[int, int]], visited: List[LightsOutPuzzle]) -> Tuple[List[Tuple[int, int]], int] :

    if puzzle.is_solved():
        return path
    
    if depth == 0:
        return None
    
    for move in puzzle.get_moves():
        x,y = move
        new_puzzle = LightsOutPuzzle(deepcopy(puzzle.board))
        new_puzzle.toggle(x,y)

        # One can put a condition here to avoid expanding similar nodes.
        # if hash(new_puzzle) not in visited:
        visited.add(hash(new_puzzle))
        result = depth_limited_search(new_puzzle, depth-1, path+[(x,y)], visited)
        if result is not None:
            return result
    return None
        
        
def ids_solve(puzzle: LightsOutPuzzle) -> Tuple[List[Tuple[int, int]], int]:
    nodes_visited = 0
    depth = 0

    while depth<=(puzzle.size**2):
        visited = set()
        visited.add(hash(puzzle))
        result = depth_limited_search(puzzle, depth, [], visited)
        nodes_visited += len(visited)
        if result is not None:
            return result, nodes_visited
        depth += 1
    return None, nodes_visited



In [11]:
# puzzle = LightsOutPuzzle([[0,1,1,0],[1,0,0,1],[0,1,1,0],[0,0,0,0]])
# puzzle = LightsOutPuzzle(tests[-1])
# print(tests[-1])
# print(ids_solve(puzzle))

### Question 2) A*
In the Lights Out puzzle, A* search is a heuristic-based search algorithm that combines the exploration of paths like BFS but uses a heuristic to guide the search towards the goal more efficiently.

_Pros:_
When we use a good heuristic, A* is highly efficient, because it focuses on exploring the most promising states first, resulting in reducing the number of nodes expanded compared to BFS or IDS.
Also A* guarantees finding the shortest solution if the heuristic is admissible (never overestimates the actual cost).

_Cons:_
Like BFS, A* needs to keep track of all visited states, which can require significant memory.
The effectiveness of A* heavily relies on the quality of the heuristic. A bad heuristic can lead to slower performance.

In [12]:
# TODO: Must return a list of tuples and the number of visited nodes
import sys

def astar_solve(puzzle: LightsOutPuzzle, heuristic: Callable[[LightsOutPuzzle], int]) -> Tuple[List[Tuple[int, int]], int]:

    pq = []
    initial = (heuristic(puzzle), 0, puzzle, []) # (f(n), g(n), puzzle, path)
    heapq.heappush(pq, initial)
    visited = set()
    cnt = 0

    while pq:
        # if cnt % 1000 == 0:
        #     sys.stdout.write(f"\r Nodes so far: {cnt}")
        #     sys.stdout.flush()
    
        curr_f, curr_g, curr_state, curr_path= heapq.heappop(pq)

        if curr_state.is_solved():
            return curr_path, cnt

        visited.add(hash(curr_state))


        for move in curr_state.get_moves():
            x,y = move
            new_puzzle = LightsOutPuzzle(deepcopy(curr_state.board))
            new_puzzle.toggle(x,y)
            if hash(new_puzzle) not in visited:
                cnt += 1
                new_path = curr_path + [(x,y)]
                new_g = curr_g + 1
                new_h = heuristic(new_puzzle)
                new_f = new_h + new_g
                heapq.heappush(pq, (new_f, new_g, new_puzzle, new_path))
                
    
    return None, cnt


### Question 3) Heuristics
Take heuristic1 as number of on lights, and heuristic2 as number of on lights divided by 5.

First heuristic is not admissible, Take [[1,1,0],[1,0,0],[0,0,0]] as a counterexample, heuristic1 gives 3, but we can reach goal state by 1 move (just toggle (0,0)), so it overestimates the cost hence it's neither admissible nor consistent.

Unlike first heuristic, second one is an admissible and consistent heuristic. It is admissible because, by toggling a light we can turn 5 lights off at best, so the total cost is always more than or equal to the number of lights divided by 5.
It is consistent too, assume that reaching from state $s_1$ to $s_2$ takes 1 as cost, then we have to prove that $h(s_1)-h(s_2) \leq 1$, by toggling a light, total number of lights will add by $k \in  \{0,\pm 1,\pm 2,\pm 3,\pm 4,\pm 5\}$, so $h(s_2) = \frac{5h(s_1) + k}{5}$, hence $h(s_1)-h(s_2) = \frac{k}{5}$ which is less than 1 anyway! By induction on $c(s_1,s_2)$ it's easy to prove that $h(s_1)-h(s_2) \leq c(s_1,s_2)$ for all pair of states.

-----------------------------------------------------------------------------------------------------------------

_Other heuristics:_ I had some ideas for other heuristics, for example one can solve the problem for each state by using the linear algebraic solution and use it as a heuristic which is admissible indeed, but it brings an overhead that is not demanded. In general there is a trade off between how much a heuristic is near the actual cost and how much work does it imply. 

In [13]:
# TODO: Implement heuristic functions and store them in the list

def heuristic1(puzzle: LightsOutPuzzle):
  return np.sum(puzzle.board)

def heuristic2(puzzle: LightsOutPuzzle):
  return np.sum(puzzle.board)/5

def heuristic3(puzzle: LightsOutPuzzle):
  sum = 0
  for i in range(puzzle.size):
    for j in range(puzzle.size):
      sum += puzzle.board[i][j]*(i+j)
  return sum
heuristics = [heuristic1]

In [14]:
# TODO: Implement weighted heuristic functions and store them in the list
def weighted_heuristic1(puzzle: LightsOutPuzzle, alpha = 5):
  return heuristic1(puzzle) * alpha

def weighted_heuristic2(puzzle: LightsOutPuzzle, alpha = 5):
  return heuristic1(puzzle) * alpha

weighted_heuristics = [weighted_heuristic1]

# TODO: assign 2 weights you wanna run
weights = [2, 5]

In [15]:
puzzle = LightsOutPuzzle(tests[0])
print(tests[0])
moves, cnt = astar_solve(puzzle,heuristic2)
print("\n",moves,cnt)

[[0 0 0 0 1]
 [1 0 0 1 0]
 [0 0 0 1 1]
 [0 1 0 1 1]
 [1 1 0 1 0]]


KeyboardInterrupt: 

In [19]:
puzzle = LightsOutPuzzle(tests[1])
print(tests[1])
moves, cnt = astar_solve(puzzle,heuristic1)
print("\n",moves,cnt)

[[1 0 1 0 1]
 [1 0 0 1 1]
 [0 1 0 0 0]
 [1 0 0 1 0]
 [1 1 0 0 1]]

 [(4, 0), (0, 1), (1, 1), (3, 4), (2, 4), (1, 3), (0, 4)] 3574


In [34]:
class SearchStatus(Enum):
    SOLVED = "Solved"
    TIMEOUT = "Timeout"
    NO_SOLUTION = "No Solution"

@dataclass
class SearchResult:
    result: SearchStatus
    solution: Any = None
    nodes_visited: int = None
    time: float = None

    def __str__(self):
        output = f"Result: {self.result.value}\n"
        if self.result == SearchStatus.SOLVED:
            output += f"Solution: {self.solution}\n"
        if self.result != SearchStatus.TIMEOUT:
            output += f"Nodes Visited: {self.nodes_visited}\n"
            output += f"Time Taken: {self.time:.3f} seconds\n"

        return output


DEFAULT_SEARCH_RESULT = SearchResult(SearchStatus.TIMEOUT, None, None, float("inf"))

In [35]:
def run_searches(func: Callable, args: Any, time_limit: float = 60.0, name: str = "") -> SearchResult:
    result = []

    def target():
        try:
            solution, nodes_visited = func(*args)
            result.append((solution, nodes_visited))
        except Exception as e:
            result.append(e)

    thread = threading.Thread(target=target)
    thread.start()

    start_time = time()
    thread.join(timeout=time_limit)

    if thread.is_alive() or not result:
        print(f"\nTime limit of {time_limit} seconds exceeded for {name}")
        return SearchResult(SearchStatus.TIMEOUT)

    if isinstance(result[0], Exception):
        raise result[0]

    solution, nodes_visited = result[0]
    show_solution(args[0], solution, name, nodes_visited)
    time_taken = time() - start_time
    return SearchResult(SearchStatus.SOLVED if solution else SearchStatus.NO_SOLUTION, solution, nodes_visited, time_taken)

In [36]:
def run_test(test: List, heuristics: List[Callable[[Any], int]], time_limit: float = 60.0):

    print(f"Running test:\n {test}")

    puzzle = LightsOutPuzzle(test)

    # bfs_result = run_searches(bfs_solve, (deepcopy(puzzle),), name="BFS", time_limit=time_limit)
    # ids_result = run_searches(ids_solve, (deepcopy(puzzle),), name="IDS", time_limit=time_limit)

    astar_results = []
    for heuristic in heuristics:
        astar_results.append(
            run_searches(
                astar_solve,
                (deepcopy(puzzle), heuristic),
                name=f"A*({heuristic.__name__})",
                time_limit=time_limit,
            )
        )

    weighted_astar_results = []
    for heuristic in weighted_heuristics:
        for weight in weights:
          weighted_astar_results.append(
              run_searches(
                  astar_solve,
                  (deepcopy(puzzle), partial(heuristic, alpha = weight)),
                  name=f"weighted A*({heuristic.__name__}, alpha = {weight})",
                  time_limit=time_limit,
              )
          )

    print("\n-------------------------------\n")

    return (
        test,
        # bfs_result,
        # ids_result,
        *astar_results,
        *weighted_astar_results
    )

In [None]:
results: List[Tuple[str, SearchResult, SearchResult, SearchResult, SearchResult]] = []

for test in tests:
    results.append(run_test(test, heuristics, 120))

Running test:
 [[0 0 0 0 1]
 [1 0 0 1 0]
 [0 0 0 1 1]
 [0 1 0 1 1]
 [1 1 0 1 0]]


In [19]:
table_data: List[List[str]] = []

for result in results:
    table_data.append(
        [
            "\n".join(" ".join(str(cell) for cell in row) for row in result[0]),
            str(result[1]),
            str(result[2]),
        ]
    )

    for i in range(len(heuristics)):
        table_data[-1].append(str(result[2 + i]))

    for i in range(len(weighted_heuristics)):
        for j in range(len(weights)):
          table_data[-1].append(str(result[2 + len(heuristics) + i * 2 + j]))

    table_data.append("\n\n\n")

if table_data[-1] == "\n\n\n":
    table_data.pop()

print(len(table_data))
print("Results:")
print(
    tabulate(
        table_data,
        headers=[
            "Test",
            "BFS",
            "IDS",
            *[f"A* ({heuristic.__name__})" for heuristic in heuristics],
            *[f"Weighted A* ({heuristic.__name__}, alpha = {weight})" for heuristic in weighted_heuristics for weight in weights]
        ],
        floatfmt=".3f",
        numalign="center",
        stralign="center",
        tablefmt="grid",
    )
)

IndexError: tuple index out of range

### Question 4&5) Analysis and Conclusion
In this project, we implemented 3 different search approaches. As we can see A* is not complete unless we use an admissible heuristic, for example take the 7th and 8th testcases, with different heuristics, A* find different-length solutions.
As we can see BFS and IDS do not find solution for $5\times 5$ grids, since there are too many states to check, but on the other hand A* finds a solution for some huge testcases. (even though optimality is not guaranteed due to inadmissible heuristics.)