In [256]:
import collections
import itertools
import time
import random
import math
import functools

# The positions of the queen on the chessboard.
Pair = collections.namedtuple('Pair', ['i', 'j'])

def in_same_row(q1, q2):
    return q1.i == q2.i

def in_same_column(q1, q2):
    return q1.j == q2.j

# Efficient computation of the check to find out if
# two queens are on the same diagonal.
def in_same_diagonal(q1, q2):
    delta_i = abs(q1.i - q2.i)
    delta_j = abs(q1.j - q2.j)

    return delta_i == delta_j

# Returns True when the two given queens attack
# each other, False otherwise.
def attacking_each_other(q1, q2):
    conditions = [
        in_same_row(q1, q2),
        in_same_column(q1, q2),
        in_same_diagonal(q1, q2)
    ]

    return len(list(filter(lambda x: x, conditions))) != 0

# Generates an array of integers from a to b,
# returning the first element after a shuffle.
def random_int(a, b):
    values = [i for i in range(a, b + 1)]
    random.shuffle(values)
    return values[0]

# Runs a function n_runs times and logs the times to the report.
def benchmark(report, func, n_runs=5):
    print(f"\n=== Benchmarking {report.name} ({n_runs} runs) ===")
    
    for i in range(1, n_runs + 1):
        print(f"  Run {i}/{n_runs}...")
        result = func()
        report.set_result(f'solution_run_{i}', result)
        
    print("Benchmark completed!\n")
    return report


In [257]:
# The general framework for CSP problems. Values are chosen for one
# variable at a time, and the process is reversed when a variable
# runs out of valid values to assign.
class CSP_BacktrackingSearchFramework:
    def __init__(self, csp_problem, reporter=None):
        self.csp_problem = csp_problem
        self.reporter = reporter

    def run(self):
        base_assignment = self.csp_problem.initialize_state()
        self.__backtrack(base_assignment)
        return base_assignment

    def __backtrack(self, assignment):
        # This means that the algorithm will stop only when a
        # complete solution is found, that is, when every variable
        # has been assigned and they are all consistent with each other.
        if self.csp_problem.is_complete(assignment):
            return assignment

        # Select a variable from the list, which is the state that keeps
        # track of assigned and unassigned variables.
        variable = self.csp_problem.select_unassigned_variable(assignment)

        # A value is chosen from the domain of the selected variable.
        for value in self.csp_problem.order_domain_values(variable, assignment):
            if self.csp_problem.is_consistent(variable, value, assignment):
                self.csp_problem.assign_variable(variable, value, assignment)
                inferences = self.csp_problem.inferences(variable, value, assignment)

                if self.csp_problem.is_inferences_valid(inferences):
                    self.csp_problem.assign_inferences(inferences, assignment)
                    result = self.__backtrack(assignment)

                    if result != self.csp_problem.failure():
                        return result

                # When the backtracking algorithm returns an incomplete result,
                # the chosen path cannot lead to a valid solution.
                # Therefore all available values for the variable have been
                # tried, so the previous assignments are incorrect.
                # All inference is removed.
                self.csp_problem.remove_assigned_variable(variable, value, assignment)
                self.csp_problem.remove_assigned_inferences(inferences, assignment)

        # After clearing the state, an error is returned to inform that
        # previous choices must be changed.
        return self.csp_problem.failure()

