In [202]:
import functools
import numpy as np
import random
import pdb

epsilon = 10 ** (-10)

In [203]:
class Candidate:
    def __init__(self):
        self.crowding_distance = 0
        self.domination_count = 0
        self.rank = 0
        self.objectives = None
        self.dominated_candidates = None
        self.features = None
        self.dominates = None
    
    def __repr__(self):
        return "Candidate :: f:{}\n".format(self.features)

In [204]:
class Problem:
    def __init__(self, objectives, bounds, fitness):
        self.objectives = objectives
        self.bounds = bounds
        self.n = len(bounds)
        self.fitness = fitness
        
    def __dominates(self, candidateB, candidateA):
        fA = self.fitness(candidateA.features)
        fB = self.fitness(candidateB.features)
        
        not_dominated = all(map(lambda f: f[0] >= f[1], zip(fA, fB)))
        dominates = any(map(lambda f: f[0] > f[1], zip(fA, fB)))
        
        return dominates and not_dominated
    
    def _calculate_objectives(self, candidate):
        objectives = self.fitness(candidate.features)
        
        return objectives
        
    def generate_candidate(self):
        candidate = Candidate()
        candidate.features = []
        candidate.dominates = functools.partial(self.__dominates,
                                                candidateA=candidate)
        for idx in range(self.n):
            candidate.features.append(random.uniform(self.bounds[idx][0],
                                                     self.bounds[idx][1]))
        candidate.objectives = self._calculate_objectives(candidate)
            
        return candidate


In [205]:
class NSGA2:
    def __init__(self, problem, pop_size, n_gen, cr, m, n_tpart=2, seed=42.0):
        self.pop_size = pop_size        # Population size
        self.n_gen = n_gen              # Number of generations to evolve for
        self.n_tpart = n_tpart          # Number of tournament participants
        self.cr = cr                    # Cross-over probability
        self.m = m                      # Mutation probability
        self.eps = 10 ** -3             # Mutation strength
        self.seed = seed                # Random seed
        
        self.problem = problem          # Instance of the problem, ie. objective function/s
        
        self.population = None
        
    def initialize(self):
        self.population = []
        random.seed(self.seed)
        
        for idx in range(self.pop_size):
            self.population.append(self.problem.generate_candidate())
    
    def _mutate(self, candidate):
        for idx in range(len(candidate.features)):
            if random.random() > self.m:
                candidate.features[idx] += self.eps * (random.random() - 1) / 2
                
                if candidate.features[idx] > self.problem.bounds[idx][1]:
                    candidate.features[idx] = self.problem.bounds[idx][1]
                elif candidate.features[idx] < self.problem.bounds[idx][0]:
                    candidate.features[idx] = self.problem.bounds[idx][0]
                    
        return candidate
    
    def _crossover(self, candidateA, candidateB):
        for idx in range(len(candidateA.features)):
            if random.random() > self.cr:
                candidateA.features[idx], candidateB.features[idx]\
                = candidateB.features[idx], candidateA.features[idx]
                
        return candidateA, candidateB
    
    def _tournament(self, population):
        participants = random.sample(population, self.n_tpart)
        return participants
    
    def _create_offsprings(self, population):
        offsprings = []
        
        while len(offsprings) < self.pop_size:
            parentA, parentB = self._tournament(population)

            childA, childB = self._crossover(parentA, parentB)
            childA = self._mutate(childA)
            childA.objectives = self.problem._calculate_objectives(childA)
            
            childB = self._mutate(childB)
            childB.objectives = self.problem._calculate_objectives(childB)
            
            offsprings += [childA, childB]
            
        # print(f"Created {len(offsprings)} offsprings.")
            
        return offsprings
        
    def _fast_non_dominated_sort(self, population):
        fronts = [[]]
        idx = 0
        
        for candidateA in population:
            candidateA.domination_count = 0
            candidateA.dominated_candidates = set()
            
            for candidateB in population:
                if candidateA.dominates(candidateB):
                    candidateA.dominated_candidates.add(candidateB)
                elif candidateB.dominates(candidateA):
                    candidateA.domination_count += 1
                    
            if candidateA.domination_count == 0:
                candidateA.rank = 0
                fronts[idx].append(candidateA)
                
        
        while len(fronts[idx]) > 0:
            ith_front = []
            
            for candidateA in fronts[idx]:
                for candidateB in candidateA.dominated_candidates:
                    candidateB.domination_count -= 1
                    
                    if candidateB.domination_count == 0:
                        candidateB.rank = idx + 1
                        ith_front.append(candidateB)
                        
            idx = idx + 1
            fronts.append(ith_front)
            
        # print(f"Created {len(fronts)} fronts using fNDS, sizes are: {list(map(len, fronts))}")
        # print(f"Starting population for fNDS was: {len(population)}")
        # print(f"Population after fNDS is: {sum(map(len, fronts))}")
            
        return fronts           
    
    def _calculate_crowding_distance(self, front):
        if len(front) > 0:
            for candidate in front:
                candidate.crowding_distance = 0
            
            for m in range(len(self.problem.objectives)):
                front = sorted(front,
                               key=lambda x: x.objectives[m])
                front[0].crowding_distance = 10 ** 100
                front[-1].crowding_distance = 10 ** 100
                
                for idx in range(1, len(front)-2): 
                    front[idx].crowding_distance +=\
                        (front[idx+1].objectives[m] - front[idx+1].objectives[m])/\
                                (self.problem.bounds[1] - self.problem.bounds[0])               
                
    def _crowding_operator(self, candidateA, candidateB):
        return candidateA.rank < candidateB.rank or\
                (candidateA.rank == candidateB.rank and\
                 candidateA.crowding_distance > candidateB.crowding_distance)
        
    def solve(self):
        self.initialize()
        p_t = self.population
            
        for t_step in range(self.n_gen):
            print(f"Generation #{t_step}, pop_size: {len(self.population)}")
            
            q_t = self._create_offsprings(p_t)
            r_t = p_t + q_t
            fronts = self._fast_non_dominated_sort(r_t)
            
            for front in fronts:
                self._calculate_crowding_distance(front)
            
            p_t = []
            front_idx = 0
            
            while front_idx < len(fronts):
                self._calculate_crowding_distance(fronts[front_idx])
                p_t.extend(fronts[front_idx])
                front_idx += 1
                
                if len(p_t) >= self.pop_size:
                    break
                
            p_t = p_t[:self.pop_size]
            self.population = p_t
                
        return self.population
        

