In [2]:
# Author: Bian Hanzhang

"""import packages"""
import random as rand
from copy import deepcopy
# import time
# from math import floor
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.pyplot import MultipleLocator

In [3]:
# read file

FILE_NAME = "/Users/bianhanzhang/Desktop/data/asset pool/SP500_100_assets_pool.csv"
data = pd.read_csv(FILE_NAME)

SP_mean_rtn = np.reshape(np.array(data.iloc[:,0]),(100,1))
SP_cov_rtn = np.array(data.iloc[:,1:])

In [4]:
# Parameter Class

class ParameterInformation:
    """
    store algorithm's parameter:
    population: amount of bees = employed bees = onlooking bees
    max_bound: upper bound of one gene on chromosome
    min_bound: lower bound of one gene on chromosome
    max_iter_num: max iteration times of each nectar
    algo_iter_num: current iteration times of the algorithm 
    max_algo_iter_num: max iteration times of the algorithm
    risk_averse_para: risk aversion parameter of investor
    """
    population = 0
    dimension = 0
    max_bound = 0
    min_bound = 0
    max_iter_num = 0
    algo_iter_num = 0
    algo_max_iter_num = 0
    risk_averse_para = 0

    def __init__(self, population, dimension, min_bound, max_bound, max_iter_num, algo_max_iter_num, risk_averse_para):
        self.population = population
        self.dimension = dimension
        self.max_bound = max_bound
        self.min_bound = min_bound
        self.max_iter_num = max_iter_num
        self.algo_max_iter_num = algo_max_iter_num
        self.risk_averse_para = risk_averse_para

    def set_algo_iter_num(self, value):
        """set algorithm's iteration number"""
        self.algo_iter_num = value

    def show_parameter(self):
        """show parameter"""
        print("Parameter Information:")
        print(f"population: {self.population} ")
        print(f"dimension: {self.dimension}")
        print(f"bounds: [{self.min_bound},{self.max_bound}]")
        print("max_iter_num: %s " % self.max_iter_num)
        print("algo_max_iter_num: %s " % self.algo_max_iter_num)

In [5]:
class Nectar:
    """nectar(chromosome) information"""

    chromosome = np.array([])
    fitness = 0
    test_func_value = 0
    iter_num = 0
    select_probability = 0

    def __init__(self, dimension) -> None:
        """initialize chromosome"""
        self.chromosome = np.array([float for i in range(dimension)])

    def set_chromosome(self, i, value):
        """set one position of the chromosome"""
        self.chromosome[i] = value

    def random_chromosome(self, min_bound, max_bound, dimension):
        """randomly generate a chromosome"""
        self.chromosome = np.random.uniform(min_bound, max_bound, dimension)

    def set_fitness(self, value):
        """set fitness of the nectar"""
        self.fitness = value

    def set_test_func_value(self, value):
        """set value of test function"""
        self.test_func_value = value

    def set_iter_num(self, value):
        """set how many times this nectar has been explored"""
        self.iter_num = value

    def set_select_probability(self, value):
        """set probability calculated by roulette"""
        self.select_probability = value

In [6]:
def mean_variance(x, mean_rtn, cov_rtn, risk_averse_para):
    """calculate objective function value"""
    obj_func = risk_averse_para * np.dot(np.dot(x.T, cov_rtn), x) - \
        (1 - risk_averse_para) * np.dot(x.T, mean_rtn)
    return obj_func


def cal_fitness(obj_value):
    """calculate fitness value by obj_value"""
    if obj_value >= 0:
        fitness = 1/(1+obj_value)
    else:
        fitness = 1 + abs(obj_value)
    return fitness


def cal_violation(chromosome):
    """calculate violation of solution vector"""
    violation = 0
    # constraint 1: sum of weight equals to 1
    violation += abs(sum(chromosome) - 1)
    return violation


def deb_method(chromosome1, chromosome2, fit1, fit2):
    """selection method after exploring"""
    violation1 = cal_violation(chromosome1)
    violation2 = cal_violation(chromosome2)
    if violation1 != violation2:
        return violation1 > violation2
    else:
        return fit1 < fit2