class CSP_QueenProblem:
    def __init__(self, n_queens, reporter=None):
        self.n_queens = n_queens
        self.reporter = reporter

    def initialize_state(self):
        # The initial state is generated by assigning a fixed row
        # to each queen, leaving the column undefined.
        return {
            'assigned_queens': [],
            'unassigned_queens': [Pair(i=index, j=None) for index in range(0, self.n_queens)],
        }

    def is_complete(self, assignment):
        assigned = assignment['assigned_queens']
        unassigned = assignment['unassigned_queens']

        # The state is complete when all queens have been assigned.
        # When a new queen is added to the assigned list,
        # it is consistent with all the queens already present.
        return len(assigned) == self.n_queens and len(unassigned) == 0

    def select_unassigned_variable(self, assigment):
        # The first variable is selected from the list,
        # in no particular order.
        return assigment['unassigned_queens'][0]

    def order_domain_values(self, variable, assigment):
        # All available values are returned, even
        # potentially inconsistent ones, in no particular order.
        for j in range(0, self.n_queens):
            yield Pair(variable.i, j)

    def is_consistent(self, variable, value, assignment):
        # The queen (i, j) that was chosen for the given variable (Q_ij)
        q1 = value
        
        for q2 in assignment['assigned_queens']:
            # The cost of the check is constant.
            if attacking_each_other(q1, q2):
                return False

        return True

    def assign_variable(self, variable, value, assignment):
        # The full queen (value) is added to the list of assigned
        # queens, while the variable is removed (unassigned queen).
        assignment['assigned_queens'].append(value)
        assignment['unassigned_queens'].remove(variable)

    def inferences(self, variable, value, assignment):
        pass

    def is_inferences_valid(self, inferences):
        return True

    def assign_inferences(self, inferences, assignment):
        pass

    def remove_assigned_variable(self, variable, value, assignment):
        # The full queen is removed and the variable is added
        # to the beginning of the list, to respect the original order.
        if value in assignment['assigned_queens']:
            assignment['assigned_queens'].remove(value)

        assignment['unassigned_queens'].insert(0, variable)

    def remove_assigned_inferences(self, inferences, assignment):
        pass

    def failure(self):
        # A state without queens.
        return {
            'assigned_queens': [],
            'unassigned_queens': []
        }
        

In [258]:
queen_8 = CSP_BacktrackingSearchFramework(CSP_QueenProblem(8))
queen_8.run()

{'assigned_queens': [Pair(i=0, j=0),
  Pair(i=1, j=4),
  Pair(i=2, j=7),
  Pair(i=3, j=5),
  Pair(i=4, j=2),
  Pair(i=5, j=6),
  Pair(i=6, j=1),
  Pair(i=7, j=3)],
 'unassigned_queens': []}

In [259]:
class SimulatedAnnealingFramework:
    def __init__(self, problem, reporter=None):
        self.problem = problem
        self.reporter = reporter

    def run(self):
        current_state = self.problem.initial_state()
        
        for iteration in itertools.count(1):
            # The scheduling function determines how quickly
            # the algorithm will reject the solution with the
            # lowest energy.
            t = self.problem.schedule(iteration)

            if t == 0 or self.problem.is_enough(current_state):
                return [current_state, self.problem.energy(current_state)]

            # For simulated annealing, successors must be complete,
            # meaning all variables involved in the problem must
            # have an associated value.
            successors = self.problem.successors(current_state)
            random.shuffle(successors)
            next_state = successors[0]

            current_energy = self.problem.energy(current_state)
            next_energy = self.problem.energy(next_state)

            difference = next_energy - current_energy

            if difference < 0:
                current_state = next_state
            else:
                exponent = -difference / t
                if random.random() < math.exp(exponent):
                    current_state = next_state

class SimulatedAnnealingQueenProblem:
    def __init__(self, n_queens, iterations, reporter=None):
        self.iterations = iterations
        self.n_queens = n_queens
        self.reporter = reporter

    def initial_state(self):
        # The initial state is a random complete state.
        N = self.n_queens
        return [Pair(i, random_int(0, N - 1)) for i in range(0, N)]

    def schedule(self, iteration):
        return math.log(self.iterations / iteration)

    def is_enough(self, current_state):
        return self.energy(current_state) == 0

    def successors(self, state):
        N = self.n_queens
        next_states = []

        # Each queen is moved along the columns,
        # resulting in N * (N - 1) successes for each node.
        for queen in state:
            without_queen = list(filter(lambda x: x != queen, state))

            node = Pair(queen.i, (queen.j + 1) % N)
            while node != queen:
                next_states.append(sorted(without_queen + [node], key=lambda x: x.i))
                node = Pair(queen.i, (node.j + 1) % N)

        return next_states;

    def energy(self, state):
        # The overall energy is determined by the number
        # of pairs attaching to each other.
        attacking_positions = set()

        for q1 in state:
            aggressive_queens = list(filter(lambda x: q1 != x, state))

            for q2 in aggressive_queens:
                if attacking_each_other(q1, q2):
                    attacking_positions.add(tuple(sorted([q1.i, q2.i, q1.j, q2.j])))

        return len(attacking_positions)

