In [None]:
import itertools
import pandas as pd
import matplotlib.pyplot as plt
import polars as pl
import numpy as np

from tqdm.auto import tqdm
from pathlib import Path
from cocoviz import ProblemDescription, Result, ResultSet, Indicator, rtpplot


project_dir = Path.cwd()
parent_dir = project_dir.parent
files_dir = parent_dir / "EARS comparison" / "Algorithm results" / "Runs"
print(files_dir)

DATA_DIR = Path(files_dir)

In [None]:
def load_results(algorithms, functions, vars, runs, data_dir, max_evaluations=15000):
    """
    Load and validate experimental results from CSV files.

    Parameters:
    -----------
    algorithms : list
        List of algorithm names
    functions : list
        List of function names
    vars : list
        List of variable dimensions
    runs : list
        List of run numbers
    data_dir : Path
        Directory containing the data files
    max_evaluations : int, optional
        Maximum number of evaluations to include (default: 15000)

    Returns:
    --------
    ResultSet
        Collection of validated results

    Raises:
    -------
    ValueError
        If no valid results were loaded
    """
    all_results = list(itertools.product(algorithms, functions, vars, runs))
    results = ResultSet()

    for alg, fn, var, run in tqdm(all_results):
        file_path = data_dir / f"{alg}_{fn}_vars={var}_run={run}.csv"

        # Check if file exists
        if not file_path.exists():
            print(f"Warning: CSV file not found: {file_path}")
            continue

        try:
            data = pd.read_csv(file_path)
            # Verify required columns exist
            if "Fitness" not in data.columns:
                print(f"Error: 'Fitness' column missing in {file_path}")
                continue
            if "Evaluations" not in data.columns:
                print(f"Error: 'Evaluations' column missing in {file_path}")
                continue

            # Filter data
            data = data[data["Evaluations"] <= max_evaluations]

            # Check if data is empty after filtering
            if data.empty:
                print(f"Warning: Empty data after filtering for {file_path}")
                continue

            # Check for NaN or invalid values in "Fitness"
            if data["Fitness"].isna().any():
                print(f"Warning: NaN values in 'Fitness' column for {file_path}")
                continue

            problem = ProblemDescription(fn, 1, var, 3)
            result = Result(alg, problem, data, "Evaluations")
            results.append(result)
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            continue

    # Check if results are empty
    if not results:
        raise ValueError("No valid results were loaded. Check CSV files and data.")

    return results

In [None]:
PSO = ["PSO-EARS", "PSO-pymoo", "PSO-jMetal", "PSO-PlatEMO", "PSO-pagmo2", "PSO-YPEA", "PSO-EvoloPy", "PSO-MEALPY", "PSO-NiaPy", "PSO-DEAP", "PSO-metaheuristicOpt", "PSO-Nevergrad"]
ABC = ["ABC-EARS","ABC-MEALPY", "ABC-PlatEMO", "ABC-NiaPy", "ABC-Author-Matlab", "ABC-YPEA", "ABC-metaheuristicOpt", "ABC-pagmo2"]
GA = ["GA-PlatEMO", "GA-MEALPY", "GA-jMetal", "GA-MOEA", "GA-NiaPy", "GA-pagmo2", "GA-pymoo", "GA-metaheuristicOpt", "GA-YPEA", "GA-EvoloPy", "GA-DEAP"]
DE = ["DE-EARS", "DE-jMetal", "DE-MEALPY", "DE-MOEA", "DE-NiaPy", "DE-PlatEMO", "DE-pagmo2", "DE-Author-Java", "DE-pymoo", "DE-metaheuristicOpt", "DE-YPEA", "DE-EvoloPy", "DE-Nevergrad", "DE-DEAP"]
GWO = ["GWO-EARS", "GWO-NiaPy", "GWO-MEALPY", "GWO-pagmo2", "GWO-Author-Matlab", "GWO-PlatEMO", "GWO-metaheuristicOpt", "GWO-EvoloPy"]
CMAES = ["CMA-ES-jMetal", "CMA-ES-pagmo2", "CMA-ES-Author-Python", "CMA-ES-pymoo", "CMA-ES-DEAP"]

FUNCTIONS_D60 = [
    "ShiftedSphere",
    "ShiftedSumOfSquares",
    "ShiftedSchwefel",
    "ShiftedRastrigin",
    "ShiftedAckley",
    "ShiftedGriewank",
    "Rosenbrock"
]

FUNCTIONS_D3 = [
    "Hartman"
]

FUNCTIONS_D2 = [
    "ShekelsFoxholes",
    "SixHumpCamelBack",
    "Branin",
    "GoldsteinPrice"
]

VARS_D60 = [60]
VARS_D3 = [3]
VARS_D2 = [2]

RUN = [i for i in range(1, 51)]

