In [1]:
import numpy as np
import matplotlib.pyplot as plt

In [2]:
import matplotlib as mpl

mpl.rcParams["figure.dpi"] = 1000
# mpl.rcParams['legend.fontsize'] = 15
# params = {'legend.fontsize': 20,
#           'legend.handlelength': 2}
# plot.rcParams.update(params)

In [3]:
from algorithms.moo.nsga2 import NSGA2
from constraints.as_obj import ConstraintsAsObjective
from operators.survival.rank_and_crowding.classes import (
    RankAndCrowding,
    MyConstrRankAndCrowding,
    ParallelConstrRankAndCrowding,
    MyConstrRankAndCrowding2,
)
from optimize import minimize
from visualization.scatter import Scatter
from pymoo.problems import get_problem

In [4]:
core_moea1 = NSGA2(pop_size=300, survival=RankAndCrowding())
core_moea2 = NSGA2(pop_size=300, survival=MyConstrRankAndCrowding())
core_moea3 = NSGA2(pop_size=300, survival=ParallelConstrRankAndCrowding())
core_moea4 = NSGA2(pop_size=300, survival=MyConstrRankAndCrowding2())

In [5]:
import time


def performance_timer(func):
    """
    A decorator to measure and print the execution time of a function.
    """

    def wrapper(*args, **kwargs):
        start_time = time.time()  # Capture the start time
        result = func(*args, **kwargs)  # Execute the function
        end_time = time.time()  # Capture the end time
        duration = end_time - start_time  # Calculate duration
        # print(f"{func.__name__} executed in {duration:.6f} seconds")
        return result

    return wrapper


@performance_timer
def minimize_(*args, **kwargs):
    return minimize(*args, **kwargs)

In [6]:
from pymoo.indicators.gd import GD
from pymoo.indicators.gd_plus import GDPlus
from pymoo.indicators.igd import IGD
from pymoo.indicators.igd_plus import IGDPlus
from pymoo.indicators.hv import HV

metric_classes = {
    "Generational Distance (GD)": GD,
    "Generational Distance Plus (GD+)": GDPlus,
    "Inverted Generational Distance (IGD)": IGD,
    "Inverted Generational Distance Plus (IGD+)": IGDPlus,
    # "Hypervolume": HV,
}


def performance_indicators(pf, results):
    # Pre-configuration for each metric, handle special cases like HV here.
    pf_calculators = [
        (
            metric_class(ref_point=np.array([0, 0]))
            if metric_name == "Hypervolume"
            else metric_class(pf)
        )
        for metric_name, metric_class in metric_classes.items()
    ]

    def cal_acc_metric(sols):
        results_matrix = np.empty(
            (len(sols), len(metric_classes)), dtype=object)

        for idx, sol in enumerate(sols):
            # Ensure `sol` is a 2D array with the correct shape [n_samples, n_features]
            sol = np.atleast_2d(sol)
            if sol.shape[0] == 1 and sol.shape[1] != len(pf):
                # This implies sol is a single sample with multiple features, which is the correct format
                pass
            elif sol.shape[1] == 1 and sol.shape[0] != len(pf):
                # This implies sol has the wrong orientation; it's many samples of a single feature
                sol = sol.T

            for metric_idx, calculator in enumerate(pf_calculators):
                try:
                    results_matrix[idx, metric_idx] = calculator(sol)
                except Exception as e:
                    # print(
                    #     f"Error calculating {list(metric_classes.keys())[metric_idx]}: {e}")
                    results_matrix[idx, metric_idx] = np.nan

        return [
            (np.mean(results_matrix[:, i]), np.median(results_matrix[:, i]))
            for i in range(len(pf_calculators))
        ]

    return [cal_acc_metric(rs) for rs in results]

In [7]:
from constraints.as_obj import CVRAsObjective, CDFAsObjective, CVRAsObjective2, CDFAsObjective2, CDFAsObjective3


