<a href="https://colab.research.google.com/github/CaioSobreira/casml_abc/blob/master/casml_abc.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import math
import numpy as np
import copy

class FitnessFunction(object):
  def __init__(self, lower_bound, upper_bound, init_subspace_lower_bound, init_subspace_upper_bound):
    self.lower_bound = lower_bound
    self.upper_bound = upper_bound
    self.init_subspace_lower_bound = init_subspace_lower_bound
    self.init_subspace_upper_bound = init_subspace_upper_bound
    self.fitness_readings_count = 0
    self.best_fitness = float("inf")
    self.best_fitness_list = []
  
  def evaluate(self, position):
    pass

class SphereFitnessFunction(FitnessFunction):
  def __init__(self):
    super(SphereFitnessFunction, self).__init__(-100.0, 100.0, 50.0, 100.0)

  def evaluate(self, position):
    fit = np.sum(position ** 2)
    if fit < self.best_fitness:
      self.best_fitness = fit
    if(self.fitness_readings_count%10 == 0):
      self.best_fitness_list.append(self.best_fitness)
    self.fitness_readings_count += 1
    return fit
      
class RosenbrockFitnessFunction(FitnessFunction):
  def __init__(self):
    super(RosenbrockFitnessFunction, self).__init__(-30.0, 30.0, 15.0, 30.0)

  def evaluate(self, position):
    sum_ = 0.0
    for i in range(1, len(position) - 1):
      sum_ += 100 * (position[i + 1] - position[i] ** 2) ** 2 + (position[i] - 1) ** 2
    fit = sum_    
    if fit < self.best_fitness:
      self.best_fitness = fit
    if(self.fitness_readings_count%10 == 0):
      self.best_fitness_list.append(self.best_fitness)
    self.fitness_readings_count += 1
    return fit

class RastriginFitnessFunction(FitnessFunction):
  def __init__(self):
    super(RastriginFitnessFunction, self).__init__(-5.12, 5.12, 2.56, 5.12)

  def evaluate(self, position):
    f_x = [xi ** 2 - 10 * math.cos(2 * math.pi * xi) + 10 for xi in position]
    fit = sum(f_x)
    if fit < self.best_fitness:
      self.best_fitness = fit
    if(self.fitness_readings_count%10 == 0):
      self.best_fitness_list.append(self.best_fitness)
    self.fitness_readings_count += 1
    return fit
  
class SearchSpace(object):
  def __init__(self, fitness_function, num_dimensions):
    self.num_dimensions = num_dimensions
    self.dimensions_lower_bound = fitness_function.lower_bound
    self.dimensions_upper_bound = fitness_function.upper_bound
        
class Bee(object):
  def __init__(self, fitness_function, search_space, max_trials):
    self.fitness_function = fitness_function
    self.search_space = search_space
    self.position = self.generate_random_position()
    self.fitness = self.fitness_function.evaluate(self.position)
    self.num_trials = 0
    self.max_trials = max_trials
    self.probability = 0.0
    
  def generate_random_position(self):
    #return np.random.uniform(self.search_space.dimensions_lower_bound, self.search_space.dimensions_upper_bound, self.search_space.num_dimensions)
    return np.random.uniform(self.fitness_function.init_subspace_lower_bound, self.fitness_function.init_subspace_upper_bound, self.search_space.num_dimensions)
  
  def max_trials_reached_restart(self):
    if(self.num_trials > self.max_trials):
      self.position = self.generate_random_position()
      self.num_trials = 0
      self.fitness = self.fitness_function.evaluate(self.position)
      self.probability = 0.0
  
