In [1]:
import warnings
warnings.filterwarnings('ignore')
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
# from tqdm.auto import tqdm
from tqdm import tqdm
import pandas as pd
import torch
from scipy import stats
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [2]:
MSSV = 21521413

# Objective Functions

In [3]:
def Sphere(pop):
    """Sphere Function"""
    return (pop**2).sum(-1)


def Griewank(pop):
    "Griewank Function"
    pop_size, d = pop.shape

    sum1 = (pop**2).sum(-1) / 4000

    sqrt_indices = (
        torch.sqrt(torch.arange(1, d + 1, device=pop.device))
        .unsqueeze(0)
        .expand((pop_size, d))
    )

    cos1 = torch.cos(pop / sqrt_indices).prod(-1)

    return sum1 - cos1 + 1


def Rosenbrock(pop):
    """Rosenbrock Function"""

    pop0 = pop[:, :-1]
    pop1 = pop[:, 1:]

    return (100 * ((pop1 - pop0**2) ** 2) + (pop0 - 1) ** 2).sum(-1)


def Michalewicz(pop, m=10):
    """Michalewicz Function"""
    pop_size, d = pop.shape

    indices = (
        torch.arange(1, d + 1, device=pop.device).unsqueeze(0).expand((pop_size, d))
    )

    sin1 = torch.sin(indices * (pop**2) / np.pi) ** (2 * m)

    return -(torch.sin(pop) * sin1).sum(-1)


def Ackley(pop, a=20, b=0.2, c=2 * np.pi):
    """Ackley Function"""
    pop_size, d = pop.shape

    exp_mean_cos = torch.exp(torch.cos(pop * c).sum(-1) / d)

    exp_sqrt_mean_square = torch.exp(torch.sqrt((pop**2).sum(-1) / d) * (-b))

    return exp_sqrt_mean_square * (-a) - exp_mean_cos + a + np.e


objectives = [
    {
        "name": "Sphere Function",
        "domain": [-5.12, 5.12],
        "g_optimum_d2": [0, 0],
        "func": Sphere,
    },
    {
        "name": "Griewank Function",
        "domain": [-600, 600],
        "g_optimum_d2": [0, 0],
        "func": Griewank,
    },
    {
        "name": "Rosenbrock Function",
        "domain": [-5, 10],
        "g_optimum_d2": [1, 1],
        "func": Rosenbrock,
    },
    {
        "name": "Michalewicz Function",
        "domain": [0, np.pi],
        "g_optimum_d2": [2.2, 1.57],
        "func": Michalewicz,
    },
    {
        "name": "Ackley Function",
        "domain": [-32.768, 32.768],
        "g_optimum_d2": [0, 0],
        "func": Ackley,
    },
]


# Differential Evolution ( DE )

In [4]:
def bounding_pop(pop, low, high):
    """
    Clamp the value of 'pop' between 'low' and 'high' (inclusive).
    """
    return pop.clamp(low, high)


def initialize_population(num_individuals, d, low=0, high=1, device="cpu"):
    pop = torch.distributions.uniform.Uniform(low, high).sample((num_individuals, d)).to(device)
    return pop


def create_mutant_pop(pop, F=0.1):

  pop_size, d = pop.shape

  # Sample three unique indices for each individual (excluding itself)
  random_indices = torch.multinomial(torch.ones((pop_size, pop_size - 1), device=pop.device), num_samples=3, replacement=False)

  # Adjust for boundary cases (indices equal to current individual's index)
  random_indices = torch.where(
    random_indices >= torch.arange(pop_size, device=pop.device).unsqueeze(1),
    random_indices + 1,
    random_indices
  )

  # Create mutant vectors
  V = pop[random_indices[:, 0]] + F * (pop[random_indices[:, 1]] - pop[random_indices[:, 2]])

  return V


def create_trial_pop(pop, Cr, F=0.1):
  """
  This function creates a trial population using Differential Evolution (DE) strategy.
  """

  pop_size, d = pop.shape

  # Randomly pick an index to ensure offspring differs from its parent
  Jrand = torch.randint(d, size=(pop_size,), device=pop.device)

  # Convert random indices to actual flattened indices for selection
  Jrand += torch.arange(pop_size, device=pop.device) * d

  # Create a mask using crossover rate (Cr) for element-wise selection
  condition = (torch.rand((pop_size, d), device=pop.device) <= Cr).flatten()

  # Ensure at least one element from mutant is selected (replacing corresponding element in condition)
  condition[Jrand] = True

  # Reshape the mask back to the population dimensions
  condition = condition.reshape((pop_size, d))

  # Create mutant population using the provided function
  mutant_pop = create_mutant_pop(pop, F)

  # Create trial population by selecting elements based on the crossover mask
  trial_pop = torch.where(condition, mutant_pop, pop)

  return trial_pop



def de_selection(pop, trial_pop, pop_fitness, trial_pop_fitness):
  # Identify individuals with improved fitness in the trial population
  improved_individuals = trial_pop_fitness < pop_fitness

  # Create the new population by selecting based on fitness improvement
  new_pop = torch.where(improved_individuals.unsqueeze(1), trial_pop, pop)

  # Create the new fitness values based on the selected population
  new_fitness = torch.where(improved_individuals, trial_pop_fitness, pop_fitness)

  return new_pop, new_fitness



