In [4]:
# importando as bibliotecas
import numpy as np
import pandas as pd
import random
from tqdm.auto import tqdm
import copy
import os
import multiprocessing
import itertools
from datetime import datetime
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import v_measure_score

In [5]:
# importando os modulos definidos
from src.Node import Node

In [6]:
random.seed(42)

# 1. Carregar e tratar os dados

In [29]:
def get_data(file_name, header):

    def read_csv_fle(file_name, header=0):
        train_data = pd.read_csv(f'./data/{file_name}train.csv', header=header)
        test_data = pd.read_csv(f'./data/{file_name}test.csv', header=header)
        return train_data, test_data
    
    train_data, test_data = read_csv_fle(file_name, header)

    train_labels = train_data.iloc[:, -1]
    train_features = train_data.iloc[:, :-1]

    test_labels = test_data.iloc[:, -1]
    test_features = test_data.iloc[:, :-1]

    def normalize(train_data, test_data):
        scaler = StandardScaler()
        scaler.fit(train_data)
        train_data = scaler.transform(train_data)
        test_data = scaler.transform(test_data)
        return train_data, test_data
    
    train_features, test_features = normalize(train_features, test_features)

    return train_features, train_labels, test_features, test_labels

# 2. Modelagem dos Indivíduo e População

Gera um idividuo modelado como uma árvore aleatória... usando o método grow.

Inicializa a população gerando uma lista de indivíduos aleatórios.

In [8]:
def generate_random_tree(max_depth: int, current_depth: int, terminals: list, variables:list) -> Node:

    if current_depth == max_depth-1 or (current_depth > 1 and random.random() > 0.5):
        value = random.choice(variables)
        return Node(value)
    else:
        op = random.choice(terminals)
        left_subtree = generate_random_tree(max_depth, current_depth + 1, terminals, variables)
        right_subtree = generate_random_tree(max_depth, current_depth + 1, terminals, variables)
        return Node(op, left_subtree, right_subtree)

In [9]:
def initialize_population_grow(pop_size:int, max_depth:int, terminals:list, variables:list) -> list:
    population = []
    for _ in tqdm(range(pop_size),desc='Initializing population'):
        population.append(generate_random_tree(max_depth=max_depth, current_depth=0, terminals=terminals, variables=variables))

    return population

# 3. Cálculo do Fitness

Explciar cálculo da fitness através da matriz de distâncias, valor V e clusterização.

In [10]:
def evaluate_fitness(individual, features, labels):

    def evaluate_tree(node, example1, example2):
        if node.is_leaf():
            if isinstance(node.value, str):
                return example1[node.value] - example2[node.value]
            else:
                return float(node.value)
        else:
            func = node.operators[node.value]
            left_val = evaluate_tree(node.left, example1, example2)
            right_val = evaluate_tree(node.right, example1, example2)
            return func(left_val, right_val)

    def compute_distance_matrix(tree, features):
        num_examples = features.shape[0]
        distance_matrix = np.zeros((num_examples, num_examples))
        
        if not isinstance(features, pd.DataFrame):
            X = pd.DataFrame(features, columns=[f'x{i}' for i in range(features.shape[1])])
        
        for i in range(num_examples):
            for j in range(i + 1, num_examples):
                example1 = X.iloc[i].to_dict()
                example2 = X.iloc[j].to_dict()
                dist = evaluate_tree(tree, example1, example2)
                distance_matrix[i, j] = abs(dist)
                distance_matrix[j, i] = distance_matrix[i, j]
        return distance_matrix

    distance_matrix = compute_distance_matrix(individual, features)
    
    num_clusters = len(np.unique(labels))
    
    clustering = AgglomerativeClustering(n_clusters=num_clusters, metric='precomputed', linkage='average')
    clustering.fit(distance_matrix)
    
    y_pred = clustering.labels_
    fitness = v_measure_score(labels, y_pred)
    
    return fitness

In [11]:
def calculate_fitness_population(population, features, labels):
    fitness_scores = []
    for individual in tqdm(population, total=len(population), desc='Calculating fitness'):
        fitness = evaluate_fitness(individual, features, labels)
        fitness_scores.append(fitness)
    return fitness_scores

# 4. Operadores Genéticos

explciar funcionamento do crossover

In [12]:
def crossover(parent1: Node, parent2: Node) -> tuple:
    child1 = copy.deepcopy(parent1)
    child2 = copy.deepcopy(parent2)
    
    nodes1 = child1.get_all_nodes()
    nodes2 = child2.get_all_nodes()
    
    crossover_point1 = random.choice(nodes1)
    crossover_point2 = random.choice(nodes2)
    
    crossover_point1.value, crossover_point2.value = crossover_point2.value, crossover_point1.value
    crossover_point1.left, crossover_point2.left = crossover_point2.left, crossover_point1.left
    crossover_point1.right, crossover_point2.right = crossover_point2.right, crossover_point1.right
    
    return child1, child2

explicar mutação

In [13]:
def mutate(individual, terminals, variables, max_depth):
    mutant = copy.deepcopy(individual)
    
    nodes = mutant.get_all_nodes()
    
    mutation_point = random.choice(nodes)
    
    new_subtree = generate_random_tree(max_depth=max_depth, current_depth=0, terminals=terminals, variables=variables)
    
    mutation_point.value = new_subtree.value
    mutation_point.left = new_subtree.left
    mutation_point.right = new_subtree.right
    
    return mutant

explicar seleção por torneio

In [14]:
def selection(population, fitness_scores, tournament_size=0):
    selected = []
    for _ in range(len(population)):
        tournament = random.sample(list(zip(population, fitness_scores)), tournament_size)
        winner = max(tournament, key=lambda x: x[1])[0]
        selected.append(winner)
    return selected

# 5. Algoritmo de GP

