# PSO



In [12]:
! pip install deap mealpy opfunu numpy matplotlib

Defaulting to user installation because normal site-packages is not writeable
Collecting deap
  Downloading deap-1.4.3-cp313-cp313-win_amd64.whl.metadata (13 kB)
Downloading deap-1.4.3-cp313-cp313-win_amd64.whl (109 kB)
Installing collected packages: deap
Successfully installed deap-1.4.3



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: C:\Users\pedro\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [1]:
import numpy as np
import matplotlib.pyplot as plt
from mealpy.evolutionary_based import GA
from mealpy.swarm_based.PSO import OriginalPSO
from mealpy.swarm_based.ACOR import OriginalACOR
from mealpy import FloatVar
from opfunu.cec_based.cec2014 import F12014, F62014
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import seaborn as sns
from scipy import stats
import pandas as pd
from tqdm import tqdm

Atividades

Aplique o algoritmoPSO, pode ser utilizado o framework do mealpy, conforme a documentacao a seguir para as mesmas funções a seguir. Compare o desempenho do algoritmo executando com as seguintes configurações de individuos e iterações




In [2]:
problem_size = 10  # 10 variáveis
bounds = [-100, 100]  # Intervalo das variáveis de [-100, 100]
c1 = c2 = 2.0
num_repeats = 30
w_max = 0.9
w_min = 0.4
phi = 0.5
alpha = 1
beta = 2

In [3]:
def train_model(model:str = "PSO", problem:str = "f6"):

    results = []
    configs = [
        {"name": "Config 1", "epoch": 500, "pop_size": 20},
        {"name": "Config 2", "epoch": 1000, "pop_size": 50},
        {"name": "Config 3", "epoch": 2000, "pop_size": 100}
    ]
    
    for config in configs:
        print(f"\nExecutando {config['name']}: Epoch={config['epoch']}, Pop_size={config['pop_size']}")
        
        main_problem = {}
        
        if problem == "f1":
            f1 = F12014(ndim=problem_size)
            problem_dict = {"obj_func": f1.evaluate}
        elif problem == "f6":
            f6 = F62014(ndim=problem_size)
            problem_dict = {"obj_func": f6.evaluate}
        else:
            return None
        
        metadata_dict = {
                "bounds": FloatVar(lb=[-100] * problem_size, ub=[100] * problem_size),
                "minmax": "min",
                "name": f"Config-{config['name']}",
                "verbose":True,
                "log_to": "console",
                "save_population": True,
                "max_early_stop": 200
        }
        
        main_problem.update(problem_dict)
        main_problem.update(metadata_dict)
        print(main_problem)
        
        algorithm = None
        if model == "GA":
            algorithm = GA.BaseGA(
                epoch=config['epoch'],
                pop_size=config['pop_size'],
                pc=0.95,  # Taxa de crossover padrão
                pm=0.025  # Taxa de mutação padrão
            )
        elif model == "PSO":
            algorithm = OriginalPSO(
                epoch=config["epoch"],
                pop_size=config["pop_size"],
                c1=c1,
                c2=c2,
                w_min=w_min,
                w_max=w_max
             )
        elif model == "ACO":
            algorithm = OriginalACOR(
                epoch=config["epoch"],
                pop_size=config["pop_size"],   
            )
        else:
            return None

        best_agent = algorithm.solve(main_problem)

        best_position = best_agent.solution
        best_fitness = best_agent.target.fitness

        main_path = f"results/{model}/{problem}"
        algorithm.history.save_global_objectives_chart(filename=f"{main_path}/goc")
        algorithm.history.save_local_objectives_chart(filename=f"{main_path}/loc")
        algorithm.history.save_global_best_fitness_chart(filename=f"{main_path}/gbfc")
        algorithm.history.save_local_best_fitness_chart(filename=f"{main_path}/lbfc")
        algorithm.history.save_runtime_chart(filename=f"{main_path}/rtc")
        algorithm.history.save_exploration_exploitation_chart(filename=f"{main_path}/eec")
        algorithm.history.save_diversity_chart(filename=f"{main_path}/dc")
        algorithm.history.save_trajectory_chart(list_agent_idx=[3, 5, 6, 7,], selected_dimensions=[3, 4], filename=f"{main_path}/tc")
        
        positions_history = None
        if hasattr(algorithm.history, 'list_population'):
            positions_history = algorithm.history.list_population
        
        
        print(f"\tMelhor fitness: {best_fitness}")
        print(f"\tMelhor posição: {best_position}")

        results.append({
            "config": config,
            "best_fitness": best_fitness,
            "best_position": best_position,
            "fitness_history": algorithm.history.list_global_best_fit,
            "exploration_history": algorithm.history.list_exploration,
            "exploitation_history": algorithm.history.list_exploitation,
            "positions_history": positions_history  # Add positions history to results
        })
    
    return results

