## Function Approximation
### Using the PSO algorithm to optimise the ANN's parameters

**Import Resources**

In [1]:
# import resources

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

#import ipywidgets as widgets
#from ipywidgets import interact, interact_manual

**1. Create Neural Network Class**

In [2]:
class NeuralNetwork:    
    def __init__(self, func, hiddenLayerNeurons, activation, inputList, outputList):
        self.func = func # name of the function to be optimized
        self.layerArch = list.copy(hiddenLayerNeurons) #list of neurons in the hidden layers
        self.activation = activation # name of the activation fuction
          
        self.inputArray = inputList # array of given input dataset
        self.outputArray = outputList # array of the desired output dataset
        #print(self.inputArray,self.outputArray)
        
        # adding input and output layers to the hidden neurons list
        self.inputOutputNeurons()
        
        # calculate number of weights
        self.nWeights = self.numWeights()
        
        # set activation function
        self.actFunc = self.functionSelection()
        
        # set indices at which weight matrix to be split for matrix multiplication
        self.splitIndices = self.splitIndices()
        
    def getANN_Hyperparameters(self):
        return [self.func, self.actFunc, self.layerArch]
        
    def getnWeights(self): # getter methonds for number of weights
        return self.nWeights
        
    def inputOutputNeurons(self): # a private method to neuralNetwork class
        # Set number neurons in the Input layer
        if self.func in ('XOR','Complex'):
            self.layerArch.insert(0,2) # XOR & Complex functions are based on two input variables
        else:
            self.layerArch.insert(0,1) # Linear, Cubic, Sine, Tanh functions are based on single input variable
       
        # Set number of neurons in the Output layer
        self.layerArch.append(1) # only single output value is expected from the fuction
        
    def numWeights(self):# a private method to neuralNetwork class
        n_weights = sum(self.layerArch[i]*self.layerArch[i+1] for i in range(len(self.layerArch)-1))
        return n_weights
    
    def functionSelection(self):# a private method to neuralNetwork class
        return activation_funcs[self.activation]
    
    def splitIndices(self):
        indices = []
        for i in range(len(self.layerArch)-1):
            indices.append(self.layerArch[i]*self.layerArch[i+1])

        splitIndices = []
        splitIndices.append(indices[0])
        for j in range(1,len(indices)-1):
            splitIndices.append(splitIndices[j-1] + indices[j])

        return splitIndices
    
    def forward(self, weights):
        
        # to do: check the length of list of weigths matches the number weights
        
        wArray = np.split(weights,self.splitIndices)
        #print("splitIndices",self.splitIndices)
        #print("wArray",wArray)
        #print("layerArch", self.layerArch)
        
        wMatrix=[]
        for i in range(0,len(self.layerArch)-1):
            wMatrix.append(wArray[i].reshape(self.layerArch[i+1],self.layerArch[i]))
        #print("wArray after reshape", wMatrix)
    
        desiredArray = np.copy(self.outputArray)
        predictArray = []
        
        for i in range(len(self.inputArray)):
            ih = self.inputArray[i].reshape(-1,1)
            for j in range(0,len(self.layerArch)-1):
                #print("wMatrix = ", wMatrix[j], "ih = ", ih)
                ih = np.matmul(wMatrix[j],ih)
            
                if j != (len(self.layerArch)-1):
                    ih = self.actFunc(ih)
            predictArray.append(ih)
            
        #print(len(desiredArray), len(predictArray))

        squeezePredict = np.squeeze(predictArray)

        mse = ((desiredArray - squeezePredict)**2).mean(axis = None)
       
        return mse

**2. Create PSO Class**<br>
swarmsize - desired swarm size i.e. weight array<br>
$ \alpha \gets$ inertia weight i.e. proportion of velocity to be retained<br>
$ \beta \gets$ cognitive weight i.e. proportion of personal best to be retained<br>
$ \delta \gets$ social weight i.e. proportion of global best to be retained<br>
$ \epsilon \gets$ jump size of a particle<br><br>
**Enforcing Boundaries**: enforced boundaries by Reinitialising the position of out of bounds particles<br>

