In [151]:
import random
from abc import abstractmethod
import heapq
import sys

In [152]:
class Node:
    def __init__(self, state):
        self.state = state

    
    def __lt__(self, other):
        pass


class NodeUcs(Node):
    def __init__(self, state, path_cost):
        super().__init__(state)
        self.path_cost = path_cost

    def __lt__(self, other):
        return self.path_cost < other.path_cost




class NodeAstar(Node):
    def __init__(self, state, path_cost, heuristic_cost):
        super().__init__(state)
        self.path_cost = path_cost
        self.heuristic_cost = heuristic_cost

    def __lt__(self, other):
        return self.path_cost + self.heuristic_cost < other.path_cost + other.heuristic_cost
    

class NodeGenetic(Node):
    def __init__(self, state, heuristic_cost):
        super().__init__(state)
        self.heuristic_cost = heuristic_cost


In [153]:
class NQueensProblem:
    def __init__(self, n):
        self.n = n
        self.initial_state = self.random_state()

    def random_state(self):
        return [random.randint(0, self.n-1) for _ in range(self.n)]

    def actions(self, state):
        for i in range(self.n):
            for j in range(self.n):
                if j != state[i]:
                    yield (i, j)

    def goal_test(self, state):
        for i in range(self.n):
            for j in range(i+1, self.n):
                if state[i] == state[j] or abs(state[i] - state[j]) == j - i:
                    return False
        return True

    def heuristic(self, state):
        h = 0
        for i in range(len(state)):
            for j in range(i + 1, len(state)):
                if state[i] == state[j] or abs(state[i] - state[j]) == j - i:
                    h += 1
        return h

    def heuristic_change(self, state, queen_index, old_pos, new_pos, h):
        # Check old position conflicts
        for i in range(len(state)):
            if i != queen_index:
                if state[i] == old_pos or abs(state[i] - old_pos) == abs(i - queen_index):
                    h -= 1

        # Check new position conflicts
        for i in range(len(state)):
            if i != queen_index:
                if state[i] == new_pos or abs(state[i] - new_pos) == abs(i - queen_index):
                    h += 1

        return h

    def print_state(self, state):
        for row in range(len(state)):
            for col in range(len(state)):
                if state[col] == row:
                    print("Q", end="  ")
                else:
                    print("*", end="  ")
            print()

In [154]:
class Search:
    @abstractmethod
    def run():
        pass

class Ucs(Search):
    def __init__(self):
        self.frontier = []
        self.explored = set()

    def run(self, problem):
        node = NodeUcs(problem.initial_state, 0)
        heapq.heappush(self.frontier, (node.path_cost, node))

        
        while True:
            if len(self.frontier) == 0:
                return None
            priority, node = heapq.heappop(self.frontier)
            # print node state to see the process
            print(node.state)
            
            # Goal test
            if problem.goal_test(node.state):
                

                frontier_size, explored_size = self.get_memory_usage()
                return (node, frontier_size, explored_size)

            self.explored.add(str(node.state))

            for action in problem.actions(node.state):
                child = self.childNode(node, action)
                str_child = str(child.state)

                index_in_frontier = -1
                if str_child not in self.explored:
                    for (i, item) in enumerate(self.frontier):
                        if str_child == str(item[1].state):
                            index_in_frontier = i
                            break
                    else:
                        heapq.heappush(self.frontier, (child.path_cost, child))

                else:
                    if index_in_frontier != -1 and child.path_cost < self.frontier[index_in_frontier][1].path_cost:
                        self.frontier[index_in_frontier] = (child.path_cost, child)
                        heapq.heapify(self.frontier)

    def get_memory_usage(self):
        frontier_len = len(self.frontier)
        if frontier_len == 0:
            return (0, 0)
        n_queens = len(self.frontier[0][1].state)
        frontier_size = frontier_len * \
            (sys.getsizeof(self.frontier[0][0]) + n_queens *
             sys.getsizeof(self.frontier[0][1].state[0])) / 1024 / 1024
        explored_size = sum([sum([sys.getsizeof(i) for i in item])
                            for item in self.explored]) / 1024 / 1024
        return (frontier_size, explored_size)

    def childNode(self, parent, action):
        child_state = parent.state.copy()
        child_state[action[0]] = action[1]
        child = NodeUcs(child_state, parent.path_cost + 1)
        return child

In [155]:
# Uniform Cost Search improved version
# class Ucs(Search):
#     def __init__(self):
#         self.frontier = []
#         self.explored = set()