@performance_timer
def compare_on(problem, n_gen=300):
    # Configuration for each optimization strategy
    configurations = [
        # (problem, core_moea1, ("n_gen", n_gen), {"seed": 1, "verbose": False}),
        # (ConstraintsAsObjective(problem),
        #  core_moea1, ("n_gen", n_gen), {"seed": 1}),
        # (CVRAsObjective(problem),
        #  core_moea1, ("n_gen", n_gen), {"seed": 1}),
        # (CDFAsObjective(problem),
        #  core_moea1, ("n_gen", n_gen), {"seed": 1}),
        # (
        #     CVRAsObjective2(problem),
        #     core_moea1,
        #     ("n_gen", n_gen),
        #     {"seed": 1, "verbose": False},
        # ),
        (
            CDFAsObjective2(problem, alpha=0.001),
            core_moea1,
            ("n_gen", n_gen),
            {"seed": 1, "verbose": False},
        ),
        (
            CDFAsObjective2(problem, alpha=0.005),
            core_moea1,
            ("n_gen", n_gen),
            {"seed": 1, "verbose": False},
        ),
        (
            CDFAsObjective3(problem, alpha=0.001),
            core_moea1,
            ("n_gen", n_gen),
            {"seed": 1, "verbose": False},
        ),
        (
            CDFAsObjective3(problem, alpha=0.005),
            core_moea1,
            ("n_gen", n_gen),
            {"seed": 1, "verbose": False},
        ),
        # (ConstraintsAsObjective(problem),
        #  core_moea2, ("n_gen", n_gen), {"seed": 1}),
        # (
        #     CVRAsObjective(problem),
        #     core_moea2,
        #     ("n_gen", n_gen),
        #     {"seed": 1, "verbose": False},
        # ),
        # (
        #     CVRAsObjective2(problem),
        #     core_moea4,
        #     ("n_gen", n_gen),
        #     {"seed": 1, "verbose": False},
        # ),
        # (
        #     CDFAsObjective(problem),
        #     core_moea2,
        #     ("n_gen", n_gen),
        #     {"seed": 1, "verbose": False},
        # ),
        # (
        #     CDFAsObjective2(problem),
        #     core_moea4,
        #     ("n_gen", n_gen),
        #     {"seed": 1, "verbose": False},
        # ),
    ]

    results = []
    for config in configurations:
        result = minimize_(*config[:-1], **config[-1])
        results.append(result)
    return results


def test(problem_name, n_gen=300, *args):
    if "dascmop" in problem_name:
        problem = get_problem(problem_name, args[0])
    elif "modact" in problem_name:
        problem = get_problem(problem_name, args[0])
    else:
        problem = get_problem(problem_name)
    pf = problem.pareto_front()
    # print(
    #     f"Problem {problem.name}: n_var={problem.n_var} n_obj={problem.n_obj} n_ieq_constr={problem.n_ieq_constr} n_eq_constr={problem.n_eq_constr}"
    # )

    results = compare_on(problem, n_gen=n_gen)

    # Check if any of the results is None
    if any(result is None for result in results):
        print("One or more optimization runs failed to return a result.")
        # Return None in place of results to indicate failure
        return pf, [None] * len(results)

    # Prepare data for performance indicators, safely handling cases where result.F might be None or incorrectly shaped
    prepared_data = []
    for i, result in enumerate(results):
        if result and hasattr(result, "F") and result.F is not None:
            # i == 0 this NGSA without additional objective
            if i in [0, 1, 2]:
                # if i == 1 or i == 3:
                # if True:
                prepared_data.append(result.F[:, :-1])
            else:
                # prepared_data.append(result.F[:, :-1])
                prepared_data.append(result.F[:, :-2])
        else:
            # Use None or an appropriate placeholder if result is invalid
            print(f'algo {i} not converge')
            prepared_data.append(None)

    # Ensure all data is valid before calculating performance indicators
    if all(data is not None for data in prepared_data):
        performance_results = performance_indicators(pf, prepared_data)
        for i, metric in enumerate(metric_classes.keys()):
            pass
            # print(
            #     f"{metric}: " +
            #     " | ".join(f"{perf[i]}" for perf in performance_results)
            # )
    else:
        print("Performance indicators could not be calculated due to invalid data.")

    return pf, *results