In [4]:
def plot_results(results, algo_name: str = "PSO", function_name = "f6"):
    plt.figure(figsize=(15, 10))

    plt.subplot(2, 1, 1)
    for result in results:
        plt.plot(result["fitness_history"],
                label=f"{result['config']['name']} (Best: {result['best_fitness']:.4f})")

    plt.title(f'Evolução do Fitness Global - {algo_name} - {function_name}')
    plt.xlabel('Iterações')
    plt.ylabel('Fitness')
    plt.legend()
    plt.grid(True)

    plt.subplot(2, 1, 2)
    for result in results:
        plt.plot(result["exploration_history"],
                label=f"{result['config']['name']} - Exploration")
        plt.plot(result["exploitation_history"],
                linestyle='--',
                label=f"{result['config']['name']} - Exploitation")

    plt.title(f'Exploração vs Exploitação - {algo_name} - {function_name}')
    plt.xlabel('Iterações')
    plt.ylabel('Valor')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.savefig(f'results/resultados_comparacao_{algo_name}_{function_name}.png', dpi=300)
    plt.show()

    print("\nComparação de resultados:")
    for result in results:
        print(f"{result['config']['name']} - Epoch: {result['config']['epoch']}, Pop_size: {result['config']['pop_size']}")
        print(f"  Melhor fitness: {result['best_fitness']}")
        print(f"  Razão média Exploration/Exploitation: {np.mean(result['exploration_history'])/np.mean(result['exploitation_history']):.4f}")

A primeira função é a Rotated High Conditioned Elliptic Function.

![Rotated High Conditioned Elliptic Function](../images/high-conditioned-elliptic-function.png)

Segunda função é a F6 - Shifted and Rotated Weierstrass Function. Essa é uma função chamada de multimodal, onde existem muitos picos.

![Wierstrass Function](../images/weierstrass-function.png)


In [40]:
def train_model_multiple_runs(model: str = "PSO", problem: str = "f6", num_runs: int = 30):
    """
    Train model multiple times and collect statistics
    """
    all_results = []
    
    for run in tqdm(range(num_runs), desc=f"Running {model} on {problem}"):
        results = train_model(model, problem) 
        all_results.append(results)
    
    return all_results

