In [12]:
# Importação de bibliotecas externas necessárias
from deap import base, creator, tools, benchmarks
import deap.cma as cma

import copy
import random
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
from scipy.integrate import solve_ivp
import math
import time
import pandas as pd

# Importação de módulos personalizados
from Modules.Helper import Helper
from Modules.Solvers import Solvers
from Modules.Plotters import Plotters
from Modules.Equation import Equation

import time
import os

In [13]:
# Carrega os dados do sistema GRN5 e define as condições iniciais
labels = ['A', 'B', 'C', 'D', 'E']
df, max_data = Helper.load_data(filename='./Data/GRN5_DATA.txt', labels=labels)
initial_conditions = np.array([df[label].iloc[0] for label in labels])
t_span = (df['t'].iloc[0], df['t'].iloc[-1])  # Intervalo de tempo para simulações
t_eval = np.array(df['t'])  # Ponto de avaliação dos dados temporais
original = np.array(df[labels]).T  # Dados originais para cálculo de erro

# Limites nos valores dos coeficientes tau, k e n 
bounds = {
    'tau': (0.1, 5.0),
    'k': (0.1, 2.0),
    'n': (0.1, 30.0)
}

IND_SIZE = 19  # Tamanho do indivíduo (número total de coeficientes)
GENERATION_LIMIT = 5000
NO_IMPROVEMENT_LIMIT = 50
HARD_SIGMA_INCREASE_FACTOR = 10
TOLERANCE = 1E-4

In [14]:
# Representa um coeficiente com valor e limites
class Coefficient:
    def __init__(self, bounds_val):
        self.val = random.uniform(*bounds_val)  # Inicializa com valor aleatório dentro dos limites
        self.bounds_val = bounds_val
    
    def __repr__(self):
        return f"val={self.val}"

# Representa um coeficiente usado no CMA-ES com limites.
class CMACoefficient:
    def __init__(self, val, bounds_val):
        self.bounds_val = bounds_val
        self.val = self.limit_val(val)  # Ajusta o valor aos limites
    
    # Garante que o valor esteja dentro dos limites
    def limit_val(self, val):
        return max(self.bounds_val[0], min(val, self.bounds_val[1]))
    
    def __repr__(self):
        return f"val={self.val}"

