<a href="https://colab.research.google.com/github/CaioSobreira/pso/blob/master/casml_fss.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import math
import numpy as np
import copy

class FitnessFunction(object):
  def __init__(self, lower_bound, upper_bound, init_subspace_lower_bound, init_subspace_upper_bound):
    self.lower_bound = lower_bound
    self.upper_bound = upper_bound
    self.init_subspace_lower_bound = init_subspace_lower_bound
    self.init_subspace_upper_bound = init_subspace_upper_bound
    self.fitness_readings_count = 0
    self.best_fitness = float("inf")
    self.best_fitness_list = []
  
  def evaluate(self, position):
    pass

class SphereFitnessFunction(FitnessFunction):
  def __init__(self):
    super(SphereFitnessFunction, self).__init__(-100.0, 100.0, 50.0, 100.0)

  def evaluate(self, position):
    fit = np.sum(position ** 2)
    if fit < self.best_fitness:
      self.best_fitness = fit
    if(self.fitness_readings_count%10 == 0):
      self.best_fitness_list.append(self.best_fitness)
    self.fitness_readings_count += 1
    return fit
      
class RosenbrockFitnessFunction(FitnessFunction):
  def __init__(self):
    super(RosenbrockFitnessFunction, self).__init__(-30.0, 30.0, 15.0, 30.0)

  def evaluate(self, position):
    sum_ = 0.0
    for i in range(1, len(position) - 1):
      sum_ += 100 * (position[i + 1] - position[i] ** 2) ** 2 + (position[i] - 1) ** 2
    fit = sum_    
    if fit < self.best_fitness:
      self.best_fitness = fit
    if(self.fitness_readings_count%10 == 0):
      self.best_fitness_list.append(self.best_fitness)
    self.fitness_readings_count += 1
    return fit

class RastriginFitnessFunction(FitnessFunction):
  def __init__(self):
    super(RastriginFitnessFunction, self).__init__(-5.12, 5.12, 2.56, 5.12)

  def evaluate(self, position):
    f_x = [xi ** 2 - 10 * math.cos(2 * math.pi * xi) + 10 for xi in position]
    fit = sum(f_x)
    if fit < self.best_fitness:
      self.best_fitness = fit
    if(self.fitness_readings_count%10 == 0):
      self.best_fitness_list.append(self.best_fitness)
    self.fitness_readings_count += 1
    return fit
  
class SearchSpace(object):
  def __init__(self, fitness_function, num_dimensions):
    self.num_dimensions = num_dimensions
    self.dimensions_lower_bound = fitness_function.lower_bound
    self.dimensions_upper_bound = fitness_function.upper_bound
  
class Fish(object):
  def __init__(self, fitness_function, search_space, w_scale, min_weight):
    nan = float('nan')
    self.fitness_function = fitness_function
    self.search_space = search_space
    self.w_scale = w_scale
    self.min_weight = min_weight
    self.position = [nan for _ in range(self.search_space.num_dimensions)]   
    self.weight = np.nan
    self.init_position()
    self.init_weight()
    self.cost = self.fitness_function.evaluate(self.position)
    self.delta_position = np.nan
    self.delta_cost = np.nan   
    self.has_improved = False
  
  def init_position(self):
    #self.position = np.random.uniform(self.search_space.dimensions_lower_bound, self.search_space.dimensions_upper_bound, self.search_space.num_dimensions)
    self.position = np.random.uniform(self.fitness_function.init_subspace_lower_bound, self.fitness_function.init_subspace_upper_bound, self.search_space.num_dimensions)
    
  def init_weight(self):
    #initial weight -> w_scale / 2  |  w_scale -> num_iterations / 2       (random??)
    self.weight = self.w_scale / 2.0  
    
    #como na apresentacao nao sao ditos quais os parametros para geracao do peso inicial aleatorio foi utilizada esta formula:
    #self.weight = self.min_weight + np.random.uniform(0.0, self.w_scale)
  
  def move_individual(self, curr_step_individual):
    new_pos = np.zeros((self.search_space.num_dimensions,), dtype=np.float)
    for dim in range(self.search_space.num_dimensions):
      new_pos[dim] = self.position[dim] + (curr_step_individual * np.random.uniform(-1, 1))
      if new_pos[dim] < self.search_space.dimensions_lower_bound:
        new_pos[dim] = self.search_space.dimensions_lower_bound
      elif new_pos[dim] > self.search_space.dimensions_upper_bound:
        new_pos[dim] = self.search_space.dimensions_upper_bound
    cost = self.fitness_function.evaluate(new_pos)
    
    ###self.optimum_cost_tracking_eval.append(self.best_fish.cost)#########
    
    if cost < self.cost:
      self.delta_cost = abs(cost - self.cost)
      self.cost = cost
      delta_pos = np.zeros((self.search_space.num_dimensions,), dtype=np.float)
      for idx in range(self.search_space.num_dimensions):
        delta_pos[idx] = new_pos[idx] - self.position[idx]
      self.delta_position = delta_pos
      self.position = new_pos
    else:
      self.delta_position = np.zeros((self.search_space.num_dimensions,), dtype=np.float)
      self.delta_cost = 0
      
  def move_collective_instinctive(self, cost_eval_enhanced):