In [None]:
algorithms = DE
results_D60 = load_results(
    algorithms=algorithms,
    functions=FUNCTIONS_D60,
    vars=VARS_D60,
    runs=RUN,
    data_dir=DATA_DIR,
    max_evaluations=15_000
)

results_D3 = load_results(
    algorithms=algorithms,
    functions=FUNCTIONS_D3,
    vars=VARS_D3,
    runs=RUN,
    data_dir=DATA_DIR,
    max_evaluations=15_000
)

results_D2 = load_results(
    algorithms=algorithms,
    functions=FUNCTIONS_D2,
    vars=VARS_D2,
    runs=RUN,
    data_dir=DATA_DIR,
    max_evaluations=15_000
)

number_of_targets = 1001

INDICATOR = Indicator("Fitness", display_name="Fitness", larger_is_better=False)

In [None]:
from cocoviz import indicator as ind

def generate_log_targets(results: ResultSet, indicator: ind.Indicator, min: float | str, number_of_targets: int = 101):
    indicator = ind.resolve(indicator)

    targets = {}
    for desc, problem_results in results.by_problem():
        indicator_values = pl.concat([r._data for r in problem_results])[indicator.name]
        low = min
        high = indicator_values.max()
        delta = high - low

        mul = np.logspace(-16, 0, number_of_targets)
        if low == high:
            targets[desc] = np.linspace(low, high, 1)
        elif indicator.larger_is_better:
            targets[desc] = low + delta * mul
        else:
            targets[desc] = np.flip(low + delta * mul)
    return targets

In [None]:
for desc, problem_results in results_D60.by_problem():
    if desc.name == "ShiftedRastrigin":
        shifted_rastrigin_results = problem_results
        break

targets = generate_log_targets(shifted_rastrigin_results, INDICATOR, 0.0)
#print(targets)

# Combine all functions into a single list
FUNCTIONS_ALL = FUNCTIONS_D60 + FUNCTIONS_D3 + FUNCTIONS_D2

# Merge results manually
problem_results = []
for result_set in [results_D60, results_D3, results_D2]:
    problem_results.extend(list(result_set.by_problem()))  # Combine all problem results

# Split FUNCTIONS_ALL into two groups for two columns
n_functions = len(FUNCTIONS_ALL)  # Should be 12
mid = (n_functions + 1) // 2  # Ceiling division: (12 + 1) // 2 = 6
col1_functions = FUNCTIONS_ALL[:mid]  # First 6 functions
col2_functions = FUNCTIONS_ALL[mid:]  # Last 6 functions
n_rows = mid  # 6 rows to fit all functions

# Define a color cycle with enough distinct colors
colors = plt.cm.tab20(np.linspace(0, 1, len(algorithms)))
plt.rcParams['axes.prop_cycle'] = plt.cycler(color=colors)

# Create subplots with n_rows and 2 columns
fig, axes = plt.subplots(n_rows, 2, figsize=(10, n_rows * 4), constrained_layout=True)

# Plot first column
for i, func in enumerate(col1_functions):
    ax = axes[i, 0]
    # Find matching result for this function
    for r, result_subset in problem_results:
        if r.name == func:
            rtpplot(result_subset, INDICATOR, number_of_targets=number_of_targets, ax=ax)
            ax.set_title(f"{r.name}, {r.number_of_variables}-D")
            ax.legend(fontsize=8, title='')
            break

# Plot second column
for i, func in enumerate(col2_functions):
    ax = axes[i, 1]
    # Find matching result for this function
    for r, result_subset in problem_results:
        if r.name == func:
            rtpplot(result_subset, INDICATOR, number_of_targets=number_of_targets, ax=ax)
            ax.set_title(f"{r.name}, {r.number_of_variables}-D")
            ax.legend(fontsize=8, title='')
            break

# Hide empty subplots (not needed for 12 functions, but kept for robustness)
for i in range(len(col2_functions), n_rows):
    axes[i, 1].set_visible(False)

#plt.show()
plt.tight_layout()
plt.savefig("profiles-per-problem-DE.pdf", bbox_inches="tight")

In [None]:
# Combine all functions into a single list
FUNCTIONS_ALL = FUNCTIONS_D60 + FUNCTIONS_D3 + FUNCTIONS_D2

# Merge results manually
problem_results = []
for result_set in [results_D60, results_D3, results_D2]:
    problem_results.extend(list(result_set.by_problem()))  # Combine all problem results

# Split FUNCTIONS_ALL into three groups for three columns
n_functions = len(FUNCTIONS_ALL)  # Should be 12
n_cols = 3
n_rows = 4  # 12 functions / 3 columns = 4 rows
col_size = n_functions // n_cols  # 12 // 3 = 4 functions per column
col1_functions = FUNCTIONS_ALL[:col_size]  # First 4 functions
col2_functions = FUNCTIONS_ALL[col_size:2*col_size]  # Next 4 functions
col3_functions = FUNCTIONS_ALL[2*col_size:]  # Last 4 functions