explicar o algoritmo de GP

In [15]:
def gp_algortihm(pop_size: int,max_depth: int,terminals: list,variables: list,generations: int,mutation_rate: float,crossover_rate: float,tournament_size: int, elitism: bool, train_labels: list,test_labels: list,train_features,test_features) -> tuple:
    
    history = []

    population = initialize_population_grow(pop_size=pop_size, max_depth=max_depth, terminals=terminals, variables=variables)
    
    for generation in tqdm(range(generations), desc='Generations'):
        population_fitness = calculate_fitness_population(population, train_features,train_labels)

        best_fitness = max(population_fitness)

        best_individual = population[population_fitness.index(best_fitness)]

        history.append({
                'generation': generation,
                'best_fitness': best_fitness,
                'min_fitness': min(population_fitness),
                'average_fitness': np.mean(population_fitness)
        })

        new_population = []

        if elitism:
            new_population.append(best_individual)

        while len(new_population) < pop_size:
            parent1, parent2 = selection(population, population_fitness, tournament_size)

            if random.random() < crossover_rate:
                child1, child2 = crossover(parent1, parent2)

            if random.random() < mutation_rate:
                child1 = mutate(child1, terminals, variables, max_depth//2)
                child2 = mutate(child2, terminals, variables, max_depth//2)

            new_population.extend([child1, child2])

        population = new_population[:pop_size]

    result = {
        'best_train_v_measure': evaluate_fitness(best_individual, train_features, train_labels),
        'best_test_v_measure': evaluate_fitness(best_individual, test_features, test_labels),
        'best_individual': best_individual.view_expression(),
        'history': history
    }

    return result

# 6. Experimentação

In [16]:
def save_result_as_csv(result):
    results_dir = 'results'
    if not os.path.exists(results_dir):
        os.makedirs(results_dir)
    
    filename = f"experiment_{result['experiment_id']}_rep_{result['repetition']}.csv"
    filepath = os.path.join(results_dir, filename)
    
    result.to_csv(filepath, index=False)

In [17]:
def run_experiment(experiment_id, repetition, pop_size, max_depth, terminals, variables, generations, mutation_rate, crossover_rate, tournament_size, elitism, train_features, train_labels, test_features, test_labels, config):

    result = gp_algortihm(pop_size=pop_size, max_depth=max_depth, terminals=terminals, variables=variables, generations=generations, mutation_rate=mutation_rate, crossover_rate=crossover_rate, tournament_size=tournament_size, elitism=elitism, train_labels=train_labels, test_labels=test_labels, train_features=train_features, test_features=test_features)
    
    result['experiment_id'] = experiment_id
    result['repetition'] = repetition
    result['config'] = config
    
    save_result_as_csv(result)

In [18]:
def test_population(config):
    pop_size = config['population_size']
    max_depth = config['max_individual_size']
    num_generations = config['num_generations']
    elitism = config['elitism']
    n_repetitions = config['n_repetitions']
    experiment_id = config['experiment_id']
    terminals = config['terminals']
    variables = config['variables']
    train_features = config['train_features']
    train_labels = config['train_labels']
    test_features = config['test_features']
    test_labels = config['test_labels']

    for population_size in pop_size:
        for num_generation in num_generations:
            crossover_prob = 0.9
            mutation_prob = 0.05
            tournament_size = 2
            
            run_config = {
                'population_size': population_size,
                'num_generations': num_generation,
                'crossover_prob': crossover_prob,
                'mutation_prob': mutation_prob,
                'tournament_size': tournament_size,
                'experiment_id': experiment_id,
                'repetition': 0
            }
            
            for repetition in range(n_repetitions):
                run_config['repetition'] = repetition
                run_experiment(experiment_id, repetition, pop_size, max_depth, terminals, variables, num_generation, mutation_prob, crossover_prob, tournament_size, elitism, train_features, train_labels, test_features, test_labels,run_config) 
            
            experiment_id +=1



# 7. Rodar Experimentos

In [19]:
# PARAMETROS DO ALGORITMO GENETICO

### FIXOS ###
TAMANHO_MAXIMO_INDIVIDUO = 7
ELITISMO = True
REPETICOES = 10
TERMINAIS = ['+', '-', '*', '/']

### VARIÁVEIS ###
TAMAHO_POPULACAO = [30, 50, 100, 500]
NUMERO_GERACOES = [30, 50, 100, 500]
K_TORNEIO = [2,3,5,7]
PROB_OPERADORES = [
        {'CROSSOVER': 0.9, 'MUTATION': 0.05},
        {'CROSSOVER': 0.6, 'MUTATION': 0.3},
    ]

# PARAMETROS DO DATASET
WINE_FILE_NAME = 'wineRed-'
BREAST_CANCER_FILE_NAME = 'breast_cancer_coimbra_'

In [31]:
wine_train_features, wine_train_labels, wine_test_features, wine_test_labels = get_data(WINE_FILE_NAME, header=None)
wine_variables = [f'x{i}' for i in range(len(wine_train_features[1]))]

### 7.1 Teste População

In [None]:
num_processes = multiprocessing.cpu_count() - 1
experiment_id = 1

config = {
    'population_size': TAMAHO_POPULACAO,
    'max_individual_size': TAMANHO_MAXIMO_INDIVIDUO,
    'num_generations': NUMERO_GERACOES,
    'elitism': ELITISMO,
    'n_repetitions': REPETICOES,
    'experiment_id': experiment_id,
    'terminals': TERMINAIS,
    'variables': wine_variables,
    'train_features': wine_train_features,
    'train_labels': wine_train_labels,
    'test_features': wine_test_features,
    'test_labels': wine_test_labels
}

with multiprocessing.Pool(processes=num_processes) as pool:
    pool.map(run_experiment, config)