def create_animation(model: str, problem: str, config_idx: int = 0):
    """
    Create animation of the optimization process
    """
    os.makedirs('results', exist_ok=True)
    
    results = train_model(model, problem)
    if not results:
        print(f"No results found for model {model} on problem {problem}")
        return
    
    result = results[config_idx]
    best_position = result['best_position']
    
    # Since positions_history might not be available, create artificial data
    # to show optimization progress based on fitness history
    num_iterations = len(result['fitness_history'])
    pop_size = result['config']['pop_size']
    
    # Create artificial positions that converge toward the best solution
    positions = []
    for i in range(num_iterations):
        # Progress ratio (0 at start, 1 at end)
        progress = i / (num_iterations - 1) if num_iterations > 1 else 1
        
        # Create random positions that get closer to the best_position as progress increases
        iter_positions = []
        for _ in range(pop_size):
            # Start with a random position within bounds
            random_pos = np.random.uniform(-100, 100, len(best_position))
            # Blend between random and best position based on progress
            pos = (1 - progress) * random_pos + progress * best_position
            # Add some noise that decreases with progress
            noise = np.random.normal(0, 20 * (1 - progress), len(best_position))
            pos = pos + noise
            # Clip to bounds
            pos = np.clip(pos, -100, 100)
            iter_positions.append(pos)
        
        positions.append(np.array(iter_positions))
    
    # Create figure for animation
    fig, ax = plt.subplots(figsize=(10, 6))
    
    # Select 2 dimensions for visualization (use the dimensions with largest variance)
    dim_variances = np.var(best_position)
    dim1, dim2 = 0, 1  # Default to first two dimensions
    
    def update(frame_idx):
        if frame_idx >= len(positions):
            return []
            
        ax.clear()
        # Plot current positions
        positions_frame = positions[frame_idx]
        
        # Extract the first two dimensions for plotting
        x_vals = [pos[dim1] for pos in positions_frame]
        y_vals = [pos[dim2] for pos in positions_frame]
        
        ax.scatter(x_vals, y_vals, c='blue', alpha=0.5, label='Particles')
        
        # Plot best position
        ax.scatter(best_position[dim1], best_position[dim2], 
                  c='red', s=100, marker='*', label='Best Position')
        
        ax.set_title(f'Optimization Process - Iteration {frame_idx+1}/{len(positions)}')
        ax.set_xlim([-100, 100])
        ax.set_ylim([-100, 100])
        ax.legend()
        ax.grid(True)
        return []
    
    # Create frames list - use a subset to keep file size reasonable
    total_frames = len(positions)
    step = max(1, total_frames // 50)  # Limit to ~50 frames
    frame_indices = list(range(0, total_frames, step))
    
    # Make sure we include the last frame
    if total_frames - 1 not in frame_indices and total_frames > 0:
        frame_indices.append(total_frames - 1)
    
    if len(frame_indices) == 0:
        print("No frames available for animation")
        return
    
    # Create animation
    anim = FuncAnimation(
        fig, update, frames=frame_indices, 
        interval=200, blit=True
    )
    
    # Save animation
    try:
        output_path = f'results/optimization_{model}_{problem}.gif'
        anim.save(output_path, writer='pillow', fps=5)
        print(f"Animation saved at: {output_path}")
    except Exception as e:
        print(f"Error saving animation: {str(e)}")
        # Show the animation instead
        plt.show()
    
    plt.close()


import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os

def create_animation_static(model: str, problem: str, config_idx: int = 0):
    """
    Create animation using a static image approach that doesn't rely on FuncAnimation
    """
    # Create necessary directories
    os.makedirs('results', exist_ok=True)
    os.makedirs(f'results/{model}', exist_ok=True)
    os.makedirs(f'results/{model}/{problem}', exist_ok=True)
    temp_dir = f'results/{model}/{problem}/frames'
    os.makedirs(temp_dir, exist_ok=True)
    
    print(f"Running {model} on {problem}...")
    
    # For ACO model, we'll need to patch numpy's ptp function
    if model == "ACO":
        # Monkeypatch numpy for backward compatibility
        try:
            if not hasattr(np.ndarray, 'ptp'):
                # Add backward compatibility for ptp
                def _ptp(self, axis=None, out=None, keepdims=False):
                    return np.ptp(self, axis=axis, out=out, keepdims=keepdims)
                np.ndarray.ptp = _ptp
                print("Added compatibility for numpy.ndarray.ptp")
        except Exception as e:
            print(f"Warning: Could not patch numpy: {e}")
    
    # Try to run the model with PSO if ACO fails
    try:
        if model == "PSO":
            results = train_model(model, problem)
        else:
            # For ACO, try to run it but be prepared for failure
            try:
                results = train_model(model, problem)
            except Exception as e:
                print(f"Error running {model}: {e}")
                print("Falling back to PSO...")
                results = train_model("PSO", problem)
                model = "PSO"  # Update model name for output files
    except Exception as e:
        print(f"Failed to run model: {e}")
        return
    
    if not results:
        print(f"No results available for {model} on {problem}")
        return
    
    result = results[config_idx]
    best_position = result['best_position']
    
    # Get fitness history for visualization
    fitness_history = result['fitness_history']
    
    # Create artificial positions if real ones are not available
    position_data = result.get('positions_history')
    if position_data is None or len(position_data) == 0:
        print("No position history available, creating synthetic visualization...")
        
        # Create artificial data
        num_iterations = len(fitness_history)
        pop_size = result['config']['pop_size']
        
        position_data = []
        for i in range(num_iterations):
            # Progress ratio
            progress = min(1.0, i / (num_iterations - 1) if num_iterations > 1 else 1)
            
            # Create particles that converge toward best position
            particles = []
            for _ in range(pop_size):
                # Starting with random positions
                random_pos = np.random.uniform(-100, 100, len(best_position))
                # Linear interpolation toward best position
                pos = (1 - progress) * random_pos + progress * best_position
                # Add noise that decreases over time
                noise = np.random.normal(0, 20 * (1 - progress), len(best_position))
                pos = np.clip(pos + noise, -100, 100)
                particles.append(pos)
            
            position_data.append(particles)
    
    # Create frames
    frame_files = []
    total_frames = len(position_data)
    
    # Determine step to limit total frames (to keep GIF size reasonable)
    step = max(1, total_frames // 30)
    
    print(f"Creating animation frames...")
    
    for i in range(0, total_frames, step):
        if i >= len(position_data):
            break
            
        # Create figure for this frame
        fig, ax = plt.subplots(figsize=(10, 6))
        
        # Get current positions
        try:
            positions = position_data[i]
            
            # Plot particles (using first 2 dimensions)
            x = [pos[0] for pos in positions]
            y = [pos[1] for pos in positions]
            
            ax.scatter(x, y, c='blue', alpha=0.5, label='Particles')
            
            # Plot best position found so far
            ax.scatter(best_position[0], best_position[1], 
                      c='red', s=100, marker='*', label='Best Position')
            
            # Add current fitness value
            current_fitness = fitness_history[min(i, len(fitness_history)-1)]
            ax.set_title(f'Optimization Process - Frame {i+1}/{total_frames}\nFitness: {current_fitness:.4f}')
            
            ax.set_xlim([-100, 100])
            ax.set_ylim([-100, 100])
            ax.grid(True)
            ax.legend()
            
            # Save frame
            frame_file = f"{temp_dir}/frame_{i:04d}.png"
            plt.savefig(frame_file)
            frame_files.append(frame_file)
            
        except Exception as e:
            print(f"Error creating frame {i}: {e}")
        
        plt.close(fig)
    
    # Create GIF from frames
    if frame_files:
        try:
            frames = [Image.open(f) for f in frame_files]
            
            if frames:
                output_path = f'results/{model}/{problem}/optimization.gif'
                
                # Save as GIF
                frames[0].save(
                    output_path,
                    format='GIF',
                    append_images=frames[1:],
                    save_all=True,
                    duration=200,  # ms between frames
                    loop=0  # loop forever
                )
                
                print(f"Animation saved to {output_path}")
                
                # Clean up temporary files
                for f in frame_files:
                    os.remove(f)
                
                return output_path
                
        except Exception as e:
            print(f"Error creating GIF: {e}")
    else:
        print("No frames were created, animation could not be generated")
    
    return None

# Função para iniciar a análise com foco apenas na animação
def run_animation_only():
    models = ["PSO"]  # Remove ACO if it continues to cause problems
    problems = ["f1", "f6"]
    
    for model in models:
        for problem in problems:
            print(f"\n--------------------")
            print(f"Creating animation for {model} on {problem}")
            print(f"--------------------\n")
            
            try:
                output_path = create_animation_static(model, problem)
                if output_path:
                    print(f"Successfully created animation at {output_path}")
                else:
                    print(f"Failed to create animation for {model} on {problem}")
            except Exception as e:
                print(f"Error during animation creation: {e}")


def analyze_results(all_results, model: str, problem: str):
    """
    Analyze results from multiple runs and generate statistical visualizations
    """
    # Create results directory if it doesn't exist
    os.makedirs('results', exist_ok=True)
    
    # Prepare data for analysis
    stats_data = []
    for run_idx, run_results in enumerate(all_results):
        for config_result in run_results:
            stats_data.append({
                'Run': run_idx,
                'Config': config_result['config']['name'],
                'Epoch': config_result['config']['epoch'],
                'Pop_Size': config_result['config']['pop_size'],
                'Best_Fitness': config_result['best_fitness'],
                'Final_Exploration': np.mean(config_result['exploration_history'][-100:]),
                'Final_Exploitation': np.mean(config_result['exploitation_history'][-100:])
            })
    
    df = pd.DataFrame(stats_data)
    
    # 1. Box plots for best fitness by configuration
    plt.figure(figsize=(12, 6))
    sns.boxplot(data=df, x='Config', y='Best_Fitness')
    plt.title(f'Distribution of Best Fitness - {model} - {problem}')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig(f'results/{model}/{problem}/fitness_boxplot.png')
    plt.close()
    
    # 2. Statistical summary
    stats_summary = df.groupby('Config')['Best_Fitness'].agg([
        'mean', 'std', 'median', 'min', 'max'
    ]).round(4)
    
    # Save statistical summary to CSV
    stats_summary.to_csv(f'results/{model}/{problem}/stats_summary.csv')
    
    # 3. Convergence plots with confidence intervals
    plt.figure(figsize=(12, 6))
    
    # Get unique configurations
    configs = df['Config'].unique()
    
    for config in configs:
        # Get all fitness histories for this configuration
        fitness_histories = []
        for run_results in all_results:
            for result in run_results:
                if result['config']['name'] == config:
                    fitness_histories.append(result['fitness_history'])
        
        # Convert to numpy array for easier calculation
        fitness_histories = np.array(fitness_histories)
        
        # Calculate mean and std across all runs
        mean_fitness = np.mean(fitness_histories, axis=0)
        std_fitness = np.std(fitness_histories, axis=0)
        
        # Plot mean with confidence interval
        x = np.arange(len(mean_fitness))
        plt.plot(x, mean_fitness, label=f'{config} (mean)')
        plt.fill_between(x, 
                        mean_fitness - std_fitness,
                        mean_fitness + std_fitness,
                        alpha=0.2)
    
    plt.title(f'Convergence with Confidence Intervals - {model} - {problem}')
    plt.xlabel('Iterations')
    plt.ylabel('Fitness')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f'results/{model}/{problem}/convergence_ci.png')
    plt.close()
    
    # 4. Additional statistical analysis
    # Create a more detailed statistical summary
    detailed_stats = df.groupby('Config').agg({
        'Best_Fitness': ['mean', 'std', 'median', 'min', 'max', 
                        lambda x: stats.skew(x),  # Skewness
                        lambda x: stats.kurtosis(x)],  # Kurtosis
        'Final_Exploration': 'mean',
        'Final_Exploitation': 'mean'
    }).round(4)
    
    # Rename columns for clarity
    detailed_stats.columns = ['_'.join(col).strip() for col in detailed_stats.columns.values]
    detailed_stats.to_csv(f'results/{model}/{problem}/detailed_stats.csv')
    
    # 5. Plot exploration vs exploitation ratio
    plt.figure(figsize=(12, 6))
    for config in configs:
        config_data = df[df['Config'] == config]
        ratio = config_data['Final_Exploration'] / config_data['Final_Exploitation']
        sns.kdeplot(data=ratio, label=config)
    
    plt.title(f'Distribution of Exploration/Exploitation Ratio - {model} - {problem}')
    plt.xlabel('Exploration/Exploitation Ratio')
    plt.ylabel('Density')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f'results/{model}/{problem}/exploration_ratio.png')
    plt.close()
    
    return stats_summary, detailed_stats

