## Deep Hedging with Quadratic Variation Penalty
- Suppose that we are hedging a short position in a European Call option using cash and underlying stock
- The marked-to-market value of the hedge portfolio is given by
\begin{equation}
    V_t = C_t+H_t^SS_t,
\end{equation}
where $C_t$ is the stock price (negative for short position) and $H_t^S$ is the hedge ratio (number of shares to hold).
- This formulation of hedging tries to solve the following stochastic optimization problem,
\begin{equation}
\min_{\text{strategies}}\mathbb{E}\left[-V_T+\frac{\lambda}{2}\langle V,V\rangle_T\right],
\end{equation}
where $\langle V, V\rangle_T$ is the quadratic variation up to time $T$.
- Considering the instantanuous changes, the objective can be written as,
\begin{equation}
\min_{\text{strategies}}\mathbb{E}\left[-\int_0^TdV_t+\frac{\lambda}{2}\int_0^Td\langle V, V\rangle_t \right],
\end{equation}
where $dV_t = dC_t+H_t^SdS_t$.

Using Ito Calculus we have and assuming Black-Scholes $dS_t=\mu S_tdt+\sigma S_tdW_t$, the objective takes the form,
\begin{align}
\min_{\text{strategies}}\mathbb{E}\left[-\int_0^T\partial_tC_t+\partial_SC_t\mu S_t+\frac{1}{2}\partial_{SS}C_t\sigma^2S^2+\mu S_tH_t+\right.\\
\left[+\frac{\lambda}{2}\left((\partial_SC_t)^2\sigma^2S_t^2+H_t^2\sigma^2S_t^2+2H_t\partial_SC_t\sigma^2S_t^2\right)\right]&
\end{align}

- By treating the integrand as a formal quadratic expression $\Phi(H_t)$, the optimal strategy is dervied as,
\begin{equation}
  H_t^* = \Delta_t^{\text{BS}}-\frac{1}{\lambda}\frac{\mu}{ \sigma^2}\frac{1}{S_t}
\end{equation}
- For approximating the optimal startategy, we use the following discretized objective,

\begin{equation}
\min_{\text{strategies}}\left[\frac{1}{N}\sum_{i=1}^N-P\&L_i+\frac{\lambda}{2}P\&L_i^2\right],
\end{equation}
where
\begin{equation}
P\&L_i = V_{t_i}-V_{t_{i-1}} = (C_{t_i}-C_{t_{i-1}})+H_{t_{i-1}}(S_{t_i}-S_{t_{i-1}}).
\end{equation}

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import sys
sys.path.insert(0,'/content/drive/MyDrive/deepHedge_QV')

In [3]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
import seaborn as sns


In [4]:
import numpy as np
import scipy.stats as stats
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm


In [5]:

def calculate_stock_prices(S_0,mu,vol,W): #maybe use Euler for stock
    S = torch.zeros(size=(W.shape[0],W.shape[1]))
    t = torch.arange(W.shape[1])
    for i in range(S.shape[0]):
        S[i,:] = S_0*np.exp(vol*W[i,:]+(mu-vol**2 /2)*t)
    return S

def calculate_d1(S, K, ir, vol, tau):
    x = np.log(S/K) + (ir + vol**2 / 2) * tau
    return x / (vol * np.sqrt(tau))

def calculate_d2(S, K, ir, vol, tau):
    x = np.log(S/K) + (ir - vol**2 / 2) * tau
    return x / (vol * np.sqrt(tau))

def calculate_deltas(S, K, ir, vol, T):

    dt = T / (S.shape[1]-1)

    # Convert to torch tensors
    K = torch.full_like(S, K)
    ir = torch.full_like(S, ir)
    vol = torch.full_like(S, vol)

    T = torch.full_like(S, T)
    t = torch.zeros_like(S)
    for j in range(t.shape[1]): t[:, j] = j*dt
    tau = T - t
    deltas = torch.zeros_like(S)

    # Calculate prices before expiry
    d1 = calculate_d1(S[:, :-1], K[:, :-1], ir[:, :-1], vol[:, :-1], tau[:, :-1])
    deltas[:, :-1] = torch.from_numpy(stats.norm.cdf(d1))

    # Calculate prices at expiry
    deltas[:, -1] = torch.from_numpy(np.where(S[:, -1] > K[:, -1], 1.0, 0.0))

    return deltas

In [6]:
def calculate_option_prices(S, K, ir, vol, T):#Peyman has thoughts here

    dt = T / (S.shape[1]-1)

    # Convert to np.ndarray
    K = torch.full_like(S, K)
    ir = torch.full_like(S, ir)
    vol = torch.full_like(S, vol)

    T = torch.full_like(S, T)
    t = torch.zeros_like(S)
    for j in range(t.shape[1]): t[:, j] = j*dt
    tau = T - t
    prices = torch.zeros_like(S)

    # Calc prices before expiry
    d1 = calculate_d1(S[:, :-1], K[:, :-1], ir[:, :-1], vol[:, :-1], tau[:, :-1])
    d2 = calculate_d2(S[:, :-1], K[:, :-1], ir[:, :-1], vol[:, :-1], tau[:, :-1])
    df = np.exp(-ir[:, :-1] * tau[:, :-1])
    prices[:, :-1] = S[:, :-1] * stats.norm.cdf(d1) - K[:, :-1] * df * stats.norm.cdf(d2)

    # Calc prices at expiry
    prices[:, -1] = torch.from_numpy(np.where(S[:, -1] > K[:, -1], S[:, -1] - K[:, -1], 0.0))

    return prices

In [7]:
def delta_pnl(S,C,D,kappa,TIME,time_step,n_samples):
    """Calculate the average loss for the batch
    S: Stock prices (BATCH_SIZE,TIME_STEP)
    C: Option prices (BATCH_SIZE,TIME_STEP)
    D: Option Deltas (BATCH_SIZE,TIME_STEP)
    PHI: Hedging strategy(BATCH_SIZE,TIME_STEP+1)- only PHI[:,0] used
    LAM: Risk aversion parameter
    TIME: Time horizon T"""


    PHI = torch.zeros((n_samples, time_step + 1))
    PHI_0=torch.ones(n_samples)*PHI_INITIAL
    PHI[:,0] = PHI_0.reshape((-1,))

    PHI_dot = torch.zeros((n_samples, time_step ))
    delta_t = TIME/time_step
    pnl_mat = torch.zeros((n_samples,time_step+1))
    pnl_mat[:,0] = -kappa*torch.abs(torch.mul(S[:,0],D[:,0]))
    print(pnl_mat.shape)
    for i in torch.arange(1,time_step):
        pnl_mat[:,i] = -1*(C[:,i]-C[:,i-1])+torch.mul(D[:,i-1],(S[:,i]-S[:,i-1]))-kappa*torch.abs(S[:,i]*(D[:,i]-D[:,i-1]))
    pnl_mat[:,-1] = pnl_mat[:,-1]-kappa*torch.abs(S[:,-1]*D[:,-1])

    return pnl_mat

In [8]:
def Training_Loss(S,C,PHI,PHI_dot,LAM,kappa,TIME,time_step,n_samples):
    """
    Formulate the optimization objective based on Kolm-Rietter formulation
    Calculate the average loss for the batch
    S: Stock prices (BATCH_SIZE,TIME_STEP)
    C: Option prices (BATCH_SIZE,TIME_STEP)
    PHI: Hedging strategy(BATCH_SIZE,TIME_STEP+1)
    LAM: Risk aversion parameter
    TIME: Time horizon T"""
    delta_t = TIME/time_step
    pnl_mat = torch.zeros((n_samples,time_step+1))
    # initial transaction cost to put on the hedge
    pnl_mat[:,0] = -kappa*torch.abs(torch.mul(S[:,0],PHI[:,0]))
    for i in torch.arange(1,time_step):
        pnl_mat[:,i] = -1*(C[:,i]-C[:,i-1])+PHI[:,i]*(S[:,i]-S[:,i-1])-kappa*torch.abs(S[:,i]*PHI_dot[:,i]*delta_t)
    #transaction cost at maturity to unwind the hedge
    pnl_mat[:,-1] = pnl_mat[:,-1]-kappa*S[:,-1]*(torch.abs(PHI[:,-1])+0.01*PHI[:,-1]**2)
    loss_mat = -1*pnl_mat+LAM/2*pnl_mat**2 #loss_mat.shape =(n_samples,time_step)
    return torch.mean(loss_mat), pnl_mat

In [9]:
criterion = nn.MSELoss() # MSE Loss

In [10]:
#initialization of parameters
def weights_init_uniform_rule(m):
        classname = m.__class__.__name__
        # for every Linear layer in a model..
        if classname.find('nn.Linear') != -1:
            # get the number of the inputs
            n = m.in_features
            y = 1.0/np.sqrt(n)
            m.weight.data.uniform_(-y, y)
            m.bias.data.fill_(0)

### Networks
class Phi_Dot_0(nn.Module):
    def __init__(self):
        super(Phi_Dot_0, self).__init__()
        self.phi_Dot_0 = nn.Linear(1, 1)
    def forward(self, x):
        return self.phi_Dot_0(x)


class RL_Net(nn.Module):
    def __init__(self,INPUT_DIM,OUTPUT_DIM,HIDDEN_DIM):
        super(RL_Net, self).__init__()
        self.input_dim = INPUT_DIM
        self.output_dim = OUTPUT_DIM
        self.hidden_dim = HIDDEN_DIM
        current_dim = self.input_dim
        self.layers = nn.ModuleList()
        self.bn = nn.ModuleList()
        self.droupout = nn.ModuleList() #drop out layer for regularization
        for hdim in self.hidden_dim:
            self.layers.append(nn.Linear(int(current_dim), int(hdim)))
            self.bn.append(nn.BatchNorm1d(int(hdim)))
            self.droupout.append(nn.Dropout(0.25)) # add a dropout layer
            current_dim = hdim
        self.layers.append(nn.Linear(int(current_dim), int(self.output_dim)))
    def forward(self, x):
        for i, layer in enumerate(self.layers[:-1]):
            x = layer(x)
            x = self.bn[i](x)
            x = F.relu(x)
        out = self.layers[-1](x)
        return out


In [11]:
def TRAIN_graph(S_0,vol,mu,ir,K,PHI_INITIAL,LAM,kappa,TIME,EPOCH,n_samples,time_step,HIDDEN_DIM_Utility,LR_Utility = 0.001,saving=0,LR_Adjust=dict(),OPT_Utility="ADAM",SEED_Utility1=0,SEED_Utility2=0):

    """
    n_samples: path size of the Brownian Motion
    time_step: discretization step
    LR_Utility: initial learning rate
    saving: list, to indicate at which epoch should have the models be saved to the path_temp
         saving=0 means no saving
    LR_Adjust: a dictoinary in which the value is the absolute DECAY,
          and the key is the epoch corresponding to the updated learning rate
    OPT_Utility: "ADAM" or "SGD"
    """
    model_list_Utility = []
    i = 0
    while i <= time_step:
        model = RL_Net(INPUT_DIM_Utility,OUTPUT_DIM,HIDDEN_DIM_Utility)
        model.apply(weights_init_uniform_rule)
        model_list_Utility.append(model)
        i += 1
    if OPT_Utility=="SGD":
        optimizer_Utility = torch.optim.SGD((par for model in model_list_Utility
                        for par in model.parameters()),
                                lr=LR_Utility)
    if OPT_Utility=="ADAM":
        optimizer_Utility = torch.optim.Adam((par for model in model_list_Utility
                        for par in model.parameters()),
                        lr=LR_Utility, betas=(0.9, 0.99))

    loss_arr_Utility = []
    PHI_0=torch.ones(n_samples)*PHI_INITIAL
    DUMMY_1 = torch.ones(n_samples).reshape((n_samples, 1))
    for epoch in tqdm(range(EPOCH)):
        ### tuning the learning rate
        if epoch in LR_Adjust.keys():
            DECAY = LR_Adjust[epoch]
            for g in optimizer_Utility.param_groups:
                g['lr'] = LR_Utility*DECAY

        ### W_t: (SAMPLE_SIZE,TIME_STEP+1)
        W=torch.cumsum(torch.normal(0, np.sqrt(TIME*1/time_step), size=(n_samples, time_step)), dim=1) # simukate Wiener process
        W=torch.cat((torch.zeros((n_samples,1)),W),dim=1)

        ### Stock process batch(SAMPLE_SIZE,TIME_STEP+1)
        S = calculate_stock_prices(S_0,mu,vol,W)

        ### Calculate option prices batch (SAMPLE_SIZE,TIME_STEP+1)
        C = calculate_option_prices(S, K, ir, vol, TIME)
        ### Calculate option deltas
        D = calculate_deltas(S, K, ir, vol, TIME)

        optimizer_Utility.zero_grad()
        PHI = torch.zeros((n_samples, time_step + 1))

        PHI[:,0] = PHI_0.reshape((-1,))

        PHI_dot = torch.zeros((n_samples, time_step ))

        for t in range(time_step):#t=0,...,time_step-1, X_utility.shape =(SAMPLE_SIZE,TIME_STEP)
            t_tensor=t/time_step*TIME*torch.ones(n_samples).reshape(-1,1)
            x_Utility=torch.cat((t_tensor,S[:,t].reshape(-1,1),PHI[:,t].reshape(-1,1)),dim=1)
            PHI_dot[:,t] = model_list_Utility[t](x_Utility).reshape(-1,)
            PHI[:,(t+1)] = PHI[:,t].reshape(-1)+PHI_dot[:,(t)].reshape(-1)*TIME/time_step
        loss_Utility, _ = Training_Loss(S,C,PHI,PHI_dot,LAM,kappa,TIME,time_step,n_samples)
        loss_arr_Utility.append(loss_Utility.data)
        loss_Utility.backward()   #compute gradient
        optimizer_Utility.step()  #update NN weights
        if np.isinf(loss_Utility.data.cpu().numpy()) or np.isnan(loss_Utility.data.cpu().numpy()):
            print("\nFAIL")
            break
        ### saving
        path_Q = './models/'
        if saving:
            if epoch%100==0:
                for i,model in enumerate(model_list_Utility):
                      torch.save(model.state_dict(),path_Q+'Utility_para{}.pkl'.format(i))
                torch.save(loss_arr_Utility,path_Q+"Utility_LOSS_arr.pt")
                torch.save(optimizer_Utility.state_dict(),path_Q+"Utility_optimizer.pt")
                print("\n saving models after {} Epochs".format(epoch+1))
    result={
        'loss':loss_arr_Utility,
        'model_list':model_list_Utility
      }
    return(result)

In [12]:
S_0 =100
K =100
vol = 0.20/np.sqrt(252)
mu = 0.05/252
ir = 0
PHI_INITIAL = 0.0
LAM = 0.1
TIME = 30
EPOCH =500
n_samples = 50000
time_step = 90
LR_Utility = 0.01
INPUT_DIM_Utility = 3 #The dimension of the input is 5:(t,S_t,phi_t)
OUTPUT_DIM = 1
HIDDEN_DIM_Utility = [100,150,100]
kappa = 0.01

In [None]:

result = TRAIN_graph(S_0,vol,mu,ir,K,PHI_INITIAL,LAM,kappa,TIME,EPOCH,n_samples,time_step,HIDDEN_DIM_Utility,LR_Utility = 0.001,saving=0,LR_Adjust=dict(),OPT_Utility="ADAM",SEED_Utility1=0,SEED_Utility2=0)

model_list_Utility = result['model_list']
### calulate pnl on test set
n_samples = 1000
  ### W_t: (SAMPLE_SIZE,TIME_STEP+1)
W=torch.cumsum(torch.normal(0, np.sqrt(TIME*1/time_step), size=(n_samples, time_step)), dim=1) # simukate Wiener process
W=torch.cat((torch.zeros((n_samples,1)),W),dim=1)

### Stock process batch(SAMPLE_SIZE,TIME_STEP+1)
S = calculate_stock_prices(S_0,mu,vol,W)

### Calculate option prices batch (SAMPLE_SIZE,TIME_STEP+1)
C = calculate_option_prices(S, K, ir, vol, TIME)
### Calculate option deltas
D = calculate_deltas(S, K, ir, vol, TIME)
PHI = torch.zeros((n_samples, time_step + 1))
PHI_0 = torch.ones(n_samples)*PHI_INITIAL
PHI[:,0] = PHI_0.reshape((-1,))

PHI_dot = torch.zeros((n_samples, time_step ))

for t in range(time_step):#t=0,...,time_step-1, X_utility.shape =(SAMPLE_SIZE,TIME_STEP)
    t_tensor=t/time_step*TIME*torch.ones(n_samples).reshape(-1,1)
    x_Utility=torch.cat((t_tensor,S[:,t].reshape(-1,1),PHI[:,t].reshape(-1,1)),dim=1)
    PHI_dot[:,t] = model_list_Utility[t](x_Utility).reshape(-1,)
    PHI[:,(t+1)] = PHI[:,t].reshape(-1)+PHI_dot[:,(t)].reshape(-1)*TIME/time_step

_,pnl_mat = Training_Loss(S,C,PHI,PHI_dot,LAM,kappa,TIME,time_step,n_samples)

  0%|          | 0/500 [00:00<?, ?it/s]

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(14,8))
plt.plot(result['loss'],color='black')
plt.xlabel('Training Epochs', fontsize=16)
plt.ylabel('Training Loss', fontsize=16)
plt.show()