#    print("--------------------------------------collec_inst")
    new_pos = np.zeros((self.search_space.num_dimensions,), dtype=np.float)
    for dim in range(self.search_space.num_dimensions):
      new_pos[dim] = self.position[dim] + cost_eval_enhanced[dim]
      if new_pos[dim] < self.search_space.dimensions_lower_bound:
        new_pos[dim] = self.search_space.dimensions_lower_bound
      elif new_pos[dim] > self.search_space.dimensions_upper_bound:
        new_pos[dim] = self.search_space.dimensions_upper_bound
    self.position = new_pos
    
  def move_collective_volitive(self, school_barycenter, school_curr_weight, school_prev_weight, curr_step_volitive):
    new_pos = np.zeros((self.search_space.num_dimensions,), dtype=np.float)
    for dim in range(self.search_space.num_dimensions):
      if school_curr_weight > school_prev_weight:
        new_pos[dim] = self.position[dim] - ((self.position[dim] - school_barycenter[dim]) * curr_step_volitive * np.random.uniform(0, 1))
      else:
        new_pos[dim] = self.position[dim] + ((self.position[dim] - school_barycenter[dim]) * curr_step_volitive * np.random.uniform(0, 1))
      if new_pos[dim] < self.search_space.dimensions_lower_bound:
        new_pos[dim] = self.search_space.dimensions_lower_bound
      elif new_pos[dim] > self.search_space.dimensions_upper_bound:
        new_pos[dim] = self.search_space.dimensions_upper_bound
    cost = self.fitness_function.evaluate(new_pos)
    
    ###self.optimum_cost_tracking_eval.append(self.best_fish.cost)#########
    
    self.cost = cost
    self.position = new_pos
    
  def feed(self, max_delta_cost, w_scale, min_weight):
    self.weight = self.weight + (self.delta_cost / max_delta_cost)
    #if self.weight > w_scale:
      #self.weight = w_scale
    #elif self.weight < min_weight:
    if self.weight < min_weight:
      self.weight = min_weight
  