class SimulatedAnnealingQueenProblemGeometricCooling(SimulatedAnnealingQueenProblem):
    def __init__(self, n_queens, iterations, temperature, rate, reporter=None):
        super().__init__(n_queens, iterations, reporter)
        self.temperature = temperature
        self.rate = rate

    # Based on geometric cooling technique.
    def schedule(self, iteration):
        if iteration > self.iterations:
            return 0

        self.temperature = self.temperature * self.rate
        return self.temperature
            

In [260]:
sa_queen_problem = SimulatedAnnealingQueenProblem(8, 4000)
framework_sa_queen_problem = SimulatedAnnealingFramework(sa_queen_problem)
state, energy = framework_sa_queen_problem.run()

print(state)
print(energy)

sa_queen_problem_gc = SimulatedAnnealingQueenProblemGeometricCooling(8, 4000, temperature=1.0, rate=0.995)
framework_sa_queen_problem_gc = SimulatedAnnealingFramework(sa_queen_problem_gc)
state = framework_sa_queen_problem_gc.run()
print(state, energy)

[Pair(i=0, j=4), Pair(i=1, j=0), Pair(i=2, j=3), Pair(i=3, j=5), Pair(i=4, j=7), Pair(i=5, j=1), Pair(i=6, j=6), Pair(i=7, j=2)]
0
[[Pair(i=0, j=4), Pair(i=1, j=0), Pair(i=2, j=7), Pair(i=3, j=5), Pair(i=4, j=2), Pair(i=5, j=6), Pair(i=6, j=1), Pair(i=7, j=3)], 0] 0


In [261]:
class GeneticAlgorithmFramework:
    def __init__(self, problem, reporter=None):
        self.problem = problem
        self.reporter = reporter

    def run(self):
        population = self.problem.initial_population()

        for iteration in itertools.count(1):
            # At each iteration the new generation
            # is used to prepare matching pairs.
            new_population = []
            
            # A matching dictionary is used to identify previously used
            # crossover points, thus avoiding having the same
            # configuration more than once in the new population.
            matches = {}

            # The number of iterations is equal to the
            # cardinality of the population.
            for _ in population:
                # The most valuable queen are selected according to a
                # specific fitness function.
                x = self.problem.pop_by_fit(population)
                y = self.problem.pop_by_fit(list(filter(lambda node: node != x, population)))

                # The selected match.
                match = tuple(sorted([x, y]))

                if matches.get(match) is None:
                    matches[match] = []

                # All individuals already generated by the
                # parent are retrieved for this run.
                siblings = matches[match]

                # A new child and the selected crossover point
                # are produced by calling the reproductive function.
                child, crossoverpoint = self.__reproduce(
                    first=x,
                    second=y,
                    individuals=siblings
                )

                # The new crossover point is recorded to reduce
                # the likelihood of choosing the same crossover point again.
                siblings.append(crossoverpoint)

                # The mutation is applied based on a specific probability distribution (e.g. Poisson).
                if self.problem.should_mutate(child, siblings=siblings):
                    child = self.problem.apply_mutation(child, siblings=siblings)

                new_population.append(child)

            population = new_population
            
            # The algorithm will stop as soon as the solution
            # is sufficient or too much time has passed.
            if self.problem.is_enough(population, iteration):
                return self.problem.best_of(population)
        

    def __reproduce(self, first, second, individuals=[]):
        n = len(first)

        # A crossover point is chosen also taking into
        # account the given individuals.
        crossover_points = list(filter(lambda x: x not in individuals, [i for i in range(1, n)]))
        random.shuffle(crossover_points)
    
        # If all possible crossover points have been generated, a random point is chosen.
        point = random_int(1, n - 1) if len(crossover_points) == 0 else crossover_points[0]
        
        first_sub = first[:point]
        second_sub = second[point:]
        
        return [first_sub + second_sub, point]