In [206]:
def f1(x):
    return x ** 2

def f2(x):
    return (x - 2) ** 2

def fitness(features):
    """minimise for both f1, f2 - so max fitness at optima"""
    return 1/(f1(features[0]) + epsilon), 1/(f2(features[1]) + epsilon)

objectives = [f1, f2] 
bounds = np.array([[-10 ** 3, 10 ** 3], [-10 ** 3, 10 ** 3]])
problem = Problem(objectives=objectives,
                  bounds=bounds,
                  fitness=fitness)

In [207]:
solver = NSGA2(problem=problem,
               pop_size=10,
               n_tpart=2,
               n_gen=10,
               cr=0.2,
               m=0.2)

In [208]:
pop = solver.solve()

Generation #0, pop_size: 10
Generation #1, pop_size: 8
Generation #2, pop_size: 9
Generation #3, pop_size: 10
Generation #4, pop_size: 7
Generation #5, pop_size: 10
Generation #6, pop_size: 10
Generation #7, pop_size: 10
Generation #8, pop_size: 10
Generation #9, pop_size: 10




In [209]:
print(pop)

[Candidate :: f:[-156.15986271419789, 10.7021987664908]
, Candidate :: f:[-156.15986271419789, 10.7021987664908]
, Candidate :: f:[-156.15986271419789, 10.7021987664908]
, Candidate :: f:[-156.15986271419789, 10.7021987664908]
, Candidate :: f:[-156.15986271419789, 10.7021987664908]
, Candidate :: f:[-156.15986271419789, 10.7021987664908]
, Candidate :: f:[-156.15986271419789, 10.7021987664908]
, Candidate :: f:[-156.15986271419789, 10.7021987664908]
, Candidate :: f:[-156.15986271419789, 10.7021987664908]
, Candidate :: f:[-156.15986271419789, 10.7021987664908]
]