## Out of Sample Performance

In [None]:
np.random.seed(1234)
torch.manual_seed(1234)
model_list_Utility = result['model_list']
n_samples =10000
W=torch.cumsum(torch.normal(0, np.sqrt(TIME*1/time_step), size=(n_samples, time_step)), dim=1) # simulate Wiener process
W=torch.cat((torch.zeros((n_samples,1)),W),dim=1)
### Stock process batch(SAMPLE_SIZE,TIME_STEP+1)
S = calculate_stock_prices(S_0,mu,vol,W)

### Calculate option prices batch (SAMPLE_SIZE,TIME_STEP+1)
C = calculate_option_prices(S, K, ir, vol, TIME)
### Calculate option deltas
D = calculate_deltas(S, K, ir, vol, TIME)

PHI = torch.zeros((n_samples, time_step + 1))
PHI_0=torch.ones(n_samples)*PHI_INITIAL
PHI[:,0] = PHI_0.reshape((-1,))

PHI_dot = torch.zeros((n_samples, time_step ))

for t in range(time_step):#t=0,...,time_step-1, X_utility.shape =(SAMPLE_SIZE,TIME_STEP)
            t_tensor=t/time_step*TIME*torch.ones(n_samples).reshape(-1,1)
            x_Utility=torch.cat((t_tensor,S[:,t].reshape(-1,1),PHI[:,t].reshape(-1,1)),dim=1)
            PHI_dot[:,t] = model_list_Utility[t](x_Utility).reshape(-1,)
            PHI[:,(t+1)] = PHI[:,t].reshape(-1)+PHI_dot[:,(t)].reshape(-1)*TIME/time_step
_, pnl_mat = Training_Loss(S,C,PHI,PHI_dot,LAM,kappa,TIME,time_step,n_samples)

In [None]:
delta_pnl_result = delta_pnl(S,C,D,kappa,TIME,time_step,n_samples)

In [None]:
plt.figure(figsize=(14,8))
plt.hist(-1*pnl_mat.sum(axis=1).detach().numpy(),bins=50,color = 'midnightblue',edgecolor='black',alpha =0.8,label='deep hedge')
plt.hist(-1*delta_pnl_result.sum(axis=1).detach().numpy(),bins=20,color = 'gold',edgecolor='black',alpha =0.8,label = 'delta hedge')
plt.axvline(-1*pnl_mat.sum(axis=1).detach().numpy().mean(), color='midnightblue', linestyle='dashed', linewidth=3)
plt.axvline(-1*delta_pnl_result.sum(axis=1).detach().numpy().mean(), color='gold', linestyle='dashed', linewidth=3)

plt.xlabel('Total Hedge P\&L',fontsize=16)


# To-do: implement the Delta hedge with the correction term