In [162]:
import random
import numpy as np

from scipy.stats import ranksums
from pymoo.optimize import minimize
from pymoo.problems import get_problem
from pymoo.core.problem import Problem
from pymoo.core.population import Population
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.visualization.scatter import Scatter
from pymoo.util.ref_dirs import get_reference_directions

In [84]:
dtlz1 = get_problem("dtlz1", n_var=7, n_obj=3)
dtlz2 = get_problem("dtlz2", n_var=12, n_obj=3)
dtlz5 = get_problem("dtlz5", n_var=12, n_obj=3)
dtlz7 = get_problem("dtlz7", n_var=22, n_obj=3)

In [94]:
class IDTLZ1(Problem):
  def __init__(self, n_var=7, n_obj=3):
    # Initialize the standard DTLZ1 problem parameters
    super().__init__(n_var=n_var, n_obj=n_obj, xl=0.0, xu=1.0)
    self.k = n_var - n_obj + 1

  def _evaluate(self, X, out, *args, **kwargs):
    # Calculate g based on the DTLZ1 definition
    g = 100 * (self.k + np.sum((X[:, self.n_obj - 1:] - 0.5) ** 2 - np.cos(20 * np.pi * (X[:, self.n_obj - 1:] - 0.5)), axis=1))

    # Calculate the standard DTLZ1 objectives
    f = 0.5 * (1 + g)[:, None] * np.fliplr(
      np.cumprod(np.hstack([np.ones((X.shape[0], 1)), X[:, :self.n_obj - 1]]), axis=1)
    )
    f[:, 1:] *= (1 - X[:, :self.n_obj - 1][:, ::-1])

    # Apply the inversion transformation: 0.5 * (1 + g) - f
    f = 0.5 * (1 + g)[:, None] - f

    # Set the inverted objectives in the output
    out["F"] = f

In [119]:
class IDTLZ2(Problem):
  def __init__(self, n_var=12, n_obj=3):
    super().__init__(n_var=n_var, n_obj=n_obj, xl=0.0, xu=1.0)
    self.k = n_var - n_obj + 1
    self.n_obj = n_obj  # Ensure the correct number of objectives is set

  def _evaluate(self, X, out, *args, **kwargs):
    M = self.n_obj

    # Calculate g for DTLZ2
    g = np.sum((X[:, M - 1:] - 0.5) ** 2, axis=1)

    # Calculate theta based on DTLZ2's requirements
    theta = np.pi / 2 * X[:, :M - 1]

    # Calculate the objectives with correct shape
    f = (1 + g)[:, None] * np.fliplr(np.cumprod(np.cos(theta), axis=1))

    # Ensure we generate M objectives by initializing the array correctly
    f = np.ones((X.shape[0], M)) * (1 + g)[:, None]  # initialize with the shape (N, M)
    f[:, :-1] = f[:, :-1] * np.cos(theta)
    f[:, 1:] *= np.sin(theta[:, ::-1])

    # Apply the inversion transformation
    f = (1 + g)[:, None] - f

    # Set the inverted objectives in the output
    out["F"] = f

In [129]:
class IDTLZ5(Problem):
  def __init__(self, n_var=12, n_obj=3):
    super().__init__(n_var=n_var, n_obj=n_obj, xl=0.0, xu=1.0)
    self.k = n_var - n_obj + 1
    self.n_obj = n_obj  # Ensure the correct number of objectives is set

  def _evaluate(self, X, out, *args, **kwargs):
    M = self.n_obj

    # Calculate g for DTLZ5
    g = np.sum((X[:, M - 1:] - 0.5) ** 2, axis=1)

    # Calculate theta based on DTLZ5's requirements
    theta = np.pi / (4 * (1 + g)[:, None]) * (1 + 2 * g[:, None] * X[:, :M - 1])
    theta[:, 0] = 0.5 * np.pi * X[:, 0]  # Special case for the first objective

    # Calculate the objectives
    f = (1 + g)[:, None] * np.ones((X.shape[0], M))  # Initialize with the correct shape
    f[:, :-1] *= np.cos(theta)
    f[:, 1:] *= np.sin(theta[:, ::-1])

    # Apply the inversion transformation
    f = (1 + g)[:, None] - f

    # Set the inverted objectives in the output
    out["F"] = f

In [133]:
class IDTLZ7(Problem):
  def __init__(self, n_var=12, n_obj=3):
    super().__init__(n_var=n_var, n_obj=n_obj, xl=0.0, xu=1.0)
    self.k = n_var - n_obj + 1
    self.n_obj = n_obj

  def _evaluate(self, X, out, *args, **kwargs):
    M = self.n_obj

    # Calculate g for DTLZ7
    g = 1 + (9 / self.k) * np.sum(X[:, M - 1:], axis=1)

    # Calculate the first m-1 objectives
    f = X[:, :M - 1]

    # Calculate the last objective h, based on the sum of the previous objectives
    h = M - np.sum(f / (1 + g)[:, None] * (1 + np.sin(3 * np.pi * f)), axis=1)
    f = np.column_stack([f, (1 + g) * h])

    # Apply the inversion transformation uniformly
    f = 1 - f

    # Set the inverted objectives in the output
    out["F"] = f