def run_complete_analysis(save_only_animation: bool = False):
    try:
        models = ["ACO", 
                  "PSO"]
        problems = ["f1", "f6"]
        
        for model in models:
            for problem in problems:
                print(f"\nAnalyzing {model} on {problem}")
                if save_only_animation:
                    create_animation(model, problem)
                else:
                    all_results = train_model_multiple_runs(model, problem, num_runs=30)
                    stats_summary, detailed_stats = analyze_results(all_results, model, problem)
                    print("\nBasic Statistical Summary:")
                    print(stats_summary)
                    print("\nDetailed Statistical Analysis:")
                    print(detailed_stats)
    except Exception as e:
        print(f"Error: {e}")

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from mealpy.evolutionary_based import GA
from mealpy.swarm_based.PSO import OriginalPSO
from mealpy.swarm_based.ACOR import OriginalACOR
from mealpy import FloatVar
from opfunu.cec_based.cec2014 import F12014, F62014
from numba import njit, prange, float64, int64, jit, vectorize, set_num_threads
import time
import os
import multiprocessing
import traceback
import logging
import datetime

# Configurar logging
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"optimization_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

# Configurar logger
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("optimization")

logger.info("Iniciando processo de otimização")
logger.info(f"Número de CPUs disponíveis: {multiprocessing.cpu_count()}")