class GeneticAlgorithmQueenProblem:
    def __init__(self, n, k, iterations, mutation_rate, reporter=None):
        self.n = n
        self.k = k
        self.iterations = iterations
        self.mutation_rate = mutation_rate
        self.reporter = reporter

    def initial_population(self):
        # A random string is generated.
        states = [0 for _ in range(0, self.k)]
        states = list(map(lambda _: [str(random_int(0, self.n - 1)) for i in range(0, self.n)], states))
        states = list(map(lambda l: ''.join(l), states))
        
        return states

    def pop_by_fit(self, population):
        with_fitness = self.__population_with_fitness(population)
        fitsum = functools.reduce(lambda x, y: x + y.j, with_fitness, 0)
        with_fitness = list(map(lambda p: Pair(p.i, p.j / fitsum), with_fitness))

        # The most valuable string is chosen based on
        # the fitness function within the given population.
        choices = list(map(lambda p: p.i, with_fitness))
        probabilities = list(map(lambda p: p.j, with_fitness))

        return random.choices(choices, weights=probabilities, k=1)[0]

    def should_mutate(self, child, siblings):
        def poisson(l, n):
            return math.exp(-l) * ((l**n) / math.factorial(n))

        mutation_rate = self.mutation_rate * (len(siblings) + 1) * (siblings.count(child) + 1)
        return random.random() < poisson(l=mutation_rate, n=1)

    def apply_mutation(self, child, siblings):
        # The state is changed by changing the position
        # of a queen with respect to the available columns.
        positions = [i for i in range(self.n)]
        random.shuffle(positions)
        position = positions[0]
        old_value = child[position]
        new_values = list(filter(lambda x: x != old_value, child))
        random.shuffle(new_values)
        new_value = new_values[0]
    
        lchild = list(child)
        lchild[position] = new_value
    
        return ''.join(lchild)

    def is_enough(self, population, iteration):
        def arithmetic_series(start,  step,  end):
            sum = (end / 2) * (2 * start + (end - 1) * step)
            return sum

        # Whenever the current iteration exceeds the maximum
        # number of iterations, the algorithm should stop.
        if iteration > self.iterations:
            return True

        # Whenever the population is composed of the
        # same element, the algorithm should stop.
        if population.count(population[0]) == len(population):
            return True

        # Whenever there is at least one state with maximum
        # fit, the algorithm should stop.
        max_fit = round(arithmetic_series(1, 1, self.n - 1))
        return len(list(filter(lambda x: x.j >= max_fit, self.__population_with_fitness(population)))) != 0

    def best_of(self, population):
        pair = max(self.__population_with_fitness(population), key=lambda x: x.j)
        return [pair.i, pair.j]
        

    def __fitness(self, state):
        non_attacking_positions = set()
    
        for q1 in state:
            non_aggressive_queens = list(filter(lambda x: q1 != x, state))
    
            for q2 in non_aggressive_queens:
                if not attacking_each_other(q1, q2):
                    non_attacking_positions.add(tuple(sorted([q1.i, q2.i, q1.j, q2.j])))
    
        return len(non_attacking_positions)

    def __population_with_fitness(self, population):
        with_fitness = []

        for node in population:
            map_ = enumerate([int(j) for j in node])
            map_ = list(map(lambda x: Pair(x[0], x[1]), map_))
            with_fitness.append(Pair(node, self.__fitness(map_)))

        return with_fitness
        

In [262]:
ga_queen = GeneticAlgorithmQueenProblem(n=8, k=64, iterations=5, mutation_rate=0.003)
framework_ga_queen = GeneticAlgorithmFramework(ga_queen)
string, energy = framework_ga_queen.run()

print(string)
print(energy)

24750637
25


