<a href="https://colab.research.google.com/github/claudia-viaro/optimal_stopping-switching/blob/main/opt_switching_V3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Suppose we fix the final regime of the process

In [None]:
import numpy as np
import torch
import torch.nn as nn
np.random.seed(234198)
import itertools
import random
import time
import scipy.stats

import math
import matplotlib.pyplot as plt
import torch.optim as optim
import torch.utils.data as tdata

In [None]:
class BlackScholes:
  def __init__(self, drift, sigma, delta, spot, assets,  paths, periods,
         maturity, strike, dividend=0):

    self.drift = drift - dividend
    self.sigma = sigma
    self.delta = delta
    self.spot = spot
    self.assets = assets
    self.paths = paths
    self.periods = periods
    self.maturity = maturity
    self.strike = strike
    self.dt = self.maturity / self.periods
    self.df = math.exp(-self.drift * self.dt)

  def drift_fct(self, x, t):
    del t
    return self.drift * x

  def diffusion_fct(self, x, t, v=0):
    del t
    return self.sigma * x



  def simulate_process(self):
    """Returns a nparray (nb_paths * assets * nb_dates) with prices."""
    paths = self.paths
    spot_paths = np.empty((self.periods+1, paths, self.assets ))

    spot_paths[0, :, :] = self.spot
    random_numbers = np.random.normal(
        0, 1, (self.periods, paths, self.assets ))
    dW = random_numbers * np.sqrt(self.dt)
    drift = self.drift
    r = np.repeat(np.repeat(np.repeat(
        np.reshape(drift, (-1, 1, 1)), self.periods, axis=0),
        paths, axis=1), self.assets, axis=2)
    sig = np.ones((self.periods, paths, self.assets))*self.sigma
    #sig = np.repeat(np.repeat(np.repeat(
    #    np.reshape(self.sigma, (-1, 1, 1)), self.periods+1, axis=2),
    #    paths, axis=1), self.assets, axis=0)
    
    spot_paths[1:, :,  :] = np.repeat(
        spot_paths[0:1, :, :], self.periods, axis=0)* np.exp(np.cumsum((r-self.delta) * self.dt - (sig ** 2) * self.dt / 2 + sig * dW, axis=0))

    return spot_paths #.reshape(spot_paths.shape[2], spot_paths.shape[0], spot_paths.shape[1])


'''
PLOT
'''

def draw_stock_model(stockmodel):
    stock_paths = stockmodel

    # draw a path
    one_path = stock_paths[:, 0, 0]
    dates = np.array([i for i in range(len(one_path))])
    plt.plot(dates, one_path, label='stock path')
    plt.ylabel('Stock price')
    plt.ylabel('Time')
    plt.legend()
    return plt.show()



class Ftheta_NN(nn.Module):
  def __init__(self, assets):
    super(Ftheta_NN, self).__init__()
    H = assets + 40
    self.bn0 = nn.BatchNorm1d(num_features=assets)
    self.layer1 = nn.Linear(assets, H)
    self.leakyReLU = nn.LeakyReLU(0.5)
    self.Softplus = nn.Softplus()
    self.sigmoid = nn.Sigmoid()
    self.tanh = nn.Tanh()
    self.relu = nn.ReLU()
    self.bn1 = nn.BatchNorm1d(num_features=H)
    self.layer2 = nn.Linear(H, H)
    self.bn2 = nn.BatchNorm1d(num_features=H)
    self.layer3 = nn.Linear(H, 1)
    self.bn3 = nn.BatchNorm1d(num_features=1)

  def forward(self, x):
    x = self.bn0(x)
    x = self.layer1(x)
    x = self.relu(x)
    x = self.bn2(x)
    x = self.layer3(x)
    x = self.sigmoid(x)
    return x



