<a href="https://colab.research.google.com/github/HakureiPOI/Modeling_Allowance/blob/main/SimulatedAnnealing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from concurrent.futures import ThreadPoolExecutor

In [2]:
class SimulatedAnnealing():
    def __init__(self, obj_function, dim, bounds, max_iter=1000, initial_temp=100, cooling_rate=0.95, stop_temp=1e-3):
        """
        初始化高效模拟退火算法类。

        参数：
        - obj_function: 目标函数，需要最小化。
        - dim: 问题的维度。
        - bounds: 每个维度的边界，格式为 [(lower_bound1, upper_bound1), ...]。
        - max_iter: 最大迭代次数。
        - initial_temp: 初始温度。
        - cooling_rate: 温度衰减率。
        - stop_temp: 停止温度。
        """
        self.obj_function = obj_function
        self.dim = dim
        self.bounds = bounds
        self.max_iter = max_iter
        self.initial_temp = initial_temp
        self.cooling_rate = cooling_rate
        self.stop_temp = stop_temp

        # 初始化随机解
        self.current_solution = np.random.uniform(
            [b[0] for b in bounds], [b[1] for b in bounds]
        )
        self.best_solution = self.current_solution.copy()
        self.current_temp = initial_temp
        self.best_score = self.obj_function(self.current_solution)
        self.history = [self.best_score]

    def _generate_neighbor(self):
        """
        生成一个邻域解，通过在当前解附近动态调整扰动范围实现。
        """
        if np.random.rand() < 0.05:  # 5% 的概率进行全局搜索
            neighbor = np.random.uniform([b[0] for b in self.bounds], [b[1] for b in self.bounds])
        else:
            perturbation = np.random.uniform(-1, 1, self.dim) * self.current_temp / self.initial_temp
            neighbor = self.current_solution + perturbation
            neighbor = np.clip(neighbor, [b[0] for b in self.bounds], [b[1] for b in self.bounds])
        return neighbor

    def optimize(self):
        """
        执行模拟退火算法。
        """
        restart_limit = 5  # 重启次数限制
        restarts = 0

        for iteration in range(self.max_iter):
            if self.current_temp < self.stop_temp:
                if restarts < restart_limit:
                    # 重新随机初始化
                    self.current_solution = np.random.uniform(
                        [b[0] for b in self.bounds], [b[1] for b in self.bounds]
                    )
                    self.current_temp = self.initial_temp
                    restarts += 1
                    print(f"触发第 {restarts} 次重启。")
                else:
                    print(f"温度降至停止阈值，优化提前结束。")
                    break

            # 生成新解并计算目标函数值
            neighbor = self._generate_neighbor()
            current_score = self.obj_function(self.current_solution)
            neighbor_score = self.obj_function(neighbor)

            # 接受新解的概率计算
            delta_score = neighbor_score - current_score
            if delta_score < 0 or np.random.rand() < np.exp(-delta_score / self.current_temp):
                self.current_solution = neighbor

                # 更新最优解
                if neighbor_score < self.best_score:
                    self.best_solution = neighbor
                    self.best_score = neighbor_score

            # 降温（慢速冷却策略）
            self.current_temp = self.initial_temp / (1 + iteration)  # 非线性冷却策略
            self.history.append(self.best_score)

            print(f"第 {iteration + 1}/{self.max_iter} 代，当前最优适应度: {self.best_score}")

        return self.best_solution, self.best_score

    def plot_optimization_history(self):
        """
        可视化优化过程中的最优值变化。
        """
        sns.set(style="whitegrid")
        plt.figure(figsize=(10, 6))
        plt.plot(self.history, label="Best Fitness", color="b")
        plt.xlabel("Iteration")
        plt.ylabel("Best Fitness")
        plt.title("Optimization History")
        plt.legend()
        plt.show()


In [3]:
class ParallelSimulatedAnnealing():
    def __init__(self, num_starts, *args, **kwargs):
        """
        初始化并行模拟退火算法类。

        参数：
        - num_starts: 并行的起点数量。
        - args/kwargs: 传递给 SimulatedAnnealing 的参数。
        """
        self.num_starts = num_starts
        self.sa_instances = [SimulatedAnnealing(*args, **kwargs) for _ in range(num_starts)]
        self.best_solution = None
        self.best_score = float('inf')

    def optimize(self):
        """
        并行执行多个模拟退火实例。
        """
        def run_instance(sa):
            return sa.optimize()

        with ThreadPoolExecutor(max_workers=self.num_starts) as executor:
            results = list(executor.map(run_instance, self.sa_instances))

        for solution, score in results:
            if score < self.best_score:
                self.best_solution = solution
                self.best_score = score

        return self.best_solution, self.best_score

In [4]:
# 测试函数（Rosenbrock函数）
def rosenbrock(x):
    return sum((1 - x[:-1])**2 + 100 * (x[1:] - x[:-1]**2)**2)

In [5]:
dim = 10
bounds = [(-5, 5)] * dim

In [6]:
# 实例化模拟退火算法类
parallel_sa = ParallelSimulatedAnnealing(num_starts=4, obj_function=rosenbrock, dim=dim, bounds=bounds, max_iter=1000, initial_temp=100, cooling_rate=0.85, stop_temp=1e-3)

In [7]:
best_position, best_score = parallel_sa.optimize()

第 1/1000 代，当前最优适应度: 89846.58549642496
第 1/1000 代，当前最优适应度: 58804.28908000988
第 2/1000 代，当前最优适应度: 58804.28908000988
第 3/1000 代，当前最优适应度: 52038.3839056561
第 4/1000 代，当前最优适应度: 52038.3839056561
第 5/1000 代，当前最优适应度: 52038.3839056561
第 6/1000 代，当前最优适应度: 47229.9264677951
第 7/1000 代，当前最优适应度: 47229.9264677951
第 8/1000 代，当前最优适应度: 47229.9264677951
第 9/1000 代，当前最优适应度: 46422.43045453261
第 10/1000 代，当前最优适应度: 45159.32375450573
第 11/1000 代，当前最优适应度: 44632.94637238947
第 12/1000 代，当前最优适应度: 42175.6942549261
第 13/1000 代，当前最优适应度: 42175.6942549261
第 14/1000 代，当前最优适应度: 42175.6942549261
第 15/1000 代，当前最优适应度: 40702.92622718364
第 16/1000 代，当前最优适应度: 40702.92622718364
第 17/1000 代，当前最优适应度: 40702.92622718364
第 18/1000 代，当前最优适应度: 39922.933243096486
第 19/1000 代，当前最优适应度: 39371.80676637627
第 20/1000 代，当前最优适应度: 38957.03478271488
第 21/1000 代，当前最优适应度: 38957.03478271488
第 22/1000 代，当前最优适应度: 38502.18916965515
第 2/1000 代，当前最优适应度: 89846.58549642496
第 1/1000 代，当前最优适应度: 57491.30922329375
第 2/1000 代，当前最优适应度: 57491.30922329375
第 3/100

In [8]:
print(f"最优位置: {best_position}")
print(f"最优适应度: {best_score}")

最优位置: [ 1.16970827  1.80435825  0.3681351  -1.22385914 -0.7514677   0.95552525
  1.97994894  1.66974704  0.99581339  2.35236213]
最优适应度: 2695.9830398420213