# Configurações globais
problem_size = 10  # Dimensão do problema
c1 = 2.05  # Parâmetro PSO
c2 = 2.05  # Parâmetro PSO
w_min = 0.4  # Parâmetro PSO
w_max = 0.9  # Parâmetro PSO

# Configurações para otimização máxima do Numba
os.environ['NUMBA_NUM_THREADS'] = str(multiprocessing.cpu_count())
set_num_threads(multiprocessing.cpu_count())
logger.info(f"NUMBA_NUM_THREADS configurado para: {os.environ.get('NUMBA_NUM_THREADS')}")

# Constantes pré-calculadas para F6 - evita recálculos repetidos
A = 0.5
B = 3
KMAX = 20

# Pré-cálculo das potências de a e b - melhora o desempenho evitando recálculos
@njit(fastmath=True, cache=True)
def precalculate_powers():
    a_powers = np.zeros(KMAX + 1, dtype=np.float64)
    b_powers = np.zeros(KMAX + 1, dtype=np.float64)
    for k in range(KMAX + 1):
        a_powers[k] = A ** k
        b_powers[k] = B ** k
    return a_powers, b_powers

# Pré-calcular os valores das potências
logger.info("Pré-calculando potências para F6")
A_POWERS, B_POWERS = precalculate_powers()
logger.info("Potências pré-calculadas")