'''
class Ftheta_NN(nn.Module):
  def __init__(self, assets, hidden_size):
    super(Ftheta_NN, self).__init__()
    self.l1 = nn.Linear(assets, hidden_size) 
    self.relu = nn.ReLU()
    self.l2 = nn.Linear(hidden_size, hidden_size)
    self.l3 = nn.Linear(hidden_size, 1)  
    self.sigmoid=nn.Sigmoid()
    
  def forward(self, x):
    out = self.l1(x)
    out = self.relu(out)
    out = self.l2(out)
    out = self.relu(out)
    out = self.l3(out)
    out = self.sigmoid(out)
    return out
'''
# set initial weights of a linear layer of the NN with uniform values and bias=0.01 (or choose zero initial weights)
def init_weights(m):
  if isinstance(m, torch.nn.Linear):
    torch.manual_seed(42)
    # torch.nn.init.zeros_(m.weight)
    torch.nn.init.xavier_uniform_(m.weight)
    m.bias.data.fill_(0.01)



class OptimizationPart(object):

  def __init__(self, assets, paths, epochs=50, batch_size=2000):
    self.assets = assets
    self.paths = paths
    self.epochs = epochs
    self.batch_size = batch_size
    self.network = Ftheta_NN(self.assets).double()
    self.network.apply(init_weights)


  def train_network(self,  stock_values, current_payoff,
                    future_payoff):
    optimizer = optim.Adam(self.network.parameters())
    #future_payoff = torch.from_numpy(future_payoff).double()
    #current_payoff = torch.from_numpy(current_payoff).double()
    X_inputs = torch.from_numpy(stock_values).double()

    self.network.train(True)
    ones = torch.ones(self.paths)
    for epoch in range(self.epochs):
      optimizer.zero_grad()
      outputs = self.network(X_inputs).reshape(-1) # probabilities
      reward = (current_payoff * outputs ) +future_payoff * (ones - outputs) # reward function
      loss = -torch.mean(reward) # loss function
      loss.backward() # gradient calculation of the loss function
      optimizer.step() # gradient descent update

  def evaluate_network(self, X_inputs):
    self.network.train(False)
    X_inputs = torch.from_numpy(X_inputs).double()
    outputs = self.network(X_inputs)
    return outputs.view(X_inputs.size()[0]).detach().numpy(), self.network



In [None]:
class Profit_training:
  def __init__(self, model):
    self.strike = model.strike
    self.model = model
    

  def terminal(self, X):
    payoff = np.max(X, axis=1) - self.strike
    return payoff.clip(0, None)

  def g(self, date,path,X):
    X=torch.from_numpy(X).float()
    max1=torch.max(X[int(date) , path , : ].float()-self.strike)
    return np.exp(-self.model.drift*self.model.dt*date)*torch.max(max1,torch.tensor([0.0])) 
 

  def running(self, Y, X):
    gamma = np.array([-self.terminal(X), self.terminal(X) + 0.7]) # there are two rows, the first for \gamma_{0,1}, the second for \gamma_{1,0}
    r_benefit = self.terminal(X)
    return torch.from_numpy(r_benefit+Y-gamma)  
 
class Profit_testing:
  def __init__(self, model):
    self.strike = model.strike
    self.model = model

  def terminal(self, X):
    terminal = np.max(X, axis=1) - self.strike
    return terminal.clip(0, None)

  def g(self, date,path,X):
    X=torch.from_numpy(X).float()
    max1=torch.max(X[int(date) , path , : ].float()-self.strike)
    return np.exp(-self.model.drift*self.model.dt*date)*torch.max(max1,torch.tensor([0.0])) 


  # switch is F_theta_train 
  def running(self, Y, date, path, S, X, switch, gamma):
    val=Y[date+1, path]- gamma  
    k = np.array([0.4, 0.7])
    r_benefit = self.g(date, path, X)
    return val*int(switch[date, path])+r_benefit.numpy()

Currently, the parameters $F^{\theta}$ are found by maximizing the reward function that takes as inputs:
- current payoff: a payoff that considers both possibilities of switching or not at that timestep (as we dont know theta yet)
- future payoff (same thing)


possibly not a good idea, instead:
- future payoff could consider both cases only for N-1
- current payoff probably should stay as it is