In [7]:
def init_nectar_sources(nectar_sources, best_nectar_source, parameter, mean_rtn, cov_rtn):
    """initialize nectars sources"""
    best_nectar_source.random_chromosome(
        parameter.min_bound, parameter.max_bound, parameter.dimension)
    best_nectar_source.set_test_func_value(
        mean_variance(best_nectar_source.chromosome, mean_rtn, cov_rtn, parameter.risk_averse_para))
    best_nectar_source.set_fitness(
        cal_fitness(best_nectar_source.test_func_value))
    best_nectar_source.set_select_probability(0)
    best_nectar_source.set_iter_num(0)
    for i in range(parameter.population):
        nectar_sources[i].random_chromosome(
            parameter.min_bound, parameter.max_bound, parameter.dimension)
        nectar_sources[i].set_test_func_value(mean_variance(
            nectar_sources[i].chromosome, mean_rtn, cov_rtn, parameter.risk_averse_para))
        nectar_sources[i].set_fitness(
            cal_fitness(nectar_sources[i].test_func_value))
        nectar_sources[i].set_select_probability(0)
        nectar_sources[i].set_iter_num(0)


def regenerate_nectar(nectar_sources, i, parameter, mean_rtn, cov_rtn):
    """regenerate a new nectar"""
    nectar_sources[i].random_chromosome(
        parameter.min_bound, parameter.max_bound, parameter.dimension)
    nectar_sources[i].set_test_func_value(
        mean_variance(nectar_sources[i].chromosome, mean_rtn, cov_rtn, parameter.risk_averse_para))
    nectar_sources[i].set_fitness(
        cal_fitness(nectar_sources[i].test_func_value))
    nectar_sources[i].set_select_probability(0)
    nectar_sources[i].set_iter_num(0)


def copy(nectar1, nectar2):
    """copy information from nectar2 to nectar1"""
    nectar1.chromosome = deepcopy(nectar2.chromosome)
    nectar1.set_test_func_value(nectar2.test_func_value)
    nectar1.set_fitness(nectar2.fitness)


def save_best_nectar_source(nectar_sources, best_nectar_source, parameter):
    """save the best nectar source into best_nectar_source"""
    for i in range(parameter.population):
        if nectar_sources[i].test_func_value < best_nectar_source.test_func_value:
            copy(best_nectar_source, nectar_sources[i])

In [8]:
def employed_bee_behavior(nectar_sources, parameter, mean_rtn, cov_rtn, MR):
    """employed bee behavior"""
    for i in range(parameter.population):
        for j in range(parameter.dimension):
            if np.random.rand() > MR:
                # randomly select another nectar k to crossover, k!=i
                while True:
                    # parameter.population-1: since random.randint includes both boundary
                    k = rand.randint(0, parameter.population - 1)
                    if k != i:
                        break
                neighbor_chromosome = deepcopy(nectar_sources[i].chromosome)
                neighbor_chromosome[j] = nectar_sources[i].chromosome[j] + \
                    rand.uniform(-1, 1) * \
                    (nectar_sources[i].chromosome[j] -
                     nectar_sources[k].chromosome[j])

                # preventing from exceeding boundary
                if neighbor_chromosome[j] > parameter.max_bound:
                    neighbor_chromosome[j] = parameter.max_bound
                elif neighbor_chromosome[j] < parameter.min_bound:
                    neighbor_chromosome[j] = parameter.min_bound

                # calculate fitness of neighbor_chromosome and plus iter
                neighbor_test_func_value = mean_variance(
                    neighbor_chromosome, mean_rtn, cov_rtn, parameter.risk_averse_para)
                neighbor_fitness = cal_fitness(neighbor_test_func_value)

                # deb's method
                if deb_method(nectar_sources[i].chromosome, neighbor_chromosome,
                              nectar_sources[i].fitness, neighbor_fitness):
                    nectar_sources[i].chromosome = deepcopy(
                        neighbor_chromosome)
                    nectar_sources[i].set_test_func_value(
                        neighbor_test_func_value)
                    nectar_sources[i].set_fitness(neighbor_fitness)
                    nectar_sources[i].set_iter_num(0)
                else:
                    nectar_sources[i].set_iter_num(
                        nectar_sources[i].iter_num + 1)

    # algorithms's iteration times +1
    parameter.set_algo_iter_num(parameter.algo_iter_num + 1)