In [3]:
class ParticleSwarmOptimizer:    
    def __init__(self, swarm_size,pso_alpha, pso_beta, pso_delta, max_epochs, bound_max, bound_min):
        # PSO parameters
        self.swarmSize = swarm_size # number of particles in swarm
        self.psoAlpha = pso_alpha # inertia weight
        self.psoBeta = pso_beta # cognitive weight 
        self.psoDelta = pso_delta # social weight
        self.maxEpochs = max_epochs # maximum number of iterations

        self.bound_max = bound_max
        self.bound_min = bound_min
        
        self.data =[] # list to record data

    def getSwarmSize(self):
        return self.swarmSize # no of particles in swarm
    
    def getRunData(self):
        return self.data
    
    def getPSO_Hyperparameter(self):
        return [self.swarmSize, self.psoAlpha, self.psoBeta, self.psoDelta, self.maxEpochs, self.bound_max, self.bound_min]
   
    def forward(self,swarm):
        
        # initialise global best position and fittness/error
        best_swarm_pos = np.ones((swarm[0].getDim()))
        best_swarm_err = 500
        
        data=[]
        epoch = 0
        while epoch < self.maxEpochs: # run until fixed number of iterations
            
            #Update global best
            for ii in range(self.swarmSize): # for each particle in swarm/population
                
                position_ii = swarm[ii].getPosition()
                err_ii = (swarm[ii].getError())
  
                # update Global/swarm best error and position if found comparing fitness/error
                if err_ii < best_swarm_err:
                    best_swarm_err = err_ii
                    best_swarm_pos = position_ii
                

            for i in range(self.swarmSize): # for each particle in swarm/population
                
                # Gather information            
                dim_i = swarm[i].getDim()
                position_i = swarm[i].getPosition()
                velocity_i = swarm[i].getVelocity()
                best_part_error_i = swarm[i].getBestPartErr()
                best_part_position_i = swarm[i].getBestPartPos()
                err_i = swarm[i].getError()
                
                print('epoch:{:03d} ,particle:{:02d}, par err:{:0.5f}, best part err:{:0.5f}, best swarm err:{:0.2f}'.
                      format(epoch+1, i+1, err_i, best_part_error_i ,best_swarm_err))
                
                #self.data.append([('epoch',epoch+1),('particle',i+1),('Particle Current Quality',err_i),('Particle Best Quality',best_part_error_i), ('Global Best Quality',best_swarm_err)])
                self.data.append([epoch+1,i+1,err_i,,best_part_error_i,best_swarm_err])
                  
                
                # compute particle new velocity
                for k in range(dim_i): # for each dimension/weight in particle i
                    a = self.psoAlpha*velocity_i[k]
                    b = self.psoBeta*(best_part_position_i[k] - position_i[k])
                    c = self.psoDelta*(best_swarm_pos[k] - position_i[k])
                    velocity_i[k] = a+b+c

                    
                # compute particle new position
                position_f = np.add(position_i,velocity_i)
                
                
                # enforcing boundaries by Reinitialising the position of out of bounds particles
                kk=0
                while kk < dim_i:
                    if position_f[kk] >= self.bound_max or position_f[kk] <= self.bound_min:
                        position_f = (np.random.rand((dim_i)).astype(np.float32)-0.5)/2
                        velocity_i = (np.random.rand((dim_i)).astype(np.float32)-0.5)/2
                        print('weights breached at weights#',kk)
                        break
                    kk +=1
                
                              
                # compute new fitness
                new_err = swarm[i].ann.forward(position_f)
                
                # update particle best error and position if found comparing fitness/error
                if new_err < best_part_error_i:
                    swarm[i].setBestPartErr(new_err)
                    swarm[i].setBestPartPos(position_f)
                
                # update particle class parameters              
                swarm[i].setPosition(position_f) # update particle position
                swarm[i].setVelocity(velocity_i) #update particle velocity
                swarm[i].setError(new_err) #update particle current error
                
                #print('epoch:{:03d} ,particle:{:02d}, par err:{:0.5f}, best part err:{:0.5f}, best swarm err:{:0.2f}'.
                      #format(epoch+1, i+1, new_err, swarm[i].getBestPartErr() ,best_swarm_err))
            
            
            print('\n--------------------------------------------------------------------------------------\n')
            epoch += 1
        return best_swarm_pos 


SyntaxError: invalid syntax (<ipython-input-3-946cbb445c4e>, line 57)

**3. Create Particles Class**

In [None]:
class Particles:
    def __init__(self, ann):
        self.ann = ann # neural network
        self.dim = self.ann.getnWeights() # no. of weights in nurol network = dimension of particle
        
        # initialise particle best position and velocity with random values 
        self.position = (np.random.rand((self.dim)).astype(np.float64)-0.5)/2 #current position; initialized with random
        self.velocity = (np.random.rand((self.dim)).astype(np.float64)-0.5)/2 #current velocity; initialized with random
        
        self.err = self.ann.forward(self.position)  #compute error for initial position
        
        self.best_part_pos = np.copy(self.position) # intialize particle best position as intial position
        self.best_part_err = np.copy(self.err)  #intialized particle best error as self error from intial position
        
    # getter methods    
    def getDim(self): return self.dim
    def getPosition(self):return self.position
    def getVelocity(self):return self.velocity
    def getError(self):return self.err
    def getBestPartErr(self):return self.best_part_err
    def getBestPartPos(self):return self.best_part_pos

    
    # setter methods    
    def setPosition(self,pos):self.position = np.copy(pos)
    def setVelocity(self,vel):self.velocity = np.copy(vel)
    def setError(self,err): self.err = np.copy(err)
    def setBestPartErr(self,best_err): self.best_part_err = np.copy(best_err)
    def setBestPartPos(self,best_pos): self.best_part_pos = np.copy(best_pos)