# BNH


In [8]:
for i in range(1, 100, 10):
    # print(f"\n I = {i}")
    pf, *results = test("bnh", n_gen=i)
    print(len(results[0].F[results[0].F[:, -2] == 0]),
          len(results[1].F[results[1].F[:, -2] == 0]),
          len(results[2].F[results[2].F[:, -2] == 0]),
          len(results[3].F[results[3].F[:, -2] == 0]),
        #   len(results[4].F[results[4].F[:, -2] == 0])
          )

70 70 70 70
300 300 300 300
300 300 300 300
300 300 300 300
299 299 299 299
299 299 299 299
300 300 300 300
300 300 300 300


KeyboardInterrupt: 

In [None]:
for i in range(1, 100, 10):
    # print(f"\n I = {i}")
    pf, *results = test("carside", n_gen=i)
    print(len(results[0].F[results[0].F[:, -2] == 0]),
          len(results[1].F[results[1].F[:, -2] == 0]),
          len(results[2].F[results[2].F[:, -2] == 0]),
          len(results[3].F[results[3].F[:, -2] == 0]),
        #   len(results[4].F[results[4].F[:, -2] == 0])
          )

In [10]:
for i in range(1, 401, 100):
    # print(f"\n I = {i}")
    pf, *results = test("tnk", n_gen=i)
    print(len(results[0].F[results[0].F[:, -2] == 0]),
          len(results[1].F[results[1].F[:, -2] == 0]),
          len(results[2].F[results[2].F[:, -2] == 0]),
          len(results[3].F[results[3].F[:, -2] == 0]),
        #   len(results[4].F[results[4].F[:, -2] == 0])
          )

6 6 6 6
7 10 10 14
14 6 6 10
6 8 10 16


In [12]:
for i in range(1, 100, 10):
    # print(f"\n I = {i}")
    pf, *results = test("osy", n_gen=i)
    print(len(results[0].F[results[0].F[:, -2] == 0]),
          len(results[1].F[results[1].F[:, -2] == 0]),
          len(results[2].F[results[2].F[:, -2] == 0]),
          len(results[3].F[results[3].F[:, -2] == 0]),
        #   len(results[4].F[results[4].F[:, -2] == 0])
          )

1 1 1 1
5 5 4 6
4 4 2 8
2 5 5 5
2 4 6 5
2 3 5 5
2 3 4 4
4 6 3 4
4 2 5 3
3 2 4 4


In [None]:
for j in range(1, 10):
    print(f"\nDASCMOP{j}-1")
    for i in range(1, 350, 50):
        # print(f"\n I = {i}")
        print(len(results[0].F[results[0].F[:, -2] == 0]),
          len(results[1].F[results[1].F[:, -2] == 0]),
        #   len(results[2].F[results[2].F[:, -1] == 0]),
        #   len(results[0].F[results[3].F[:, -2] == 0]),
        #   len(results[11].F[results[4].F[:, -2] == 0])
          )

In [14]:
for j in range(1, 10):
    print(f"\nDASCMOP{j}-16")
    for i in range(1, 350, 50):
        # print(f"\n I = {i}")
        pf, *results = test(f"dascmop{j}", i, 16)
        print(len(results[0].F[results[0].F[:, -2] == 0]),
            len(results[1].F[results[1].F[:, -2] == 0]),
            len(results[2].F[results[2].F[:, -2] == 0]),
            len(results[3].F[results[3].F[:, -2] == 0]),
            #   len(results[4].F[results[4].F[:, -2] == 0])
        )