# Representa um indivíduo contendo coeficientes e funções para manipulação
class Individual:
    def __init__(self, method='RK45', error='abs'):
        # Estrutura do indivíduo contendo os coeficientes para cada variável
        self.coeffs = {
            'A': {
                'E': {'n': None, 'k': None, '-': True},
                'tau': None
            },
            'B': {
                'A': {'n': None, 'k': None, '-': False},
                'tau': None
            },
            'C': {
                'B': {'n': None, 'k': None, '-': False},
                'tau': None,
            },
            'D': {
                'C': {'n': None, 'k': None, '-': False},
                'tau': None,
            },
            'E': {
                'D': {'n': None, 'k': None, '-': False},
                'B': {'n': None, 'k': None, '-': False},
                'E': {'n': None, 'k': None, '-': False},
                'tau': None,
            }
        }
        
        # Tamanho do indivíduo
        self.ind_size = IND_SIZE 
        
        # Fitness inicializado como infinito
        self.fitness = np.inf
        self.method = method
        self.error = error
     
    @staticmethod
    def list_to_ind(list_ind, method, error):
        i = 0
        ind = Individual(method=method, error=error)
        for key, label in ind.coeffs.items():
            label['tau'] = CMACoefficient(list_ind[i], bounds['tau'])
            i += 1
            for key, coeffs in label.items():
                if key != 'tau':
                    coeffs['n'] = CMACoefficient(list_ind[i], bounds['n'])
                    coeffs['k'] = CMACoefficient(list_ind[i+1], bounds['k'])
                    i += 2
        return ind
    
    def ind_to_list(self):
        ind_list = []
        for key, label in self.coeffs.items():
            ind_list.append(label['tau'].val)
            for key, coeffs in label.items():
                if key != 'tau':
                    ind_list.append(coeffs['n'].val)
                    ind_list.append(coeffs['k'].val)
        return ind_list
    
    @staticmethod
    def apply_bounds(population, method, error):
        for ind in population:
            list_ind = Individual.list_to_ind(ind, method, error)
            ind[:] = Individual.ind_to_list(list_ind)
    
    @staticmethod    
    def cma_evaluate(method, error, list_ind):
        ind = Individual.list_to_ind(list_ind, method, error)
        ind.calc_fitness()
        return ind.fitness,
        # return ind
    
    # Calcula o fitness do indivíduo simulando o sistema de ODEs
    def calc_fitness(self):
        try:
            equation = Equation(self.numerical_coeffs, labels)
            y = solve_ivp(self.system, t_span, initial_conditions, method=self.method, t_eval=t_eval, args=(equation, )).y
            
            if self.error == 'abs':
                self.fitness = self.abs_error(original, y)
            elif self.error == 'mse':
                self.fitness = self.MSE_error(original, y)
            elif self.error == 'mabs':
                self.fitness = self.mean_abs_error(original, y)
            elif self.error == 'squared':
                self.fitness = self.squared_error(original, y)
            
            self.fitness = min(self.fitness, 1e6)
        except:
            # Trata exceções relacionadas ao solver
            print("Overflow")
            self.fitness = 1e6
            
    def calc_all_fitness(self):
        equation = Equation(self.numerical_coeffs, labels)
        y = solve_ivp(self.system, t_span, initial_conditions, method=self.method, t_eval=t_eval, args=(equation, )).y
        
        return {
            'abs_error': self.abs_error(original, y), 
            'squared_error': self.squared_error(original, y),
            'MSE_error': self.MSE_error(original, y), 
            'mean_abs_error': self.mean_abs_error(original, y), 
        }
     
       
       
    @staticmethod
    def system(t, y, equation):
        vals = [Solvers.norm_hardcoded(val, max_data[label]) for val, label in zip(y, labels)]
        N_A, N_B, N_C, N_D, N_E = vals
        
        dA = equation.full_eq(vals, 'A', 'E')
        dB = equation.full_eq(vals, 'B', 'A')
        dC = equation.full_eq(vals, 'C', 'B')
        dD = equation.full_eq(vals, 'D', 'C')
        dE = equation.complex_eqs(vals, 'E', [['+B', '+D'], ['+D', '+E']])

        return [dA, dB, dC, dD, dE]
    
    @staticmethod
    def abs_error(original, pred):
        return sum(sum(abs(original-pred)))
    
    @staticmethod
    def squared_error(original, pred):
        return sum(sum( (original-pred)**2 ))**(1/2)
    
    @staticmethod
    def MSE_error(original, pred):
        return np.mean((original-pred)**2)
    
    @staticmethod
    def mean_abs_error(original, pred):
        return np.mean(abs(original-pred))
            
    @staticmethod
    def initialize_ind(bounds):
        ind = Individual()
        for key, label in ind.coeffs.items():
            label['tau'] = Coefficient(bounds['tau'])
            for key, coeffs in label.items():
                if key != 'tau':
                    coeffs['n'] = Coefficient(bounds['n'])
                    coeffs['k'] = Coefficient(bounds['k'])
                    
        ind.calc_fitness()
        return ind
    
    @property
    def numerical_coeffs(self,):
        
        numerical_coeffs = copy.deepcopy(self.coeffs)
        for key, label in numerical_coeffs.items():
            label['tau'] = label['tau'].val
            for key, coeffs in label.items():
                if key != 'tau':
                    coeffs['n'] = int(coeffs['n'].val)
                    coeffs['k'] = coeffs['k'].val
                    
        return numerical_coeffs
    
    # Métodos: RK45, DOP853, LSODA, BDF, 
    def plot(self, method='RK45', comparison=True):
        methods = [self.method]
        results = {}
        equation = Equation(self.numerical_coeffs, labels)
        results[method] = solve_ivp(self.system, t_span, initial_conditions, method=method, t_eval=t_eval, args=(equation, )).y
        Plotters.plot_methods(results=results,t=t_eval, methods=methods, labels=labels)
        if comparison:
            Plotters.plot_comparison(results=results, t=t_eval, df=df, methods=methods, labels=labels)
            
    @staticmethod        
    def initialize_average_bounds(bounds, ind_size):
        averages = [np.mean(value) for value in bounds.values()]
        array = np.resize(averages, ind_size)
        return array
        
        
    def __repr__(self):
        coeffs_repr = {k: v for k, v in self.coeffs.items()}
        return f"Individual(fitness={self.fitness}, coeffs={coeffs_repr}, ind_size={self.ind_size})"

In [15]:

# Criação do tipo de indivíduo e toolbox
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)

toolbox = base.Toolbox()

# Inicializa estratégia CMA-ES
centroids = Individual.initialize_average_bounds(bounds, IND_SIZE)
strategy = cma.Strategy(centroid=centroids, sigma=10, lambda_=int(4+(3*np.log(IND_SIZE))))

toolbox.register("generate", strategy.generate, creator.Individual)
toolbox.register("update", strategy.update)

# Estatísticas e parâmetros do algoritmo
hof = tools.HallOfFame(1)
stats = tools.Statistics(lambda individual: individual.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)



In [16]:
if os.path.exists('results.csv'):
    results_df = pd.read_csv('results.csv')
    existing_results = results_df[['seed', 'error_type', 'method']]
else:
    # Create an empty DataFrame if the file does not exist
    results_df = pd.DataFrame(columns=[
        'best_ind', 'error_type', 'method', 'seed', 'abs_error',
        'squared_error', 'MSE_error', 'mean_abs_error', 'execution_time'
    ])
    existing_results = pd.DataFrame(columns=['seed', 'error_type', 'method'])

all_methods = ['RK45', 'DOP853', 'LSODA']
errors = ['abs', 'squared', 'mse', 'mabs']
seeds = 10
results = []