In [None]:
class Training:
  def __init__(self, model, training, testing, nb_epochs=50):

    self.model = model # argument is S    
    self.neural_stopping = OptimizationPart(model.assets, model.paths) 
    self.profit_training = Profit_training(self.model)
    self.profit_testing = Profit_testing(self.model)

  def price(self):
    model = self.model
    stock_paths = self.model.simulate_process()    
    disc_factor = np.math.exp((-model.drift) * model.maturity/(model.periods))
    
    # create empty objects to store values
    k = np.array([0.4, 0.7])
    regimes = [0, 1]
    regime_path=np.zeros((model.periods+1, model.paths)) # record at which regime we're at at each n
    Y_train=np.zeros((model.periods+1, model.paths))
    F_theta_train=np.zeros((model.periods+1,model.paths)) # record switching events for each n
    mods=[None]*model.periods # record the models of the NN for testing

    # at maturity N
    final_payoff = np.array([self.profit_training.terminal(stock_paths[-1, :, :]), self.profit_training.terminal(stock_paths[-1, :, :])])   # payoff of the last date for each path.
    future_payoff = torch.from_numpy(final_payoff*disc_factor).double() 
    Y_train[model.periods, :]= final_payoff[0]
    F_theta_train[model.periods,:]=1 # at maturity we switch (does it matter?)
    regime_path[model.periods, :] = random.sample(regimes, 1)[0] # sample a regime at maturity
    values = Y_train[model.periods, :]
    # recursive calc. before maturity
    for date in range(stock_paths.shape[0] - 2, 0, -1): 
      current_payoff = self.profit_training.running(Y_train[date+1, :], stock_paths[date, :, :])

      stopping_probability, networks = self.stop(stock_paths[date, : , :], 
                                current_payoff,
                                future_payoff)
      
      
      F_theta_train[date,:]=(stopping_probability > 0.5)*1.0   # transform stopping probabilities in 0-1 decision
      which = stopping_probability > 0.5

      for m in range(0,model.paths-1):
        old_regime = regime_path[date +1, m]
        regime_path[date, m] = int(which[m])
        if which[m] == True:
          if int(old_regime) - int(which[m])>0:  #gamma 0-1
            gamma = -self.profit_testing.g(date, m, stock_paths)+0.7
          else: gamma = -self.profit_testing.g(date, m, stock_paths) #gamma 1-0  
        else:
          gamma = 0 
        Y_train[date, m] = Y_train[date+1, m]- gamma


      immediate_exercise_value = Y_train[date, :]       
      values[which] = immediate_exercise_value[which] # when we switch we take the current profit
      values[~which] *= disc_factor           # when we don't switch we take final profit discounted 
      #Y_train[date, :] = values
      mods[date]=networks    
    
    return mods


    

  def stop(self, stock_values, current_payoff,
           future_payoff):
    
    self.neural_stopping.train_network(
      stock_values,
      current_payoff ,
      future_payoff)

    inputs = stock_values
    stopping_probability , networks   = self.neural_stopping.evaluate_network(inputs)
    return stopping_probability , networks  


In [None]:
# simulate paths Y
# goal of this phase is to be able to get stopping decisions f_theta_n
hyperparam_training = {'drift': 0.2, 'sigma': 0.05, 'delta': 0.1,  'paths':1024, 'periods': 9, 'maturity': 3., 'strike' : 100,'assets':2,  'spot':90,}
S_train=BlackScholes(**hyperparam_training)


pricing = Training(S_train, Profit_training, Profit_testing, nb_epochs=3000)
'''
arguments are:
- path process
- Profit training and profit testing classes
- number of epochs to be used for the gradient descent algorithm

'''

mods = pricing.price()


#Lower bound

the stopping time $\tau^{\Theta}$ gives a lower bound $L=\mathbb{E}g(\tau^{\Theta}, X_{\tau^{\Theta}})$ for the optimal value $V_0= \sup_{\tau \in \mathcal{T}}\mathbb{E}g(\tau, X_{\tau})$.

Simulate 
- $K_L = 1024$ paths $(y_n^k)_{n=0}^N$, $k=1, \ldots, K_L$, of $(X_n)_{n=0}^N$ and assume these are drawn independently from the realizations $(x_n^k)_{n=0}^N$, $k=1, \ldots, K$.