# Create subplots with 4 rows and 3 columns
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, n_rows * 4), constrained_layout=True)

# Plot first column
for i, func in enumerate(col1_functions):
    ax = axes[i, 0]
    # Find matching result for this function
    for r, result_subset in problem_results:
        if r.name == func:
            rtpplot(result_subset, INDICATOR, number_of_targets=number_of_targets, ax=ax)
            ax.set_title(f"{r.name}, {r.number_of_variables}-D")
            ax.legend(fontsize=8, title='')
            break

# Plot second column
for i, func in enumerate(col2_functions):
    ax = axes[i, 1]
    # Find matching result for this function
    for r, result_subset in problem_results:
        if r.name == func:
            rtpplot(result_subset, INDICATOR, number_of_targets=number_of_targets, ax=ax)
            ax.set_title(f"{r.name}, {r.number_of_variables}-D")
            ax.legend(fontsize=8, title='')
            break

# Plot third column
for i, func in enumerate(col3_functions):
    ax = axes[i, 2]
    # Find matching result for this function
    for r, result_subset in problem_results:
        if r.name == func:
            rtpplot(result_subset, INDICATOR, number_of_targets=number_of_targets, ax=ax)
            ax.set_title(f"{r.name}, {r.number_of_variables}-D")
            ax.legend(fontsize=8, title='')
            break

# Hide empty subplots (not needed for 12 functions, but kept for robustness)
for i in range(len(col3_functions), n_rows):
    for j in range(n_cols):
        axes[i, j].set_visible(False)

plt.savefig("profiles-per-problem-DE.pdf", bbox_inches="tight")

In [None]:
# Plot the results by aggregating over all problems
nvar = len(results_D2.number_of_variables)
fig, axes = plt.subplots(nvar, 1, figsize=(6, 4), squeeze=False)

for axrow, (d, result_subset) in zip(axes, results_D2.by_number_of_variables()):
    ax = axrow[0]  # Single axis since only one column
    rtpplot(result_subset, INDICATOR, number_of_targets=10, ax=ax)
    ax.set_title(f"{INDICATOR.display_name} ({d}-D)")

plt.tight_layout()
#plt.savefig("profiles-aggregated.pdf", bbox_inches="tight")

In [None]:
# Split FUNCTIONS into two groups for two columns
n_functions = len(FUNCTIONS_D60)
mid = (n_functions + 1) // 2  # Ceiling division for uneven split
col1_functions = FUNCTIONS_D60[:mid]  # First column
col2_functions = FUNCTIONS_D60[mid:]  # Second column
n_rows = mid  # rows to fit all functions

# Create subplots with n_rows and 2 columns
fig, axes = plt.subplots(n_rows, 2, figsize=(10, n_rows * 3), constrained_layout=True)

# Get all results by problem first
problem_results = list(results_D60.by_problem())

# Plot first column
for i, func in enumerate(col1_functions):
    ax = axes[i, 0]
    # Find matching result for this function
    for r, result_subset in problem_results:
        if r.name == func:
            rtpplot(result_subset, INDICATOR, number_of_targets=number_of_targets, ax=ax)
            ax.set_title(f"Fitness ({r.name}, {r.number_of_variables}-D)")
            break

# Plot second column
for i, func in enumerate(col2_functions):
    ax = axes[i, 1]
    # Find matching result for this function
    for r, result_subset in problem_results:
        if r.name == func:
            rtpplot(result_subset, INDICATOR, number_of_targets=number_of_targets, ax=ax)
            ax.set_title(f"Fitness ({r.name}, {r.number_of_variables}-D)")
            break

# Hide empty subplots in the second column if any
for i in range(len(col2_functions), n_rows):
    axes[i, 1].set_visible(False)

plt.show()

In [None]:
# Create subplots with len(FUNCTIONS) rows and 1 column
n_functions = len(FUNCTIONS_D2)
fig, axes = plt.subplots(n_functions, 1, figsize=(6, n_functions * 3), constrained_layout=True)

# Loop over results by problem
for ax, (r, result_subset) in zip(axes, results_D2.by_problem()):
    rtpplot(result_subset, INDICATOR, number_of_targets=number_of_targets, ax=ax)
    ax.set_title(f"Fitness ({r.name}, {r.number_of_variables}-D)")

plt.show()

In [None]:
fig, axes = plt.subplots(len(FUNCTIONS_D60), len(VARS_D60), figsize=(10, 12))
for ax, (r, result_subset) in zip(axes.flatten(), results.by_problem()):
    rtpplot(result_subset, INDICATOR, number_of_targets=number_of_targets, ax=ax)
    ax.set_title(f"Fitness ({r.name}, {r.number_of_variables}-D)")
plt.tight_layout()