#     def run(self, problem):
#         node = NodeUcs(problem.initial_state, 0)
#         heapq.heappush(self.frontier, (node.path_cost, node))
#         while True:
#             if len(self.frontier) == 0:
#                 return None
#             priority, node = heapq.heappop(self.frontier)
#             if str(node.state) in self.explored:
#                 continue
#             # print node state to see the process
#             print(node.state)
#             # Goal test
#             if problem.goal_test(node.state):
#                 frontier_size, explored_size = self.get_memory_usage()
#                 return (node, frontier_size, explored_size)
#             self.explored.add(str(node.state))
#             for action in problem.actions(node.state):
#                 child = self.childNode(node, action)
#                 str_child = str(child.state)
                
#                 if str_child not in self.explored:
#                     heapq.heappush(self.frontier, (child.path_cost, child))

#     def childNode(self, parent, action):
#         child_state = parent.state.copy()
#         child_state[action[0]] = action[1]
#         child = NodeUcs(child_state, parent.path_cost + 1)
#         return child
            
#     def get_memory_usage(self):
#         if len(self.frontier) == 0:
#             return (0, 0)
#         frontier_len = len(self.frontier)
#         n_queens = len(self.frontier[0][1].state)
#         frontier_size = frontier_len * \
#             (sys.getsizeof(self.frontier[0][0]) + n_queens *
#              sys.getsizeof(self.frontier[0][1].state[0])) / 1024 / 1024
#         explored_size = sum([sum([sys.getsizeof(i) for i in item])
#                             for item in self.explored]) / 1024 / 1024
#         return (frontier_size, explored_size)

In [156]:
class Astar:
    def __init__(self):
        self.frontier = []
        self.explored = set()

    def run(self, problem):
        node = NodeAstar(problem.initial_state, 0,
                         problem.heuristic(problem.initial_state))
        heapq.heappush(self.frontier, (node.path_cost +
                       node.heuristic_cost, node))

        while True:
            if len(self.frontier) == 0:
                return None

            # Take the node with the lowest path cost
            priority, node = heapq.heappop(self.frontier)

            # print node state to see the process
            print(node.state)

            # Goal test
            if node.heuristic_cost == 0:
                frontier_size, explored_size = self.get_memory_usage()
                return (node, frontier_size, explored_size)

            # Add node to explored set
            self.explored.add(str(node.state))
            # Add child nodes to frontier
            for action in problem.actions(node.state):
                child = self.childNode(node, action, problem)
                str_child = str(child.state)

                index_in_frontier = -1
                if str_child not in self.explored:
                    for (i, item) in enumerate(self.frontier):
                        if str_child == str(item[1].state):
                            index_in_frontier = i
                            break
                    else:
                        heapq.heappush(self.frontier, (child.path_cost +
                                                       child.heuristic_cost, child))

                else:
                    if index_in_frontier != -1 and child.path_cost + child.heuristic_cost < self.frontier[index_in_frontier][1].path_cost + self.frontier[index_in_frontier][1].heuristic_cost:
                        self.frontier[index_in_frontier] = (
                            child.path_cost + child.heuristic_cost, child)
                        heapq.heapify(self.frontier)

    def childNode(self, parent, action, problem):
        child_state = parent.state.copy()
        
        child_state[action[0]] = action[1]
        # Calculate heuristic cost for child
        old_pos = parent.state[action[0]]
        new_pos = action[1]
        child_heuristic_cost = problem.heuristic_change(
            child_state, action[0], old_pos, new_pos, parent.heuristic_cost)
        
        child = NodeAstar(child_state, parent.path_cost +
                          1, child_heuristic_cost)
        return child

    def get_memory_usage(self):
        if len(self.frontier) == 0:
            return (0, 0)
        frontier_len = len(self.frontier)
        n_queens = len(self.frontier[0][1].state)
        frontier_size = frontier_len * \
            (sys.getsizeof(self.frontier[0][0]) + n_queens *
             sys.getsizeof(self.frontier[0][1].state[0])) / 1024 / 1024
        explored_size = sum([sum([sys.getsizeof(i) for i in item])
                            for item in self.explored]) / 1024 / 1024
        return (frontier_size, explored_size)

In [157]:
# A* algorithm improved version
# class Astar:
#     def __init__(self):
#         self.frontier = []
#         self.explored = set()

#     def run(self, problem):
#         node = NodeAstar(problem.initial_state, 0,
#                          problem.heuristic(problem.initial_state))
#         heapq.heappush(self.frontier, (node.path_cost +
#                        node.heuristic_cost, node))

#         while True:
#             if len(self.frontier) == 0:
#                 return None