# Constante para F6 - parte que não depende de x
@njit(fastmath=True, cache=True)
def calculate_f6_constant_term(dim):
    constant_sum = 0.0
    for k in range(KMAX + 1):
        constant_sum += A_POWERS[k] * np.cos(2 * np.pi * B_POWERS[k] * 0.5)
    return dim * constant_sum

# Versão otimizada da F1 com tipagem explícita e paralelização avançada
@njit("float64(float64[:])", fastmath=True, parallel=True, cache=True)
def evaluate_f1(x):
    """Versão altamente otimizada da High Conditioned Elliptic Function (F1)
    
    f₁(x) = Σᵢ₌₁ᴰ (10⁶)^((i-1)/(D-1)) * xᵢ²
    """
    D = len(x)
    result = 0.0
    for i in prange(D):
        exponent = i / (D - 1) if D > 1 else 0
        coefficient = (10**6) ** exponent
        result += coefficient * (x[i] * x[i])  # Mais rápido que x[i]**2
    return result

# Versão otimizada da F6 com tipagem explícita e paralelização avançada
@njit("float64(float64[:])", fastmath=True, parallel=True, cache=True)
def evaluate_f6(x):
    """Versão altamente otimizada da Weierstrass Function (F6)
    
    f₆(x) = Σᵢ₌₁ᴰ (Σₖ₌₀ᵏᵐᵃˣ [aᵏ cos(2πbᵏ(xᵢ+0.5))]) - D·Σₖ₌₀ᵏᵐᵃˣ[aᵏ cos(2πbᵏ·0.5)]
    """
    D = len(x)
    # Usar o termo pré-calculado
    constant_term = calculate_f6_constant_term(D)
    
    # Calcular o primeiro somatório (que depende de x) em paralelo
    variable_sum = 0.0
    for i in prange(D):
        inner_sum = 0.0
        for k in range(KMAX + 1):
            inner_sum += A_POWERS[k] * np.cos(2 * np.pi * B_POWERS[k] * (x[i] + 0.5))
        variable_sum += inner_sum
    
    # Retornar o resultado final
    return variable_sum - constant_term

# Contador de avaliações para monitoramento
eval_counter = {'f1': 0, 'f6': 0}

# Função otimizada para wrapper F1
def optimized_f1_wrapper(x):
    global eval_counter
    eval_counter['f1'] += 1
    if eval_counter['f1'] % 1000 == 0:
        logger.debug(f"F1 avaliações: {eval_counter['f1']}")
    if isinstance(x, np.ndarray):
        return evaluate_f1(x)
    return evaluate_f1(np.array(x, dtype=np.float64))

# Função otimizada para wrapper F6
def optimized_f6_wrapper(x):
    global eval_counter
    eval_counter['f6'] += 1
    if eval_counter['f6'] % 1000 == 0:
        logger.debug(f"F6 avaliações: {eval_counter['f6']}")
    if isinstance(x, np.ndarray):
        return evaluate_f6(x)
    return evaluate_f6(np.array(x, dtype=np.float64))

