<a href="https://colab.research.google.com/github/DrZee24/PINN/blob/main/PINNs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install numpy torch matplotlib


In [None]:
import torch

import torch.nn as nn
import torch.optim as optim
import numpy as np
from scipy.stats import gamma, beta, uniform
import matplotlib.pyplot as plt
import torch.nn.functional as F
from sklearn.model_selection import ParameterGrid
from tqdm import tqdm
!pip install py_vollib
from py_vollib import black_scholes_merton as bsm
from progressbar import ProgressBar
from sklearn.model_selection import train_test_split

PINN class: Defines the neural network architecture with specified input dimension, hidden layers, hidden units, and output dimension. The activation function used is hyperbolic tangent (nn.Tanh()).

In [None]:
def thisS(q):
    return gamma.ppf(q, a=100, scale=1)

def thisK(q):
    return uniform.ppf(q, 50, 200)

def thisR(q):
    return uniform.ppf(q, 0.01, 0.18)

def thisD(q):
    return uniform.ppf(q, 0.01, 0.18)

def thisSigma(q):
    return beta.ppf(q, a=2, b=5) + 0.001

In [None]:
import pandas as pd
def this_extremes_S(q):
    return uniform.ppf(q, 90, 110)
num_increment = 12
percentiles = pd.Series(np.linspace(0, 0.99, num_increment))
S = percentiles.apply(thisS).tolist()
K = percentiles.apply(thisK).tolist()
q = percentiles.apply(thisD).tolist()
t = np.array([.25, .5, .75, 1]).tolist()
r = percentiles.apply(thisR).tolist()
sigma = percentiles.apply(thisSigma).tolist()

param_grid = {'S': S, 'K': K, 'q': q, 't': t, 'r': r, 'sigma': sigma}
grid = ParameterGrid(param_grid)

data = []
for params in tqdm(grid, desc='Calcul des prix'):
    price = bsm.black_scholes_merton(flag='c', S=params['S'], K=params['K'],
                                     q=params['q'], t=params['t'], r=params['r'], sigma=params['sigma'])
    # Ajoutez le prix au dictionnaire des paramètres
    params['price'] = price
    # Ajoutez le dictionnaire à la liste
    data.append(params)
# Convertissez la liste de dictionnaires en DataFrame
fullDF = pd.DataFrame(data)
# Enregistrez le DataFrame dans un fichier CSV
fullDF.to_csv('dataFull.csv', index=False)
print(fullDF.head())
print(fullDF.tail())
print(fullDF.columns)


Calcul des prix: 100%|██████████| 995328/995328 [00:05<00:00, 177036.05it/s]


      K    S     q     r    sigma     t  price
0  50.0  0.0  0.01  0.01  0.00100  0.25    0.0
1  50.0  0.0  0.01  0.01  0.00100  0.50    0.0
2  50.0  0.0  0.01  0.01  0.00100  0.75    0.0
3  50.0  0.0  0.01  0.01  0.00100  1.00    0.0
4  50.0  0.0  0.01  0.01  0.08819  0.25    0.0
            K           S       q       r     sigma     t     price
995323  248.0  124.722561  0.1882  0.1882  0.511316  1.00  3.015377
995324  248.0  124.722561  0.1882  0.1882  0.706686  0.25  0.575184
995325  248.0  124.722561  0.1882  0.1882  0.706686  0.50  3.027893
995326  248.0  124.722561  0.1882  0.1882  0.706686  0.75  5.940339
995327  248.0  124.722561  0.1882  0.1882  0.706686  1.00  8.688295
Index(['K', 'S', 'q', 'r', 'sigma', 't', 'price'], dtype='object')


In [None]:
import torch
import torch.nn as nn

