<a href="https://colab.research.google.com/github/harikrishnareddymallavarapu/Optimization-MetaHeuristic/blob/master/PSO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import random

In [None]:
#Algo
#V = Momentum Part + Cognitive Part + Social Part
#Vinew = WVi + C1R1(Pbest-Xi) + C2R2(Gbest-Xi)
#Xinew = Xi + Vinew

#Momentum Part - Acts as memory of the previous flight - It restricts drastic movement and keeps it biased to previous flight
#Cognitive Part - Quantfies the performance to its best location - Particles drawn back to their own best
#Social Part - Quantifies the performance to its neighbours

#1. Fix the population size,inertia, accelaration coefficient, max iterations
#2. Generate Random positions and evaluate the fitness
#3. Generate Random Velocities within the domain of the random variables
#4. Determine the personal best and global best
#5. Generate two set of random numbers
#6. Determine Velocity
#7. Determine Position



In [None]:
class PSO():
  populationSize =5
  inertiaW = 0.7
  coefficient1 = 1.5
  coefficient2 = 1.5
  noOfIterations = 10
  paramSols = np.array([])
  funcError = np.array([])
  personalBestError =  np.array([])
  globalBestError = np.array([])
  personalBestSols = np.array([])
  globalBestSol = np.array([])
  randomVelocities = np.array([])
  paramBoundaries = np.array([])

  def getFuncParamError(self,paramValues):
    error=0
    #print(paramValues)
    for i in range(paramValues.size):
      error= error+(paramValues[i]*paramValues[i])
    #print(error)  
    return error

  def getFuncErrorForParamSols(self):
    for i in range(self.populationSize):
      error = self.getFuncParamError(self.paramSols[i])
      self.funcError = np.append(self.funcError,[error])

  def initialiseParamSols(self):
    paramSols2 = np.array([])
    for i in range(self.populationSize):
      randomArray = np.zeros((0,len(self.paramBoundaries)),int)
      for j in range(len(self.paramBoundaries)):
        randomValue = np.array([random.randint(self.paramBoundaries[j][0],self.paramBoundaries[j][1])])
        randomArray = np.append(randomArray,randomValue)
      if paramSols2.size==0: 
        paramSols2 = np.array([randomArray])
      else:
        paramSols2 = np.vstack((paramSols2,randomArray)) 
    return paramSols2

  def updateVelocities(self,rowNo):
    #Vinew = WVi + C1R1(Pbest-Xi) + C2R2(Gbest-Xi)
    randomVector1 = self.randomParamSols()
    randomVector2 = self.randomParamSols()
    self.randomVelocities[rowNo] = self.inertiaW * self.randomVelocities[rowNo] \
      + self.coefficient1 * randomVector1*(self.personalBestSols[rowNo]-self.paramSols[rowNo]) \
      + self.coefficient2 * randomVector2*(self.personalBestSols[np.argmax(self.funcError)] - self.paramSols[rowNo])

  def randomParamSols(self):
    randomArray = np.array([])
    for i in range(len(self.paramBoundaries)):
      randomValue = np.array([round(random.random(), 1)])
      randomArray = np.append(randomArray,randomValue)
    return randomArray

  def adjustBoundariesForNewSol(self,newSolution):
    for i in range(len(self.paramBoundaries)):
      if newSolution[i]>self.paramBoundaries[i][1]:
        newSolution[i]=self.paramBoundaries[i][1]
      elif newSolution[i] < self.paramBoundaries[i][0]:
        newSolution[i]=self.paramBoundaries[i][0]
      else:
        newSolution[i]=newSolution[i]
    return newSolution 

  def executePSO(self,weight,coeff1,coeff2,popSize,totalIterations,paramRange):
    self.populationSize = popSize
    self.inertiaW = weight
    self.coefficient1 = coeff1
    self.coefficient2 = coeff2
    self.noOfIterations = totalIterations
    self.paramBoundaries = paramRange

    if self.paramSols.size==0:
      self.paramSols = self.initialiseParamSols()
    
    self.getFuncErrorForParamSols()
    self.randomVelocities = self.initialiseParamSols()

    self.personalBestSols = np.copy(self.paramSols)
    self.personalBestError = np.copy(self.funcError)

    for i in range(self.noOfIterations):
      for j in range(0,self.populationSize):        
        #update velocities and get the latest vector
        self.updateVelocities(j)
        #updating parameter
        self.paramSols[j] = self.paramSols[j] + self.randomVelocities[j]
        #adjusting boundaries
        self.paramSols[j] = self.adjustBoundariesForNewSol(self.paramSols[j])
        #calculating fitness
        self.funcError[j] = self.getFuncParamError(self.paramSols[j])
        if self.funcError[j]<self.personalBestError[j]:
          #print("True")
          self.personalBestError[j] = self.funcError[j]
          self.personalBestSols[j] = self.paramSols[j] 



In [None]:
psoClass = PSO()
psoClass.executePSO(0.7,1.5,1.5,5,20,[[10,20],[20,100]])

True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True


In [None]:
psoClass.personalBestSols

array([[10, 20],
       [10, 20],
       [10, 20],
       [10, 20],
       [10, 20]])

In [None]:
psoClass.personalBestError

array([500., 500., 500., 500., 500.])