In [1]:
import numpy as np

In [None]:
class CMA_ES:
    def __init__(self, func, dim, pop_size=50, sigma=0.1, max_iter=1000, tol=1e-6):
        self.func = func
        self.dim = dim
        self.pop_size = pop_size
        self.sigma = sigma
        self.max_iter = max_iter
        self.tol = tol

        # 初始化种群
        self.mean = np.random.randn(dim)
        self.cov = np.eye(dim)
        self.inv_cov = np.linalg.inv(self.cov)
        self.best_solution = None
        self.best_fitness = float('inf')

    def _sample_population(self):
        """从多元正态分布中生成一个新的种群"""
        return np.random.multivariate_normal(self.mean, self.sigma**2 * self.cov, self.pop_size)

    def _update(self, population, fitness):
        """根据当前种群和适应度更新均值和协方差矩阵"""
        sorted_idx = np.argsort(fitness)
        population = population[sorted_idx]
        fitness = fitness[sorted_idx]

        # 选择最好的半数个体
        selected_pop = population[:self.pop_size // 2]

        # 更新均值
        new_mean = np.mean(selected_pop, axis=0)

        # 更新协方差矩阵
        z = (selected_pop - new_mean)
        new_cov = np.cov(z.T)

        return new_mean, new_cov, fitness[0]

    def optimize(self):
        """执行 CMA-ES 优化"""
        for iteration in range(self.max_iter):
            # 生成新的种群
            population = self._sample_population()
            fitness = np.array([self.func(ind) for ind in population])

            # 记录最优解
            best_idx = np.argmin(fitness)
            if fitness[best_idx] < self.best_fitness:
                self.best_fitness = fitness[best_idx]
                self.best_solution = population[best_idx]

            # 如果收敛则提前停止
            if self.best_fitness < self.tol:
                break

            # 更新均值和协方差矩阵
            self.mean, self.cov, _ = self._update(population, fitness)

        return self.best_solution, self.best_fitness


# 测试代码
def test_func(x):
    """目标函数（可以换成任何需要优化的函数）"""
    return np.sum(x**2)  # 简单的平方和

# 设置CMA-ES优化器
optimizer = CMA_ES(func=test_func, dim=10, pop_size=100, sigma=0.5, max_iter=1000, tol=1e-8)

# 执行优化
best_solution, best_fitness = optimizer.optimize()

print(f"最优解: {best_solution}")
print(f"最优适应度: {best_fitness}")


最优解: [ 0.7597218   0.81693852  0.51618524 -0.85598918 -1.34118632  0.25611217
 -0.10391063 -1.12190899  0.08417165  0.48249647]
最优适应度: 5.617469559206125


(5_w,10)-aCMA-ES (mu_w=3.2,w_1=45%) in dimension 10 (seed=813280, Thu Apr  3 15:57:25 2025)


{}

In [19]:
import nevergrad as ng
import numpy as np

# 目标函数（平方和）
def test_func(x):
    return np.sum(x**2)

# 创建优化器
optimizer = ng.optimizers.CMA(parametrization=10,budget=1000)  # 设置初始均值和步长，设置优化预算

# 使用 CMA-ES 进行优化
recommendation = optimizer.minimize(test_func)

# 输出最优解
print(f"最优解: {recommendation.value}")
print(f"最优适应度: {test_func(recommendation.value)}")


最优解: [-4.66946630e-05  6.64809494e-05  1.14007539e-04  1.88257915e-05
  1.17464916e-04  8.21061846e-05 -1.00001221e-04  1.06315998e-04
 -5.29837798e-05  7.74970416e-05]
最优适应度: 7.060807759310412e-08


In [45]:
import numpy as np
import cma
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
from sklearn.model_selection import train_test_split


def true_objective_function(x):
    """
    复杂目标函数，包含Rastrigin函数、高斯噪声和正弦变化。
    :param x: 输入参数，假设是一个N维的向量。
    :return: 目标函数值
    """
    # Rastrigin函数的标准形式
    A = 10
    n = len(x)
    rastrigin = A * n + np.sum(x**2 - A * np.cos(2 * np.pi * x))
    
    # 添加高斯噪声项
    # noise = np.random.normal(0, 0.1, 1)  # 均值为0，标准差为0.1的高斯噪声
    
    # # 添加正弦变化项，增加一些周期性变化
    # sine_variation = np.sin(0.5 * np.sum(x)) * 5  # 基于输入向量的和来生成周期性变化
    
    # 综合计算
    return rastrigin #+ noise + sine_variation

# 代理模型：高斯过程回归
class SurrogateModel:
    def __init__(self, kernel=None):
        if kernel is None:
            # kernel = C(1.0, (1e-4, 1e1)) * RBF(1.0, (1e-4, 1e1))
            kernel = C(1.0, (1e-4, 1e10)) * RBF(1.0, (1e-4, 1e1))  # 增大上限
        self.model = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10)

    def fit(self, X, y):
        self.model.fit(X, y)

    def predict(self, X):
        return self.model.predict(X, return_std=True)

    def update(self, X, y):
        self.fit(X, y)

# 代理模型辅助CMA-ES的目标函数评估
def surrogate_evaluation(x, surrogate_model, real_evaluation, X_train, y_train):
    # 首先使用代理模型进行评估
    pred, std = surrogate_model.predict(np.array([x]))
    
    # 如果代理模型的预测不确定性较大，则使用真实目标函数进行评估
    if std > 0.1:  # 可调阈值，决定何时使用真实目标函数
        true_val = real_evaluation(x)
        X_train = np.vstack([X_train, x])  # 添加新解
        y_train = np.append(y_train, true_val)  # 添加真实目标值
        surrogate_model.update(X_train, y_train)  # 更新代理模型
        return true_val
    else:
        return pred[0]  # 否则使用代理模型的预测值

# 使用CMA-ES进行优化的主函数
def optimize_with_cma_es(dimension=6, popsize=10,max_iter=10, max_evals=500):
    # CMA-ES的初始配置
    es = cma.CMAEvolutionStrategy(np.random.randn(dimension), 0.33, {'popsize': popsize,'maxiter': max_iter, 'maxfevals': max_evals})

    # 训练数据初始化
    X_train = np.random.uniform(-5, 5, (10, dimension))  # 初始训练数据（随机生成）
    y_train = np.array([true_objective_function(x) for x in X_train])  # 真实目标函数值

    # 创建代理模型
    surrogate_model = SurrogateModel()

    # 用初始数据训练代理模型
    surrogate_model.fit(X_train, y_train)

    # 开始优化
    while not es.stop():
        solutions = es.ask()  # 获取新的候选解
        # 对每个候选解进行代理模型评估或实际评估
        fitness_values = [surrogate_evaluation(x, surrogate_model, true_objective_function, X_train, y_train) for x in solutions]
        es.tell(solutions, fitness_values)  # 更新CMA-ES的种群
        es.logger.add()  # 记录优化数据
        print(f"Best fitness so far: {es.result.fbest}, Best solution: {es.result.xbest}")
        es.disp()  # 显示当前状态

    return es.result.xbest, es.result.fbest

# 运行优化
best_solution, best_fitness = optimize_with_cma_es()
print(f"Optimized solution: {best_solution}")
print(f"Best fitness value: {best_fitness}")


(5_w,10)-aCMA-ES (mu_w=3.2,w_1=45%) in dimension 6 (seed=793819, Thu Apr  3 17:06:47 2025)




KeyboardInterrupt: 