class PINN(nn.Module):
    def __init__(self, input_dim, batch_size,hidden_layers, hidden_units, output_dim):
        super(PINN, self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        # Create a list to store the layers
        layers = []

        # Add the first hidden layer
        layers.append(nn.Linear(input_dim, hidden_units))
        layers.append(nn.ReLU())

        # Add additional hidden layers
        for _ in range(hidden_layers):
            layers.append(nn.Linear(hidden_units, hidden_units))
            layers.append(nn.Tanh())

        # Add the output layer
        layers.append(nn.Linear(hidden_units, output_dim))

        # Create the sequential model
        self.model = nn.Sequential(nn.Linear(64,64))

    def forward(self, x):
        # Ensure the input shape is correct
        x = x.reshape(batch_size, input_dim)
        # x should have shape (batch_size, input_dim), no need for unsqueeze here
        return self.model(x)


black_scholes_pde function: Calculates the residual of the Black-Scholes equation using the model's predictions. It calculates the time and spatial derivatives using torch.autograd.grad and the input model, S (price of the underlying asset), and t (time).

In [None]:
def black_scholes_pde(model, S, t, sigma, r):
    # Calculer les prédictions du modèle
    V = model(torch.cat((S, t), dim=1))

    # Créer un tenseur de grad_outputs de la même forme que V
    grad_outputs = torch.ones_like(V)

    # Calculer les gradients partiels
    V_t = torch.autograd.grad(V, t, grad_outputs=grad_outputs, create_graph=True)[0]
    V_S = torch.autograd.grad(V, S, grad_outputs=grad_outputs, create_graph=True)[0]

    # Calculer le second gradient pour obtenir V_SS
    grad_outputs_V_S = torch.ones_like(V_S)
    V_SS = torch.autograd.grad(V_S, S, grad_outputs=grad_outputs_V_S, create_graph=True)[0]

    # Calculer le résidu basé sur l'équation de Black-Scholes
    residual = V_t + 0.5 * sigma ** 2 * S ** 2 * V_SS + r * S * V_S - r * V

    return residual



In [None]:
def calculate_loss(model, S_init, t_init, u_init, S_boundary, t_boundary, u_boundary, collocation_points, sigma, r):
    # Calculate residual error
    residual_error = black_scholes_pde(model, collocation_points['S'], collocation_points['t'], sigma, r)
    loss_residual = torch.mean(residual_error ** 2)  # Mean squared error (MSE) of residuals

    inputs_init = torch.cat((t_init.unsqueeze(1), S_init.unsqueeze(1)), dim=1)



    # Calculate initial condition error
    # Create the input tensor for initial conditions
    u_pred_init = model(inputs_init)
    loss_initial = torch.mean((u_pred_init - u_init) ** 2)  # Mean squared error of initial conditions

    # Calculate boundary condition error
    # Create the input tensor for boundary conditions
    u_boundary = torch.zeros((len(t_boundary), len(S_boundary)))
    inputs_boundary = torch.cat((t_boundary.unsqueeze(1), S_boundary), dim=1)

    u_pred_boundary = model(inputs_boundary).squeeze()

    loss_boundary = torch.mean((u_pred_boundary - u_boundary) ** 2, dim=1)  # Mean squared error of boundary conditions

    # Combine all losses
    total_loss = loss_residual + loss_initial + loss_boundary
    loss= total_loss.sum()
    return loss


train_pinn function: Trains the model using stochastic gradient descent (Adam optimizer) over a specified number of epochs. It samples random batches of inputs from the data (S and t), calculates the residual, and minimizes the mean squared error loss.

In [None]:
def train_pinn(model, optimizer, fullDF, S_init, t_init, u_init, S_boundary, t_boundary, u_boundary, sigma, r, epochs, batch_size=64):
    model.train()

    if not isinstance(fullDF, pd.DataFrame):
        fullDF = pd.DataFrame(fullDF)

    # Convert the Pandas DataFrame to a NumPy array
    fullDF = fullDF.values

    S_collocation = fullDF[:, 0].reshape(-1, 1)
    t_collocation = fullDF[:, 5].reshape(-1, 1)
    u_collocation = fullDF[:, 6].reshape(-1, 1)

    for epoch in range(epochs):
        # Sample random batch points for S and t
        idx = np.random.choice(len(fullDF), size=batch_size, replace=False)
        S_batch = S_collocation[idx]
        t_batch = t_collocation[idx]
        u_batch = u_collocation[idx]

        # Convertir les données en tenseurs PyTorch du bon type
        S_batch = torch.tensor(S_batch, dtype=torch.float32)
        t_batch = torch.tensor(t_batch, dtype=torch.float32)
        u_batch = torch.tensor(u_batch, dtype=torch.float32)

        # Concaténer les tenseurs d'entrée
        input_tensor = torch.cat((t_batch.unsqueeze(1), S_batch.unsqueeze(1)), dim=1)

        # Passer les données au modèle
        u_pred = model(input_tensor)
        u_pred = u_pred.view(-1, 1)
        u_pred = u_pred[:batch_size]

        # Calculer la perte
        loss = torch.mean((u_pred - u_batch) ** 2)

        # Réinitialiser les gradients
        optimizer.zero_grad()

        # Rétropropagation
        loss.backward()

        # Mettre à jour les poids du modèle
        optimizer.step()

        # Afficher les informations sur l'entraînement
        if epoch % 1000 == 0:
            print(f"Epoch: {epoch+1}/{epochs}, Loss: {loss.item():.4f}")


In [None]:
def exact_solution(S, t, sigma, r, K):
    d1 = (torch.log(S / K) + (r + 0.5 * sigma ** 2) * t) / (sigma * torch.sqrt(t))
    d2 = d1 - sigma * torch.sqrt(t)
    C = S * torch.distributions.normal.Normal(0, 1).cdf(d1) - K * torch.exp(-r * t) * torch.distributions.normal.Normal(0, 1).cdf(d2)
    return C

main function: Defines the problem domain (range of S and t values), initializes the model and optimizer, trains the model, and then plots the predicted option prices at a specific time (t=0.5).

In [None]:
def main():
    # Define parameters
    S_min, S_max = 0, 200
    t_min, t_max = 0, 1
    sigma = 0.2  # Volatility
    r = 0.05    # Risk-free rate
    K = 100  # Strike price (used in exact_solution)

    # Create a grid of S and t values
    S = torch.linspace(S_min, S_max, 1000)
    t = torch.linspace(t_min, t_max, 1000)

    # Prepare data for initial and boundary conditions
    S_init = torch.linspace(S_min, S_max, 100)
    t_init = torch.full((len(S_init),), t_min)
    u_init = torch.zeros_like(S_init)

    # Prepare data for collocation points
    collocation_points = {'S': torch.linspace(S_min, S_max, 1000), 't': torch.linspace(t_min, t_max, 1000)}

    # Prepare data for boundary conditions
    S_boundary = torch.tensor([S_min, S_max])
    t_boundary = torch.linspace(t_min, t_max, len(S_boundary))
    S_boundary = S_boundary.view(2, 1)

    u_boundary = torch.zeros((len(t_boundary), len(S_boundary)))


    # Define model hyperparameters
    input_dim = 2
    hidden_layers = 10
    hidden_units = 64
    output_dim = 1

    # Initialize the model
    model = PINN(input_dim, hidden_layers, hidden_units, output_dim)
    print(model.parameters())
    # Define optimizer
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train the model
    epochs = 10000
    batch_size = 64
    train_pinn(model, optimizer, S_init, t_init, u_init, S_boundary, t_boundary, u_boundary, collocation_points, sigma, r, epochs, batch_size)

    # Visualize the results
    model.eval()
    with torch.no_grad():
        # Test the model on the entire domain for visualization
        S_test = torch.linspace(S_min, S_max, 1000)
        t_test = torch.full((1000,), 0.5)
        inputs_test = torch.cat((t_test.unsqueeze(1), S_test.unsqueeze(1)), dim=1)

        # Predict option prices using the trained model
        V_pred = model(inputs_test).squeeze().numpy()
        print(V_pred)
        # Calculate exact solution for comparison
        V_exact = exact_solution(S_test, t_test, sigma, r, K).numpy()
    errors = V_pred - V_exact
         # Plot the predicted and exact option prices
    plt.figure()
# Tracer les prédictions du modèle
    plt.plot(S_test.numpy(), V_pred, label='Predicted V at t=0.5', color='blue')
# Tracer la solution exacte
    plt.plot(S_test.numpy(), V_exact, label='Exact V at t=0.5', color='red', linestyle='--')

    plt.xlabel('S')  # Étiquette de l'axe x
    plt.ylabel('Option Price')  # Étiquette de l'axe y
    plt.title('Predicted vs. Exact Option Prices')  # Titre du graphique
    plt.legend()  # Ajouter une légende
    plt.plot(S_test.numpy(),V_pred , label='Error (V_pred )')
    plt.xlabel('S')
    plt.ylabel('Error')
    plt.title('Errors between Predicted and Exact Option Prices')
    plt.legend()

# Afficher le graphique
    plt.show()


if __name__ == "__main__":
    main()


In [None]:
def main():
    # Charger le dataset à partir du fichier CSV
    data = pd.read_csv('dataFull.csv')

    # Extraire les données d'entrée et de sortie du dataset
    X = data[['S', 'K', 'q', 'r', 'sigma', 't']].values
    y = data['price'].values

    # Fractionner les données en ensembles d'entraînement et de test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=7)

    # Définir les valeurs initiales et aux bords
    S_init = torch.tensor(X_train[:, 0]).float()
    t_init = torch.tensor(X_train[:, 5]).float()
    u_init = torch.tensor(y_train).float()
    S_boundary = torch.tensor([X_train[:, 0].min(), X_train[:, 0].max()]).float()
    t_boundary = torch.tensor([X_train[:, 5].min(), X_train[:, 5].max()]).float()
    u_boundary = torch.zeros(2)  # À définir en fonction de votre problème

    # Définir les hyperparamètres du modèle
    input_dim = 64
    hidden_layers = 2
    hidden_units = 64
    output_dim = 1
    epochs = 10000

    lr = 0.001

    # Initialiser le modèle PINN
    model = PINN(input_dim, hidden_layers, hidden_units, output_dim)

    # Définir l'optimiseur
    optimizer = optim.Adam(model.parameters(), lr=lr)

    # Entraîner le modèle PINN
    fullDF = pd.DataFrame(np.column_stack((X_train, y_train)), columns=['S', 'K', 'q', 'r', 'sigma', 't', 'price'])
    train_pinn(model, optimizer, fullDF, S_init, t_init, u_init, S_boundary, t_boundary, u_boundary, sigma, r, epochs, batch_size=64)

    # Évaluer le modèle sur l'ensemble de test
    model.eval()
    with torch.no_grad():
        y_pred = model(torch.tensor(X_test).float()).squeeze().numpy()

    # Calculer l'erreur quadratique moyenne (MSE) sur l'ensemble de test
    mse = np.mean((y_test - y_pred) ** 2)
    print(f'Mean Squared Error (MSE) on test set: {mse}')

    # Tracer les prédictions par rapport aux valeurs réelles
    plt.figure(figsize=(8, 6))
    plt.scatter(y_test, y_pred)
    plt.xlabel('True Prices')
    plt.ylabel('Predicted Prices')
    plt.title('Comparison of True and Predicted Prices')
    plt.show()

if __name__ == "__main__":
    main()

TypeError: PINN.__init__() missing 1 required positional argument: 'output_dim'