In this notebook we'll implement Particle Swarm Optimization for Restricted Portfolio Optimization 

PSO Algorithm

In [None]:
import os
import random
import numpy as np
import pandas as pd
from functools import reduce

import matplotlib.pyplot as plt
import seaborn as sns
from scipy.signal import savgol_filter
import matplotlib.ticker as mtick

In [None]:
class Particle:
    def __init__(self, num_stocks):
        self.position =  Particle.initial_swarm(num_stocks)
        self.pbest_position = self.position
        self.pbest_value = -10**3
        self.velocity = np.zeros(num_stocks)

    def move(self):
        new_pos = self.position + self.velocity
        new_pos = np.where(new_pos>1, 1, new_pos) # upper bound
        new_pos = np.where(new_pos<0, 0, new_pos) # lower bound
        # print(new_pos)
        self.position = new_pos/np.sum(new_pos)

    @staticmethod
    def initial_swarm(x):
        array = np.array([random.random() for i in range(x)])
        sum1 = np.sum(array)
        return array/sum1 # normalized weights

class Space:
    def __init__(self, n_particles, num_stocks, returns, vol_matrix, alpha, w, c1, c2): #add alpha for penalty term
        self.n_particles = n_particles
        self.particles = []
        self.gbest_value = -10**3
        self.alpha = alpha #sensitivity to sharpe
        self.num_stocks = num_stocks        
        self.gbest_position = Space.initial_swarm(num_stocks)
        self.returns = returns
        self.vol_matrix = vol_matrix #vol_matrixatility matrix (variance)
        self.w = w
        self.c1 = c1
        self.c2 = c2

    @staticmethod
    def initial_swarm(x):
        array = np.array([random.random() for i in range(x)])
        sum1 = np.sum(array)
        return array/sum1 # normalized weights

    # fitness function for portfolio optimization
    def fitness(self, particle):
        res = np.dot(particle.position,np.transpose(self.returns))
        vol_matrix = np.dot(np.dot(particle.position,self.vol_matrix),np.transpose(particle.position))

        # w\cdot covariance\:matrix\times w^{T}
        vol_matrixs = np.dot(particle.position,self.vol_matrix)*particle.position
        
        # vol_matrixatility\:dispersion=\sum (stock\:risk\:contibution_{i} -average\:risk)
        self.vol_matrix_dis = np.sum(abs(np.sqrt(vol_matrixs)-np.mean(np.sqrt(vol_matrixs))))
      
       
        result = res - self.alpha*self.vol_matrix_dis
        return result

    def set_pbest(self):
        for particle in self.particles:
            fitness_candidate = self.fitness(particle)
            if (particle.pbest_value < fitness_candidate):
                particle.pbest_value = fitness_candidate
                particle.pbest_position = particle.position

    def set_gbest(self):
        for particle in self.particles:
            best_fitness_candidate = self.fitness(particle)
            if(self.gbest_value < best_fitness_candidate):
                self.gbest_value = best_fitness_candidate
                self.gbest_position = particle.position

    def move_particles(self):
        for particle in self.particles:
            new_velocity = (self.w*particle.velocity) + (self.c1*random.random()) * (particle.pbest_position - particle.position) + \
                            (random.random()*self.c2) * (self.gbest_position - particle.position)
            particle.velocity = new_velocity
            particle.move()

In [None]:
#Return & covariance matrix has to be provided by the user.
dic_pos = {}
best_values, sharpes, disp, al = [], [], [], []

for a in range(0,100,1):
    alpha = a/100
    search_space = Space(50, 34, returns, cov_matrix, alpha, w=0.5, c1=0.8, c2=0.9)
    particles_vector = [Particle(search_space.num_stocks) for _ in range(search_space.n_particles)]
    search_space.particles = particles_vector
    num_iterations = 1000
        # algo
    iteration = 0
    prev_gbest_value = -10**4
    while(iteration < num_iterations):
        search_space.set_pbest()    
        search_space.set_gbest()

        if(abs(search_space.gbest_value - prev_gbest_value) < 0.0001) and (iteration>3):
            break
        prev_gbest_value = search_space.gbest_value
        print('Iteration: {0} Position {1}, Value: {2:.3f}'.format(iteration, 
                                                                search_space.gbest_position, search_space.gbest_value))
        search_space.move_particles()
        iteration += 1
    print('finished {} ---------'.format(alpha))
    al.append(alpha)    
    best_values.append(search_space.gbest_value)
    sharpes.append(search_space.sharpe)
    disp.append(search_space.vol_matrix_dis)
    dic_pos[a] = search_space.gbest_position