In [24]:
import warnings
warnings.filterwarnings('ignore')

import math 
import pandas as pd 
from tqdm import tqdm 

from collections import namedtuple
from random import choices, randint, randrange, random, sample
from typing import List, Callable, Tuple 
from functools import partial

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from lightgbm import LGBMClassifier
from sklearn.neural_network import MLPClassifier

from sklearn import datasets 
from deslib.des.knora_e import KNORAE 
from deslib.des.des_p import DESP
from deslib.des.knop import KNOP

from sklearn.model_selection import train_test_split 
from sklearn.preprocessing import LabelEncoder

from scipy.io import arff
from sklearn.metrics import (accuracy_score, f1_score)

from ucimlrepo import fetch_ucirepo 

pd.set_option('display.max_columns', None)   

In [25]:
from diversity import * 

In [63]:
Genome = List[int] 
Population = List[Genome] 
FitnessFunc = Callable[[Genome], float]
PopulateFunc = Callable[[], Population]
SelectionFunc = Callable[[Population, FitnessFunc], Tuple[Genome, Genome]]
CrossoverFunc = Callable[[Genome, Genome], Tuple[Genome, Genome]] 
MutationFunc = Callable[[Genome], Genome]
BaseClassifier = namedtuple('BaseClassifier', ['name', 'model'])  


class GeniticAlgorithm: 
    def __init__(self, pool: [BaseClassifier], metric_function: accuracy_score, ensemble_model, 
                 X_dsel, y_dsel, X_test, y_test): 
        self.pool = pool
        self.metric_function = metric_function 
        self.ensemble_model  = ensemble_model 
        self.X_dsel = X_dsel 
        self.y_dsel = y_dsel 
        self.X_test = X_test 
        self.y_test = y_test 

        self.measure_diversity()
        

    def measure_diversity(self):  
        prediction_dict = {} 
    
        for bc in self.pool: 
            preds = bc.model.predict(self.X_dsel) 
            prediction_dict[bc.name] = preds
    
        # Calculate diversity 
        df_diversity = pd.DataFrame(columns=['model'] + list(prediction_dict.keys())) 
        
        for main_model in list(prediction_dict.keys()): 
            row_dict = {"model": main_model} 
            
            for second_model in list(prediction_dict.keys()): 
                row_dict[second_model] = disagreement_measure(self.y_dsel.to_numpy(), prediction_dict[main_model], prediction_dict[second_model]) 
    
            df_diversity = pd.concat([df_diversity, pd.DataFrame([row_dict])], ignore_index=True) 
                
        # dictionary 
        diversity_dictionary = {}

        for first_model in df_diversity.model.to_list(): 
            for second_model in df_diversity.model.to_list(): 
                diversity_dictionary[f"{first_model}-{second_model}"] = df_diversity[df_diversity.model == first_model][second_model].iloc[0] 
        
        self.diversity_dictionary = diversity_dictionary 

    
    def genome_to_pool(self, genome: Genome, pool: [BaseClassifier]) -> [BaseClassifier]: 
        result = [] 
        for i, model in enumerate(pool): 
            if genome[i] == 1: 
                result += [model.name] 
    
        return result 

    
    def generate_genome(self, length: int) -> Genome: 
        return choices([0, 1], k=length) 
    
    
    def generate_population(self, size: int, genome_length: int) -> Population: 
        return [self.generate_genome(genome_length) for _ in range(size)] 


    def diversity_fitness(self, genome: Genome) -> float: 
        models = [self.pool[i].model for i, value in enumerate(genome) if value == 1]  
        model_names = [self.pool[i].name for i, value in enumerate(genome) if value == 1]  

        if len(models) <= 1: 
            return 1 

        diversity_list = [] 
        for first_model in model_names: 
            for second_model in model_names: 
                diversity_list.append(self.diversity_dictionary[f"{first_model}-{second_model}"]) 

        return round(sum(diversity_list)/len(diversity_list), 4) 
        
    
    def fitness(self, genome: Genome, pool: [BaseClassifier]) -> float: 
        # Initialize the DES model
        models = [pool[i].model for i, value in enumerate(genome) if value == 1] 
        # print(self.genome_to_pool(genome, pool)) 
        
        if len(models) <= 1: 
            return 0 
        
        ensemble = self.ensemble_model(models)
        ensemble.fit(self.X_dsel, self.y_dsel)
    
        preds = ensemble.predict(self.X_test) 
        score = self.metric_function(self.y_test, preds) 
    
        return score
    
    
    def selection_pair(self, population: Population, fitness_func: FitnessFunc) -> Population: 
        return choices(
            population=population, 
            weights=[fitness_func(genome) for genome in population], 
            k=2
        )
    
    
    def single_point_crossover(self, a: Genome, b: Genome) -> Tuple[Genome, Genome]: 
        if len(a) != len(b): 
            raise ValueError("Genome a and b must be the same length")
    
    
        length = len(a) 
        if length < 2: 
            return a, b 
    
        p = randint(1, length - 1) 
        return a[0:p] + b[p:], b[0:p] + a[p:] 


    def two_points_crossover(self, a: Genome, b: Genome) -> Tuple[Genome, Genome]:
        if len(a) != len(b): 
            raise ValueError("Genome a and b must be the same length") 

        length = len(a) 
        if length < 2: 
            return a, b  

        p1 = randint(1, int(length/2)) 
        p2 = randint(int(length/2), length - 1) 

        return a[0:p1] + b[p1:p2] + a[p2:], b[0:p1] + a[p1:p2] + b[p2:] 


    def uniform_crossover():
        pass
        
    
    
    def mutation(self, genome: Genome, num: int = 2, probability: float = 0.5) -> Genome: 
        for _ in range(num): 
            index = randrange(len(genome)) 
            genome[index] = genome[index] if random() > probability else abs(genome[index] - 1)
    
        return genome 


    def tournament_selection(self, population: Population, fitness_func: FitnessFunc) -> Genome: 
        random_population = sample(population, k=5)  
        # random_population = sorted(random_population, key=lambda genome: fitness_func(genome), reverse=True) 

        return random_population[0]
        

    def run_evolution(self, 
        populate_func: PopulateFunc,
        fitness_func: FitnessFunc,
        fitness_limit: int,
        selection_func: SelectionFunc,  
        crossover_func: CrossoverFunc, 
        mutation_func: MutationFunc, 
        generation_limit: int = 100) -> Tuple[Population, int]:
            
        population = populate_func()
    
        for i in range(generation_limit):
            population = sorted(population, key=lambda genome: fitness_func(genome), reverse=True)
            print(self.genome_to_pool(population[0], pool))                  # <----- Printing Genome 
            print(f"Best Score: {self.fitness(population[0], pool):.3f}")    # <----- Printing Best Score   
    
            if fitness_func(population[0]) >= fitness_limit:
                break

            # next_generation = population[0:2] # <- selecting top two for parenting  
            next_generation = [population[0]]

            # next_generation.append(self.tournament_selection(population[2:], fitness_func)) # <- selecting with tournament selection 
            d_population = sorted(population, key=lambda genome: self.diversity_fitness(genome), reverse=True) 

            next_generation.append(d_population[0])
    
            for j in range(int(len(population) / 2) - 1):
                parents = selection_func(population, fitness_func)
                offspring_a, offspring_b = crossover_func(parents[0], parents[1])
                offspring_a = mutation_func(offspring_a)
                offspring_b = mutation_func(offspring_b)
                next_generation += [offspring_a, offspring_b]
    
            population = next_generation
    
        return population, i