def differential_evolution(
    objective,  # The objective function to be optimized
    num_individuals=16,  # Population size (N)
    dimensionality=2,  # Number of variables (d)
    crossover_rate=0.1,  # Crossover rate (Cr)
    scaling_factor=0.3,  # Scaling factor (F)
    max_evaluations=None,  # Maximum number of function evaluations
    seed=21521413,
    device="cpu",  # Device to use (CPU or GPU)
):
  """
  This function implements the Differential Evolution (DE) algorithm for optimization.
  """

  torch.manual_seed(seed)

  # Set maximum evaluations if not provided
  if max_evaluations is None:
      max_evaluations = dimensionality * 10000

  # Get lower and upper bounds from the objective function
  low, high = objective["domain"]

  # Initialize population and fitness values
  pop = initialize_population(num_individuals, dimensionality, low, high, device)
  pop_fitness = objective["func"](pop)
  num_evaluations = num_individuals

  # List to store generation history
  generation_log = []

  while num_evaluations < max_evaluations:
      # Identify the best individual in the current population
      best_idv = pop_fitness.argmin()

      # Log generation data (generation number, best fitness, best solution)
      generation_log.append([
          num_evaluations,
          pop_fitness[best_idv].cpu().item(),
          pop[best_idv].cpu().tolist(),
      ])

      # Create trial population using crossover and mutation
      trial_pop = create_trial_pop(pop, crossover_rate, scaling_factor)

      # Clip trial population values to stay within bounds
      trial_pop = bounding_pop(trial_pop, low, high)

      # Evaluate the trial population
      trial_pop_fitness = objective["func"](trial_pop)
      num_evaluations += num_individuals

      # Perform selection based on fitness
      pop, pop_fitness = de_selection(pop, trial_pop, pop_fitness, trial_pop_fitness)

  # Identify the best individual in the final population
  best_idv = pop_fitness.argmax()

  # Log final generation data
  generation_log.append([
      num_evaluations,
      pop_fitness[best_idv].cpu().item(),
      pop[best_idv].cpu().tolist(),
  ])

  return generation_log

In [5]:
differential_evolution(objective=objectives[3], scaling_factor=0.2)