In [9]:
def roulette(nectar_sources, parameter):
    """calculate each nectar's selection probability"""
    prob_list = []
    fitness_list = [nectar_sources[i].fitness
                    for i in range(parameter.population)]
    violation_list = [cal_violation(nectar_sources[i].chromosome)
                    for i in range(parameter.population)]
    nectar_list = list(range(parameter.population))
    prob_list = [
    (1 - v / sum(violation_list)) * 0.5 if v > 0 else 0.5 + f / sum(fitness_list) * 0.5
    for i, (v, f) in enumerate(zip(violation_list, fitness_list))]

    # add [0] : rand.choices returns a list -> turns list into int
    return rand.choices(nectar_list, prob_list)[0]

In [10]:
def onlooking_bee_behavior(nectar_sources, parameter, mean_rtn, cov_rtn, MR):
    """onlooking bee behavior"""
    i = 0
    while i < parameter.population:
        nectar_selected = roulette(nectar_sources, parameter)
        # randomly choose a position of selected chromosome to explore
        neighbor_chromosome = deepcopy(
            nectar_sources[nectar_selected].chromosome)
        for j in range(parameter.dimension):
            if np.random.rand() > MR:
                # randomly select another nectar k to crossover, k!=nectar_selected
                while True:
                    k = rand.randint(0, parameter.population - 1)
                    if k != nectar_selected:
                        break

                neighbor_chromosome[j] = nectar_sources[nectar_selected] \
                    .chromosome[j] + rand.uniform(-1, 1) * \
                    (nectar_sources[nectar_selected]
                     .chromosome[j]-nectar_sources[k].chromosome[j])

                # preventing from exceeding boundary
                if neighbor_chromosome[j] > parameter.max_bound:
                    neighbor_chromosome[j] = parameter.max_bound
                elif neighbor_chromosome[j] < parameter.min_bound:
                    neighbor_chromosome[j] = parameter.min_bound

            # calculate fitness of neighbor_chromosome and plus iter
            neighbor_test_func_value = mean_variance(
                neighbor_chromosome, mean_rtn, cov_rtn, parameter.risk_averse_para)
            neighbor_fitness = cal_fitness(neighbor_test_func_value)

            # deb's method
            if deb_method(nectar_sources[nectar_selected].chromosome, neighbor_chromosome,
                          nectar_sources[nectar_selected].fitness, neighbor_fitness):
                nectar_sources[nectar_selected].chromosome = deepcopy(
                    neighbor_chromosome)
                nectar_sources[nectar_selected].set_test_func_value(
                    neighbor_test_func_value)
                nectar_sources[nectar_selected].set_fitness(neighbor_fitness)
                nectar_sources[nectar_selected].set_iter_num(0)
            else:
                nectar_sources[nectar_selected].set_iter_num(
                    nectar_sources[nectar_selected].iter_num+1)
            i += 1

In [11]:
def scout_bee_behavior(nectar_sources, parameter, mean_rtn, cov_rtn):
    """scout bee behavior"""
    for i in range(parameter.population):
        if nectar_sources[i].iter_num > parameter.max_iter_num:
            regenerate_nectar(nectar_sources, i, parameter, mean_rtn, cov_rtn)