#             # Take the node with the lowest path cost
#             priority, node = heapq.heappop(self.frontier)
#             if str(node.state) in self.explored:
#                 continue

#             # print node state to see the process
#             print(node.state)

#             # Goal test
#             if node.heuristic_cost == 0:
#                 frontier_size, explored_size = self.get_memory_usage()
#                 return (node, frontier_size, explored_size)

#             # Add node to explored set
#             self.explored.add(str(node.state))
#             # Add child nodes to frontier
#             for action in problem.actions(node.state):
#                 child = self.childNode(node, action, problem)
#                 str_child = str(child.state)

#                 if str_child not in self.explored:
#                     heapq.heappush(self.frontier, (child.path_cost +
#                                                    child.heuristic_cost, child))

#     def childNode(self, parent, action, problem):
#         child_state = parent.state.copy()

#         child_state[action[0]] = action[1]
#         # Calculate heuristic cost for child
#         old_pos = parent.state[action[0]]
#         new_pos = action[1]
#         child_heuristic_cost = problem.heuristic_change(
#             child_state, action[0], old_pos, new_pos, parent.heuristic_cost)

#         child = NodeAstar(child_state, parent.path_cost +
#                           1, child_heuristic_cost)
#         return child

#     def get_memory_usage(self):
#         frontier_len = len(self.frontier)
#         n_queens = len(self.frontier[0][1].state)
#         frontier_size = frontier_len * \
#             (sys.getsizeof(self.frontier[0][0]) + n_queens *
#              sys.getsizeof(self.frontier[0][1].state[0])) / 1024 / 1024
#         explored_size = sum([sum([sys.getsizeof(i) for i in item])
#                             for item in self.explored]) / 1024 / 1024
#         return (frontier_size, explored_size)

In [158]:
class GeneticSearch:
    def __init__(self):
        self.population = []

    def reproduce(self, parent1, parent2, problem):
        index = random.randint(0, len(parent1.state)-1)
        child_state = parent1.state[:index] + parent2.state[index:]
        childNode = NodeGenetic(child_state, problem.heuristic(child_state))
        return childNode

    def select_parents(self, population):
        population_size = len(population)
        parents = []
        # sort population based on heuristic cost
        sorted_population = sorted(
            population, key=lambda node: node.heuristic_cost)
        sum_fitness = sum(1/node.heuristic_cost for node in sorted_population)

        for _ in range(population_size):
            # choose two parents based on their fitness
            random1 = random.random()
            random2 = random.random()
            parent1 = None
            parent2 = None
            cumulative_fitness = 0

            for i in range(population_size):
                cumulative_fitness += 1 / sorted_population[i].heuristic_cost
                if random1 <= cumulative_fitness / sum_fitness:
                    parent1 = sorted_population[i]
                    break
            cumulative_fitness = 0
            for i in range(population_size):
                cumulative_fitness += 1 / sorted_population[i].heuristic_cost
                if random2 <= cumulative_fitness / sum_fitness:
                    parent2 = sorted_population[i]
                    break
            # append tuple of two parents to parents list
            parents.append((parent1, parent2))

        return parents

    def mutate(self, node, problem):
        index = random.randint(0, len(node.state)-1)
        new_pos = random.randint(0, len(node.state)-1)
        old_pos = node.state[index]
        while new_pos == node.state[index]:
            new_pos = random.randint(0, len(node.state)-1)
        node.state[index] = new_pos
        # update heuristic cost
        node.heuristic_cost = problem.heuristic_change(
            node.state, index, old_pos, new_pos, node.heuristic_cost)
        return node

    def get_memory_usage(self):
        population_size = len(self.population)
        n_queens = population_size / 2

        population_memory = population_size * \
            (n_queens * sys.getsizeof(self.population[0].state[0]) + sys.getsizeof(
                self.population[0].heuristic_cost)) / 1024 / 1024
        
        return population_memory

    def run(self, problem):
        population_size = problem.n * 2
        self.population = [NodeGenetic(problem.random_state(), problem.heuristic(problem.random_state()))
                           for _ in range(population_size)]

        generation = 0
        while True:
            if generation > 100000:
                return None
            for node in self.population:
                if node.heuristic_cost == 0:
                    population_memory = self.get_memory_usage()
                    return (node, population_memory)

            parents = self.select_parents(self.population)

            new_population = []
            min_heuristic = parents[0][0].heuristic_cost

            for parent1, parent2 in parents:
                if random.random() < 0.8:
                    child = self.reproduce(parent1, parent2, problem)
                else:
                    child = self.mutate(self.reproduce(
                        parent1, parent2, problem), problem)
                min_heuristic = min(min_heuristic, child.heuristic_cost)
                new_population.append(child)

            print('Generation: ', generation, end=' | ')
            print('Min heuristic: ', min_heuristic)
            generation += 1
            self.population = new_population