In [None]:
'''
a = np.random.rand(3,3).astype(np.float32)-0.5
print(a)

a = np.random.rand(3,3).astype(np.float32)-0.5
print(a)
'''

**Select function from User input**

In [None]:
'''
# Set function from User input
func = input('Choose the function name \n(Options: Linear, Cubic, Sine, TanH, XOR, Complex) = ')
'''

**Set Hyperparameters from User input**

In [None]:
'''
# Set Hyperparameters from User input

# set number of hidden layers
hidden_layers = int(input('Enter number of hidden layers = '))


# set number of neurons in the hidden layers
if(hidden_layers > 0):
    hidden_layer_neurons = []
    for layer in range(hidden_layers):
        hidden_layer_neurons.append(int(input('Enter number of nodes in hidden layer {}= '.format(layer+1))))
  
# set activation function
activation = input('Enter the activation function \n(Options: Null, Sigmoid, Hyperbolic Tangent, Cosine, Gaussian) = ')
'''

**Setting ANN Hyperparameters**

In [None]:
func = 'Cubic' #(Options: Linear, Cubic, Sine, TanH, XOR, Complex)
hidden_layer_neurons = [5,5]
activation = 'Sigmoid' # (Options: Null, Sigmoid, Hyperbolic Tangent, Cosine, Gaussian)

In [None]:
# to do: handling errors & exceptions

In [None]:
'''
#check only
for i in range(len(hidden_layer_neurons)):
    print("number of neurons in hidden layer {} is {}".format(i+1,hidden_layer_neurons[i]))
'''

In [None]:
activation_funcs = {
    'Null': lambda x: 0,
    'Sigmoid': lambda x: 1/(1 + np.exp(-x)),
    'Hyperbolic Tangent': lambda x: np.tanh(x),
    'Cosine': lambda x: np.cos(x),
    'Gaussian': lambda x: np.exp(-x**2/2),
        }

In [None]:
'''
#check only
activate = activation_funcs[activation]
print ('Output: Selected activation -',activate(.5))

print('Output: Null -',0)
print('Output: Sigmoid -',1/(1 + np.exp(-.5)))
print('Output: Hyperbolic Tangent -',np.tanh(.5))
print('Output: Cosine -',np.cos(.5))
print('Output: Gaussian -',np.exp(-.5**2/2))
'''

**Reading data from the .txt file**

In [None]:
# load the training data from .txt file into a list
txt_address = {'Linear':'Data/1in_linear.txt',
               'Cubic':'Data/1in_cubic.txt',
               'Sine':'Data/1in_sine.txt',
               'TanH':'Data/1in_tanh.txt',
               'XOR':'Data/2in_xor.txt',
               'Complex':'Data/2in_complex.txt'}

train_data_file = open(txt_address[func],'r')
line_data = train_data_file.readlines()
train_data_file.close()

train_list = []
for line in line_data:
    line = line.strip().split()
    for d in line:
        train_list.append(float(d))
        
#check     
#print(train_list)

if func in ('XOR','Complex'):
    train_array = np.asarray(train_list, dtype=np.float32).reshape(-1,3)
    X,Y = train_array[:,0:2], train_array[:,-1]

elif func in ('Linear', 'Cubic', 'Sine', 'TanH'):
    train_array = np.asarray(train_list, dtype=np.float32).reshape(-1,2)
    X,Y = train_array[:,0], train_array[:,-1]
else:
    "error reading data"
    
#print(train_array)
#print(X)
#print(Y)

In [None]:
#create instance of neural network
ann1 = NeuralNetwork(func, hidden_layer_neurons, activation, X,Y)


**Setting PSO Hyperparameters**

In [None]:
swarm_size = 20
pso_alpha = 0.7 #0.729
pso_beta = 1.8 #1.49
pso_delta = 1.8 #1.49

max_epochs = 100
bound =50

bound_max = bound
bound_min = -bound


pso1 = ParticleSwarmOptimizer(swarm_size,pso_alpha,pso_beta,pso_delta,max_epochs, bound_max, bound_min)

In [None]:
#create swarm with swarm_size particles
swarm = [Particles(ann1) for i in range(pso1.getSwarmSize())]

In [None]:
final_weights = pso1.forward(swarm)

In [None]:
print(final_weights)

In [None]:
np.savetxt('final_weights01.csv',final_weights,delimiter=',')

In [None]:
print(pso1.getRunData()[1])
#data in format:
#[('epoch',epoch+1),('particle',i+1),('Particle Current Quality',err_i),('Particle Best Quality',best_part_error_i), ('Global Best Quality',best_swarm_err)])
                  