# Calibrate MGLU Pricer with CMA-ES

In [1]:
import numpy as np
# import pandas as pd

import torch 

import sys
sys.path.append('../')
sys.path.append('../../')

# 导入模型
from Networks.nn import NN_pricing_MGLU
from Pricing.rBergomi.rBergomi_utils import *
from calibrate_utils import *

# 评估使用 cpu
device = torch.device('cpu')

# 设置参数为 4 个
hyperparams = { 
    'input_dim': 4, 
    'hidden_dim': 64, 
    'hidden_nums': 4,
    'output_dim': 88
}

model = NN_pricing_MGLU(hyperparams=hyperparams).to(device=device, dtype=torch.float64)


model_state = torch.load( 
    r'../../Results/models/MGLU_rBergomi_flat_forward_variance.pth'
)
model.load_state_dict(model_state)

# 设置为 eval mode
model.eval()
model.to(device=device, dtype=torch.float64)

NN_pricing_MGLU(
  (layer_lst): ModuleList(
    (0): MGLUBlock(
      (mglu_projection): MGLU(in_features=4, out_features=64, bias=False)
    )
    (1-3): 3 x MGLUBlock(
      (mglu_projection): MGLU(in_features=64, out_features=64, bias=False)
      (norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
    )
  )
  (output_layer): Linear(in_features=64, out_features=88, bias=True)
)

In [2]:
# 数据集
import gzip
f = gzip.GzipFile(
    filename = r"../../Data/rBergomiTrainSet.txt.gz", 
    mode = "r"
)

data = np.load(f)
xx, yy = data[:, :4], data[:, 4:]

strikes=np.array([0.5,0.6,0.7,0.8,0.9,1.0,1.1,1.2,1.3,1.4,1.5 ])
maturities=np.array([0.1,0.3,0.6,0.9,1.2,1.5,1.8,2.0 ])

# xx: 参数
## 前 4 列代表网格所对应的参数
xx = data[:, :4]
print(f"参数形状：{xx.shape}")

# yy: 隐含波动率曲面 
# 后 88 列表示隐含波动率曲面 8 * 11 = 88
yy = data[:, 4:]
print(f"隐含波动率曲面形状：{yy.shape}")

# 参数
print(f"参数上界: {np.max(xx, axis=0)}")
print(f"参数下界: {np.min(xx, axis=0)}")


from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import sys
sys.path.append('../')

# from NN_Training.rBergomi_nn_pricer import x_transform, x_inv_transform, y_transform, y_inv_transform, params_scaler, params_inv_scaler


x_train, x_test, y_train, y_test = train_test_split(xx, yy, test_size=0.15, random_state=42)

scale_x, scale_y = StandardScaler(), StandardScaler()


# 工具函数——数据标准化
def x_transform(train_data, test_data): 
    return scale_x.fit_transform(train_data), scale_x.transform(test_data)

def x_inv_transform(x):
    return scale_x.inverse_transform(x)

def y_transform(train_data, test_data): 
    return scale_y.fit_transform(train_data), scale_y.transform(test_data)

def y_inv_transform(y):
    return scale_y.inverse_transform(y)


# 训练集的 Upper and Lower Bounds
upper_bound = np.array([0.16,4,-0.1,0.5])
lower_bound = np.array([0.01,0.3,-0.95,0.025])

def params_scaler(x): 
    return (x - (upper_bound+lower_bound) / 2 ) * 2 / (upper_bound-lower_bound)

def params_inv_scaler(x):
    return x * (upper_bound-lower_bound) / 2 + (upper_bound+lower_bound) / 2


x_train_transform = params_scaler(x_train)
x_test_transform = params_scaler(x_test)

y_train_transform, y_test_transform = y_transform(y_train, y_test)


train_data = (torch.from_numpy(x_train_transform).to(device=device),torch.from_numpy(y_train_transform).to(device=device))

test_data = (torch.from_numpy(x_test_transform).to(device=device),torch.from_numpy(y_test_transform).to(device=device))


print(f"训练集形状：{train_data[0].shape}")
print(f"测试集形状：{test_data[0].shape}")

参数形状：(40000, 4)
隐含波动率曲面形状：(40000, 88)
参数上界: [ 0.16      4.       -0.100011  0.499998]
参数下界: [ 0.0100133  0.300028  -0.949934   0.0250066]
训练集形状：torch.Size([34000, 4])
测试集形状：torch.Size([6000, 4])


## 优化

In [3]:
import cma
import numpy as np

def calibrate_with_cmaes(model, market_iv, device, bounds=None):
    """
    使用CMA-ES算法校准模型参数。
    bounds: 参数上下界，例如 [(0.01,0.5), (0.1,5.0), (-0.95,-0.05), (0.005,0.5)]
    """
    model.eval()
    market_iv_tensor = torch.tensor(market_iv, device=device, dtype=torch.float32).flatten()
    
    # 定义CMA-ES优化的目标函数（期望输入为numpy数组）
    def objective(params_np):
        params_torch = torch.tensor(params_np, dtype=torch.float32, device=device)
        with torch.no_grad():
            pred = model(params_torch.unsqueeze(0)).squeeze(0).flatten()
        loss = torch.nn.functional.mse_loss(pred, market_iv_tensor).item()
        return loss
    
    # 设置初始点、初始步长和边界
    x0 = np.array([0.1, 2.0, -0.7, 0.04])  # 合理初始猜测
    sigma0 = 0.5  # 初始搜索步长（建议覆盖约1/2的参数范围）
    
    # 配置CMA-ES选项
    opts = cma.CMAOptions()
    opts.set("bounds", bounds)  # 重要：设置参数边界防止物理无效值
    opts.set("maxfevals", 5000)  # 最大函数评估次数
    opts.set("ftarget", 1e-8)    # 目标损失值，达到可提前停止
    opts.set("verbose", -9)      # 控制输出详细程度
    
    # 运行CMA-ES优化
    es = cma.CMAEvolutionStrategy(x0, sigma0, opts)
    es.optimize(objective)
    
    # 获取结果
    result = es.result
    best_params = result.xbest
    best_loss = result.fbest
    
    print(f"CMA-ES校准完成。最佳损失: {best_loss:.6e}")
    print(f"最佳参数: {best_params}")
    print(f"函数评估次数: {result.evaluations}")
    
    return best_params, best_loss

# 定义参数边界（根据rBergomi模型合理范围）
param_bounds = [
    (0.05, 0.15),   # H: 粗糙指数
    (1.5, 3.0),     # eta: 波动率的波动率
    (-0.9, -0.3),   # rho: 相关性
    (0.02, 0.2)     # v0: 初始方差
]

# 对单个样本进行校准
best_params, best_loss = calibrate_with_cmaes(model, y_test_transform[0], device, param_bounds)

ValueError: bounds must be None, empty, or a list of length 2 where each element may be a scalar, list, array, or None; type(bounds) was: <class 'list'>