In [12]:
def artificial_bee_colony(parameter, mean_rtn, cov_rtn, MR):
    """main function: control algorithm's process"""
    parameter.show_parameter()
    result_vector = np.array([None])
    nectar_sources = np.array([Nectar(parameter.dimension)
                              for i in range(parameter.population)])
    best_nectar_source = Nectar(parameter.dimension)

    # run algorithm max_test_num times and calculate mean result

    parameter.set_algo_iter_num(0)
    init_nectar_sources(nectar_sources, best_nectar_source,
                        parameter, mean_rtn, cov_rtn)
    save_best_nectar_source(nectar_sources, best_nectar_source, parameter)

    while parameter.algo_iter_num < parameter.algo_max_iter_num:
        employed_bee_behavior(nectar_sources, parameter, mean_rtn, cov_rtn, MR)
        onlooking_bee_behavior(nectar_sources, parameter,
                               mean_rtn, cov_rtn, MR)
        save_best_nectar_source(
            nectar_sources, best_nectar_source, parameter)
        scout_bee_behavior(nectar_sources, parameter, mean_rtn, cov_rtn)
        result_vector[0] = best_nectar_source.chromosome

    # print(result_by_iter)
    # draw(result_by_iter,parameter.algo_max_iter_num)

    return result_vector

In [13]:
def draw(func_value, max_algo_iter_num):
    '''画出收敛图'''
    x = np.linspace(0, max_algo_iter_num, max_algo_iter_num)
    plt.figure(num=1, figsize=(8, 5))  # num表示的是编号，figsize表示的是图表的长宽
    plt.yscale('log')  # 设置纵坐标的缩放
    x_major_locator = MultipleLocator(
        max_algo_iter_num/10)  # 把x轴的刻度间隔设置为100，并存在变量里
    ax = plt.gca()  # ax为两条坐标轴的实例
    ax.xaxis.set_major_locator(x_major_locator)  # 把x轴的主刻度设置为100的倍数

    l1, = plt.plot(x, func_value, color='black',  # 线条颜色
                   linewidth=1.5,  # 线条宽度
                   linestyle='-',  # 线条样式
                   label='algorithm1')
    plt.show()

In [14]:
def cal_ann_return_risk(weight, mean_rtn, cov_rtn):
    """calculate annual return and risk"""
    ann_rtn = np.dot(weight, mean_rtn) * 251  # trading days: 251
    ann_risk = np.sqrt(np.dot(np.dot(weight, cov_rtn),weight.T) * 251)
    result = np.array([ann_rtn.item(),ann_risk])
    return result

In [15]:
def main(parameter,MR):
    """main function"""
    print("------Artificial Bee Colony algorithms starts------")
    #starting_time = time.time()
    result = artificial_bee_colony(parameter, SP_mean_rtn, SP_cov_rtn, MR)

    return result

In [16]:
# ABC parameter 100 assets
my_population = int(20 * np.sqrt(100))
my_MCN = int(1000 * 100 / my_population)
my_parameter = ParameterInformation(my_population,100,0,1,30,my_MCN,0.5)

# set extra parameter and mean_rtn/cov_rtn
my_MR = 0.8

# ABC POP
pop_result = main(my_parameter,my_MR)
pop_result = pop_result[0]

#(pd.DataFrame.from_dict(data=result_dict, orient="index")).to_csv(
#   "test1.csv", header=False)

------Artificial Bee Colony algorithms starts------
Parameter Information:
population: 200 
dimension: 100
bounds: [0,1]
max_iter_num: 30 
algo_max_iter_num: 500 


In [17]:
# pop_result = np.reshape(pop_result[0], (15,1))
pop_result = [ x/sum(pop_result) for x in pop_result]
pop_result = np.array(pop_result)
print(pop_result)

# calculate annual return and risk
print(cal_ann_return_risk(pop_result,SP_mean_rtn,SP_cov_rtn))

[0.         0.0245545  0.         0.         0.0322782  0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.00113572 0.22842548 0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.00730473 0.         0.         0.         0.         0.06952951
 0.         0.         0.         0.         0.         0.
 0.         0.         0.03232193 0.         0.         0.08218407
 0.         0.         0.15540551 0.         0.         0.
 0.         0.         0.         0.         0.1260746  0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.00135717 0.2272949
 0.         0.         0.         0.         0.         0.01213367
 0.         0.         0.

In [18]:
ann_rtn = np.dot(pop_result, SP_mean_rtn) * 251

In [19]:
ann_rtn.flatten

<function ndarray.flatten>

In [20]:
ann_risk = np.sqrt(np.dot(np.dot(pop_result, SP_cov_rtn),pop_result.T) * 251)

In [21]:
ann_risk

13.264256620094724