### References
- "Genetic Algorithm from Scratch in Python (tutorial with code)" - Kie Codes. https://www.youtube.com/watch?v=nhT56blfRpE.

In [28]:
import pandas as pd 
import numpy as np 
import matplotlib.pyplot as plt

import multiprocessing as mp

from numba import jit

from scipy.stats import skew

### Data Preparation

In [16]:
def compute_log_returns(df):
    log_df = np.log(df)
    log_returns = log_df.diff()

    return log_returns

In [17]:
# we load the datasets
ftse = pd.read_csv("C:\\Users\\Saverio\\Documents\\GitHub\\python_defi_project\\data\\ftse100.csv", dtype = float, parse_dates= True, index_col= 0)
hanseng = pd.read_csv("C:\\Users\\Saverio\\Documents\\GitHub\\python_defi_project\\data\\hanseng.csv", dtype = float, parse_dates= True, index_col= 0)
sp =  pd.read_csv("C:\\Users\\Saverio\\Documents\\GitHub\\python_defi_project\\data\\sp100.csv", dtype = float, parse_dates= True, index_col= 0)

# we calculate the log returns
ftse_rets = compute_log_returns(ftse)
hanseng_rets = compute_log_returns(hanseng)
sp_rets = compute_log_returns(sp)

### Genetic Algorithm Set-Up

