In [4]:
import math
import matplotlib.pyplot as plt
import numpy as np
import torch
import tensorflow as tf

USE_GPU = True

dtype = torch.float64 # we will be using float throughout this tutorial
device = torch.device('cuda') if (USE_GPU and torch.cuda.is_available()) else torch.device('cpu')
print('using device:', device)

using device: cuda


In [15]:
def pay_off(S,r,K,dt):
  return torch.maximum(K- S,torch.zeros_like(S))


def simulate_path(M, T, n, r, vol, S, K, dt):
    nudt = (r - 0.5 * vol**2) * dt
    lnS = np.log(S)

    # Méthode de Monte Carlo
    Z = np.random.normal(size=(M,n))
    L = np.random.normal(size=(M,n)) #new random variable
    dW = np.sqrt(dt) * Z  #it's the simulation of the dWt_i we'll need in each iteration
    delta_lnSt = nudt + vol*dW

    LnS_s = np.zeros([M, n + 1])

    # Set the initial values
    LnS_s[:, 0] = lnS
    # Compute cumulative sums using cumsum
    LnS_s[:, 1:] = np.cumsum(delta_lnSt, axis=1)
    LnS_s[:,1:] +=lnS

    S = np.exp(LnS_s)
    S_tensor = torch.tensor(S, device = device ,dtype=dtype)
    dW_tensor = torch.tensor(dW, device = device ,dtype=dtype)
    L_tensor = torch.tensor(L, device = device ,dtype=dtype)

    return S_tensor,dW_tensor , L_tensor

#Parametres

T = 1
n=50
dt = T/n  #les t_i seront donc les i*dt.


S = 36          # Prix de l'action
K = 40           # Prix d'exercice
vol = 0.2       # Volatilité (%)
r = 0.06            # Taux sans risque (%)
M = 100000  # Nombre de simulations


S,dW ,L =simulate_path(M, T, n, r, vol, S, K, dt)
X=torch.zeros([M,n+1], device = device ,dtype=dtype)
Y=torch.zeros_like(X, device = device ,dtype=dtype)


X[ :, n]=pay_off(S[:,n],r,K,dt)
Y[ :, n]=X[ :, n]

beta_dt=math.exp(-r*dt)



In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

class SpecificModel(nn.Module):
    def __init__(self):
        super(SpecificModel, self).__init__()
        self.branch1 = nn.Sequential(
            nn.Linear(1, 20),
            # nn.BatchNorm1d(50),
            nn.ReLU(),
            nn.Linear(20, 20),
            # nn.BatchNorm1d(50),
            nn.ReLU(),
            nn.Linear(20, 1)
        )
        self.branch2 = nn.Sequential(
            nn.Linear(1, 20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 1)
        )
        self.branch3 = nn.Sequential(
            nn.Linear(1, 20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 1)
        )
        self.branch4 = nn.Sequential(
            nn.Linear(1, 20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 20),
            nn.ReLU(),
            nn.Linear(20, 1)
        )

        self.optimizer = optim.Adam(self.parameters(), lr=0.001)

    def forward(self, x):
        x_branch1 = self.branch1(x)
        x_branch2 = self.branch2(x)
        x_branch3 = self.branch3(x)
        x_branch4 = self.branch4(x)
        concatenated_output = torch.cat((x_branch1, x_branch2, x_branch3, x_branch3), dim=1)
        return concatenated_output

    def custom_loss(self, Y_true, dW_true , dW_true_2 , L_true ,  y_pred):
        return torch.mean(torch.square(Y_true - (y_pred[:, 0] + dW_true * y_pred[:, 1]+(dW_true_2-dt)*y_pred[:, 2]+L_true*y_pred[:, 3])))

    def train_model(self, S_tensor, Y_tensor, dW_tensor, L_tensor , epochs=5, batch_size=32, validation_split=0.2):
        torch.cuda.empty_cache()
        combined_labels = torch.cat((Y_tensor.unsqueeze(1), dW_tensor.unsqueeze(1), L_tensor.unsqueeze(1)), dim=1)
        dataset = TensorDataset(S_tensor, combined_labels)
        train_size = int((1 - validation_split) * len(dataset))
        val_size = len(dataset) - train_size
        train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
        train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

        for epoch in range(epochs):
            self.train()
            total_loss = 0
            for S_batch, labels_batch in train_dataloader:
                self.optimizer.zero_grad()
                outputs = self(S_batch)
                loss = self.custom_loss(labels_batch[:, 0], labels_batch[:, 1] , labels_batch[:, 1]*labels_batch[:, 1] ,labels_batch[:, 2],  outputs)
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()


            val_loss = 0

            if epoch % 3 == 0:
                self.eval()
                with torch.no_grad():
                    for S_batch, labels_batch in val_dataloader:
                        outputs = self(S_batch)
                        val_loss += self.custom_loss(labels_batch[:, 0], labels_batch[:, 1], labels_batch[:, 1]*labels_batch[:, 1], labels_batch[:, 2] , outputs).item()

                print(f"Epoch {epoch+1}/{epochs}, Training Loss: {total_loss/len(train_dataset)}, Validation Loss: {val_loss/val_size}")


        self.eval()
        with torch.no_grad():
              outputs = self(S_tensor)
        print("Training complete .")

        return outputs

model = SpecificModel().to(device = device ,dtype=dtype)


In [None]:

# The main loop :

for i in range(n-1,0,-1):
  #determination of phi and psi in one step
  print( "In the ", i ," th step :")
  outputs =  model.train_model(S[:,i].reshape(-1,1), beta_dt*Y[:,i+1].reshape(-1,1), dW[:,i].reshape(-1,1),L[:,i].reshape(-1,1))

  psi_S=outputs[:,1]
  phi_S=outputs[:,0]

  X[ :, i]=beta_dt*X[:,i+1]-psi_S *dW[:,i] #to change it into (M,)
  Y[ :, i]=beta_dt*Y[:,i+1]-psi_S *dW[:,i]

  Z=pay_off(S[:,i],r,K,dt)

  Y[ :, i] = torch.where(Z> phi_S, Z, Y[ :, i])
  X[ :, i] = torch.where(Z> X[:,i], Z, X[ :, i])


#Pour i =0 :
#determination of phi and psi in two steps
i=0
outputs = model.train_model(S[:,i].reshape(-1,1), beta_dt*Y[:,i+1].reshape(-1,1), dW[:,i].reshape(-1,1),L[:,i].reshape(-1,1))

psi_S=outputs[:,1]
phi_S=outputs[:,0]

X[ :, 0]=beta_dt*X[:,1]-psi_S *dW[:,0]
Y[ :, 0]=beta_dt*Y[:,1]-psi_S *dW[:,0]




In [18]:
#Monté Carlo
u0=torch.mean(X[:,0]).cpu().numpy()
l0=torch.mean(Y[:,0]).cpu().numpy()
print(u0)
print(l0)
print(np.abs(u0-l0))

5.851455488990885
4.203136286169285
1.6483192028215994


In [19]:
print(4.4887-4.4762)
print(np.abs(np.abs(u0-l0)-(4.4887-4.4762)))

0.01249999999999929
1.6358192028216