In [160]:
idtlz1 = IDTLZ1(n_var=7, n_obj=3)
idtlz2 = IDTLZ2(n_var=12, n_obj=3)
idtlz5 = IDTLZ5(n_var=12, n_obj=3)
idtlz7 = IDTLZ7(n_var=22, n_obj=3)

In [20]:
class NSDE2(NSGA2):
  def __init__(self, F=0.5, CR=0.9, **kwargs):
    super().__init__(**kwargs)
    self.F = F  # Scaling factor
    self.CR = CR  # Crossover probability

  def _infill(self):
    # Get the current population
    pop = self.pop
    n_pop = len(pop)
    n_var = pop.get('X').shape[1]

    # Prepare offspring population
    off_X = np.empty((n_pop, n_var))

    X = pop.get('X')

    for i in range(n_pop):
      x_i = X[i]

      # Select three unique individuals from the population
      idxs = np.arange(n_pop)
      idxs = np.delete(idxs, i)

      if len(idxs) < 3:
          raise ValueError("Population size is too small for DE mutation.")
      
      r1, r2, r3 = np.random.choice(idxs, 3, replace=False)
      x_r1 = X[r1]
      x_r2 = X[r2]
      x_r3 = X[r3]

      # Generate trial vector
      v = x_r1 + self.F * (x_r2 - x_r3)

      # Crossover
      u = np.copy(x_i)
      j_rand = np.random.randint(n_var)
      for j in range(n_var):
        if np.random.rand() <= self.CR or j == j_rand:
          u[j] = v[j]

      # Ensure variables are within bounds
      u = np.clip(u, self.problem.xl, self.problem.xu)
      off_X[i] = u

    # Create offspring population
    off = Population.new("X", off_X)

    # Evaluate offspring
    self.evaluator.eval(self.problem, off)

    return off

In [None]:
# Normalize objectives
def normalize_objectives(F):
    F_min = np.min(F, axis=0)
    F_max = np.max(F, axis=0)
    return (F - F_min) / (F_max - F_min + 1e-6)

# Calculate the ASF (Achievement Scalarizing Function)
def achievement_scalarizing_function(F_normalized, ref_dirs):
    asf_values = np.max(F_normalized[:, None, :] / ref_dirs[None, :, :], axis=2)
    r2_values = np.min(asf_values, axis=1)
    return np.mean(r2_values)

# Calculate Riesz s-energy for diversity measurement
def riesz_s_energy(F, s=1):
    n = F.shape[0]
    energy = 0.0
    for i in range(n):
        for j in range(i + 1, n):
            dist = np.linalg.norm(F[i] - F[j])
            if dist > 1e-6:
                energy += 1.0 / (dist ** s)
    return energy


