# Calibration with GRU Network

## 导入模型

`'../Data/Models/nn_gru_rBergomi.pth'`

In [8]:
import numpy as np
import pandas as pd

import torch 
import scipy 
import time

import sys
sys.path.append('../')

# 导入模型
from NN_Training.NN.nn import NN_pricing_GRU

# 评估使用 cpu
device = torch.device('cpu')

# 设置参数为 4 个
hyperparams = { 
    "input_dim": 4,
    'hidden_dim': 64, 
    'hidden_nums': 2, 
    'output_dim': 88, 
    'seq_len': 8, 
    'feature_per_step': 11
}

model = NN_pricing_GRU(hyperparams=hyperparams).to(device=device, dtype=torch.float64)


model_state = torch.load( 
    '../Data/Models/nn_gru_rBergomi.pth'
)
model.load_state_dict(model_state)

# 设置为 eval mode
model.eval()
model.to(device=device, dtype=torch.float64)

NN_pricing_GRU(
  (mlp_encoder): Sequential(
    (0): Sequential(
      (0): Linear(in_features=4, out_features=64, bias=True)
      (1): ELU(alpha=1.0)
    )
    (1): Sequential(
      (0): Linear(in_features=64, out_features=64, bias=True)
      (1): ELU(alpha=1.0)
    )
  )
  (gru): GRU(64, 64, num_layers=2, batch_first=True)
  (fc_out): Linear(in_features=64, out_features=11, bias=True)
)

## 数据集导入

In [9]:
# 数据集
import gzip
f = gzip.GzipFile(
    filename = r"../Data/rBergomiTrainSet.txt.gz", 
    mode = "r"
)

data = np.load(f)
xx, yy = data[:, :4], data[:, 4:]

strikes=np.array([0.5,0.6,0.7,0.8,0.9,1.0,1.1,1.2,1.3,1.4,1.5 ])
maturities=np.array([0.1,0.3,0.6,0.9,1.2,1.5,1.8,2.0 ])

# xx: 参数
## 前 4 列代表网格所对应的参数
xx = data[:, :4]
print(f"参数形状：{xx.shape}")

# yy: 隐含波动率曲面 
# 后 88 列表示隐含波动率曲面 8 * 11 = 88
yy = data[:, 4:]
print(f"隐含波动率曲面形状：{yy.shape}")

# 参数
print(f"参数上界: {np.max(xx, axis=0)}")
print(f"参数下界: {np.min(xx, axis=0)}")

参数形状：(40000, 4)
隐含波动率曲面形状：(40000, 88)
参数上界: [ 0.16      4.       -0.100011  0.499998]
参数下界: [ 0.0100133  0.300028  -0.949934   0.0250066]


## 数据集划分

In [10]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import sys
sys.path.append('../')

# from NN_Training.rBergomi_nn_pricer import x_transform, x_inv_transform, y_transform, y_inv_transform, params_scaler, params_inv_scaler


x_train, x_test, y_train, y_test = train_test_split(xx, yy, test_size=0.15, random_state=42)

scale_x, scale_y = StandardScaler(), StandardScaler()


# 工具函数——数据标准化
def x_transform(train_data, test_data): 
    return scale_x.fit_transform(train_data), scale_x.transform(test_data)

def x_inv_transform(x):
    return scale_x.inverse_transform(x)

def y_transform(train_data, test_data): 
    return scale_y.fit_transform(train_data), scale_y.transform(test_data)

def y_inv_transform(y):
    return scale_y.inverse_transform(y)


# 训练集的 Upper and Lower Bounds
upper_bound = np.array([0.16,4,-0.1,0.5])
lower_bound = np.array([0.01,0.3,-0.95,0.025])

def params_scaler(x): 
    return (x - (upper_bound+lower_bound) / 2 ) * 2 / (upper_bound-lower_bound)

def params_inv_scaler(x):
    return x * (upper_bound-lower_bound) / 2 + (upper_bound+lower_bound) / 2


x_train_transform = params_scaler(x_train)
x_test_transform = params_scaler(x_test)

y_train_transform, y_test_transform = y_transform(y_train, y_test)


train_data = (torch.from_numpy(x_train_transform).to(device=device),torch.from_numpy(y_train_transform).to(device=device))

test_data = (torch.from_numpy(x_test_transform).to(device=device),torch.from_numpy(y_test_transform).to(device=device))


print(f"训练集形状：{train_data[0].shape}")
print(f"测试集形状：{test_data[0].shape}")


训练集形状：torch.Size([34000, 4])
测试集形状：torch.Size([6000, 4])


In [11]:
import torch
import torch.nn as nn

loss_MSE = nn.MSELoss()

vol_model = y_inv_transform(model(test_data[0]).detach().numpy())
vol_real = y_test

error_real = np.abs(vol_model-vol_real)
error_relative = error_real/vol_real

# np.mean(error_relative)

print(f"均方误差：{np.mean(error_real)}")

均方误差：0.0011304029245992817


------

> **注意: 以下的优化都太慢了, 或许是基于 LSTM / GRU 的神经网络调起来非常慢.**

可能优化一个样本需要 0.2s 不到. 

再算上程序里其他必要的 `Python` 运算, 一轮得跑个把小时. 