class EmployeeBee(Bee):  
  def find_new_food_source(self):
    if self.num_trials <= self.max_trials:
      new_candidate_position = np.zeros((self.search_space.num_dimensions,), dtype=np.float)
      random_position_dimension = np.random.choice(self.position)
      for dimension in range(self.search_space.num_dimensions):
        new_candidate_position[dimension] = self.position[dimension] + np.random.uniform(-1, 1) * (self.position[dimension] - random_position_dimension)
        if new_candidate_position[dimension] > self.search_space.dimensions_upper_bound:
          new_candidate_position[dimension] = self.search_space.dimensions_upper_bound
        if new_candidate_position[dimension] < self.search_space.dimensions_lower_bound:
          new_candidate_position[dimension] = self.search_space.dimensions_lower_bound
      new_candidate_position_fitness = self.fitness_function.evaluate(new_candidate_position)
      if new_candidate_position_fitness < self.fitness:
        self.position = new_candidate_position
        self.fitness = new_candidate_position_fitness
        self.num_trials = 0
      else:
        self.num_trials += 1
        
  def calculate_probability(self, sum_fitness):
    self.probability = self.get_fitness() / sum_fitness
    
  def get_fitness(self):
    if self.fitness >=0:
      return 1 / (1 + self.fitness)
    else:
      return 1 + abs(self.fitness)
    
class OnlookerBee(Bee):
  def exploit(self, food_sources):
    for food_source in food_sources:
      r = np.random.uniform(0,1)
      if r < food_source.probability:
        if self.num_trials <= self.max_trials:
          self.position = copy.copy(food_source.position)
          self.fitness = self.fitness_function.evaluate(self.position)
          new_candidate_position = np.zeros((self.search_space.num_dimensions,), dtype=np.float)
          random_position_dimension = np.random.choice(self.position)
          for dimension in range(self.search_space.num_dimensions):
            new_candidate_position[dimension] = self.position[dimension] + np.random.uniform(-1, 1) * (self.position[dimension] - random_position_dimension)
            if new_candidate_position[dimension] > self.search_space.dimensions_upper_bound:
              new_candidate_position[dimension] = self.search_space.dimensions_upper_bound
            if new_candidate_position[dimension] < self.search_space.dimensions_lower_bound:
              new_candidate_position[dimension] = self.search_space.dimensions_lower_bound
          new_candidate_position_fitness = self.fitness_function.evaluate(new_candidate_position)
          if new_candidate_position_fitness < self.fitness:
            self.position = new_candidate_position
            self.fitness = new_candidate_position_fitness
            self.num_trials = 0
          else:
            self.num_trials += 1
            
