In [21]:
#  Imports 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
from scipy.optimize import differential_evolution
from scipy.optimize import minimize, OptimizeResult  
from collections import deque   

# Model
def rastrigin(x):
    """Rastrigin function for optimization benchmark."""
    A = 10
    return A * len(x) + sum([(xi ** 2 - A * np.cos(2 * np.pi * xi)) for xi in x])

#  Evaluation Tracker 
class EvaluationTracker:
    def __init__(self, objective_function, max_evaluations):
        self.objective_function = objective_function
        self.max_evaluations = max_evaluations
        self.evaluation_count = 0
        self.history = []

    def evaluate(self, position):
        
        cost = self.objective_function(position)
        self.history.append({
            "step": self.evaluation_count,
            "position": position.copy(),
            "cost": cost,
        })
        self.evaluation_count += 1
        return cost

    def reset(self):
        self.evaluation_count = 0
        self.history = []

def run_asa_with_tracker(tracker, bounds, initial_solution, 
                         initial_temp=1.0, cooling_rate=0.95, 
                         maxiter=None):
    """
    Wraps the new ASA method so that it uses the tracker to count evaluations.
    """
    if maxiter is None:
        # We'll use tracker.max_evaluations as the ASA max iterations
        maxiter = tracker.max_evaluations
    
    # 1) Wrap the tracker's evaluate method so ASA calls it instead of the raw objective.
    def tracked_objective(x):
        return tracker.evaluate(x)

    # 2) Time the ASA run
    start_time = time.time()
    
    # 3) Call the new ASA function
    result = ASA(
        objective_function=tracked_objective,
        x0=initial_solution,
        bounds=bounds,
        maxiter=maxiter,
        initial_temp=initial_temp,
        cooling_rate=cooling_rate
    )
    
    end_time = time.time()
    total_time = end_time - start_time

    # The result is a scipy OptimizeResult, so extract best solution & cost
    best_solution = result.x
    best_cost = result.fun
    
    # Return in the same format as your other methods:
    return best_solution, best_cost, total_time

#  PSO 
class Particle:
    def __init__(self, bounds, dim):
        self.position = np.random.uniform(bounds[:, 0], bounds[:, 1], dim)
        self.velocity = np.random.uniform(-1, 1, dim)
        self.best_position = np.copy(self.position)
        self.best_cost = float('inf')

    def update_velocity(self, global_best_position, inertia, cognitive, social, bounds):
        r1, r2 = np.random.rand(), np.random.rand()
        cognitive_component = cognitive * r1 * (self.best_position - self.position)
        social_component = social * r2 * (global_best_position - self.position)
        self.velocity = inertia * self.velocity + cognitive_component + social_component
        self.velocity = np.clip(self.velocity, -np.abs(bounds[:, 1] - bounds[:, 0]), np.abs(bounds[:, 1] - bounds[:, 0]))

    def update_position(self, bounds):
        self.position += self.velocity
        self.position = np.clip(self.position, bounds[:, 0], bounds[:, 1])

def run_pso_with_tracker(tracker, bounds, dim, swarm_size=50, inertia=0.5, cognitive=1.5, social=1.5):
    bounds = np.array(bounds)
    particles = [Particle(bounds, dim) for _ in range(swarm_size)]
    global_best_position = None
    global_best_cost = float('inf')

    start_time = time.time()
    while tracker.evaluation_count < tracker.max_evaluations:
        for particle in particles:
            cost = tracker.evaluate(particle.position)
            if cost < particle.best_cost:
                particle.best_cost = cost
                particle.best_position = np.copy(particle.position)
            if cost < global_best_cost:
                global_best_cost = cost
                global_best_position = np.copy(particle.position)

        for particle in particles:
            particle.update_velocity(global_best_position, inertia, cognitive, social, bounds)
            particle.update_position(bounds)
    total_time = time.time() - start_time

    return global_best_position, global_best_cost, total_time

# Differential Evolution 
def run_de_with_tracker(tracker, bounds):
    def objective_wrapper(x):
        return tracker.evaluate(x)

    start_time = time.time()
    result = differential_evolution(objective_wrapper, bounds, popsize=int(10), maxiter=int(tracker.max_evaluations/10), strategy='best1bin', disp=False)
    total_time = time.time() - start_time

    return result.x, result.fun, total_time

#  Benchmarking
def run_algorithm(tracker, method, bounds, dim, initial_solution=None, **kwargs):
    if method == "ASA":
        return run_asa_with_tracker(tracker, bounds, initial_solution, **kwargs)
    elif method == "PSO":
        return run_pso_with_tracker(tracker, bounds, dim, **kwargs)
    elif method == "DE":
        return run_de_with_tracker(tracker, bounds)
    else:
        raise ValueError(f"Unknown method: {method}")