The unbiased estimate of the lower bound $L$ is given by
\begin{equation}
\hat{L}=\frac{1}{K_L} \sum_{k=1}^{K_L} g(l^k, y_{l^k}^k)
\end{equation}
where $l^k = l(y_0^k, \ldots, y_{N-1}^k)$

In [None]:
# Testing phase - Lower bound

# sample y from the process (Y)
hyperparam_testing_L = {'drift': 0.2, 'sigma': 0.05, 'delta': 0.1,  'paths':4, 'periods': 9, 'maturity': 3., 'strike' : 100,'assets':2,  'spot':90,}
S_test_L=BlackScholes(**hyperparam_testing_L)

# now we can compute all the stopping times recursively

In [None]:
# Testing

class Testing_Lower:
  def __init__(self, model, training, testing, mods):   
    self.model = model # argument is S   
    self.neural_stopping = OptimizationPart(model.assets, model.paths) 
    self.profit_training = Profit_training(self.model)
    self.profit_testing = Profit_testing(self.model)
    self.mods = mods

  def price(self):
    model = self.model
    disc_factor = np.math.exp((-model.drift) * model.maturity/(model.periods))
    stock_paths = self.model.simulate_process()
    k = np.array([0.4, 0.7])
    regimes = [0, 1]
    regime_path=np.zeros((model.periods+1, model.paths)) # record at which regime we're at at each n
    Y_train=np.zeros((model.periods+1, model.paths))
    F_theta_train=np.zeros((model.periods+1,model.paths)) # record switching events for each n
 
    # at maturity N
    final_payoff = np.array([self.profit_training.terminal(stock_paths[-1, :, :]), self.profit_training.terminal(stock_paths[-1, :, :])])   # payoff of the last date for each path.
    future_payoff = torch.from_numpy(final_payoff*disc_factor).double() 
    Y_train[model.periods, :]= final_payoff[0]
    F_theta_train[model.periods,:]=1 # at maturity we switch (does it matter?)
    regime_path[model.periods, :] = random.sample(regimes, 1)[0] # sample a regime at maturity
    values = Y_train[model.periods, :]


    


    # recursive calc. before maturity
         
    for date in range(stock_paths.shape[0] - 2, 0, -1):
      current_payoff = self.profit_training.running(Y_train[date+1, :], stock_paths[date, :, :])
      mod_curr=self.mods[date]
      probs=mod_curr(torch.from_numpy(stock_paths[date])) 
      np_probs=probs.detach().numpy().reshape(self.model.paths)
      
      F_theta_train[date,:]=(np_probs > 0.5)*1.0   # transform stopping probabilities in 0-1 decision
      which = np_probs > 0.5

      for m in range(0,model.paths-1):
        old_regime = regime_path[date +1, m]
        regime_path[date, m] = int(which[m])
        if which[m] == True:
          if int(old_regime) - int(which[m])>0:  #gamma 0-1
            gamma = -self.profit_testing.g(date, m, stock_paths)+0.7
          else: gamma = -self.profit_testing.g(date, m, stock_paths) #gamma 1-0  
        else:
          gamma = 0 
        Y_train[date, m] = Y_train[date+1, m]- gamma


      immediate_exercise_value = Y_train[date, :]       
      values[which] = immediate_exercise_value[which] # when we switch we take the current profit
      values[~which] *= disc_factor           # when we don't switch we take final profit discounted 

      #Y_train[date, :] = values
      print("date", date, np.mean(values))

    
    return round(np.mean(values)* disc_factor, 3), Y_train



In [None]:
price_testing = Testing_Lower(S_test_L, Profit_training, Profit_testing, mods)

Y_test_mean, Y_train = price_testing.price()
print(Y_test_mean)

date 8 30.638559341430664
date 7 38.668416023254395
date 6 45.94314098358154
date 5 52.68890953063965
date 4 57.284738540649414
date 3 58.2815055847168
date 2 58.2815055847168
date 1 58.2815055847168
54.523