class Colony(object):
  def __init__(self, search_space, fitness_function, size, max_trials, max_fitness_evaluations):
    self.search_space = search_space
    self.fitness_function = fitness_function
    self.size = size
    self.max_trials = max_trials
    self.max_fitness_evaluations = max_fitness_evaluations
    self.employee_bees_list = []
    self.onlooker_bees_list = []
    self.best_solution = None
    self.best_solution_list = []
    for idx in range(self.size // 2):
      employee_bee = EmployeeBee(self.fitness_function, self.search_space, self.max_trials)
      self.employee_bees_list.append(employee_bee)
      onlooker_bee = OnlookerBee(self.fitness_function, self.search_space, self.max_trials)
      self.onlooker_bees_list.append(onlooker_bee)
  
  def employee_bees_phase(self):
    for employee_bee in self.employee_bees_list:
      employee_bee.find_new_food_source()
      
  def employee_bees_sum_fitness(self):
    sum = 0.0
    for employee_bee in self.employee_bees_list:
      sum += employee_bee.get_fitness()
    return sum
      
  def calculate_probabilities(self):
    sum_fitness = self.employee_bees_sum_fitness()
    for employee_bee in self.employee_bees_list:
      employee_bee.calculate_probability(sum_fitness)    
      
  def onlooker_bees_phase(self):
    for onlooker_bee in self.onlooker_bees_list:
      onlooker_bee.exploit(self.employee_bees_list)
      
  def scout_bees_phase(self):
    for employee_bee in self.employee_bees_list:
      employee_bee.max_trials_reached_restart()
    for onlooker_bee in self.onlooker_bees_list:
      onlooker_bee.max_trials_reached_restart()
      
  def memorize_best_solution(self):
    for employee_bee in self.employee_bees_list:
      if not self.best_solution:
        self.best_solution = copy.copy(employee_bee)
        self.best_solution_list.append(self.best_solution)
      else:
        if employee_bee.fitness < self.best_solution.fitness:
          self.best_solution = copy.copy(employee_bee)
          self.best_solution_list.append(self.best_solution)
    for onlooker_bee in self.onlooker_bees_list:
      if not self.best_solution:
        self.best_solution = copy.copy(onlooker_bee)
        self.best_solution_list.append(self.best_solution)
      else:
        if onlooker_bee.fitness < self.best_solution.fitness:
          self.best_solution = copy.copy(onlooker_bee)
          self.best_solution_list.append(self.best_solution)
          
      
  def optimize(self):
    while(self.fitness_function.fitness_readings_count <= self.max_fitness_evaluations):
      self.employee_bees_phase()
      self.calculate_probabilities()
      self.onlooker_bees_phase()
      self.scout_bees_phase()
      #self.memorize_best_solution()
      #print("fitness_readings_count: {} = best_fitness: {}".format(self.fitness_function.fitness_readings_count, "%04.03e" % self.best_solution.fitness))
    

SyntaxError: ignored

In [0]:
import csv

num_dimensions = 30
colony_size = 30
max_trials = 100
max_fitness_evaluations = 500000

#Rosenbrock
exec_tracking_best_fitness_list_rosenbrock = []

print('Rosenbrock:')
for num_exec in range(30):
  print(num_exec)
  fitness_function = RosenbrockFitnessFunction()
  search_space = SearchSpace(fitness_function, num_dimensions)
  colony = Colony(search_space, fitness_function, colony_size, max_trials, max_fitness_evaluations)
  colony.optimize()
  #exec_tracking_best_fitness_list.append(colony.best_solution_list)
  exec_tracking_best_fitness_list_rosenbrock.append(fitness_function.best_fitness_list)
    
#Sphere
exec_tracking_best_fitness_list_sphere = []

print('Sphere:')
for num_exec in range(30):
  print(num_exec)
  fitness_function = SphereFitnessFunction()
  search_space = SearchSpace(fitness_function, num_dimensions)
  colony = Colony(search_space, fitness_function, colony_size, max_trials, max_fitness_evaluations)
  colony.optimize()
  #exec_tracking_best_fitness_list.append(colony.best_solution_list)
  exec_tracking_best_fitness_list_sphere.append(fitness_function.best_fitness_list)
  
#Rastrigin
exec_tracking_best_fitness_list_rastrigin = []

print('Rastrigin:')
for num_exec in range(30):
  print(num_exec)
  fitness_function = RastriginFitnessFunction()
  search_space = SearchSpace(fitness_function, num_dimensions)
  colony = Colony(search_space, fitness_function, colony_size, max_trials, max_fitness_evaluations)
  colony.optimize()
  #exec_tracking_best_fitness_list.append(colony.best_solution_list)
  exec_tracking_best_fitness_list_rastrigin.append(fitness_function.best_fitness_list)

with open("abc_rosenbrock.csv","w+") as my_csv:
  csvWriter = csv.writer(my_csv,delimiter=',')
  csvWriter.writerows(exec_tracking_best_fitness_list_rosenbrock)
  
with open("abc_sphere.csv","w+") as my_csv:
  csvWriter = csv.writer(my_csv,delimiter=',')
  csvWriter.writerows(exec_tracking_best_fitness_list_sphere)
  
with open("abc_rastrigin.csv","w+") as my_csv:
  csvWriter = csv.writer(my_csv,delimiter=',')
  csvWriter.writerows(exec_tracking_best_fitness_list_rastrigin)



Rosenbrock:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Sphere:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Rastrigin:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