In [159]:
class Program:
    def __init__(self):
        self.problem = None
        self.result = None
    def run(self):
        number_of_queens = int(input("Enter number of queens: "))
        algorithm = int(
            input("Enter the algorithm: 1.UCS   2.A*    3.Genetic Search:"))
        self.problem = NQueensProblem(number_of_queens)
        print("Initial state: ")
        self.problem.print_state(self.problem.initial_state)
        if algorithm == 1:
            ucs = Ucs()
            self.result = ucs.run(self.problem)
            if self.result == None:
                print("No solution found")
                return
            result_state = self.result[0].state
            print("UCS: n =", number_of_queens, "")
            print("Solution: ")
            self.problem.print_state(result_state)
            frontier_size = self.result[1]
            explored_size = self.result[2]
            print(f"Frontier size: {frontier_size} MB")
            print(f"Explored size: {explored_size} MB")
            print(f"Total memory: {frontier_size + explored_size} MB")
        elif algorithm == 2:
            astar = Astar()
            self.result = astar.run(self.problem)
            if self.result == None:
                print("No solution found")
                return
            result_state = self.result[0].state
            print("A*: n =", number_of_queens, "")
            print("Solution: ")
            self.problem.print_state(result_state)
            frontier_size = self.result[1]
            explored_size = self.result[2]
            print(f"Frontier size: {frontier_size} MB")
            print(f"Explored size: {explored_size} MB")
            print(f"Total memory: {frontier_size + explored_size} MB")
        elif algorithm == 3:
            genetic = GeneticSearch()
            self.result = genetic.run(self.problem)
            if self.result == None:
                print("No solution found")
                return
            result_state = self.result[0].state
            if self.result == None:
                print("No solution found")
                return
            print("Genetic Search: n =", number_of_queens, "")
            print("Solution: ")
            self.problem.print_state(result_state)
            population_memory = self.result[1]
            print(f"Population memory: {population_memory} MB")
        else:
            print("Invalid input")
            return
        

In [160]:
%%time
if __name__ == "__main__":
    program = Program()
    program.run()

Initial state: 
*  *  *  *  *  *  *  *  *  *  
*  *  *  *  *  *  *  *  *  *  
Q  *  *  *  *  *  *  Q  *  Q  
*  *  *  *  *  *  Q  *  *  *  
*  Q  *  *  *  *  *  *  *  *  
*  *  *  *  *  *  *  *  *  *  
*  *  *  Q  *  *  *  *  *  *  
*  *  *  *  *  *  *  *  Q  *  
*  *  Q  *  Q  Q  *  *  *  *  
*  *  *  *  *  *  *  *  *  *  
[2, 4, 8, 6, 8, 8, 3, 2, 7, 2]
[2, 4, 8, 1, 8, 8, 3, 2, 7, 2]
[0, 4, 8, 1, 8, 8, 3, 2, 7, 2]
[0, 4, 8, 1, 8, 1, 3, 2, 7, 2]
[0, 4, 8, 1, 8, 1, 3, 9, 7, 2]
[0, 4, 8, 1, 8, 8, 3, 9, 7, 2]
[0, 4, 8, 1, 8, 9, 3, 2, 7, 2]
[0, 6, 8, 1, 8, 8, 3, 9, 7, 2]
[0, 4, 6, 1, 8, 8, 3, 9, 7, 2]
[0, 4, 8, 1, 5, 8, 3, 9, 7, 2]
[0, 4, 8, 1, 8, 6, 3, 2, 7, 2]
[0, 4, 8, 1, 8, 6, 3, 2, 7, 5]
[0, 4, 8, 1, 9, 6, 3, 2, 7, 5]
[0, 4, 8, 1, 8, 6, 3, 9, 7, 5]
[0, 6, 8, 1, 8, 1, 3, 9, 7, 2]
[0, 4, 8, 1, 9, 6, 3, 2, 7, 2]
[0, 4, 8, 1, 9, 6, 0, 2, 7, 5]
[0, 4, 8, 1, 9, 6, 3, 0, 7, 5]
[0, 4, 8, 1, 9, 6, 3, 1, 7, 5]
[0, 4, 8, 1, 9, 6, 3, 9, 7, 5]
[0, 4, 8, 1, 8, 6, 0, 2, 7, 5]
[0, 4, 8, 1, 8, 6, 3, 0