In [1]:
import numpy as np
from IPython.display import clear_output

In [2]:
class Particle(object):
    def __init__(self, vector):
        self.vector = vector
        self.score  = None
        
    def real2int(self): # for discrete problems only
        return np.apply_along_axis(np.round, axis=0, arr=self.vector)
    
    def __str__(self):
        return f'Vector:{self.vector} & Score:{self.score}'

In [3]:
class EO(object):
    def __init__(self, problem, popSize, maxIter, parameters, searchSpace = 'discrete'):
        
        self.params = parameters
        
        self.initializer   = problem.initializer
        self.objectiveFunc = problem.objectiveFunc
        self.vecBound      = problem.vecBound
        
        self.popSize = popSize
        self.maxIter = maxIter
        self.searchSpace = searchSpace
        
        self.dim = None
        self.C = []
        self.Ceq = []
        
    def run(self):
        self.initialize()
        for i in range(self.maxIter):
            print(f'-----------starting iteration {i+1}-----------')
            clear_output(wait = True)
            
            self.getScore()
            self.updateCeq()
            self.eoEvolve(i+1)
        
    def initialize(self):
        print(f'-----------initializing Population-----------')
        self.C = [Particle(self.initializer()) for each in range(self.popSize)]
        self.dim = len(self.C[0].vector)
    
    def getScore(self):
        for each in self.C:
            self.objectiveFunc(each)
            
    def updateCeq(self):
        dummy = Particle([]); dummy.score = float('-inf')
        
        cs   = sorted(self.C,   key = lambda x:x.score, reverse=True); cs.append(dummy)
        ceqs = sorted(self.Ceq, key = lambda x:x.score, reverse=True); ceqs.append(dummy)
        
        from copy import copy
        Ceq, i, j = [], 0, 0
        for each in range(self.params['Neq']-1):
            if cs[i].score > ceqs[j].score:
                Ceq.append(copy(cs[i]))
                i+=1
            else:
                Ceq.append(copy(ceqs[j]))
                j+=1
        
        Ceqavg = Particle(np.average([each.vector for each in Ceq], axis=0))
        self.objectiveFunc(Ceqavg)
        Ceq.append(Ceqavg)
        
        if self.searchSpace == 'discrete':
            for i, each in enumerate(Ceq):
                Ceq[i].vector = each.real2int()
            
        self.Ceq = Ceq
        
    def eoEvolve(self, itr):
        a1  = self.params['a1']
        a2  = self.params['a2']
        GP  = self.params['GP']
        Neq = self.params['Neq']
        dim = self.dim
        maxIter = self.maxIter
        vecBound = self.vecBound
        
        r  = np.random.rand(dim)
        r1 = np.random.rand()
        r2 = np.random.rand()
        lamda = np.random.rand(dim)
        
        t = (1 - itr/maxIter)**(a2 * itr/maxIter)
        F = a1 * np.sign(r - 0.5) * (np.exp(-lamda*t) - 1)

        if r2 >= GP:
            GCP = 0.5*r1
        else:
            GCP = 0

        GCP = np.ones(dim)*GCP
        j = np.random.randint(0, Neq)
        
        for each_C in self.C:
            C = each_C.vector
            Ceq = np.random.choice(self.Ceq).vector
            
            G0 = GCP * Ceq - lamda*C
            G = G0 * F
            
            newC = Ceq + (C-Ceq)*F + G*(1-F)/lamda
            newC = np.array([min(max(i, j),k) for i, j, k in zip(vecBound[0], newC, vecBound[1])])
            each_C.vector = newC

## Examples

In [4]:
class BoothFunc(object):
    def __init__(self):
        self.dim = 2
        self.vecBound = (-10,10)
        
        if not isinstance(self.vecBound[0], list):
            i, j = self.vecBound
            self.vecBound = ([i for each in range(self.dim)], [j for each in range(self.dim)])
        
    def initializer(self):
        return np.array([np.random.uniform(i,j) for i, j in zip(*self.vecBound)])

    def objectiveFunc(self, particle):
        x, y = particle.vector
        particle.score = -((x+2*y-7)**2 + (2*x+y-5)**2)

In [5]:
class McCormickFunc(object):
    def __init__(self):
        self.dim = 2
        self.vecBound = ([-1.5, -3],[4, 4])
        
        if not isinstance(self.vecBound[0], list):
            i, j = self.vecBound
            self.vecBound = ([i for each in range(self.dim)], [j for each in range(self.dim)])
        
    def initializer(self):
        return np.array([np.random.uniform(i,j) for i, j in zip(*self.vecBound)])

    def objectiveFunc(self, particle):
        x, y = particle.vector
        particle.score = -( np.sin(x+y) + (x-y)**2 - 1.5*x + 2.5*y + 1 )

In [6]:
parameters = {
    'Neq': 5
    ,'a1': 2.2
    ,'a2': 1
    ,'GP': 0.75
}

In [7]:
prob = BoothFunc() #https://en.wikipedia.org/wiki/Test_functions_for_optimization

eo = EO(
    problem = prob,
    popSize=100,
    maxIter=25,
    parameters = parameters,
    searchSpace = 'Real'
)

eo.run()

print("Equilibrium particles at the end of iterations")
for each in eo.Ceq:
    print(each)

Equilibrium particles at the end of iterations
Vector:[1.35084658 2.61975819] & Score:-0.27113346820412754
Vector:[1.35084658 2.61975817] & Score:-0.2711334926500537
Vector:[1.35084658 2.61975817] & Score:-0.271133492762928
Vector:[1.35084661 2.6197578 ] & Score:-0.2711338787760552
Vector:[1.35084658 2.61975808] & Score:-0.2711335830981722


In [8]:
prob = McCormickFunc() #https://en.wikipedia.org/wiki/Test_functions_for_optimization

eo = EO(
    problem = prob,
    popSize=100,
    maxIter=25,
    parameters = parameters,
    searchSpace = 'Real'
)

eo.run()

print("Equilibrium particles at the end of iterations")
for each in eo.Ceq:
    print(each)

Equilibrium particles at the end of iterations
Vector:[-0.50627019 -1.58100684] & Score:1.9076154194088857
Vector:[-0.50627016 -1.58100686] & Score:1.9076154103825647
Vector:[-0.50626979 -1.58100697] & Score:1.9076153371465856
Vector:[-0.50626941 -1.58100709] & Score:1.9076152627817224
Vector:[-0.50626989 -1.58100694] & Score:1.9076153574301324