DASCMOP1-16
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0

DASCMOP2-16
0 0 0 0
2 2 2 2
2 2 2 3
2 2 2 2
2 2 2 2
2 2 2 2
3 2 2 2

DASCMOP3-16
0 0 0 0
0 0 1 0
0 1 1 1
1 1 1 1
1 1 1 1
1 1 1 2
1 1 1 2

DASCMOP4-16
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0

DASCMOP5-16
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
1 2 1 0

DASCMOP6-16
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0

DASCMOP7-16
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 2 0 0
0 2 0 0

DASCMOP8-16
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 0 0 0
0 3 0 0
0 2 0 0

DASCMOP9-16
0 0 0 0
2 2 2 2
2 2 2 2
2 2 2 2
2 2 2 2
2 2 2 2
2 2 2 2


In [None]:
for j in range(1, 17):
    print(f"\nLevel = {j}")
    for i in range(1, 360, 50):
        # print(f"\n I = {i}")
        pf, *results = test("dascmop1", i, j)
        print(len(results[0].F[results[0].F[:, -1] == 0]),
            len(results[1].F[results[1].F[:, -1] == 0]),
            len(results[2].F[results[2].F[:, -1] == 0]),
            len(results[3].F[results[3].F[:, -2] == 0]),
            len(results[4].F[results[4].F[:, -2] == 0])
            )

In [None]:
for i in range(1, 100, 10):
    # print(f"\n I = {i}")
    pf, *results = test("dascmop1", i, 16)
    print(len(results[0].F[results[0].F[:, -1] == 0]),
          len(results[1].F[results[1].F[:, -1] == 0]),
          len(results[2].F[results[2].F[:, -1] == 0]),
          len(results[3].F[results[3].F[:, -2] == 0]),
          len(results[4].F[results[4].F[:, -2] == 0])
          )

In [None]:
for i in range(1, 100, 10):
    # print(f"\n I = {i}")
    pf, *results = test("dascmop1", i, 1)
    print(len(results[0].F[results[0].F[:, -2] == 0]),
          len(results[1].F[results[1].F[:, -2] == 0]),
          len(results[2].F[results[2].F[:, -2] == 0]),
          len(results[3].F[results[3].F[:, -2] == 0]),
        #   len(results[4].F[results[4].F[:, -2] == 0])
          )

In [11]:
for i in range(1, 100, 10):
    # print(f"\n I = {i}")
    pf, *results = test("dascmop1", i, 16)
    print(len(results[0].F[results[0].F[:, -1] == 0]),
          len(results[1].F[results[1].F[:, -1] == 0]),
          len(results[2].F[results[2].F[:, -1] == 0]),
          len(results[3].F[results[3].F[:, -2] == 0]),
          len(results[4].F[results[4].F[:, -2] == 0])
          )

IndexError: list index out of range

In [None]:
plot = Scatter()
plot.add(pf, marker="*", color="orange", label="Pareto front")
plot.add(results[0].F[results[0].F[:, -1] == 0][:, :-1], marker="d",
         color="yellow", label="UMOP + Baseline")
# plot.add(results[2].F[:, :-1], marker="+",
#  color="green", label="UMOP + Baseline + Our CDP")
plot.add(
    results[1].F[results[1].F[:, -1] == 0][:, :-1], marker="1", color="blue", label="UMOP + Our generalized framework"
)
plot.add(results[2].F[results[2].F[:, -1] == 0][:, :-1], marker="2",
         color="purple", label="UMOP + Our framework + CDF")
plot.add(results[3].F[results[3].F[:, -1] == 0][:, :-2], marker="2",
         color="green", label="UMOP + Our framework + CDF")
plot.add(results[4].F[results[4].F[:, -1] == 0][:, :-2], marker="2",
         color="black", label="UMOP + Our framework + CDF")
plot.show()

In [None]:
results[4].F