[[16, -0.7953729629516602, [2.22196364402771, 0.41545283794403076]],
 [32, -1.4555213451385498, [2.2675139904022217, 1.6580137014389038]],
 [48, -1.4555213451385498, [2.2675139904022217, 1.6580137014389038]],
 [64, -1.4555213451385498, [2.2675139904022217, 1.6580137014389038]],
 [80, -1.4555213451385498, [2.2675139904022217, 1.6580137014389038]],
 [96, -1.4555213451385498, [2.2675139904022217, 1.6580137014389038]],
 [112, -1.4934601783752441, [2.07273006439209, 1.5233427286148071]],
 [128, -1.7399693727493286, [2.209916830062866, 1.6095693111419678]],
 [144, -1.7979357242584229, [2.1885221004486084, 1.571691632270813]],
 [160, -1.7979357242584229, [2.1885221004486084, 1.571691632270813]],
 [176, -1.7979357242584229, [2.1885221004486084, 1.571691632270813]],
 [192, -1.7979357242584229, [2.1885221004486084, 1.571691632270813]],
 [208, -1.801252841949463, [2.2018587589263916, 1.571691632270813]],
 [224, -1.801252841949463, [2.2018587589263916, 1.571691632270813]],
 [240, -1.80125284194946

# Cross-Entropy Method ( CEM )

In [6]:
def sample_search_points(mean, covariance, num_samples):
  """
  This function samples points from a multivariate normal distribution.
  """

  # Create a multivariate normal distribution sampler
  sampler = torch.distributions.multivariate_normal.MultivariateNormal(mean, covariance)

  # Sample points from the distribution
  return sampler.sample((num_samples,))

def calc_cov(samples, u, e):
    # Wi = 1/Ne
    d = u.shape[0]

    u = u.unsqueeze(0).expand_as(samples)
    diff = samples - u
    return torch.einsum("ki,kj->ij", diff, diff) + torch.eye(d, device=samples.device) * e

def calculate_covariance(samples, mean, epsilon):
  """
  This function calculates the covariance matrix based on a set of samples and a global mean.
  """

  # Dimensionality of the samples
  dimensionality = mean.shape[0]

  # Expand the mean vector to match the dimensions of the samples
  mean = mean.unsqueeze(0).expand_as(samples)

  # Calculate centered differences
  centered_diff = samples - mean

  # Calculate covariance using Einstein summation
  covariance = torch.einsum("ki,kj->ij", centered_diff, centered_diff)

  # Add a small value to the diagonal for stability
  identity_matrix = torch.eye(dimensionality, device=samples.device)
  covariance += identity_matrix * epsilon

  return covariance

def CEM(
    objective,  # The objective function to be optimized
    initial_stddev=1.0,  # Initial standard deviation (o_init)
    mean=None,  # Initial mean vector (u)
    epsilon=0.001,  # Small value for numerical stability
    population_size=16,  # Number of samples to draw (N)
    elite_count=4,  # Number of top samples to use for update (Ne)
    dimensionality=2,  # Number of variables in the problem (d)
    max_evaluations=None,  # Maximum number of function evaluations
    seed=21521413,  # Random seed for reproducibility
    device="cpu",  # Device to use for computations (CPU or GPU)
):
  """
  This function implements the Covariance Matrix Adapted Evolution Method (CEM) for optimization.
  """

  torch.manual_seed(seed)

  # Set maximum evaluations if not provided
  if max_evaluations is None:
      max_evaluations = dimensionality * 10000

  # Get lower and upper bounds from the objective function
  low, high = objective["domain"]

  # Initialize covariance matrix and mean (if not provided)
  covariance = torch.eye(dimensionality, device=device) * initial_stddev
  if mean is None:
      mean = torch.rand(dimensionality, device=device)
  else:
      mean = mean.to(device)

  num_evaluations = 0
  generation_log = []

  while num_evaluations < max_evaluations:

      # Sample search points from the multivariate normal distribution
      samples = sample_search_points(mean, covariance, population_size)

      # Fix samples outside the domain bounds
      samples = bounding_pop(samples, low, high)
      # samples = prepair_pop(samples, low, high)  # Uncomment if needed

      # Evaluate the sampled points
      samples_fitness = objective["func"](samples)
      num_evaluations += len(samples)

      # Identify top Ne samples based on fitness
      top_elite_indices = torch.topk(samples_fitness, k=elite_count, largest=False).indices

      # Extract the top Ne elite samples
      elite_samples = samples[top_elite_indices]

      # Update covariance matrix based on the elite samples
      covariance = calculate_covariance(elite_samples, mean, epsilon)

      # Update mean vector as the average of the elite samples
      mean = elite_samples.mean(0)

      # Identify the best individual in the current generation
      best_idv = top_elite_indices[0]

      # Log generation data (generation number, best fitness, best solution)
      generation_log.append([
          num_evaluations,
          samples_fitness[best_idv].cpu().item(),
          samples[best_idv].cpu().tolist(),
      ])

  return generation_log


In [7]:
CEM(objective=objectives[2])[-1]

[20000, 0.11683531105518341, [1.300178050994873, 1.706811785697937]]

# Experimental Process

In [8]:
N = [16, 32, 64, 128, 256]
dimensions = [2, 10]


In [11]:
results = {}
for objective in objectives:
  for d in dimensions:
    for n in N:
      for i in range(10):
        de = differential_evolution(objective=objective, num_individuals=n, dimensionality=d, seed=MSSV + i, device="cuda")
        cem = CEM(objective=objective, population_size=n, dimensionality=d, seed=MSSV+i, device="cuda")
        key = (objective["name"], d, n, MSSV+i)
        results[key] = {
            'DE': de,
            'CEM': cem,
        }

In [12]:
np.save("results.npy", results)

In [24]:
results[('Michalewicz Function', 2, 256, 21521413)]["CEM"][-1]

[20224, -1.801183819770813, [2.202080011367798, 1.5691542625427246]]

In [26]:
DElogfiles = {}
CEMlogfiles = {}
for objective in objectives:
  for d in dimensions:
    for n in N:
      for i in range(10):
        key = (objective["name"], d, n, MSSV+i)
        DElogfiles[key] = results[key]['DE'][-1]
        CEMlogfiles[key] = results[key]['CEM'][-1]


In [27]:
np.save("DELogFiles.npy", DElogfiles)
np.save("CEMLogfiles.npy", CEMlogfiles)

In [28]:
DElogfiles

{('Sphere Function', 2, 16, 21521413): [20000,
  0.0,
  [-2.617061041427906e-23, -2.1471347069215766e-23]],
 ('Sphere Function', 2, 16, 21521414): [20000,
  0.0,
  [-1.710580501921898e-23, 2.195368344793867e-23]],
 ('Sphere Function', 2, 16, 21521415): [20000,
  0.0,
  [1.88067279703948e-24, -5.5659430120016416e-24]],
 ('Sphere Function', 2, 16, 21521416): [20000,
  0.0,
  [-1.776191483358759e-23, 1.9993790083353275e-23]],
 ('Sphere Function', 2, 16, 21521417): [20000,
  0.0,
  [5.754124992081209e-24, 1.324283836332945e-23]],
 ('Sphere Function', 2, 16, 21521418): [20000,
  0.0,
  [-6.944688858597959e-24, -8.969934616015483e-24]],
 ('Sphere Function', 2, 16, 21521419): [20000,
  0.0,
  [-1.7985205068254018e-23, 1.7575848578455826e-24]],
 ('Sphere Function', 2, 16, 21521420): [20000,
  0.0,
  [-1.6092445344424742e-23, 8.273289184032092e-24]],
 ('Sphere Function', 2, 16, 21521421): [20000,
  0.0,
  [-2.436560988843381e-23, 2.2533149114477817e-24]],
 ('Sphere Function', 2, 16, 21521422): 