In [2]:
from abc import ABC, abstractmethod
from random import randrange
import itertools

class Problem(ABC):
  def __init__(self) -> None:
    self.initial_state = []
    self.n = 0
  @abstractmethod
  def goal_test(self, state) -> bool:
    '''
    Check if state is final
    '''
    pass

  @abstractmethod
  def actions(self, state):
    '''
    Return available actions
    '''
    pass
  
  @abstractmethod
  def result(self, state, action):
    '''
    Return state created from an action
    '''
    pass

  @abstractmethod
  def g(self, from_state, action, to_state):
    '''
    Calculate the path cost from one state to another
    '''
    pass

  @abstractmethod
  def h(self, state):
    '''
    Calculate the heuristic of the specified state
    '''
    pass

  @abstractmethod
  def random_state(self):
    '''
    Return a random state of the problem
    '''
    pass

class NQueens(Problem):
  def __init__(self, n) -> None:
    self.initial_state = tuple([randrange(0, n) for _ in range(n)])
    self.n = n
  
  def check_conflict(self, col1, row1, col2, row2):
    return row1 == row2 or abs(row1 - row2) == abs(col1 - col2)
  
  def goal_test(self, state: 'tuple[int]') -> bool:
    return not self.h(state)
  
  def actions(self, state: 'tuple[int]') -> 'tuple[int]':
    return itertools.filterfalse(lambda x: x[1] == state[x[0]], itertools.product(range(self.n), repeat=2))
  
  def result(self, state, action):
    new_state = list(state)
    new_state[action[0]] = action[1]
    return tuple(new_state)

  def g(self, from_state, action, to_state) -> int: 
    return 1
  
  def h(self, state: 'tuple[int]'):
    num_conflict = 0
    for i in range(len(state) - 1):
      for j in range(i + 1, len(state)):
        if self.check_conflict(i, state[i], j, state[j]):
          num_conflict += 1
    return num_conflict

  def random_state(self) -> 'tuple[int]':
    return tuple([randrange(0, self.n) for _ in range(self.n)])


In [3]:

import heapq

class AStar:
  class Node:
    def __init__(self, problem: Problem, state, path_cost=0, heuristic=0) -> None:
      self.problem = problem
      self.state = state
      self.h = heuristic
      self.g = path_cost

    def child_node(self, action):
      '''
      Return next node from executing specified action
      '''
      new_state = self.problem.result(self.state, action)
      return AStar.Node(self.problem, new_state, self.g + self.problem.g(self.state, action, new_state), self.problem.h(new_state))

    def pretty_print(self):
      l = len(self.state)
      for i in range(l):
        for j in range(l):
          if i == self.state[j]:
            print("Q ", end='')
          else:
            print("* ", end='')
        print()

    def __repr__(self):
      return f"A* Node {self.state}(g={self.g}, h={self.h})>"

    def __lt__(self, other: 'AStar.Node'):
      return (self.g + self.h) < (other.g + other.h)
    
    def __hash__(self):
      return hash(self.state)

    def __eq__(self, other: 'AStar.Node'):
      return self.state == other.state
    
  def __init__(self, problem: Problem) -> None:
    self.problem = problem
    self.root = AStar.Node(problem, problem.initial_state, 1, problem.h(problem.initial_state))
    
  def solve(self):
    frontier = [self.root]
    heapq.heapify(frontier)
    explored = set()
    explored.add(self.root.state)
    while frontier:
      cur = heapq.heappop(frontier)
      if not cur.h:
        return cur
      for action in self.problem.actions(cur.state):
        new_node = cur.child_node(action) 
        if new_node.state not in explored:
          heapq.heappush(frontier, new_node)
          explored.add(new_node.state)
    return None


In [4]:

class UCS:
  class Node:
    def __init__(self, problem: Problem, state, path_cost=0) -> None:
      self.problem = problem
      self.state = state
      self.g = path_cost

    def child_node(self, action):
      '''
      Return next node from executing specified action
      '''
      new_state = self.problem.result(self.state, action)
      return UCS.Node(self.problem, new_state, self.g + self.problem.g(self.state, action, new_state))

    def pretty_print(self):
      l = len(self.state)
      for i in range(l):
        for j in range(l):
          if i == self.state[j]:
            print("Q ", end='')
          else:
            print("* ", end='')
        print()

    def __repr__(self):
      return f"UCS Node {self.state}(g={self.g})>"

    def __lt__(self, other: 'UCS.Node'):
      return self.g < other.g
    
    def __hash__(self):
      return hash(self.state)

    def __eq__(self, other: 'UCS.Node'):
      return self.state == other.state

  def __init__(self, problem: Problem) -> None:
    self.problem = problem
    self.root = UCS.Node(problem, problem.initial_state)

  def solve(self):
    frontier = [self.root]
    heapq.heapify(frontier)
    explored = set()
    explored.add(self.root.state)
    while frontier:
      cur = heapq.heappop(frontier)
      if self.problem.goal_test(cur.state):
        return cur
      for action in self.problem.actions(cur.state):
        new_node = cur.child_node(action)
        if new_node.state not in explored:
          heapq.heappush(frontier, new_node)
          explored.add(new_node.state)
    return None


In [5]:
import random