class School(object):
  def __init__(self, school_size, fitness_function, search_space, w_scale, min_weight):
    self.school_size = school_size
    self.fitness_function = fitness_function
    self.search_space = search_space
    self.w_scale = w_scale
    self.min_weight = min_weight
    self.prev_weight_school = 0.0
    self.curr_weight_school = 0.0
    self.best_fish = Fish(self.fitness_function, self.search_space, self.w_scale, self.min_weight)
    self.best_fish.cost = np.inf
    self.curr_weight_school = 0.0
    self.prev_weight_school = 0.0
    self.fish_list = []   
    for idx in range(self.school_size):
      fish = Fish(self.fitness_function, self.search_space, self.w_scale, self.min_weight)
      self.fish_list.append(fish)
      self.curr_weight_school += fish.weight
    self.prev_weight_school = self.curr_weight_school
    self.update_best_fish()
    
    ###self.optimum_cost_tracking_iter.append(self.best_fish.cost)#########
    
  def max_delta_cost(self):
    max_ = 0
    for fish in self.fish_list:
      if max_ < fish.delta_cost:
        max_ = fish.delta_cost
    return max_
  
  def total_school_weight(self):
    self.prev_weight_school = self.curr_weight_school
    self.curr_weight_school = 0.0
    for fish in self.fish_list:
      self.curr_weight_school += fish.weight
      
  def calculate_barycenter(self):
    barycenter = np.zeros((self.search_space.num_dimensions,), dtype=np.float)
    density = 0.0

    for fish in self.fish_list:
      density += fish.weight
      for dim in range(self.search_space.num_dimensions):
        barycenter[dim] += (fish.position[dim] * fish.weight)
    for dim in range(self.search_space.num_dimensions):
      barycenter[dim] = barycenter[dim] / density

    return barycenter
  
  def update_best_fish(self):
    for fish in self.fish_list:
      if self.best_fish.cost > fish.cost:
        self.best_fish = copy.copy(fish)
        
  def individual_movement(self, curr_step_individual):
    for fish in self.fish_list:
      fish.move_individual(curr_step_individual)
        
  def collective_instinctive_movement(self):
    cost_eval_enhanced = np.zeros((self.search_space.num_dimensions,), dtype=np.float)
    density = 0.0
    for fish in self.fish_list:
      density += fish.delta_cost
      for dim in range(self.search_space.num_dimensions):
        cost_eval_enhanced[dim] += (fish.delta_position[dim] * fish.delta_cost)
    for dim in range(self.search_space.num_dimensions):
      if density != 0:
        cost_eval_enhanced[dim] = cost_eval_enhanced[dim] / density
    for fish in self.fish_list:
      fish.move_collective_instinctive(cost_eval_enhanced)
      
  def collective_volitive_movement(self, curr_step_volitive):
    self.total_school_weight()
    barycenter = self.calculate_barycenter()
    for fish in self.fish_list:
      fish.move_collective_volitive(barycenter, self.curr_weight_school, self.prev_weight_school, curr_step_volitive)
      
  def feeding(self):
    for fish in self.fish_list:
      if self.max_delta_cost(): #testa se max_delta_cost() != 0
        fish.feed(self.max_delta_cost(), self.w_scale, self.min_weight)  
        
class FSS(object):
  def __init__(self, fitness_function, num_dimensions, n_iter, school_size, step_individual_init, step_individual_final, step_volitive_init, step_volitive_final, min_weight, w_scale, max_fitness_evaluations):
    self.fitness_function = fitness_function
    self.num_dimensions = num_dimensions
    
    self.search_space = SearchSpace(self.fitness_function, self.num_dimensions)
    
    #self.n_iter = n_iter
    self.n_iter = max_fitness_evaluations
    self.max_fitness_evaluations = max_fitness_evaluations

    self.school_size = school_size
    self.step_individual_init = step_individual_init
    self.step_individual_final = step_individual_final
    self.step_volitive_init = step_volitive_init
    self.step_volitive_final = step_volitive_final

    self.curr_step_individual = self.step_individual_init * (self.search_space.dimensions_upper_bound - self.search_space.dimensions_lower_bound)
    self.curr_step_volitive = self.step_volitive_init * (self.search_space.dimensions_upper_bound - self.search_space.dimensions_lower_bound)
    self.min_weight = min_weight
    self.w_scale = w_scale
    
    self.school = School(self.school_size, self.fitness_function, self.search_space, self.w_scale, self.min_weight)
  
  def update_steps(self, curr_iter):
    self.curr_step_individual = self.step_individual_init - curr_iter * float(self.step_individual_init - self.step_individual_final) / self.n_iter
    self.curr_step_volitive = self.step_volitive_init - curr_iter * float(self.step_volitive_init - self.step_volitive_final) / self.n_iter

  def optimize(self):
    #for i in range(self.n_iter):
    while(self.fitness_function.fitness_readings_count <= self.max_fitness_evaluations):
      self.school.individual_movement(self.curr_step_individual)
      self.school.update_best_fish()
      self.school.feeding()
      self.school.collective_instinctive_movement()
      self.school.collective_volitive_movement(self.curr_step_volitive)
      #self.update_steps(i)
      self.update_steps(self.fitness_function.fitness_readings_count)
      self.school.update_best_fish()
      
      print(self.fitness_function.fitness_readings_count, end=" ")
      print(self.school.best_fish.cost)    