In [None]:
class GA_PortfolioOptimization:

    class Chromosome:
        def __init__(self, assets, s_list):
            self.assets_dictionary = dict(zip(assets,s_list))
            self.fitness = None
            self.weights = None

            self.expected_return = None
            self.risk_factor = None


    def __init__(self, assets_returns_df, K, min_holding_constraint, max_holding_constraint):

        self.asset_returns = assets_returns_df
        self.assets_list = assets_returns_df.columns
        self.assets_indices = list(range(assets_returns_df.shape[1]))

        self.expected_returns = assets_returns_df.mean()
        self.var_cov_matrix = assets_returns_df.cov()
        self.K = K
    
        # We assume a minimum and maximum holding constraint
        self.min_holding_constraint = min_holding_constraint
        self.max_holding_constraint = max_holding_constraint

        self.trade_off_param = None
        self.weight_skewness = None

    def create_chromosome(self):                
        assets = np.random.choice(self.assets_list, size = self.K, replace= False)
        s_list = np.random.uniform(0,1, size = self.K)
  
        return self.Chromosome(assets = assets, s_list = s_list)

    def repair_weights(self, weights):

        repaired_weights = np.copy(weights)

        while True:

            violating_mask = repaired_weights > self.max_holding_constraint

            if not np.any(violating_mask):
                break

            total_excess = np.sum(repaired_weights[violating_mask] - self.max_holding_constraint)

            repaired_weights[violating_mask] = self.max_holding_constraint

            eligible_mask = ~violating_mask

            if not np.any(eligible_mask):
                break

            eligible_weights = repaired_weights[eligible_mask]
            
            # using inverse weighting to redistribute the excess weight

            # slight fix to prevent division by 0
            epsilon = 1e-9

            inverse_weights = 1/(eligible_weights+epsilon)

            redistribution = total_excess * (inverse_weights/np.sum(inverse_weights))
            repaired_weights[eligible_mask] += redistribution
        
        return repaired_weights/np.sum(repaired_weights)


    def fitness_function_mean_variance(self, chromosome):

        weights = np.array(chromosome.weights)
        assets = list(chromosome.assets_dictionary.keys())

        means = np.array(self.expected_returns[assets])
        covariances = self.var_cov_matrix.loc[assets, assets]

        weights = np.array(weights)
        
        var_term = weights.T@covariances@weights
        mean_term = weights@means

        chromosome.risk_factor = var_term

        fitness = self.trade_off_param*var_term-(1-self.trade_off_param)*mean_term

        return fitness
    
    def fitness_function(self, chromosome, risk_term):
        
        weights = np.array(chromosome.weights)
        assets = list(chromosome.assets_dictionary.keys())

        time_series = self.asset_returns[assets].dropna()
        
        portfolio_returns = time_series@weights

        portfolio_expected_return = portfolio_returns.mean()

        risk_metric = None
        fitness = None

        chromosome.expected_return = portfolio_expected_return

        if risk_term == "MAD":
            risk_ts = np.abs(portfolio_returns - portfolio_expected_return)
            risk_metric = np.mean(risk_ts)

            chromosome.risk_factor = risk_metric

            fitness = self.trade_off_param*risk_metric-(1-self.trade_off_param)*portfolio_expected_return

        elif risk_term == "semivariance":
            deviations = portfolio_returns - portfolio_expected_return
            downside_deviations = deviations[deviations < 0]
            squared_downside_deviations = downside_deviations**2
            risk_metric = np.sum(squared_downside_deviations) / len(portfolio_returns)

            chromosome.risk_factor = risk_metric

            fitness = self.trade_off_param*risk_metric-(1-self.trade_off_param)*portfolio_expected_return
        
        elif risk_term == "skewness":
            skewness = skew(portfolio_returns)
            variance = np.var(portfolio_returns)

            chromosome.risk_factor = variance

            fitness = self.trade_off_param*variance-(1-self.trade_off_param)*portfolio_expected_return - self.weight_skewness*skewness

        elif risk_term == "MV":
            fitness = self.fitness_function_mean_variance(chromosome)   

        else:
            print("Enter a valid argument for 'risk_term'.")
            return False    
            
        chromosome.fitness = fitness
    
    def compute_weights(self, chromosome):
        
        s_list_chromosome = list(chromosome.assets_dictionary.values())
        s_list_chromosome = np.array(s_list_chromosome)

        weights_vector = np.zeros_like(s_list_chromosome)
        weights_vector += self.min_holding_constraint

        # We assume a uniform holding constraint
        left_over_asset_share = 1- (self.K*self.min_holding_constraint)

        normalized_s_vector = s_list_chromosome/np.sum(s_list_chromosome)

        final_weights = weights_vector + left_over_asset_share*normalized_s_vector
        repaired_final_weights = self.repair_weights(final_weights)

        chromosome.weights = repaired_final_weights
    
    def evaluate_fitness(self, chromosome, fitness_type):
        self.compute_weights(chromosome = chromosome)
        self.fitness_function(chromosome, risk_term= fitness_type)
    
    def perform_crossover_and_mutation(self, chromosome_1, chromosome_2):

        chrom_1_dict = chromosome_1.assets_dictionary
        chrom_2_dict = chromosome_2.assets_dictionary

        chrom_1_asset_list = chrom_1_dict.keys()
        chrom_2_asset_list = chrom_2_dict.keys()
        
        common_assets = set(chrom_1_asset_list) & set(chrom_2_asset_list)
        common_assets_list = list(common_assets)

        s_value_common_assets_choice = np.random.binomial(n = 1,p = 0.5, size = len(common_assets_list)).astype(int)
        common_assets_s_values = []

        for i, asset in enumerate(common_assets_list):
            if s_value_common_assets_choice[i] == 0:
                common_assets_s_values.append(chrom_1_dict[asset])
            else:
                common_assets_s_values.append(chrom_2_dict[asset])

        crossover_dict = dict(zip(common_assets_list, common_assets_s_values))

        leftover_assets = (set(chrom_1_asset_list)| set(chrom_2_asset_list)) - common_assets
        leftover_assets_list = list(leftover_assets)

        leftover_assets_choice = np.random.binomial(n = 1, p = 0.5, size = len(leftover_assets_list))

        
        for i, leftover in enumerate(leftover_assets_list):
            if leftover_assets_choice[i] == 1:
                
                if leftover in chrom_1_dict.keys():
                     crossover_dict[leftover] = chrom_1_dict[leftover]
                else:
                     crossover_dict[leftover] = chrom_2_dict[leftover]
            else:
                continue
        
        # handle the cardinality constraint

        n_assets_in_chromosome = len(crossover_dict.keys())

        if n_assets_in_chromosome  > self.K:
            assets_to_remove_count = len(crossover_dict.keys()) - self.K

            leftover_assets_removable = []

            for asset in leftover_assets_list:
                if asset in crossover_dict.keys():
                    leftover_assets_removable.append(asset)
            
            if len(leftover_assets_removable) >= assets_to_remove_count:
                random_leftover_assets_to_remove = np.random.choice(a = leftover_assets_removable, 
                                                                    size = assets_to_remove_count,
                                                                    replace= False)

                for asset in random_leftover_assets_to_remove:
                    del crossover_dict[asset]
                        
            else:
                n_leftover_assets_removable = len(leftover_assets_removable)

                for asset in leftover_assets_removable:
                        del crossover_dict[asset]

                random_common_assets_to_remove = np.random.choice(a = common_assets_list, size = assets_to_remove_count - n_leftover_assets_removable, replace = False)
            
                for asset in random_common_assets_to_remove:
                    del crossover_dict[asset]                       
        
        elif n_assets_in_chromosome < self.K:
            assets_to_add = self.K - n_assets_in_chromosome

            possible_choices = set(self.assets_list) - set(crossover_dict.keys())
            possible_choices_list = list(possible_choices)

            assets_to_add_choice = np.random.choice(a = possible_choices_list, size = assets_to_add)
            s_list_new_assets = np.random.uniform(low = 0, high = 1, size = assets_to_add)

            new_assets_dict = dict(zip(assets_to_add_choice, s_list_new_assets))
            crossover_dict.update(new_assets_dict)

        ### We implement the mutation

        asset_to_mutate = np.random.choice(a = list(crossover_dict.keys()), size = 1)[0]

        increase_or_decrease_s_flip = np.random.binomial(n = 1, p = 0.5, size = 1)
        if increase_or_decrease_s_flip == 0:
            crossover_dict[asset_to_mutate] *= 1.1
        else:
            crossover_dict[asset_to_mutate] *= 0.9
        
        # clip the mutated s value to ensure it fulfills the constraints
        crossover_dict[asset_to_mutate] = np.clip(crossover_dict[asset_to_mutate], a_min= 0, a_max= 1)

        return self.Chromosome(assets = crossover_dict.keys(), s_list = crossover_dict.values())
    
    def determine_parent(self, chromosomes_list):
        potential_parent_1, potential_parent_2 = np.random.choice(a = chromosomes_list, size = 2, replace= True)

        if potential_parent_1.fitness < potential_parent_2.fitness:
            return potential_parent_1
        else:
            return potential_parent_2


    # something extra we've tried to implement which wasn't included in the original paper
    def perform_elitist_selection_generational(self, chromosomes_list, fitness_type, n_children):

        for choromosome in chromosomes_list:
            self.evaluate_fitness(chromosome= choromosome, fitness_type = fitness_type)

        chromosome_fitnesses = [chromosome.fitness for chromosome in chromosomes_list]

        print(chromosome_fitnesses)
 
        for child in range(n_children):
            parent1 = self.determine_parent(chromosomes_list)
            parent2 = self.determine_parent(chromosomes_list)

            child = self.perform_crossover_and_mutation(parent1, parent2)
            self.evaluate_fitness(child, fitness_type= fitness_type)

            if child.fitness < max(chromosome_fitnesses):
                index_max_values = chromosome_fitnesses.index(max(chromosome_fitnesses))

                chromosomes_list[index_max_values] = child
                chromosome_fitnesses[index_max_values] = child.fitness

            else:
                continue

        return chromosomes_list
    
    def run_optimization(self, fitness_type, n_iterations, n_population, trade_off_param = 0.5, weight_skewness = 0.01, generational = False):
        
        self.trade_off_param = trade_off_param

        if fitness_type == "skewness":
            self.weight_skewness = weight_skewness
        
        starting_chromosomes_list = [self.create_chromosome() for x in range(n_population)]
        chromosomes_list = starting_chromosomes_list

        if generational == True:
            for i in range(n_iterations):
                chromosomes_list = self.perform_elitist_selection(chromosomes_list = chromosomes_list, 
                                                                fitness_type= fitness_type,
                                                                n_children= n_population)
        else:
            for choromosome in chromosomes_list:
                self.evaluate_fitness(chromosome= choromosome, fitness_type = fitness_type)

            chromosome_fitnesses = [chromosome.fitness for chromosome in chromosomes_list]

            n_loops = n_iterations * len(self.assets_list)

            for i in range(n_loops):
                parent1 = self.determine_parent(chromosomes_list)
                parent2 = self.determine_parent(chromosomes_list)

                child = self.perform_crossover_and_mutation(parent1, parent2)
                self.evaluate_fitness(child, fitness_type= fitness_type)

                worst_member = max(chromosomes_list, key = lambda c: c.fitness)

                if child.fitness < worst_member.fitness:
                    worst_index = chromosomes_list.index(worst_member)
                    chromosomes_list[worst_index] = child
                else:
                    continue

        final_population_fitnesses = [chromosome.fitness for chromosome in chromosomes_list]
        
        indices_list = list(range(len(chromosomes_list)))
        indices_list = sorted(indices_list, key=lambda i: final_population_fitnesses[i], reverse=False)

        index_best_chromosome = indices_list[0]

        return chromosomes_list[index_best_chromosome]

