In [None]:
import pandas as pd
import numpy as np
import sqlite3
import sklearn
import csv
import matplotlib.pyplot as plt
import matplotlib as mpl
from csv import writer
from datetime import datetime, timedelta
from sklearn.covariance import LedoitWolf
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.core.problem import ElementwiseProblem
from pymoo.optimize import minimize
from pymoo.core.callback import Callback
from pymoo.factory import get_selection
from pymoo.core.selection import Selection
from pymoo.factory import get_sampling, get_crossover, get_mutation, get_termination
from pymoo.indicators.hv import Hypervolume

In [None]:
# Define callback operator to return necessary data from optimisation run to calculate hypervolume

class MyCallback(Callback):

    def __init__(self) -> None:
        super().__init__()
        self.data["n_evals"] = []
        self.data["objective_values"] = []
    
    def notify(self, algorithm):
        self.data["n_evals"].append(algorithm.evaluator.n_eval)
        self.data["objective_values"].append(algorithm.opt.get("F"))

# Import mean return vector and covariance matrix for chosen historical data to be used

mreturn = np.loadtxt("mean_return.csv", delimiter = ',')
cvm = np.loadtxt("cvm.csv", delimiter = ',')

cardinality_constraint = 5
cardinality_threshold = 1e-10
max_asset_weight = 0.3

population_size = 100
number_of_generations = 100000

# Ceate initial population for cardinality constrained only test
# initial_pop = np.zeros((population_size, len(mreturn)))

# for i in range(population_size):
#     initial_pop[i,:cardinality_constraint] = np.random.rand(cardinality_constraint)
#     initial_pop[i] = initial_pop[i] / np.sum(initial_pop[i])
#     np.random.shuffle(initial_pop[i])

# Create initial population for cardinality and max weighting test
maxweight_cardinality_list = []

while len(maxweight_cardinality_list) < population_size:
    x = np.random.dirichlet(np.ones(cardinality_constraint), size = 1)
    if np.all(x <= max_asset_weight):
        maxweight_cardinality_list.append(x)

initial_pop = np.zeros((population_size, len(mreturn)))

for i in range(population_size):
    initial_pop[i,:cardinality_constraint] = maxweight_cardinality_list[i]
    np.random.shuffle(initial_pop[i])


class MyProblem(ElementwiseProblem):

    def __init__(self, mreturn, cvm):
        super().__init__(n_var = len(mreturn),
                         n_obj = 2,
                         n_constr = 2,
                         xl = np.array([0 for asset in mreturn]),
                         xu = np.array([1 for asset in mreturn]))

        self.mreturn = mreturn
        self.cvm = cvm
        self.K = cardinality_constraint

    def _evaluate(self, x, out, *args, **kwargs):

        x = x / np.sum(x) # Normalise asset weightings to collectively sum to 1

        f1 = np.dot(x, (np.dot(self.cvm, x))) # Risk/variance objective function
        
        f2 = -(np.dot(x, self.mreturn)) # Mean return objective function

        g1 = sum(asset > cardinality_threshold for asset in x) - self.K # Cardinality constraint function

        g3 = (max((asset - max_asset_weight) for asset in x))
        
        out["F"] = np.column_stack([f1, f2])
        out["G"] = [g1, g3]

problem = MyProblem(mreturn, cvm)

algorithm = NSGA2(pop_size = population_size, sampling = initial_pop)

res = minimize(problem,
               algorithm,
               ("n_gen", number_of_generations),
               callback = MyCallback(),
               verbose = False,
               save_history = False,
               seed = None)

##### Record necessary data from each optimisation

In [None]:
n_evals1 = res.algorithm.callback.data["n_evals"]
objective_values1 = res.algorithm.callback.data["objective_values"]
approx_ideal1 = res.F.min(axis=0)
approx_nadir1 = res.F.max(axis=0)
result1 = res.F

In [None]:
n_evals2 = res.algorithm.callback.data["n_evals"]
objective_values2 = res.algorithm.callback.data["objective_values"]
approx_ideal2 = res.F.min(axis=0)
approx_nadir2 = res.F.max(axis=0)
result2 = res.F

##### Hypervolume

In [None]:
reference_point = np.array([1.01, 1.01])

In [None]:
metric = Hypervolume(ref_point= reference_point,
                     norm_ref_point=False,
                     zero_to_one=True,
                     ideal=approx_ideal2,
                     nadir=approx_nadir2)

hv_1 = [metric.do(value) for value in objective_values2]

In [None]:
metric = Hypervolume(ref_point= reference_point,
                     norm_ref_point=False,
                     zero_to_one=True,
                     ideal=approx_ideal2,
                     nadir=approx_nadir2)

hv_2 = [metric.do(value) for value in objective_values1]

##### Plot Hypervolume and Objective Values

In [None]:
plt.figure(figsize=(6.5, 4), facecolor = 'white')
plt.subplot(1,2,1)
plt.xlabel("Function Evaluations")
plt.ylabel("Hypervolume")
plt.plot(n_evals1, hv_1,  color='goldenrod', lw=2)
plt.plot(n_evals2, hv_2,  color='firebrick', lw=2)
plt.grid()
plt.tight_layout()

plt.rc('font', size = 10)
plt.rc('axes', titlesize = 10, labelsize = 8)
plt.rc('axes', facecolor = 'white')
plt.rc('xtick', labelsize = 6)
plt.rc('ytick', labelsize = 6)

plt.subplot(1,2,2)

plt.scatter((result1[:, 0]), (-result1[:, 1]), s = 5, marker = "o", facecolors = 'goldenrod', label = 'Random')
plt.scatter((result2[:, 0]), (-result2[:, 1]), s = 5, marker = "o", facecolors = 'firebrick', label = 'Initial')

plt.xlabel("Variance of Returns")
plt.ylabel("Mean Return")
plt.legend(fancybox = True, shadow = True, title = "Sampling Procedure", fontsize = 8, markerscale = 2, loc = 4)
plt.tight_layout()
plt.grid()
plt.savefig(f"HyperVolume_ObjectiveValues_Subplot.png", dpi = 500)