# Main loops
for seed in range(seeds):
    for error in errors:
        for method in all_methods:
            # Check if this combination is already processed
            if not existing_results[
                (existing_results['seed'] == seed) &
                (existing_results['error_type'] == error) &
                (existing_results['method'] == method)
            ].empty:
                print(f"Skipping already processed: seed={seed}, error={error}, method={method}")
                continue

            print("Solver: {method}, Error: {error}")
            # Register evaluation function
            toolbox.register("evaluate", Individual.cma_evaluate, method, error)
            population = toolbox.generate()
            Individual.apply_bounds(population, method, error)

            # Evolution loop
            best_fitness = None
            no_improvement_counter = 0
            best_individual = None
            start_time = time.time()

            for gen in range(GENERATION_LIMIT):
                # Evaluate population
                for i, ind in enumerate(population):
                    ind.fitness.values = toolbox.evaluate(ind)

                # Compile statistics
                record = stats.compile(population)

                # Update best fitness
                current_best_fitness = min(ind.fitness.values[0] for ind in population)
                hof.update(population)

                if best_fitness is None or current_best_fitness < best_fitness - TOLERANCE:
                    best_fitness = current_best_fitness
                    no_improvement_counter = 0
                    best_individual = hof[0]
                else:
                    no_improvement_counter += 1

                if no_improvement_counter >= (NO_IMPROVEMENT_LIMIT + gen / 100):
                    strategy.sigma = min(max(strategy.sigma * HARD_SIGMA_INCREASE_FACTOR, 1), 15)
                    print(f"Sigma increased due to no improvement. New sigma: {strategy.sigma}")
                    no_improvement_counter = 0

                # Update strategy and generate next population
                toolbox.update(population)
                print(f"Generation {gen}: {record}", end='\r')

                if gen % 500 == 0:
                    best_ind = Individual.list_to_ind(best_individual, method, error)
                    best_ind.plot(comparison=False, method=method)

                population = toolbox.generate()
                Individual.apply_bounds(population, method, error)

            # End timer
            end_time = time.time()
            execution_time = end_time - start_time

            # Calculate errors and append results
            errors_result = best_ind.calc_all_fitness()
            results.append({
                'best_ind': best_ind,
                'error_type': error,
                'method': method,
                'seed': seed,
                'abs_error': errors_result['abs_error'],
                'squared_error': errors_result['squared_error'],
                'MSE_error': errors_result['MSE_error'],
                'mean_abs_error': errors_result['mean_abs_error'],
                'execution_time': execution_time,
            })

            # Save updated results
            results_df = pd.concat([results_df, pd.DataFrame(results)], ignore_index=True)
            results_df.to_csv('results.csv', index=False)


Skipping already processed: seed=0, error=abs, method=RK45
Skipping already processed: seed=0, error=abs, method=DOP853
Skipping already processed: seed=0, error=abs, method=LSODA
Skipping already processed: seed=0, error=squared, method=RK45
Skipping already processed: seed=0, error=squared, method=DOP853
Skipping already processed: seed=0, error=squared, method=LSODA
Skipping already processed: seed=0, error=mse, method=RK45
Skipping already processed: seed=0, error=mse, method=DOP853
Skipping already processed: seed=0, error=mse, method=LSODA
Skipping already processed: seed=0, error=mabs, method=RK45
Skipping already processed: seed=0, error=mabs, method=DOP853
Skipping already processed: seed=0, error=mabs, method=LSODA
Skipping already processed: seed=1, error=abs, method=RK45
Skipping already processed: seed=1, error=abs, method=DOP853
Skipping already processed: seed=1, error=abs, method=LSODA
Skipping already processed: seed=1, error=squared, method=RK45
Skipping already proce

In [17]:
results_df

Unnamed: 0,best_ind,error_type,method,seed,abs_error,squared_error,MSE_error,mean_abs_error,execution_time
0,"Individual(fitness=inf, coeffs={'A': {'E': {'n...",abs,RK45,0,69.041488,5.481817,0.120201,0.276166,513.271485
1,"Individual(fitness=inf, coeffs={'A': {'E': {'n...",abs,RK45,0,69.041488,5.481817,0.120201,0.276166,513.271485
2,"Individual(fitness=inf, coeffs={'A': {'E': {'n...",abs,DOP853,0,41.579371,3.535421,0.049997,0.166317,1587.294716
3,"Individual(fitness=inf, coeffs={'A': {'E': {'n...",abs,RK45,0,69.041488,5.481817,0.120201,0.276166,513.271485
4,"Individual(fitness=inf, coeffs={'A': {'E': {'n...",abs,DOP853,0,41.579371,3.535421,0.049997,0.166317,1587.294716
...,...,...,...,...,...,...,...,...,...
300,"Individual(fitness=inf, coeffs={'A': {'E': {'n...",mse,DOP853,9,26.130717,2.000103,0.016002,0.104523,972.757312
301,"Individual(fitness=inf, coeffs={'A': {'E': {'n...",mse,LSODA,9,26.016128,1.994873,0.015918,0.104065,255.351094
302,"Individual(fitness=inf, coeffs={'A': {'E': {'n...",mabs,RK45,9,26.013185,1.995423,0.015927,0.104053,1596.658751
303,"Individual(fitness=inf, coeffs={'A': {'E': {'n...",mabs,DOP853,9,26.011750,1.995093,0.015922,0.104047,1085.493791