def benchmark_methods(tracker, methods, bounds, dim, initial_solution, max_evaluations, **kwargs):
    results = []
    histories = {}
    for method in methods:
        print(f"Running {method}...")
        tracker.reset()
        result = run_algorithm(tracker, method, bounds, dim, initial_solution, **kwargs)
        results.append({
            "Algorithm": method,
            "Best Cost": result[1],
            "Time (s)": result[2]
        })
        histories[method] = tracker.history
    return results, histories

def visualize_paths(histories, methods):
    plt.figure(figsize=(10, 6))
    for method in methods:
        history = histories[method]
        costs = [h["cost"] for h in history]
        plt.plot(range(len(costs)), costs, label=method)
    plt.xlabel("Evaluation Step")
    plt.ylabel("Cost")
    plt.title("Cost Evolution by Optimization Algorithm")
    plt.legend()
    plt.grid()
    plt.show()

if __name__ == "__main__":
    # Configuration
    dim = 10
    bounds = [(-5.12, 5.12) for _ in range(dim)]
    max_evaluations = 2500
    methods = ["ASA", "PSO"]

    tracker = EvaluationTracker(rastrigin, max_evaluations)
    initial_solution = np.random.uniform(*zip(*bounds), dim)

    # Run benchmarks
    results, histories = benchmark_methods(tracker, methods, bounds, dim, initial_solution, max_evaluations)
    results_df = pd.DataFrame(results)
    print("Benchmark Results:")
    print(results_df)

    # Visualize results
    visualize_paths(histories, methods)


Running ASA...


  log_lb, log_ub = np.log10(lower_bounds), np.log10(upper_bounds)


ValueError: Bounds must be finite numbers

In [20]:
def vfsa_accprob(curr_cost, new_cost, temp_acc):
    exponent = np.clip((new_cost - curr_cost) / temp_acc, -500, 500)
    return 1 / (1 + np.exp(exponent))

def vfsa_gen_step(dim, log_lb, log_ub, temp_gen, rng=None):
    if rng is None: 
        rng = np.random.default_rng()

    uni = rng.random(dim)
    base = 1 + 1 / (temp_gen + 1e-10)  # Small value added to avoid division by zero
    exponent = 2 * uni - 1
    rnd = np.sign(uni - 0.5) * temp_gen * (base**np.abs(exponent) - 1)
    return (log_ub - log_lb) * rnd

def vfsa_gen_params(curr_params, dim, log_lb, log_ub, temp_gen, rng=None):
    if rng is None: 
        rng = np.random.default_rng()

    log_params = np.log10(curr_params)
    flag1 = True

    while flag1:
        # Generate a log step
        log_step = vfsa_gen_step(dim, log_lb, log_ub, temp_gen, rng)
        new_log_params = log_params + log_step

        # Check if all new parameters are within bounds
        if np.all(new_log_params >= log_lb) and np.all(new_log_params <= log_ub):
            par = 10 ** new_log_params
            flag1 = False
        else:
            # If any parameter is out of bounds, handle them individually
            for i in range(dim):
                if new_log_params[i] < log_lb[i] or new_log_params[i] > log_ub[i]:
                    flag2 = True
                    while flag2:
                        log_step = vfsa_gen_step(dim, log_lb, log_ub, temp_gen, rng)
                        new_log_params[i] = log_params[i] + log_step[i]
                        if log_lb[i] <= new_log_params[i] <= log_ub[i]:
                            flag2 = False
            par = 10 ** new_log_params
            flag1 = False

    return par
        
def vfsa_generinitpoint(dim, log_lb, log_ub, rng=None):
    if rng is None:
        rng = np.random.default_rng()
    
    if not (np.all(np.isfinite(log_lb)) and np.all(np.isfinite(log_ub))):
        raise ValueError("Bounds must be finite numbers")
    if np.any(log_lb > log_ub):
        raise ValueError("Lower bounds must be <= upper bounds")
    
    flag = True
    while flag:
        uni = rng.random(dim)
        log_initpoints = log_lb + (log_ub - log_lb) * uni
        if np.all(log_initpoints >= log_lb) and np.all(log_initpoints <= log_ub):
            flag = False
    return 10 ** log_initpoints