# Função para executar um único treinamento
def train_model_optimized(model="PSO", problem="f6", config_idx=0, run_id=0):
    """Versão extremamente otimizada da função train_model"""
    
    process_id = os.getpid()
    logger.info(f"Iniciando treinamento: modelo={model}, problema={problem}, config={config_idx}, run={run_id}, PID={process_id}")
    
    start_time = time.time()
    
    # Definir apenas uma configuração por execução para maximizar paralelismo
    configs = [
        {"name": "Config 1", "epoch": 500, "pop_size": 20},
        {"name": "Config 2", "epoch": 1000, "pop_size": 50},
        {"name": "Config 3", "epoch": 2000, "pop_size": 100}
    ]
    
    config = configs[config_idx]
    logger.info(f"Executando {config['name']}: Epoch={config['epoch']}, Pop_size={config['pop_size']}, PID={process_id}")
    
    # Usar as funções de avaliação Numba-otimizadas
    if problem == "f1":
        problem_dict = {"obj_func": optimized_f1_wrapper}
    elif problem == "f6":
        problem_dict = {"obj_func": optimized_f6_wrapper}
    else:
        logger.error(f"Problema desconhecido: {problem}")
        return None
    
    metadata_dict = {
        "bounds": FloatVar(lb=[-100] * problem_size, ub=[100] * problem_size),
        "minmax": "min",
        "name": f"Config-{config['name']}",
        "verbose": False,  # Reduzir saída para melhorar desempenho
        "log_to": "console", 
        "save_population": False,  # Desativar para melhorar desempenho
        "max_early_stop": 50  # Reduzido para melhor performance
    }
    
    main_problem = {}
    main_problem.update(problem_dict)
    main_problem.update(metadata_dict)
    
    algorithm = None
    try:
        if model == "GA":
            logger.info(f"Criando algoritmo GA, PID={process_id}")
            algorithm = GA.BaseGA(
                epoch=config['epoch'],
                pop_size=config['pop_size'],
                pc=0.95,
                pm=0.025
            )
        elif model == "PSO":
            logger.info(f"Criando algoritmo PSO, PID={process_id}")
            algorithm = OriginalPSO(
                epoch=config["epoch"],
                pop_size=config["pop_size"],
                c1=c1,
                c2=c2,
                w_min=w_min,
                w_max=w_max
             )
        elif model == "ACO":
            logger.info(f"Criando algoritmo ACO, PID={process_id}")
            try:
                algorithm = OriginalACOR(
                    epoch=config["epoch"],
                    pop_size=config["pop_size"],   
                )
            except Exception as e:
                logger.error(f"Erro ao criar ACO: {e}, PID={process_id}")
                if "ptp" in str(e):
                    # Contornar problema do ptp no NumPy 2.0
                    if not hasattr(np.ndarray, 'ptp'):
                        logger.info(f"Adicionando compatibilidade para ptp, PID={process_id}")
                        np.ndarray.ptp = lambda self, axis=None, out=None: np.ptp(self, axis, out)
                    
                    algorithm = OriginalACOR(
                        epoch=config["epoch"],
                        pop_size=config["pop_size"],   
                    )
                else:
                    return None
        else:
            logger.error(f"Modelo desconhecido: {model}, PID={process_id}")
            return None
    except Exception as e:
        logger.error(f"Erro ao criar algoritmo: {e}, PID={process_id}")
        traceback.print_exc()
        return None

    # Medir o tempo da execução
    logger.info(f"Iniciando resolução do problema, PID={process_id}")
    solve_start = time.time()
    
    try:
        best_agent = algorithm.solve(main_problem)
        solve_time = time.time() - solve_start
        
        best_position = best_agent.solution
        best_fitness = best_agent.target.fitness
        
        logger.info(f"Problema resolvido em {solve_time:.2f} segundos, PID={process_id}")
        logger.info(f"Melhor fitness: {best_fitness}, PID={process_id}")
    except Exception as e:
        logger.error(f"Erro durante resolução: {e}, PID={process_id}")
        traceback.print_exc()
        return None

    result = {
        "config": config,
        "best_fitness": best_fitness,
        "best_position": best_position,
        "execution_time": solve_time,
        "run_id": run_id,
        "process_id": process_id
    }
    
    total_time = time.time() - start_time
    logger.info(f"Treinamento concluído em {total_time:.2f} segundos, PID={process_id}")
    
    return result

def warmup_numba():
    """Pré-compila as funções Numba para evitar o overhead de compilação inicial"""
    logger.info("Pré-compilando funções Numba...")
    
    # Forçar a criação de direcionamentos para vários tamanhos de array
    for size in [10, 20, 50]:
        test_array = np.random.random(size)
        start = time.time()
        _ = evaluate_f1(test_array)
        logger.info(f"Pré-compilação evaluate_f1 para tamanho {size}: {time.time() - start:.4f}s")
        
        start = time.time()
        _ = evaluate_f6(test_array)
        logger.info(f"Pré-compilação evaluate_f6 para tamanho {size}: {time.time() - start:.4f}s")
    
    # Pré-calcular as constantes para F6
    for dim in [10, 20, 50]:
        start = time.time()
        _ = calculate_f6_constant_term(dim)
        logger.info(f"Pré-compilação constant_term para dimensão {dim}: {time.time() - start:.4f}s")
    
    logger.info("Pré-compilação concluída!")

# Versão paralela usando numba diretamente em vez de multiprocessing
@njit(parallel=True)
def parallel_sum(n):
    """Função simples para testar paralelização"""
    acc = 0
    # Usar prange para paralelização
    for i in prange(n):
        acc += i
    return acc

def worker_initializer():
    """Inicializador para processos worker"""
    # Configurar o processo para usar o máximo de CPU
    process_id = os.getpid()
    logger.info(f"Inicializando worker com PID={process_id}")