In [263]:
class Report:
    def __init__(self, name, problem_size=None, params=None):
        self.name = name
        self.problem_size = problem_size
        self.params = params or {}
        self.counters = collections.Counter()
        self.timing = {}
        self.results = {}

    def measure(self, key=None):
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                start = time.perf_counter()
                result = func(*args, **kwargs)
                elapsed = time.perf_counter() - start
                k = key or func.__name__
                self.timing.setdefault(k, []).append(elapsed)
                return result
            return wrapper
        return decorator

    def counter(self, name, amount=1):
        self.counters[name] += amount

    def set_result(self, key, value):
        self.results[key] = value

    def summary(self):
        return {
            'name': self.name,
            'problem_size': self.problem_size,
            'params': self.params,
            'timings': {k: {'calls': len(v), 'total': sum(v), 'avg': sum(v)/len(v)} for k,v in self.timing.items()},
            'counters': dict(self.counters),
            'results': self.results
        }

    def pretty_print(self):
        s = self.summary()
        
        print(f"=== Report: {s['name']} ===\n")
        print(f"Problem size: {s['problem_size']}")
        print(f"Params: {s['params']}\n")
        print("Timings:")
        
        for k,v in s['timings'].items():
            print(f"  {k}: calls={v['calls']}, total={v['total']:.6f}s, avg={v['avg']:.6f}s")
            
        print("\nCounters:")
        for k,v in s['counters'].items():
            print(f"  {k}: {v}")
            
        print("\nResults:")
        for k,v in s['results'].items():
            print(f"  {k}: {v}")
            
        print("="*50)

In [264]:
# Initialize the report to be used to analyze different metrics.
report_csp = Report("CSP_Backtracking", problem_size=8, params={'method':'backtracking'})
report_sa = Report("SimulatedAnnealing", problem_size=8, params={'method':'SA', 'iterations':200})

csp_problem = CSP_QueenProblem(8, reporter=report_csp)
csp_search = CSP_BacktrackingSearchFramework(csp_problem, reporter=report_csp)
sa_problem = SimulatedAnnealingQueenProblem(8, iterations=200, reporter=report_sa)
sa_search = SimulatedAnnealingFramework(sa_problem, reporter=report_sa)

# The run function is the function for which the report is to be written.
csp_run_timed = report_csp.measure('csp_run')(csp_search.run)
sa_run_timed = report_sa.measure('sa_run')(sa_search.run)

benchmark(report_csp, csp_run_timed, n_runs=5)
benchmark(report_sa,  sa_run_timed,  n_runs=5)

report_csp.pretty_print()
report_sa.pretty_print()


=== Benchmarking CSP_Backtracking (5 runs) ===
  Run 1/5...
  Run 2/5...
  Run 3/5...
  Run 4/5...
  Run 5/5...
Benchmark completed!


=== Benchmarking SimulatedAnnealing (5 runs) ===
  Run 1/5...
  Run 2/5...
  Run 3/5...
  Run 4/5...
  Run 5/5...
Benchmark completed!

=== Report: CSP_Backtracking ===

Problem size: 8
Params: {'method': 'backtracking'}

Timings:
  csp_run: calls=5, total=0.007418s, avg=0.001484s

Counters:

Results:
  solution_run_1: {'assigned_queens': [Pair(i=0, j=0), Pair(i=1, j=4), Pair(i=2, j=7), Pair(i=3, j=5), Pair(i=4, j=2), Pair(i=5, j=6), Pair(i=6, j=1), Pair(i=7, j=3)], 'unassigned_queens': []}
  solution_run_2: {'assigned_queens': [Pair(i=0, j=0), Pair(i=1, j=4), Pair(i=2, j=7), Pair(i=3, j=5), Pair(i=4, j=2), Pair(i=5, j=6), Pair(i=6, j=1), Pair(i=7, j=3)], 'unassigned_queens': []}
  solution_run_3: {'assigned_queens': [Pair(i=0, j=0), Pair(i=1, j=4), Pair(i=2, j=7), Pair(i=3, j=5), Pair(i=4, j=2), Pair(i=5, j=6), Pair(i=6, j=1), Pair(i=7, j=3)], 'unassi