基于 `SciPy` 的优化使用了 3 种方法: `L-BFGS-B`, `SLSQP`, `BFGS`. 直接跑了 4 小时. 

------

## L-BFGS (基于 `Torch.optim.LBFGS`)

### 反向优化

In [6]:
import torch.nn as nn

loss_MSE = nn.MSELoss()

vol_model = y_inv_transform(model(test_data[0]).detach().numpy())
vol_real = y_test

error_real = np.abs(vol_model-vol_real)
error_relative = error_real/vol_real

np.mean(error_relative)


# 使用 LBFGS 优化器
from optimization_utils import calibrate_with_torch_lbfgs

Approx_torch, Timing_torch = calibrate_with_torch_lbfgs(model, y_test_transform, device='cpu')
Approx_torch = params_inv_scaler(Approx_torch)


print(f"LBFGS 优化结果 (前 10 个): {Approx_torch[:10]}")
print(f"LBFGS 优化时间 (前 10 个): {Timing_torch[:10]}")

459/6000

KeyboardInterrupt: 

## 3 种拟牛顿法调优 (基于 `SciPy`)

### 反向优化

注意 `init` 需要初始化为 `np.zeros(4)` 因为 `scipy.optimize.minimize` 中 `x0` 必须传入 1 维数组

然而 神经网络接受 2 维数组 所以使用 `np.array([x])` 来把其形状变成 $1 \times 4$

In [6]:
# 损失函数
def CostFunc(x,sample_ind):
    return np.sum( 
        np.power( 
            (model(torch.from_numpy( np.array([x]) )).detach().numpy() - y_test_transform[sample_ind]), 
            2
        )
    )


import time
import scipy
Approx_scipy, Timing_scipy = [], [] 
solutions = np.zeros([3,4])
times = np.zeros(3)
# 注意 这个神经网络接受的是 2 维数组
init = np.zeros(4)

for i in range(x_test_transform.shape[0]):
    disp=str(i+1)+f"/{x_test_transform.shape[0]}"
    print (disp,end="\r")
    #L-BFGS-B
    start= time.time()
    I=scipy.optimize.minimize(CostFunc,x0=init,args=i, method='L-BFGS-B',tol=1E-10,options={"maxiter":10000})
    end= time.time()
    solutions[0,:]=params_inv_scaler(I.x)
    times[0]=end-start
    #SLSQP
    start= time.time()
    I=scipy.optimize.minimize(CostFunc,x0=init,args=i,method='SLSQP',tol=1E-10,options={"maxiter":10000})
    end= time.time()
    solutions[1,:]=params_inv_scaler(I.x)
    times[1]=end-start
    #BFGS
    start= time.time()
    I=scipy.optimize.minimize(CostFunc,x0=init,args=i,method='BFGS',tol=1E-10,options={"maxiter":10000})
    end= time.time()
    solutions[2,:]=params_inv_scaler(I.x)
    times[2]=end-start
    
    Approx_scipy.append(np.copy(solutions))
    Timing_scipy.append(np.copy(times))

Approx_scipy = np.array(Approx_scipy)


print(f"SciPy 优化结果 (前 3 轮):\n {Approx_scipy[:3]}")
print(f"SciPy 优化时间 (前 3 轮):\n {Timing_scipy[:3]}")

104/6000

KeyboardInterrupt: 

In [13]:
# 损失函数
def CostFunc(x,sample_ind):
    return np.sum( 
        np.power( 
            (model(torch.from_numpy( np.array([x]) )).detach().numpy() - y_test_transform[sample_ind]), 
            2
        )
    )


import time
import scipy
Approx_scipy, Timing_scipy = [], [] 
solutions = np.zeros([3,4])
times = np.zeros(3)
# 注意 这个神经网络接受的是 2 维数组
init = np.zeros(4)

for i in range(10):
    disp=str(i+1)+f"/{x_test_transform.shape[0]}"
    print (disp,end="\r")
    #L-BFGS-B
    start= time.time()
    I=scipy.optimize.minimize(CostFunc,x0=init,args=i, method='L-BFGS-B',tol=1E-10,options={"maxiter":10000})
    end= time.time()
    solutions[0,:]=params_inv_scaler(I.x)
    times[0]=end-start
    #SLSQP
    start= time.time()
    I=scipy.optimize.minimize(CostFunc,x0=init,args=i,method='SLSQP',tol=1E-10,options={"maxiter":10000})
    end= time.time()
    solutions[1,:]=params_inv_scaler(I.x)
    times[1]=end-start
    #BFGS
    start= time.time()
    I=scipy.optimize.minimize(CostFunc,x0=init,args=i,method='BFGS',tol=1E-10,options={"maxiter":10000})
    end= time.time()
    solutions[2,:]=params_inv_scaler(I.x)
    times[2]=end-start
    
    Approx_scipy.append(np.copy(solutions))
    Timing_scipy.append(np.copy(times))

Approx_scipy = np.array(Approx_scipy)


print(f"SciPy 优化结果 (前 3 轮):\n {Approx_scipy[:3]}")
print(f"SciPy 优化时间 (前 3 轮):\n {Timing_scipy[:3]}")

5/6000

KeyboardInterrupt: 

### 可视化