In [164]:
def experiment(pop_size, problems, problem_names, F=0.5, CR=0.9, n_gens=500):
  nsga_r2_inds = []
  nsga_rieszs = []
  nsde_r2_inds = []
  nsde_rieszs = []

  nsga_problem_Fs = []
  nsde_problem_Fs = []

  for problem in problems:
    nsga_problem_r2_inds = []
    nsga_problem_rieszs = []
    nsde_problem_r2_inds = []
    nsde_problem_rieszs = []
    
    nsga_problem_F = []
    nsde_problem_F = []

    for i in range(30):
      # Initialize algorithms
      nsga_ii = NSGA2(
        pop_size=pop_size,
        eliminate_duplicates=True
      )

      nsde_ii = NSGA2(
        pop_size=pop_size,
        F=F,
        CR=CR,
        eliminate_duplicates=True
      )

      # Solve problems
      nsga_ii_res = minimize(problem, nsga_ii, ('n_gen', n_gens), seed=1, verbose=False)
      nsde_ii_res = minimize(problem, nsde_ii, ('n_gen', n_gens), seed=1, verbose=False)

      # Generate weight vectors
      ref_dirs = get_reference_directions("das-dennis", problem.n_obj, n_partitions=problem.n_var)

      # Normalize objectives
      nsga_normalized = normalize_objectives(nsga_ii_res.F)
      nsde_normalized = normalize_objectives(nsde_ii_res.F)
      
      # Discrete R2 indicator and Riesz s-energy
      nsga_r2_ind = achievement_scalarizing_function(nsga_normalized, ref_dirs)
      nsga_riesz = riesz_s_energy(nsga_normalized, s=1)
      nsde_r2_ind = achievement_scalarizing_function(nsde_normalized, ref_dirs)
      nsde_riesz = riesz_s_energy(nsde_normalized, s=1)

      nsga_problem_r2_inds.append(nsga_r2_ind)
      nsga_problem_rieszs.append(nsga_riesz)
      nsde_problem_r2_inds.append(nsde_r2_ind)
      nsde_problem_rieszs.append(nsde_riesz)  

      nsga_problem_F.append(nsga_ii_res.F)
      nsde_problem_F.append(nsde_ii_res.F)
    
    nsga_r2_inds.append(nsga_problem_r2_inds)
    nsga_rieszs.append(nsga_problem_rieszs)
    nsde_r2_inds.append(nsde_problem_r2_inds)
    nsde_rieszs.append(nsde_problem_rieszs)

    nsga_problem_Fs.append(nsga_problem_F)
    nsde_problem_Fs.append(nsde_problem_F)

  print(f'NSGA-II R2 indicators: {nsga_r2_inds}')
  print(f'NSGA-II Riesz S-energy: {nsga_rieszs}')
  print(f'NSDE-II R2 indicators: {nsde_r2_inds}')
  print(f'NSDE-II Riesz S-energy: {nsde_rieszs}')
  print()

  nsga_r2_means = [np.mean(problem_r2) for problem_r2 in nsga_r2_inds]
  nsga_r2_std = [np.std(problem_r2) for problem_r2 in nsga_r2_inds]
  nsga_riesz_means = [np.means(problem_riesz) for problem_riesz in nsga_rieszs]
  nsga_riesz_std = [np.std(problem_riesz) for problem_riesz in nsga_rieszs]
  nsde_r2_means = [np.mean(problem_r2) for problem_r2 in nsde_r2_inds]
  nsde_r2_std = [np.std(problem_r2) for problem_r2 in nsde_r2_inds]
  nsde_riesz_means = [np.means(problem_riesz) for problem_riesz in nsde_rieszs]
  nsde_riesz_std = [np.std(problem_riesz) for problem_riesz in nsde_rieszs]

  print(f'NSGA-II R2 indicator - Means: {nsga_r2_means} Standard deviations: {nsga_r2_std}')
  print(f'NSGA-II Riesz S-Energy - Means: {nsga_riesz_means} Standard deviations: {nsga_riesz_std}')
  print(f'NSDE-II R2 indicator - Means: {nsde_r2_means} Standard deviations: {nsde_r2_std}')
  print(f'NSGA-II Riesz S-energy - Means: {nsde_riesz_means} Standard deviations: {nsde_riesz_std}')

  # Wilcoxon test
  for problem_idx, (nsga_r2, nsde_r2, nsga_riesz, nsde_riesz) in enumerate(zip(nsga_r2_inds, nsde_r2_inds, nsga_rieszs, nsde_rieszs)):
    # R2 indicator Wilcoxon test
    _, r2_p_value = ranksums(nsga_r2, nsde_r2)
    better_r2 = "NSGA-II" if np.mean(nsga_r2) < np.mean(nsde_r2) else "NSDE-II"
    print(f"Problem {problem_names[problem_idx]} - R2 Indicator Wilcoxon p-value: {r2_p_value}")
    print(f"Better R2 Indicator: {better_r2} (mean comparison)")
    
    # Riesz S-energy Wilcoxon test
    _, riesz_p_value = ranksums(nsga_riesz, nsde_riesz)
    better_riesz = "NSGA-II" if np.mean(nsga_riesz) < np.mean(nsde_riesz) else "NSDE-II"
    print(f"Problem {problem_names[problem_idx]} - Riesz S-Energy Wilcoxon p-value: {riesz_p_value}")
    print(f"Better Riesz S-Energy: {better_riesz} (mean comparison)")

  # Plotting random problem
  for problem_idx, problem in enumerate(problems):
    problem_name = problem_names[problem_idx]

    nsga_ii_aprox = random.choice(nsga_problem_Fs[problem_idx])
    nsde_ii_aprox = random.choice(nsde_problem_Fs[problem_idx])

    pf = 0.5 - problem.pareto_front() if problem_name.startswit('Inverse') else problem.pareto_front()

    plot = Scatter(figsize=(16, 12), legend='True', title=f'NSGA-II Pareto Front estimation for {problem_name}')
    plot.add(pf, color="black", alpha=0.7, label='Pareto front')
    plot.add(nsga_ii_aprox, facecolor="none", edgecolor="red", label='NSGA-II approximated Pareto Front')
    plot.show()

    plot = Scatter(figsize=(16, 12), legend='True', title=f'NSDE-II Pareto Front estimation for {problem_name}')
    plot.add(pf, color="black", alpha=0.7, label='Pareto front')
    plot.add(nsde_ii_aprox, facecolor="none", edgecolor="blue", label='NSDE-II approximated Pareto Front')
    plot.show()

In [None]:
experiment(
  100,
  [dtlz1, dtlz2, dtlz5, dtlz7, idtlz1, idtlz2, idtlz5, idtlz7]
  ['DTLZ1', 'DTLZ2', 'DTLZ5', 'DTLZ7', 'Inverse DTLZ1', 'Inverse DTLZ2', 'Inverse DTLZ5', 'Inverse DTLZ7']
)