In [3]:
#Rosenbrock
school_size = 30
num_dimensions = 30
num_iterations = 50000
max_fitness_evaluations = 500000
step_individual_init = 0.1
#step_individual_final = 0.0001
step_individual_final = 0.001
step_volitive_init = 0.01
step_volitive_final = 0.001
min_weight = 1
w_scale = num_iterations / 2.0
#w_scale = max_fitness_evaluations / 2.0
exec_tracking_best_fitness_list = []

for num_exec in range(30):
  print(num_exec)
  fitness_function = RosenbrockFitnessFunction()
  fss = FSS(fitness_function, num_dimensions, num_iterations, school_size, step_individual_init, step_individual_final, step_volitive_init, step_volitive_final, min_weight, w_scale, max_fitness_evaluations)
  fss.optimize()
  exec_tracking_best_fitness_list.append(fitness_function.best_fitness_list)


0
91 530825802.2592452
151 528737824.3936455
211 527098413.2936619
271 525428146.4739893
331 524606074.64917696
391 522771624.9662013
451 522070604.13367593
511 518453646.3550588
571 516078851.49309444
631 514433257.5027647
691 513270204.6391078
751 512162595.4604122
811 509503584.32702255
871 507966466.67151827
931 506805571.08804953
991 504922792.38920707
1051 503228657.8283036
1111 501486676.0614825
1171 500491723.5398189
1231 499645636.8180094
1291 498095790.6500458
1351 495606220.8419028
1411 493365578.1480869
1471 491502721.55039734
1531 490247360.2940136
1591 489071377.2373085
1651 487724452.47913915
1711 486438990.07843536
1771 485228468.22024643
1831 484142615.393546
1891 482618183.182311
1951 481148890.3951495
2011 479519932.2282754
2071 478619367.1638346
2131 477006433.86911756
2191 475840068.10600114
2251 473903484.5293606
2311 472244415.4104663
2371 471015884.78917265
2431 469548220.8807614
2491 467746346.9811755
2551 466440219.60118425
2611 464203078.5869867
2671 46286986

KeyboardInterrupt: ignored

In [0]:
import csv
with open("fss_rosenbrock.csv","w+") as my_csv:
  csvWriter = csv.writer(my_csv,delimiter=',')
  csvWriter.writerows(exec_tracking_best_fitness_list)

#Sphere
school_size = 30
num_dimensions = 30
num_iterations = 50000
max_fitness_evaluations = 500000
step_individual_init = 0.1
#step_individual_final = 0.0001
step_individual_final = 0.001
step_volitive_init = 0.01
step_volitive_final = 0.001
min_weight = 1
#w_scale = num_iterations / 2.0
w_scale = max_fitness_evaluations / 2.0
exec_tracking_best_fitness_list2 = []

for num_exec in range(30):
  print(num_exec)
  fitness_function = SphereFitnessFunction()
  fss = FSS(fitness_function, num_dimensions, num_iterations, school_size, step_individual_init, step_individual_final, step_volitive_init, step_volitive_final, min_weight, w_scale, max_fitness_evaluations)
  fss.optimize()
  exec_tracking_best_fitness_list2.append(fitness_function.best_fitness_list)
  
#Rastrigin
school_size = 30
num_dimensions = 30
num_iterations = 50000
max_fitness_evaluations = 500000
step_individual_init = 0.1
#step_individual_final = 0.0001
step_individual_final = 0.001
step_volitive_init = 0.01
step_volitive_final = 0.001
min_weight = 1
w_scale = num_iterations / 2.0
#w_scale = max_fitness_evaluations / 2.0
exec_tracking_best_fitness_list3 = []

for num_exec in range(30):
  print(num_exec)
  fitness_function = RastriginFitnessFunction()
  fss = FSS(fitness_function, num_dimensions, num_iterations, school_size, step_individual_init, step_individual_final, step_volitive_init, step_volitive_final, min_weight, w_scale, max_fitness_evaluations)
  fss.optimize()
  exec_tracking_best_fitness_list3.append(fitness_function.best_fitness_list)

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14


In [0]:
with open("fss_sphere.csv","w+") as my_csv:
  csvWriter = csv.writer(my_csv,delimiter=',')
  csvWriter.writerows(exec_tracking_best_fitness_list2)

with open("fss_rastrigin.csv","w+") as my_csv:
  csvWriter = csv.writer(my_csv,delimiter=',')
  csvWriter.writerows(exec_tracking_best_fitness_list3)