### Run 

In [64]:
# breast_cancer = datasets.load_breast_cancer() 

# breast_cancer_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names) 
# breast_cancer_df['target'] = breast_cancer.target  

# X = breast_cancer_df[breast_cancer.feature_names] 
# y = breast_cancer_df.target 

iris = datasets.load_iris() 
iris_df = pd.DataFrame(iris.data, columns=iris.feature_names) 
iris_df['target'] = iris.target  

X = iris_df[iris.feature_names] 
y = iris_df.target 

In [73]:
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.25, random_state=46) 
X_train, X_dsel, y_train, y_dsel = train_test_split(X_train, y_train, stratify=y_train, test_size=0.25, random_state=42)  

In [74]:
pool = [
    BaseClassifier('XGB', XGBClassifier(random_state=42)), 
    BaseClassifier('XGB2', XGBClassifier(learning_rate=0.1, n_estimators=250, random_state=42)),  
    BaseClassifier('RF', RandomForestClassifier(random_state=42)), 
    BaseClassifier('RF2', RandomForestClassifier(n_estimators=230, random_state=42)), 
    BaseClassifier('LR', LogisticRegression(random_state=42)), 
    BaseClassifier('SVC', SVC(random_state=42)), 
    BaseClassifier('DT', DecisionTreeClassifier(max_depth=10, random_state=42)),
    BaseClassifier('DT2', DecisionTreeClassifier(max_depth=5, random_state=42)),
    BaseClassifier('LGBM', LGBMClassifier(random_state=42)), 
    BaseClassifier('LGBM2', LGBMClassifier(learning_rate=0.05, n_estimators=250, random_state=42)), 
    BaseClassifier('KNN', KNeighborsClassifier()), 
    BaseClassifier('MLP', MLPClassifier(random_state=42)), 
]

In [75]:
# fit base classifiers 
for base_classifier in tqdm(pool):  
    base_classifier.model.fit(X_train, y_train) 

 75%|█████████████████████████████████           | 9/12 [00:00<00:00, 18.13it/s]

[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.000140 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 70
[LightGBM] [Info] Number of data points in the train set: 84, number of used features: 4
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.000161 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 70
[LightGBM] [Info] Number of data points in the train set: 84, number of used features: 4
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612


100%|███████████████████████████████████████████| 12/12 [00:00<00:00, 12.34it/s]






In [76]:
ga = GeniticAlgorithm(pool, accuracy_score, DESP, X_dsel, y_dsel, X_test, y_test)

population, generations = ga.run_evolution(
    populate_func=partial(
        ga.generate_population, size=10, genome_length=len(pool)
    ), 
    fitness_func=partial(
        ga.fitness, pool=pool 
    ), 
    selection_func = ga.selection_pair, 
    crossover_func = ga.two_points_crossover, 
    mutation_func  = ga.mutation, 
    fitness_limit=1.0, 
    generation_limit=30 
)

['XGB2', 'RF', 'SVC', 'DT2', 'LGBM', 'LGBM2', 'MLP']
Best Score: 0.947
['XGB', 'XGB2', 'LR', 'SVC', 'DT2', 'LGBM2', 'KNN']
Best Score: 0.974
['XGB', 'XGB2', 'LR', 'SVC', 'DT2', 'LGBM2', 'KNN']
Best Score: 0.974
['XGB', 'XGB2', 'LR', 'SVC', 'DT2', 'LGBM2', 'KNN']
Best Score: 0.974
['XGB', 'XGB2', 'LR', 'SVC', 'DT2', 'LGBM2', 'KNN']
Best Score: 0.974
['XGB', 'XGB2', 'LR', 'SVC', 'DT2', 'LGBM2', 'KNN']
Best Score: 0.974
['XGB', 'XGB2', 'LR', 'LGBM', 'LGBM2']
Best Score: 1.000