def vfsa_reannealing(best_cost, best_params, curr_cost, dim, x0, tmax, tscat, data, c, temp_gen, temp0_gen, objective_function):
    # Provided for completeness but not strictly used in the minimal ASA example
    log_orig_best_params = np.log10(best_params)
    log_par_delta = log_orig_best_params + 0.01 * log_orig_best_params
    par_delta = 10 ** log_par_delta
    
    cost_delta = np.array([
        objective_function(par_delta if i == j else best_params)
        for j in range(dim)
    ])
    
    par_diff = np.clip(par_delta - best_params, 1e-10, None)
    s = np.abs((cost_delta - best_cost) / par_diff) * (best_params / best_cost)
    smax = np.max(s)
    
    temp_gen = np.clip(temp_gen * (smax / np.clip(s, 1e-10, None)), 1e-10, None)
    
    k_gen = (-1/c * np.log(np.clip(temp_gen / temp0_gen, 1e-10, None))) ** dim
    k_gen = np.clip(k_gen, 0, None)
    
    temp0_acc = curr_cost
    temp_acc = best_cost
    k_acc = (-1/c * np.log(np.clip(temp_acc / temp0_acc, 1e-10, None))) ** dim
    
    return temp_gen, k_gen, temp0_acc, temp_acc, k_acc

def vfsa_temp(temp_gen0, c, k_gen, dim, min_temp=1e-10):
    exponent = -c * np.power(k_gen, 1/dim)
    temp = temp_gen0 * np.exp(exponent)
    return np.clip(temp, min_temp, None)

def simulated_annealing(objective_function, initial_solution, lower_bounds, upper_bounds, 
                        initial_temperature, cooling_rate, max_iterations, neighborhood_function, 
                        log_lb, log_ub, temp_gen, 
                        M=10, eps=0.0, min_temp=1e-10, verbose=False):
    
    current_solution = initial_solution
    current_cost = objective_function(current_solution)
    best_solution = current_solution
    best_cost = current_cost
    temperature = initial_temperature

    reanneal_cost_vec = [best_cost]
    diff = deque(maxlen=M)
    best_diff = deque(maxlen=M)

    for iteration in range(max_iterations):
        # Generate a new solution
        new_solution = neighborhood_function(current_solution, len(current_solution), 
                                             log_lb, log_ub, temp_gen)
        new_cost = objective_function(new_solution)
        delta_cost = new_cost - current_cost
        
        # Acceptance
        if delta_cost < 0 or np.random.rand() < np.exp(-delta_cost / temperature):
            current_solution = new_solution
            current_cost = new_cost
            if current_cost < best_cost:
                best_solution = current_solution
                best_cost = current_cost

            reanneal_cost_vec.append(best_cost)
            if len(reanneal_cost_vec) > 1:
                diff.append(abs(reanneal_cost_vec[-1] - reanneal_cost_vec[-2]))
                best_diff.append(abs(reanneal_cost_vec[-1] - best_cost))

                if (len(diff) == M 
                    and all(d <= eps for d in diff) 
                    and all(bd <= eps for bd in best_diff)):
                    print('ASA converged, terminating at iteration', iteration)
                    break
        
        temperature = max(min_temp, temperature * cooling_rate)
        
        if iteration % (max_iterations // 10) == 0:
            print(f"ASA iteration {iteration}, current cost={current_cost}, best cost={best_cost}")
        
    return best_solution, best_cost

def ASA(objective_function, x0, bounds, maxiter=100, initial_temp=1.0, cooling_rate=0.95, 
        neighborhood_function=vfsa_gen_params, init_function=vfsa_generinitpoint, 
        **kwargs):
    
    dim = len(x0)
    lower_bounds, upper_bounds = np.array(bounds).T
    log_lb, log_ub = np.log10(lower_bounds), np.log10(upper_bounds)

    # Generate initial solution on a log scale, then exponentiate back
    initial_solution = init_function(dim, log_lb, log_ub)
    
    # Run ASA optimization
    best_solution, best_cost = simulated_annealing(
        objective_function, 
        initial_solution, 
        lower_bounds, 
        upper_bounds,
        initial_temp, 
        cooling_rate, 
        maxiter, 
        neighborhood_function,
        log_lb, 
        log_ub, 
        initial_temp,  # passing `initial_temp` as `temp_gen` for simplicity
        verbose=False,
        **kwargs
    )
    print('ASA best cost:', best_cost)
    print('ASA best solution:', best_solution)

    # Optionally, do a local refinement with SciPy's minimize (L-BFGS-B)
    minimizer_kwargs = {
        'method': 'L-BFGS-B',
        'bounds': bounds,
        'options': {
            'disp': True,
            'maxiter': 250,
        }
    }
    local_result = minimize(objective_function, x0=best_solution, **minimizer_kwargs)
    
    final_solution = local_result.x
    final_cost = local_result.fun
    nfev = local_result.nfev
    success = local_result.success

    return OptimizeResult(x=final_solution, fun=final_cost, nfev=nfev, success=success)