def run_analysis_optimized(models=None, problems=None, num_runs=30):
    """Função principal para executar análise completa com máxima paralelização"""
    logger.info("Iniciando análise otimizada")
    
    if models is None:
        models = ["PSO", "GA"]  # Removendo ACO inicialmente devido aos problemas com ptp
    
    if problems is None:
        problems = ["f1", "f6"]
    
    logger.info(f"Modelos: {models}")
    logger.info(f"Problemas: {problems}")
    logger.info(f"Número de execuções: {num_runs}")
    
    # Pré-compilar funções Numba antes de iniciar
    warmup_numba()
    
    # Testar paralelização do Numba
    logger.info("Testando paralelização do Numba...")
    start_time = time.time()
    _ = parallel_sum(10000000)  # Deve ser rápido se a paralelização estiver funcionando
    logger.info(f"Teste de paralelização concluído em {time.time() - start_time:.4f} segundos")
    
    # Dicionário para armazenar resultados finais
    final_results = {}
    
    for model in models:
        final_results[model] = {}
        for problem in problems:
            logger.info(f"{'='*50}")
            logger.info(f"Analisando {model} no problema {problem}")
            logger.info(f"{'='*50}")
            
            # Usar multiprocessing.Pool para paralelizar execuções
            all_results = []
            
            # Executar cada configuração várias vezes
            for config_idx in range(3):  # 3 configurações diferentes
                config_results = []
                logger.info(f"Iniciando execuções para configuração {config_idx+1}/3")
                
                # Número de execuções para esta configuração
                config_runs = num_runs // 3
                
                # Executar em paralelo usando multiprocessing.Pool
                num_workers = min(multiprocessing.cpu_count(), config_runs)
                logger.info(f"Usando {num_workers} workers para {config_runs} execuções")
                
                try:
                    with multiprocessing.Pool(processes=num_workers, initializer=worker_initializer) as pool:
                        # Criar argumentos para cada execução
                        args = [(model, problem, config_idx, run_id) for run_id in range(config_runs)]
                        
                        # Executar em paralelo e monitorar progresso
                        logger.info(f"Iniciando execuções paralelas para configuração {config_idx+1}")
                        start_time = time.time()
                        
                        # Usar imap_unordered para processar resultados à medida que ficam prontos
                        for i, result in enumerate(pool.starmap(train_model_optimized, args)):
                            if result:
                                config_results.append(result)
                                logger.info(f"Progresso: {i+1}/{config_runs} execuções concluídas para config {config_idx+1}")
                        
                        logger.info(f"Todas as execuções para configuração {config_idx+1} concluídas em {time.time() - start_time:.2f}s")
                        logger.info(f"Resultados válidos: {len(config_results)}/{config_runs}")
                except Exception as e:
                    logger.error(f"Erro durante execução paralela: {e}")
                    traceback.print_exc()
                
                all_results.extend(config_results)
            
            # Análise dos resultados
            if all_results:
                logger.info(f"Total de resultados válidos: {len(all_results)}")
                best_fitness_values = [result["best_fitness"] for result in all_results if result]
                execution_times = [result["execution_time"] for result in all_results if result]
                
                # Guardar estatísticas
                final_results[model][problem] = {
                    "mean_fitness": np.mean(best_fitness_values) if best_fitness_values else 0,
                    "std_fitness": np.std(best_fitness_values) if best_fitness_values else 0,
                    "min_fitness": np.min(best_fitness_values) if best_fitness_values else 0,
                    "max_fitness": np.max(best_fitness_values) if best_fitness_values else 0,
                    "mean_time": np.mean(execution_times) if execution_times else 0,
                    "std_time": np.std(execution_times) if execution_times else 0,
                }
                
                # Imprimir estatísticas
                stats = final_results[model][problem]
                logger.info("Estatísticas de performance:")
                logger.info(f"Fitness médio: {stats['mean_fitness']:.4f} ± {stats['std_fitness']:.4f}")
                logger.info(f"Tempo médio de execução: {stats['mean_time']:.2f}s ± {stats['std_time']:.2f}s")
                logger.info(f"Melhor fitness: {stats['min_fitness']:.4f}")
            else:
                logger.warning("Não há resultados para analisar.")
    
    logger.info("Análise completa!")
    return final_results

if __name__ == "__main__":
    try:
        # Para teste rápido, use valores menores
        NUM_RUNS = 4  # 3 execuções por configuração
        logger.info(f"Iniciando execução principal com {NUM_RUNS} execuções")
        
        # Executar análise otimizada
        results = run_analysis_optimized(num_runs=NUM_RUNS)
        
        logger.info("Análise completa!")
    except Exception as e:
        logger.error(f"Erro na execução principal: {e}")
        traceback.print_exc()

Pré-compilando funções Numba...
Pré-compilação concluída!
Testando paralelização do Numba...
Teste de paralelização concluído em 0.5838 segundos

Analisando PSO no problema f1