class Genetic:
  POPULATION_SIZE = 250
  MAX_GEN = 100
  MUTATION_PROB = 0.1

  def __init__(self, problem: Problem) -> None:
    self.problem = problem
    self.n = problem.n
    self.max_fitness = (self.n * (self.n - 1)) // 2
    self.population = [problem.random_state() for _ in range(Genetic.POPULATION_SIZE)]
    self.gen = 0
    self.solution = None
  
  def fitness(self, chromosome):
    return self.max_fitness - self.problem.h(chromosome)

  def calc_prob(self, fitness):
    '''
      Calculate probabilty with current fitness
    '''
    return fitness / self.max_fitness

  def cross_over(self, chrom_1, chrom_2):
    n = len(chrom_1)
    split = random.randrange(0, n)
    child = chrom_1[:split] + chrom_2[split:]
    return child

  def mutate(self, chrom):
    pos = random.randrange(0, len(chrom))
    new_chrom = list(chrom)
    new_chrom[pos] = random.randrange(0, self.n)
    return tuple(new_chrom)
  
  def pretty_print(self):
    print(f"Population size: {Genetic.POPULATION_SIZE}")
    print(f"Mutation probability: {Genetic.MUTATION_PROB}")
    print(f"Solved in generation: {self.gen}")
    l = len(self.solution)
    for i in range(l):
      for j in range(l):
        if i == self.solution[j]:
          print("Q ", end='')
        else:
          print("* ", end='')
      print()

  def solve(self, no_limit=False):
    while no_limit or self.gen != Genetic.MAX_GEN:
      all_fitness = [self.fitness(chrom) for chrom in self.population]
      # print(all_fitness)
      if self.max_fitness in all_fitness:
        self.solution = self.population[all_fitness.index(self.max_fitness)]
        return

      # Calculate probabilitiy for each chromosome to be choosen for next generation
      self.probs = [self.calc_prob(f) for f in all_fitness]

      new_population = []
      
      for _ in range(len(self.population)):
        # Selection
        first, second = random.choices(self.population, weights=self.probs, k=2)
        
        # Cross-over
        child = self.cross_over(first, second)

        # Mutation
        if random.random() < Genetic.MUTATION_PROB:
          child = self.mutate(child)

        new_population.append(child)

      del self.population
      self.population = new_population
      self.gen += 1


In [6]:
N = 8
TEST_NUM = 3

In [7]:
import tracemalloc
from time import perf_counter_ns

def benchmark(func):
  def calc():
    t = []
    m = []
    for i in range(TEST_NUM):
      tracemalloc.start()
      t1 = perf_counter_ns()
      print(f"TEST {i + 1}:")
      func()
      t2 = perf_counter_ns()
      peak = tracemalloc.get_traced_memory()[1]
      tracemalloc.stop()
      t.append((t2 - t1) / 10**6)
      m.append(peak / 1024**2)
    print(f"Average time: {sum(t) / TEST_NUM:.2f} ms")
    print(f"Memory usage: {sum(m) / TEST_NUM:.4f} MB")
  return calc


@benchmark
def run_a_star():
  astar = AStar(NQueens(N))
  astar.solve().pretty_print()


@benchmark
def run_ucs():
  ucs = UCS(NQueens(N))
  ucs.solve().pretty_print()

@benchmark
def run_genetic():
  gen = Genetic(NQueens(N))
  gen.solve(1)
  gen.pretty_print()

In [8]:
run_a_star()

TEST 1:
* * * * Q * * * 
* * * * * * Q * 
* Q * * * * * * 
* * * * * Q * * 
* * Q * * * * * 
Q * * * * * * * 
* * * * * * * Q 
* * * Q * * * * 
TEST 2:
Q * * * * * * * 
* * * * * * Q * 
* * * * Q * * * 
* * * * * * * Q 
* Q * * * * * * 
* * * Q * * * * 
* * * * * Q * * 
* * Q * * * * * 
TEST 3:
* * * * * Q * * 
* * Q * * * * * 
Q * * * * * * * 
* * * * * * * Q 
* * * * Q * * * 
* Q * * * * * * 
* * * Q * * * * 
* * * * * * Q * 
Average time: 83.75 ms
Memory usage: 0.5655 MB


In [11]:
run_ucs()

TEST 1:


KeyboardInterrupt: 

In [11]:
run_genetic()

TEST 1:
Population size: 250
Mutation probability: 0.1
Solved in generation: 19
* Q * * * * * * 
* * * * Q * * * 
* * * * * * Q * 
* * * Q * * * * 
Q * * * * * * * 
* * * * * * * Q 
* * * * * Q * * 
* * Q * * * * * 
TEST 2:
Population size: 250
Mutation probability: 0.1
Solved in generation: 88
* * * * * * Q * 
* * * * Q * * * 
* * Q * * * * * 
Q * * * * * * * 
* * * * * Q * * 
* * * * * * * Q 
* Q * * * * * * 
* * * Q * * * * 
TEST 3:
Population size: 250
Mutation probability: 0.1
Solved in generation: 47
* * Q * * * * * 
Q * * * * * * * 
* * * * * * Q * 
* * * * Q * * * 
* * * * * * * Q 
* Q * * * * * * 
* * * Q * * * * 
* * * * * Q * * 
Average time: 3136.69 ms
Memory usage: 0.1082 MB