In [27]:
optimizer = GA_PortfolioOptimization(sp_rets, 30, 0.05, 0.5)
optimal_result = optimizer.run_optimization(fitness_type= "MV", n_iterations= 1000, n_population = 100, trade_off_param= 0)

In [142]:
def compute_chromosome_mean_variance(chromosome, dataset):
    
    weights = np.array(chromosome.weights)
    assets = list(chromosome.assets_dictionary.keys())

    means = dataset.mean()[assets]
    variance_covariance_matrix = dataset.cov().loc[assets, assets]

    mean_term = weights@means
    var_term = weights.T@variance_covariance_matrix@weights

    return [mean_term, var_term]

In [170]:
def compute_charts_table(K_ranges, dataset, fitness_type, max_holding_constr = 0.2, min_holding_constr = 0.01):

    lambda_values = np.linspace(start= 0, stop= 1, num= 500)
    K_ranges = K_ranges

    for K in K_ranges:

        results_expected_ret = []
        results_risk_metric = []

        optimizer = GA_PortfolioOptimization(dataset, K = K, min_holding_constraint= min_holding_constr, max_holding_constraint= max_holding_constr)

        for lambda_val in lambda_values:
            optimal_result = optimizer.run_optimization(fitness_type= fitness_type, n_iterations= 100, n_population = 100, trade_off_param= lambda_val)

            results_expected_ret.append(optimal_result.expected_return)
            results_risk_metric.append(optimal_result.risk_factor)

        plt.plot(x = results_risk_metric, y = results_expected_ret, label = f"K = {K}")

        print(f"{K} chart completed")
    
    plt.legend()
    plt.ylabel("Return")
    plt.xlabel("Risk")

In [10]:
K_ranges_HS = list(range(5,35,5))

In [11]:
compute_charts_table(K_ranges= K_ranges_HS, dataset= hanseng_rets, fitness_type= "MV")

NameError: name 'compute_charts_table' is not defined