<a href="https://colab.research.google.com/github/JackDigout/MRP/blob/main/MRP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf

class DGMNet(tf.keras.Model):
    def __init__(self, n_layers, layer_width, input_dim):
        super(DGMNet, self).__init__()
        self.initial_layer = DenseLayer(input_dim + 1, layer_width, activation="tanh")
        self.n_layers = n_layers
        self.LSTMLayerList = [LSTMLayer(input_dim + 1, layer_width, activation="tanh") for _ in range(n_layers)]
        self.final_layer = DenseLayer(layer_width, 1, activation=None)

    def call(self, x, t):
        X = tf.concat([x, tf.cast(t, tf.float32)], 1)
        S = self.initial_layer(X)
        for lstm_layer in self.LSTMLayerList:
            S = lstm_layer(S, X)
        result = self.final_layer(S)
        return result

class LSTMLayer(tf.keras.layers.Layer):
    def __init__(self, n_inputs, n_outputs, activation):
        super(LSTMLayer, self).__init__()
        self.n_outputs = n_outputs
        self.n_inputs = n_inputs
        self.activation = _get_function(activation)

    def build(self, input_shape):
        self.Uz = self.add_weight(name="Uz", shape=[self.n_inputs, self.n_outputs], dtype=tf.float32,
                                  initializer=tf.keras.initializers.glorot_uniform())
        self.Ug = self.add_weight(name="Ug", shape=[self.n_inputs, self.n_outputs], dtype=tf.float32,
                                  initializer=tf.keras.initializers.glorot_uniform())
        self.Ur = self.add_weight(name="Ur", shape=[self.n_inputs, self.n_outputs], dtype=tf.float32,
                                  initializer=tf.keras.initializers.glorot_uniform())
        self.Uh = self.add_weight(name="Uh", shape=[self.n_inputs, self.n_outputs], dtype=tf.float32,
                                  initializer=tf.keras.initializers.glorot_uniform())
        self.Wz = self.add_weight(name="Wz", shape=[self.n_outputs, self.n_outputs], dtype=tf.float32,
                                  initializer=tf.keras.initializers.glorot_uniform())
        self.Wg = self.add_weight(name="Wg", shape=[self.n_outputs, self.n_outputs], dtype=tf.float32,
                                  initializer=tf.keras.initializers.glorot_uniform())
        self.Wr = self.add_weight(name="Wr", shape=[self.n_outputs, self.n_outputs], dtype=tf.float32,
                                  initializer=tf.keras.initializers.glorot_uniform())
        self.Wh = self.add_weight(name="Wh", shape=[self.n_outputs, self.n_outputs], dtype=tf.float32,
                                  initializer=tf.keras.initializers.glorot_uniform())
        self.bz = self.add_weight(name="bz", shape=[self.n_outputs], dtype=tf.float32)
        self.bg = self.add_weight(name="bg", shape=[self.n_outputs], dtype=tf.float32)
        self.br = self.add_weight(name="br", shape=[self.n_outputs], dtype=tf.float32)
        self.bh = self.add_weight(name="bh", shape=[self.n_outputs], dtype=tf.float32)
        super(LSTMLayer, self).build(input_shape)

    def call(self, S, X):
        Z = self.activation(tf.add(tf.add(tf.matmul(X, self.Uz), tf.matmul(S, self.Wz)), self.bz))
        G = self.activation(tf.add(tf.add(tf.matmul(X, self.Ug), tf.matmul(S, self.Wg)), self.bg))
        R = self.activation(tf.add(tf.add(tf.matmul(X, self.Ur), tf.matmul(S, self.Wr)), self.br))
        H = self.activation(tf.add(tf.add(tf.matmul(X, self.Uh), tf.matmul(tf.multiply(S, R), self.Wh)), self.bh))
        Snew = tf.add(tf.multiply(tf.subtract(tf.ones_like(G), G), H), tf.multiply(Z, S))
        return Snew

class DenseLayer(tf.keras.layers.Layer):
    def __init__(self, n_inputs, n_outputs, activation):
        super(DenseLayer, self).__init__()
        self.n_inputs = n_inputs
        self.n_outputs = n_outputs
        self.activation = _get_function(activation)

    def build(self, input_shape):
        self.W = self.add_weight(name="W", shape=[self.n_inputs, self.n_outputs], dtype=tf.float32,
                                 initializer=tf.keras.initializers.glorot_uniform())
        self.b = self.add_weight(name="b", shape=[self.n_outputs], dtype=tf.float32)
        super(DenseLayer, self).build(input_shape)

    def call(self, inputs):
        S = tf.add(tf.matmul(inputs, self.W), self.b)
        return self.activation(S)

def _get_function(name):
    if name == "tanh":
        return tf.nn.tanh
    elif name == "sigmoid":
        return tf.nn.sigmoid
    elif name == "relu":
        return tf.nn.relu
    elif not name:
        return tf.identity
    raise ValueError(f"Unsupported activation function: {name}")

In [None]:
class DomainNd:

    def __init__(self, range_vec, T):
        self.dim = len(range_vec)
        self.range_vec = range_vec
        self.lower_bound = range_vec.T[0]
        self.upper_bound = range_vec.T[1]
        self.T = T

    def get_discretization_size(self, n_vec, nt):
        return (self.range_vec[:, 1] - self.range_vec[:, 0]) / n_vec, self.T / nt

In [None]:
class SamplerNd:

    def __init__(self, domainNd, multi):
        self.domain = domainNd
        self.multi = multi

    def run(self, n_interior, n_terminal):
        # Sampler #1: domain interior
        dim, T = self.domain.dim, self.domain.T
        lower_bound, upper_bound = self.domain.lower_bound, self.domain.upper_bound * self.multi
        t_interior = np.random.uniform(low=0, high=T-1e-10, size=[n_interior, 1])
        S_interior = np.random.uniform(low=lower_bound, high=upper_bound, size=[n_interior, dim])



        # Sampler #2: initial/terminal condition
        t_terminal = T * np.ones((n_terminal, 1))
        S_terminal = np.random.uniform(low=lower_bound, high=upper_bound, size=[n_terminal, dim])

        return S_interior, t_interior, S_terminal, t_terminal


In [None]:
class SamplerNdV2:
    def __init__(self, domainNd, multi):
        self.domain = domainNd
        self.multi = multi

    def run(self, n_interior, n_terminal, n_boundary):
        # Sampler #1: domain interior
        dim, T = self.domain.dim, self.domain.T
        lower_bound, upper_bound = self.domain.lower_bound, self.domain.upper_bound * self.multi
        t_interior = np.random.uniform(low=0, high=T - 1e-10, size=[n_interior, 1])
        S_interior = np.random.uniform(low=lower_bound, high=upper_bound, size=[n_interior, dim])

        # Sampler #2: terminal condition
        t_terminal = T * np.ones((n_terminal, 1))
        S_terminal = np.random.uniform(low=lower_bound, high=upper_bound, size=[n_terminal, dim])

        # Sampler #3: boundary conditions
        # At S = 0
        S_boundary_zero = np.zeros((n_boundary, dim))  # S = 0
        t_boundary_zero = np.random.uniform(low=0, high=T - 1e-10, size=[n_boundary, 1])  # random times

        # At S = S_max (or some large value)
        S_boundary_max = np.ones((n_boundary, dim)) * upper_bound  # S = S_max
        t_boundary_max = np.random.uniform(low=0, high=T - 1e-10, size=[n_boundary, 1])  # random times

        return (S_interior, t_interior, S_terminal, t_terminal,
                S_boundary_zero, t_boundary_zero,
                S_boundary_max, t_boundary_max)

In [None]:
class AmerSamplerNdV2:
    def __init__(self, domainNd, multi):
        self.domain = domainNd
        self.multi = multi

    def run(self, n_interior, n_terminal, n_boundary):
        # Sampler #1: domain interior
        dim, T = self.domain.dim, self.domain.T
        lower_bound, upper_bound = self.domain.lower_bound, self.domain.upper_bound * self.multi
        t_interior = np.random.uniform(low=0, high=T - 1e-10, size=[n_interior, 1])
        S_interior = np.random.uniform(low=lower_bound, high=upper_bound, size=[n_interior, dim])

        # Sampler #2: terminal condition
        t_terminal = T * np.ones((n_terminal, 1))
        S_terminal = np.random.uniform(low=lower_bound, high=upper_bound, size=[n_terminal, dim])

        # Sampler #3: boundary conditions
        # At S = 0
        S_boundary_zero = 50 * np.ones((n_boundary, dim))  # S = 0
        t_boundary_zero = np.random.uniform(low=0, high=T - 1e-10, size=[n_boundary, 1])  # random times

        # # At S = S_max (or some large value)
        # S_boundary_infinity = np.ones((n_boundary, dim)) * upper_bound  # S = S_max
        # t_boundary_infinity = np.random.uniform(low=0, high=T - 1e-10, size=[n_boundary, 1])  # random times

        return S_interior, t_interior, S_terminal, t_terminal, S_boundary_zero, t_boundary_zero,


In [None]:
from scipy.stats import qmc
import math
import numpy as np
from scipy.stats.qmc import Sobol
class SobolSamplerNd:
    def __init__(self, domainNd):
        self.domain = domainNd

    def run(self, n_interior, n_terminal, n_boundary):
        # Sobol sampler for domain interior
        dim, T = self.domain.dim, self.domain.T
        lower_bound, upper_bound = self.domain.lower_bound, self.domain.upper_bound * 1.3
        sobol_sampler = Sobol(d=dim + 1, scramble=True)

        sobol_interior = sobol_sampler.random(n_interior)
        t_interior = sobol_interior[:, 0].reshape(- 1, 1) * (T - 1e-10)
        S_interior = sobol_interior[:, 1:] * (upper_bound - lower_bound) + lower_bound

        S_boundary_zero = np.zeros((n_boundary, dim))  # S = 0
        t_boundary_zero = np.random.uniform(low=0, high=T - 1e-10, size=[n_boundary, 1])

        S_boundary_max = np.ones((n_boundary, dim)) * upper_bound  # S = S_max
        t_boundary_max = np.random.uniform(low=0, high=T - 1e-10, size=[n_boundary, 1])  # random times

        # Sobol sampler for initial/terminal condition
        sobol_terminal = sobol_sampler.random(n_terminal)
        t_terminal = T * np.ones((n_terminal, 1))
        # Extract the spatial dimensions for terminal points, ensuring consistency with t_terminal
        S_terminal = sobol_terminal[:, 1:] * (upper_bound - lower_bound) + lower_bound

        return S_interior, t_interior, S_terminal, t_terminal, S_boundary_zero, t_boundary_zero, S_boundary_max, t_boundary_max




In [None]:
import math
from scipy.stats import norm
T = 1
def BSM(S, K, r, sigma, t, q):
    ''' Analytical solution for European call option price under Black-Scholes model

    Args:
        S:     spot price
        K:     strike price
        r:     risk-free interest rate
        sigma: volatility
        t:     time
    '''

    d1 = (np.log(S/K) + (r - q + sigma**2 / 2) * (T-t))/(sigma * np.sqrt(T-t))
    d2 = d1 - (sigma * np.sqrt(T-t))

    callPrice = S * np.exp(-q * (T-t)) * norm.cdf(d1) - K * np.exp(-r * (T-t)) * norm.cdf(d2)
    putPrice = K * np.exp(-r * (T-t)) * norm.cdf(-d2) - S * np.exp(-q * (T-t)) * norm.cdf(-d1)

    return callPrice, putPrice

In [None]:
import numpy as np
from scipy.stats import norm
T = 1
def margrabe(S1, S2, vol1, vol2, rho, q1, q2, t):
      """
      Calculates the price of an option to exchange one asset for another,
      considering dividend yields.

      Args:
        S1: Price of asset 1.
        S2: Price of asset 2.
        vol1: Volatility of asset 1.
        vol2: Volatility of asset 2.
        rho: Correlation between the returns of asset 1 and asset 2.
        q1: Dividend yield of asset 1.
        q2: Dividend yield of asset 2.
        T: Time to maturity.

      Returns:
        The price of the Margrabe option with dividends.
      """

      sigma = np.sqrt(vol1**2 + vol2**2 - 2 * rho * vol1 * vol2)
      d1 = (np.log(S1/S2) + (q1 - q2 + sigma**2 / 2) * (T-t))/(sigma * np.sqrt(T-t))
      d2 = d1 - (sigma * np.sqrt(T-t))

      callPrice = S1 * np.exp(-q1 * (T-t)) * norm.cdf(d1) - S2 * np.exp(-q2 * (T-t)) * norm.cdf(d2)

      return callPrice

In [None]:
import numpy as np

class SamplerHeston:
    def __init__(self, domainNd, v0, kappa, theta, xi, rho):
        self.domain = domainNd
        self.v0 = v0  # Initial variance
        self.kappa = kappa
        self.theta = theta
        self.xi = xi
        self.rho = rho

    def run(self, n_interior, n_terminal):
        dim, T = self.domain.dim, self.domain.T
        lower_bound, upper_bound = self.domain.lower_bound, self.domain.upper_bound*1.3

        # Sample times
        t_interior = np.random.uniform(low=0, high=T-1e-10, size=[n_interior, 1])
        t_terminal = T * np.ones((n_terminal, 1))

        # Sample asset prices
        S_interior = np.random.uniform(low=lower_bound, high=upper_bound, size=[n_interior, dim])
        S_terminal = np.random.uniform(low=lower_bound, high=upper_bound, size=[n_terminal, dim])

        # Sample variances
        V_interior = self.sample_variance(n_interior)
        V_terminal = self.sample_variance(n_terminal)

        return S_interior, V_interior, t_interior, S_terminal, V_terminal, t_terminal

    def sample_variance(self, n_samples):
        # Sample variances according to the Heston model dynamics
        # Here, we use a simple mean-reverting process for illustration
        variances = self.theta + (self.v0 - self.theta) * np.exp(-self.kappa * np.random.uniform(size=n_samples)) \
                    + self.xi * np.sqrt((1 - np.exp(-2 * self.kappa * np.random.uniform(size=n_samples))) / (2 * self.kappa))
        return np.maximum(variances, 0).reshape(-1, 1)  # Ensure non-negative variances

In [None]:
import sys, os, time
DIR_LOC = os.getcwd()
sys.path.append(DIR_LOC+"/..")
import tensorflow as tf
import numpy as np
import pickle
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D


class American:
    def __init__(self, payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, strike, sampler=None):
        self.payoff_func = payoff_func
        self.domain = domain
        self.dim = domain.dim
        self.vol_vec = vol_vec
        self.ir = ir
        self.dividend_vec = dividend_vec
        self.corr_mat = corr_mat
        self.strike = strike
        self.cov_mat = np.outer(vol_vec, vol_vec) * corr_mat
        self.sampler = sampler if sampler is not None else SamplerNd(domain)
        self.model = None

    def run(self, n_samples, steps_per_sample, n_layers=3, layer_width=50, n_interior=16,
            n_boundary=16, n_terminal=32):




        model = DGMNet(n_layers, layer_width, input_dim=self.dim)
        self.model = model

        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=1e-2, decay_steps=1000, decay_rate=0.9)
        optimizer = tf.keras.optimizers.Adam(learning_rate=0.005)

        self.loss_vec, self.L1_vec, self.L2_vec, self.L3_vec, self.L4_vec = [], [], [], [], []

        @tf.function
        def train_step(S_interior, t_interior, S_terminal, t_terminal):
            with tf.GradientTape(persistent=True) as tape:
                tape.watch([S_interior, t_interior])
                loss, L1, L2, L3, L4 = self.compute_loss(model, S_interior, t_interior, S_terminal, t_terminal)
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            return loss, L1, L2, L3, L4

        for i in range(n_samples):
            S_interior, t_interior, S_terminal, t_terminal = self.sampler.run(n_interior, n_terminal)
            S_interior = tf.convert_to_tensor(S_interior, dtype=tf.float32)
            t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
            S_terminal = tf.convert_to_tensor(S_terminal, dtype=tf.float32)
            t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)

            for _ in range(steps_per_sample):
                loss, L1, L2, L3, L4 = train_step(S_interior, t_interior, S_terminal, t_terminal)

            self.loss_vec.append(loss.numpy())
            self.L1_vec.append(L1.numpy())
            self.L2_vec.append(L2.numpy())
            self.L4_vec.append(L3.numpy())
            self.L4_vec.append(L4.numpy())

            if i % 10 == 0:
              print(f"Iteration {i}: Loss: {loss.numpy()}; L1: {L1.numpy()}; L2: {L2.numpy()}; L3: {L3.numpy()}; L4: {L4.numpy()}")





        if self.dim == 1:
            self.plot_results(self.model)

        if self.dim == 2:
            self.checkExchange(model)

        plt.plot(np.log(self.loss_vec))
        plt.xlabel('Sampling Stage')
        plt.ylabel('Loss')
        plt.title('Training Loss over Time')
        plt.show()


        S0 = np.full((1, self.dim), 25, dtype=np.float32)
        t0 = np.array([[0]])  # Time to maturity
        value_at_S0 = model(S0, t0).numpy()
        print("Option Value at S0 = 1.5 for all assets:", value_at_S0)


    @tf.function
    def compute_loss(self, model, S_interior, t_interior, S_terminal, t_terminal):
        S_interior = tf.convert_to_tensor(S_interior, dtype=tf.float32)
        t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
        S_terminal = tf.convert_to_tensor(S_terminal, dtype=tf.float32)
        t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)

        with tf.GradientTape() as tape1:
            tape1.watch([S_interior, t_interior])
            with tf.GradientTape() as tape2:
                tape2.watch([S_interior, t_interior])
                V = model(S_interior, t_interior)
            V_s = tape2.gradient(V, S_interior)
        V_ss = tape1.batch_jacobian(V_s, S_interior)

        with tf.GradientTape() as tape3:
            tape3.watch([S_interior, t_interior])
            V = model(S_interior, t_interior)
        V_t = tape3.gradient(V, t_interior)

        sec_ord = 0
        for i in range(self.dim):
            for j in range(self.dim):
                S_coli = tf.reshape(S_interior[:, i], [-1, 1])
                S_colj = tf.reshape(S_interior[:, j], [-1, 1])
                V_ss_column = tf.reshape(V_ss[:, i, j], [-1, 1])
                sec_ord += self.vol_vec[i] * self.vol_vec[j] * self.corr_mat[i, j] * S_coli * S_colj * V_ss_column

        fir_ord = 0
        for i in range(self.dim):
            V_s_column = tf.reshape(V_s[:, i], [-1, 1])
            S_col1 = tf.reshape(S_interior[:, i], [-1, 1])
            fir_ord += (self.ir - self.dividend_vec[i]) * S_col1 * V_s_column

        diff_V = V_t + 0.5 * sec_ord + fir_ord - self.ir * V

        payoff = tf.nn.relu(self.strike - S_interior)
        value = model(S_interior, t_interior)
        L1 = tf.reduce_mean(tf.square( diff_V * (value - payoff) ))

        temp = tf.nn.relu(diff_V)                   # Minimizes -min(-f,0) = max(f,0)
        L2 = tf.reduce_mean(tf.square(temp))

        # V_boundary_zero = model(self.strike, t_interior)
        # L_boundary_zero = tf.reduce_mean(tf.square(V_boundary_zero))

        V_ineq = tf.nn.relu(-(value-payoff))      # Minimizes -min(-f,0) = max(f,0)
        L3 = tf.reduce_mean(tf.square(V_ineq))

        target_payoff = tf.nn.relu(self.strike - S_terminal)
        fitted_payoff = model(S_terminal, t_terminal)

        L4 = tf.reduce_mean( tf.square(fitted_payoff - target_payoff) )
        total_loss = L1 + L2 + L3 + L4
        return total_loss, L1, L2, L3, L4

    def plot_results(self, model):
        S_plot = np.linspace(self.domain.lower_bound, self.domain.upper_bound, 100).reshape(-1, 1)
        S_plot1 = np.linspace(self.domain.lower_bound, self.domain.upper_bound, 100)
        fig, axs = plt.subplots(2, 2, figsize=(12, 10))
        fig_error, axs_error = plt.subplots(2, 2, figsize=(12, 10))
        error = np.zeros((100, 4))
        total_error = np.zeros((1, 4))

        # Time values at which to examine density
        valueTimes = [0, 1/3, 2/3, 1]

        for i, curr_t in enumerate(valueTimes):
            # Specify subplot
            ax = axs[i//2, i%2]

            # Simulate process at current t
            S = np.zeros((100, 1))
            for j in range(100):
                if i < 3:
                    S[j, 0] = AmericanBinom(j, self.strike, 1.0 - curr_t, self.ir, self.vol_vec[0], self.dividend_vec[0])
                else:
                    S[j, 0] = np.maximum((self.strike - S_plot1[j]), 0)


            # for j in range(100):
            #     k = j+1.0
            #     S[j, 0] = AmericanOptionsLSMC('put', k, 50., 1., 50, 0.04, 0.01, 0.15, 1000).price

            # Compute normalized density at all x values to plot and current t value
            t_plot = np.full(S_plot.shape, curr_t, dtype=np.float32)
            value = model(S_plot, t_plot).numpy()

            # Calculate error
            current_error = np.abs(value - S)
            error[:, i] = current_error.flatten()  # Store the error for each point
            total_error[0, i] = np.sum(current_error)  # Compute the mean error for current t


            # Plot analytical solution and DGM estimate
            ax.plot(S_plot, S, color='b', label='Binomial tree estimate', linewidth=3, linestyle=':')
            ax.plot(S_plot, value, color='r', label='Network estimate')

            # Subplot options
            ax.set_xlabel("Spot Price", fontsize=15, labelpad=10)
            ax.set_ylabel("Option Price", fontsize=15, labelpad=20)

            ax.tick_params(axis='both', which='major', labelsize=13)
            ax.grid(linestyle=':')

            if i == 0:
                ax.legend(loc='upper left', prop={'size': 16})

            # Plot error in separate subplot
            ax_error = axs_error[i//2, i%2]
            ax_error.plot(S_plot, current_error, color='g', label='Error', linestyle='--')

            # Subplot options for error
            ax_error.set_xlabel("Spot Price", fontsize=15, labelpad=10)
            ax_error.set_ylabel("Error", fontsize=15, labelpad=20)
            ax_error.set_title("Absolute Error", fontsize=18, y=1.03)

            ax_error.tick_params(axis='both', which='major', labelsize=13)
            ax_error.grid(linestyle=':')

            if i == 0:
                ax_error.legend(loc='upper left', prop={'size': 16})

        # Adjust space between subplots
        fig.subplots_adjust(wspace=0.3, hspace=0.4)
        fig_error.subplots_adjust(wspace=0.3, hspace=0.4)
        print(total_error)
        # Optionally, print or return the error arrays if needed


        plt.show()













In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D


class CEV:
    def __init__(self, payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, beta, strike, sampler=None):
        self.payoff_func = payoff_func
        self.domain = domain
        self.dim = domain.dim
        self.vol_vec = vol_vec
        self.ir = ir
        self.dividend_vec = dividend_vec
        self.corr_mat = corr_mat
        self.beta = beta
        self.strike = strike
        self.cov_mat = np.outer(vol_vec, vol_vec) * corr_mat
        self.sampler = sampler if sampler is not None else SamplerNd(domain)

    def run(self, n_samples, steps_per_sample, n_layers=3, layer_width=50, n_interior=16,
            n_boundary=16, n_terminal=32):




        model = DGMNet(n_layers, layer_width, input_dim=self.dim)
        self.model = model

        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=1e-2, decay_steps=1000, decay_rate=0.9)
        optimizer = tf.keras.optimizers.Adam(learning_rate=0.005)

        self.loss_vec, self.L1_vec, self.L3_vec = [], [], []

        @tf.function
        def train_step(S_interior, t_interior, S_terminal, t_terminal):
            with tf.GradientTape(persistent=True) as tape:
                tape.watch([S_interior, t_interior])
                loss, L1, L3 = self.compute_loss(model, S_interior, t_interior, S_terminal, t_terminal)
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            return loss, L1, L3

        for i in range(n_samples):
            S_interior, t_interior, S_terminal, t_terminal = self.sampler.run(n_interior, n_terminal)
            S_interior = tf.convert_to_tensor(S_interior, dtype=tf.float32)
            t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
            S_terminal = tf.convert_to_tensor(S_terminal, dtype=tf.float32)
            t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)

            for _ in range(steps_per_sample):
                loss, L1, L3 = train_step(S_interior, t_interior, S_terminal, t_terminal)

            self.loss_vec.append(loss.numpy())
            self.L1_vec.append(L1.numpy())
            self.L3_vec.append(L3.numpy())

            if i % 10 == 0:
              print(f"Iteration {i}: Loss: {loss.numpy()}; L1: {L1.numpy()}; L3: {L3.numpy()}")

            if loss.numpy() < 0.01:
                break




        if self.dim == 1:
            self.plot_CEV(self.model)



        plt.plot(np.log(self.loss_vec))
        plt.xlabel('Sampling Stage')
        plt.ylabel('Loss')
        plt.title('Training Loss over Time')
        plt.show()


        S02 = np.full((1, self.dim), 50, dtype=np.float32)
        S03 = np.full((1, self.dim), 55, dtype=np.float32)
        S04 = np.full((1, self.dim), 60, dtype=np.float32)
        S01 = np.full((1, self.dim), 45, dtype=np.float32)

        t0 = np.array([[0]])  # Time to maturity
        value_at_S01 = model(S01, t0).numpy()
        value_at_S02 = model(S02, t0).numpy()
        value_at_S03 = model(S03, t0).numpy()
        value_at_S04 = model(S04, t0).numpy()

        print("Option Value at S0 = 45 for all assets:", value_at_S01)
        print("Option Value at S0 = 50 for all assets:", value_at_S02)
        print("Option Value at S0 = 55 for all assets:", value_at_S03)
        print("Option Value at S0 = 60 for all assets:", value_at_S04)

    def trainAgain(self, saved_name, n_samples, steps_per_sample, n_layers=3, layer_width=50, n_interior=32, n_boundary=32, n_terminal=32):

        self.model = DGMNet(n_layers, layer_width, input_dim=self.dim)

        self.model_path = os.path.join(DIR_LOC, "saved_models", saved_name, saved_name + ".ckpt")
        self.model.load_weights(self.model_path)

        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=1e-2, decay_steps=1000, decay_rate=0.9)
        optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

        self.loss_vec, self.L1_vec, self.L3_vec = [], [], []

        @tf.function
        def train_step(S_interior, t_interior, S_terminal, t_terminal):
            with tf.GradientTape(persistent=True) as tape:
                tape.watch([S_interior, t_interior])
                loss, L1, L3 = self.compute_loss(self.model, S_interior, t_interior, S_terminal, t_terminal)
            grads = tape.gradient(loss, self.model.trainable_variables)
            optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
            return loss, L1, L3

        for i in range(n_samples):
            S_interior, t_interior, S_terminal, t_terminal = self.sampler.run(n_interior, n_terminal)
            S_interior = tf.convert_to_tensor(S_interior, dtype=tf.float32)
            t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
            S_terminal = tf.convert_to_tensor(S_terminal, dtype=tf.float32)
            t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)

            for _ in range(steps_per_sample):
                loss, L1, L3 = train_step(S_interior, t_interior, S_terminal, t_terminal)

            self.loss_vec.append(loss.numpy())
            self.L1_vec.append(L1.numpy())
            self.L3_vec.append(L3.numpy())

            print(f"Iteration {i}: Loss: {loss.numpy()}; L1: {L1.numpy()}; L3: {L3.numpy()}")

            if loss.numpy() < 0.02:
                break

        #if self.dim == 1:
            #self.plot_results(self.model)

        #if self.dim == 0:
            #self.checkExchange(self.model)

        epochs = range(1, len(self.loss_vec) + 1)
        plt.plot(epochs, self.loss_vec)
        plt.yscale('log')  # Set y-axis to logarithmic scale
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Training Loss')
        plt.show()



    def restore(self, S, t, saved_name, n_layers=3, layer_width=50):
        self.model = DGMNet(n_layers, layer_width, input_dim=self.dim)
        sample_S = tf.zeros((1000, self.dim))
        sample_t = tf.ones(sample_S.shape)  # Replace with appropriate shapes
        _ = self.model(sample_S, sample_t)
        model_path = os.path.join(DIR_LOC, "saved_models", saved_name, saved_name + ".ckpt")
        self.model.load_weights(model_path)
        # Prepare input tensors
        S_interior_tnsr = tf.convert_to_tensor(S, dtype=tf.float64)
        t_interior_tnsr = tf.convert_to_tensor(t, dtype=tf.float64)

        # Predict using the loaded model
        fitted_optionValue = self.model(S_interior_tnsr, t_interior_tnsr)

        print("Model {}: {}".format(saved_name, fitted_optionValue.numpy().T))
        return fitted_optionValue.numpy().T



    @tf.function
    def compute_loss(self, model, S_interior, t_interior, S_terminal, t_terminal):
        S_interior = tf.convert_to_tensor(S_interior, dtype=tf.float32)
        t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
        S_terminal = tf.convert_to_tensor(S_terminal, dtype=tf.float32)
        t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)


        with tf.GradientTape() as tape1:
            tape1.watch([S_interior, t_interior])
            with tf.GradientTape() as tape2:
                tape2.watch([S_interior, t_interior])
                V = model(S_interior, t_interior)
            V_s = tape2.gradient(V, S_interior)
        V_ss = tape1.batch_jacobian(V_s, S_interior)



        with tf.GradientTape() as tape3:
            tape3.watch([S_interior, t_interior])
            V = model(S_interior, t_interior)
        V_t = tape3.gradient(V, t_interior)

        S_interiorbeta = tf.pow(S_interior, self.beta)
        sec_ord = 0
        for i in range(self.dim):
            for j in range(self.dim):
                S_coli = tf.reshape(S_interiorbeta[:, i], [-1, 1])
                S_colj = tf.reshape(S_interiorbeta[:, j], [-1, 1])
                V_ss_column = tf.reshape(V_ss[:, i, j], [-1, 1])
                sec_ord += self.vol_vec[i] * self.vol_vec[j] * self.corr_mat[i, j] * S_coli * S_colj * V_ss_column

        fir_ord = 0
        for i in range(self.dim):
            V_s_column = tf.reshape(V_s[:, i], [-1, 1])
            S_col1 = tf.reshape(S_interior[:, i], [-1, 1])
            fir_ord += (self.ir - self.dividend_vec[i]) * S_col1 * V_s_column



        diff_V = V_t + 0.5 * sec_ord + fir_ord - self.ir * V

        L1 = tf.reduce_mean(tf.square(diff_V))

        # Payoff evaluation at terminal time
        target_payoff = tf.convert_to_tensor(self.payoff_func(S_terminal), dtype=tf.float32)
        fitted_payoff = tf.reshape(model(S_terminal, t_terminal), [-1])
        L3 = tf.reduce_mean(tf.square(fitted_payoff - target_payoff))


        # Combining all components of the loss function
        total_loss = L1 + L3


        return total_loss, L1, L3

    def plot_CEV(self, model):
        S_plot = np.linspace(self.domain.lower_bound, self.domain.upper_bound, 100).reshape(-1, 1)
        t_plot = np.full(S_plot.shape, self.domain.T)
        plt.figure()
        plt.figure(figsize = (12,10))

        optionValue = CEVclosed(S_plot, self.strike, self.ir, self.dividend_vec[0], 0.5, self.vol_vec[0], 1.0)


        #BS, _ = BSM(S_plot, self.strike, self.ir, self.vol_vec[0], 0, self.dividend_vec[0])
        # compute normalized density at all x values to plot and current t value
        t_plot = np.full(S_plot.shape, 0, dtype=np.float32)
        value = model(S_plot, t_plot).numpy()

        # plot histogram of simulated process values and overlay estimated density
        plt.plot(S_plot, optionValue, color = 'b', label='Analytical Solution', linewidth = 3, linestyle=':')
        plt.plot(S_plot, value, color = 'r', label='Network estimate')
        #plt.plot(S_plot, BS, color = 'g', label='Black-Scholes Solution', linewidth = 3, linestyle=':')


        # subplot options

        plt.xlabel("Spot Price", fontsize=20, labelpad=10)
        plt.ylabel("Option Price", fontsize=20, labelpad=20)

        plt.xticks(fontsize=13)
        plt.yticks(fontsize=13)
        plt.grid(linestyle=':')


        plt.legend(loc='upper left', prop={'size': 16})
        # adjust space between subplots
        # plt.subplots_adjust(wspace=0.3, hspace=0.4)

        error = np.abs(value - optionValue)
        #error = np.abs(value - BS)

        total_error = np.sum(error)
        print("Total Error at t = {}: {}".format(0, total_error))

        plt.show()


        plt.figure()
        plt.figure(figsize = (12,10))

        #plt.subplot(2,2,i+1)
        plt.plot(S_plot, error)
        plt.ylim(0, 0.5)
        plt.xlabel("Spot Price", fontsize = 20)
        plt.ylabel("Absolute Error", fontsize = 20)
        plt.title("Absolute Error", fontsize = 20)
        #plt.text(0.5, 0.9, f'Total Error: {total_error:.4f}', ha='center', va='center', transform=plt.gca().transAxes)
        plt.show()

In [None]:
import numpy as np
import tensorflow as tf
import os
import time
import pickle
import matplotlib.pyplot as plt

class EuroHeston:
    def __init__(self, payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, strike, kappa, theta, xi, rho, sampler=None):
        self.payoff_func = payoff_func
        self.domain = domain
        self.dim = domain.dim
        self.vol_vec = vol_vec
        self.ir = ir
        self.dividend_vec = dividend_vec
        self.corr_mat = corr_mat
        self.strike = strike
        self.kappa = kappa
        self.theta = theta
        self.xi = xi
        self.rho = rho
        self.sampler = sampler if sampler is not None else SamplerHeston(domain, vol_vec[0], kappa, theta, xi, rho)
        self.model = None

    def run(self, n_samples, steps_per_sample, n_layers=3, layer_width=50, n_interior=16,
            n_boundary=16, n_terminal=32, saved_name=None, new_training=True):
        if new_training:
            if not saved_name:
                pickle_dir = DIR_LOC+"/saved_models/{}_Euro".format(time.strftime("%Y%m%d"))
                saved_name = "{}_Euro.ckpt".format(time.strftime("%Y%m%d"))
            else:
                pickle_dir = DIR_LOC+"/saved_models/{}_{}".format(time.strftime("%Y%m%d"), saved_name)
                saved_name = time.strftime("%Y%m%d") + "_" + saved_name + ".ckpt"

            if not os.path.exists(pickle_dir):
                os.makedirs(pickle_dir)

        model = DGMNet(n_layers, layer_width, input_dim=self.dim + 1)  # Additional dimension for variance
        self.model = model

        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=1e-2, decay_steps=1000, decay_rate=0.9)
        optimizer = tf.keras.optimizers.Adam(learning_rate=0.005)

        self.loss_vec, self.L1_vec, self.L3_vec = [], [], []

        @tf.function
        def train_step(S_interior, V_interior, t_interior, S_terminal, V_terminal, t_terminal):
            with tf.GradientTape(persistent=True) as tape:
                tape.watch([S_interior, V_interior, t_interior])
                loss, L1, L3 = self.compute_loss(model, S_interior, V_interior, t_interior, S_terminal, V_terminal, t_terminal)
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            return loss, L1, L3

        for i in range(n_samples):
            S_interior, V_interior, t_interior, S_terminal, V_terminal, t_terminal = self.sampler.run(n_interior, n_terminal)
            S_interior = tf.convert_to_tensor(S_interior, dtype=tf.float32)
            V_interior = tf.convert_to_tensor(V_interior, dtype=tf.float32)
            t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
            S_terminal = tf.convert_to_tensor(S_terminal, dtype=tf.float32)
            V_terminal = tf.convert_to_tensor(V_terminal, dtype=tf.float32)
            t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)

            for _ in range(steps_per_sample):
                loss, L1, L3 = train_step(S_interior, V_interior, t_interior, S_terminal, V_terminal, t_terminal)

            self.loss_vec.append(loss.numpy())
            self.L1_vec.append(L1.numpy())
            self.L3_vec.append(L3.numpy())

            print(f"Iteration {i}: Loss: {loss.numpy()}; L1: {L1.numpy()}; L3: {L3.numpy()}")

            # if loss.numpy() < 0.1:
            #     break

        self.plot_option_value(model)


        if new_training:
            self.model.save_weights(saved_name)

            with open(pickle_dir + "_lossvec.pickle", 'wb') as f:
                pickle.dump(self.loss_vec, f)
            with open(pickle_dir + "_l1vec.pickle", 'wb') as f:
                pickle.dump(self.L1_vec, f)

            with open(pickle_dir + "_l3vec.pickle", 'wb') as f:
                pickle.dump(self.L3_vec, f)

    @tf.function
    def compute_loss(self, model, S_interior, V_interior, t_interior, S_terminal, V_terminal, t_terminal):
        S_interior = tf.convert_to_tensor(S_interior, dtype=tf.float32)
        V_interior = tf.convert_to_tensor(V_interior, dtype=tf.float32)
        t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
        S_terminal = tf.convert_to_tensor(S_terminal, dtype=tf.float32)
        V_terminal = tf.convert_to_tensor(V_terminal, dtype=tf.float32)
        t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)

        with tf.GradientTape(persistent=True) as tape:
            tape.watch([S_interior, V_interior, t_interior])
            V = model(tf.concat([S_interior, V_interior], axis=1), t_interior)
            V_s = tape.gradient(V, S_interior)
            V_v = tape.gradient(V, V_interior)

        V_ss = tape.gradient(V_s, S_interior)
        V_vv = tape.gradient(V_v, V_interior)
        V_sv = tape.gradient(V_s, V_interior)
        V_t = tape.gradient(V, t_interior)


        sec_ord = 0.5 * (S_interior**2 * V_interior * V_ss + self.xi**2 * V_interior * V_vv + 2 * self.rho * self.xi * S_interior * V_interior * V_sv)
        first_ord = (self.ir - self.dividend_vec) * S_interior * V_s + self.kappa * (self.theta - V_interior) * V_v
        diff_V = V_t + sec_ord + first_ord - self.ir * V
        L1 = tf.reduce_mean(tf.square(diff_V))

        target_payoff = tf.convert_to_tensor(self.payoff_func(S_terminal), dtype=tf.float32)
        fitted_payoff = tf.reshape(model(tf.concat([S_terminal, V_terminal], axis=1), t_terminal), [-1])
        L3 = tf.reduce_mean(tf.square(fitted_payoff - target_payoff))

        return L1 + L3, L1, L3

    def plot_training_loss(self):
        plt.figure(figsize=(10, 6))
        plt.plot(self.loss_vec, label='Total Loss')
        plt.plot(self.L1_vec, label='L1 Loss (PDE Residual)')
        plt.plot(self.L3_vec, label='L3 Loss (Payoff)')
        plt.xlabel('Iteration')
        plt.ylabel('Loss')
        plt.title('Training Loss Over Time')
        plt.legend()
        plt.show()

    def plot_option_value(self, model):
        S_plot = np.linspace(self.domain.lower_bound, self.domain.upper_bound, 100).reshape(-1, 1)
        V_plot = np.linspace(0, 1, 100).reshape(-1, 1)
        plt.figure()
        plt.figure(figsize = (12,10))
        error = []
        total_error = []
        # time values at which to examine density
        valueTimes = [0, 1/3, 2/3, 1]
        for i, curr_t in enumerate(valueTimes):

            S = np.c_[S_plot, np.full(S_plot.shape, 0.15)]
           #S = np.c_[S_plot, V_plot]
            t_vals = np.full((S.shape[0], 1), curr_t)

            # specify subplot
            plt.subplot(2,2,i+1)

            if curr_t == 1:
                prices = [heston_call_price(S0, self.strike, 0.01, self.ir, self.dividend_vec[0], self.vol_vec[0], self.kappa, self.theta, self.xi, self.rho) for S0 in S_plot]
            else:
                prices = [heston_call_price(S0, self.strike, self.domain.T - curr_t, self.ir, self.dividend_vec[0], self.vol_vec[0], self.kappa, self.theta, self.xi, self.rho) for S0 in S_plot]
            prices = np.nan_to_num(prices).flatten()

            # simulate process at current t



            # compute normalized density at all x values to plot and current t value
            t_plot = np.full(S_plot.shape, curr_t, dtype=np.float32)
            value = model(S, t_vals).numpy().flatten()

            # plot histogram of simulated process values and overlay estimated density
            plt.plot(S_plot, prices, color = 'b', label='Analytical Solution', linewidth = 3, linestyle=':')
            plt.plot(S_plot, value, color = 'r', label='Network estimate')


            # subplot options

            plt.xlabel("Spot Price", fontsize=15, labelpad=10)
            plt.ylabel("Option Price", fontsize=15, labelpad=20)

            plt.xticks(fontsize=13)
            plt.yticks(fontsize=13)
            plt.grid(linestyle=':')

            if i == 0:
                plt.legend(loc='upper left', prop={'size': 16})
            # adjust space between subplots
            plt.subplots_adjust(wspace=0.3, hspace=0.4)

            error.append(np.abs(value - prices))

            total_error.append(np.sum(error[i]))
            print("Total Error at t = {}: {}".format(curr_t, total_error[i]))

        plt.show()

        plt.figure()
        plt.figure(figsize = (12,10))
        for i in range(len(valueTimes)):
            plt.subplot(2,2,i+1)
            plt.plot(S_plot, error[i])

            plt.xlabel("Spot Price")
            plt.ylabel("Absolute Error")
            plt.title("Absolute Error")
            #plt.text(0.5, 0.9, f'Total Error: {total_error[i]:.4f}', ha='center', va='center', transform=plt.gca().transAxes)
        plt.show()



In [None]:
import numpy as np
from scipy.stats import qmc

class SobolSamplerHeston:
    def __init__(self, domainNd, v0, kappa, theta, xi, rho):
        self.domain = domainNd
        self.v0 = v0  # Initial variance
        self.kappa = kappa
        self.theta = theta
        self.xi = xi
        self.rho = rho

    def run(self, n_interior, n_terminal):
        dim, T = self.domain.dim, self.domain.T
        lower_bound, upper_bound = self.domain.lower_bound, self.domain.upper_bound * 1.3

        # Sobol sequence sampler
        sampler_interior = qmc.Sobol(d=dim + 1)
        sampler_terminal = qmc.Sobol(d=dim + 1)

        # Generate Sobol sequence for interior and terminal times and asset prices
        sobol_interior = sampler_interior.random(n=n_interior)
        sobol_terminal = sampler_terminal.random(n=n_terminal)

        # Map Sobol sequence to the desired ranges
        t_interior = T * sobol_interior[:, 0].reshape(-1, 1)
        S_interior = lower_bound + (upper_bound - lower_bound) * sobol_interior[:, 1:]

        t_terminal = T * np.ones((n_terminal, 1))
        S_terminal = lower_bound + (upper_bound - lower_bound) * sobol_terminal[:, 1:]

        # Sample variances
        V_interior = self.sample_variance(n_interior)
        V_terminal = self.sample_variance(n_terminal)

        return S_interior, V_interior, t_interior, S_terminal, V_terminal, t_terminal

    def sample_variance(self, n_samples):
        # Sobol sequence for variances
        sampler_variance = qmc.Sobol(d=1)
        sobol_variance = sampler_variance.random(n=n_samples)

        # Sample variances according to the Heston model dynamics
        variances = self.theta + (self.v0 - self.theta) * np.exp(-self.kappa * sobol_variance[:, 0]) \
                    + self.xi * np.sqrt((1 - np.exp(-2 * self.kappa * sobol_variance[:, 0])) / (2 * self.kappa))
        return np.maximum(variances, 0).reshape(-1, 1)  # Ensure non-negative variances

In [None]:
import sys, os, time
DIR_LOC = os.getcwd()
sys.path.append(DIR_LOC+"/..")
import tensorflow as tf
import numpy as np
import pickle
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.ticker import LogFormatter
from matplotlib.ticker import FuncFormatter
import time



class Euro:
    def __init__(self, payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, strike, multi, sampler=None):
        self.payoff_func = payoff_func
        self.domain = domain
        self.dim = domain.dim
        self.vol_vec = vol_vec
        self.ir = ir
        self.dividend_vec = dividend_vec
        self.corr_mat = corr_mat
        self.strike = strike
        self.multi = multi
        self.cov_mat = np.outer(vol_vec, vol_vec) * corr_mat
        self.sampler = sampler if sampler is not None else SamplerNdV2(domain, multi)
        self.model = None

    def run(self, n_samples, steps_per_sample, n_layers=3, layer_width=50, n_interior=16,
            n_boundary=16, n_terminal=32, saved_name = None, new_training = False):

        if new_training:
            if not saved_name:
                pickle_dir = DIR_LOC+"/saved_models/{}_Euro".format(time.strftime("%Y%m%d"))
                saved_name = "{}_Euro.ckpt".format(time.strftime("%Y%m%d"))
            else:
                pickle_dir = DIR_LOC+"/saved_models/{}_{}".format(time.strftime("%Y%m%d"), saved_name)
                saved_name = saved_name + ".weights.h5"

            if not os.path.exists(pickle_dir):
                os.makedirs(pickle_dir)


        model = DGMNet(n_layers, layer_width, input_dim=self.dim)
        self.model = model

        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=1e-2, decay_steps=1000, decay_rate=0.9)
        optimizer = tf.keras.optimizers.Adam(learning_rate= 0.001)

        self.loss_vec, self.L1_vec, self.L2_vec, self.L3_vec = [], [], [], []

        @tf.function
        def train_step(S_interior, t_interior, S_terminal, t_terminal, S_boundary_zero, t_boundary_zero, S_boundary_infinity, t_boundary_infinity):
            with tf.GradientTape(persistent=True) as tape:
                tape.watch([S_interior, t_interior, S_terminal, t_terminal, S_boundary_zero, t_boundary_zero, S_boundary_infinity, t_boundary_infinity])
                loss, L1, L2, L3 = self.compute_loss(model, S_interior, t_interior, S_terminal, t_terminal, S_boundary_zero, t_boundary_zero, S_boundary_infinity, t_boundary_infinity)
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            return loss, L1, L2, L3

        cumulative_time = 0.0
        for i in range(n_samples):
        # Sample points including boundary conditions
            S_interior, t_interior, S_terminal, t_terminal, S_boundary_zero, t_boundary_zero, S_boundary_infinity, t_boundary_infinity = self.sampler.run(n_interior, n_terminal, n_boundary)
            S_interior = tf.convert_to_tensor(S_interior, dtype=tf.float32)
            t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
            S_boundary_zero = tf.convert_to_tensor(S_boundary_zero, dtype=tf.float32)
            t_boundary_zero = tf.convert_to_tensor(t_boundary_zero, dtype=tf.float32)
            S_boundary_infinity = tf.convert_to_tensor(S_boundary_infinity, dtype=tf.float32)
            t_boundary_infinity = tf.convert_to_tensor(t_boundary_infinity, dtype=tf.float32)
            S_terminal = tf.convert_to_tensor(S_terminal, dtype=tf.float32)
            t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)

            for _ in range(steps_per_sample):
                loss, L1, L2, L3 = train_step(S_interior, t_interior, S_terminal, t_terminal, S_boundary_zero, t_boundary_zero, S_boundary_infinity, t_boundary_infinity)

            self.loss_vec.append(loss.numpy())
            self.L1_vec.append(L1.numpy())
            self.L2_vec.append(L2.numpy())
            self.L3_vec.append(L3.numpy())




            if i % 10 == 0:
                print(f"Iteration {i}: Loss: {loss.numpy()}; L1: {L1.numpy()}; L2: {L2.numpy()}; L3: {L3.numpy()}")
            if loss.numpy() < 0.1:
                break


        if new_training:
            self.model.save_weights(saved_name)

            with open(pickle_dir + "_lossvec.pickle", 'wb') as f:
                pickle.dump(self.loss_vec, f)
            with open(pickle_dir + "_l1vec.pickle", 'wb') as f:
                pickle.dump(self.L1_vec, f)

            with open(pickle_dir + "_l3vec.pickle", 'wb') as f:
                pickle.dump(self.L3_vec, f)

        S03 = np.full((1, self.dim), 50, dtype=np.float32)
        S04 = np.full((1, self.dim), 55, dtype=np.float32)
        S05 = np.full((1, self.dim), 60, dtype=np.float32)
        S02 = np.full((1, self.dim), 45, dtype=np.float32)
        S01 = np.full((1, self.dim), 40, dtype=np.float32)
        t0 = np.array([[0]])  # Time to maturity
        value_at_S01 = model(S01, t0).numpy()
        value_at_S02 = model(S02, t0).numpy()
        value_at_S03 = model(S03, t0).numpy()
        value_at_S04 = model(S04, t0).numpy()
        value_at_S05 = model(S05, t0).numpy()
        print("Option Value at S0 = 40 for all assets:", value_at_S01)
        print("Option Value at S0 = 45 for all assets:", value_at_S02)
        print("Option Value at S0 = 50 for all assets:", value_at_S03)
        print("Option Value at S0 = 55 for all assets:", value_at_S04)
        print("Option Value at S0 = 60 for all assets:", value_at_S05)

        if self.dim == 1:
            self.plot_results(self.model)

        if self.dim == 2:
            self.checkExchange(model)

        # Custom formatter function
        def log_formatter(y, pos):
            exponent = int(np.log10(y))
            return f'$10^{{{exponent}}}$'

        # Create a range of epochs based on the length of self.loss_vec
        epochs = range(1, len(self.loss_vec) + 1)

        # Plot the loss values
        plt.plot(epochs, self.loss_vec)

        # Set y-axis to logarithmic scale
        plt.yscale('log')

        # Use a custom formatter for the y-axis
        formatter = FuncFormatter(log_formatter)
        plt.gca().yaxis.set_major_formatter(formatter)

        # Set labels and title
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Training Loss')

        # Display the plot
        plt.show()



    @tf.function
    def compute_loss(self, model, S_interior, t_interior, S_terminal, t_terminal, S_boundary_zero, t_boundary_zero, S_boundary_infinity, t_boundary_infinity):
        S_interior = tf.convert_to_tensor(S_interior, dtype=tf.float32)
        t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
        S_terminal = tf.convert_to_tensor(S_terminal, dtype=tf.float32)
        t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)

        with tf.GradientTape() as tape1:
            tape1.watch([S_interior, t_interior])
            with tf.GradientTape() as tape2:
                tape2.watch([S_interior, t_interior])
                V = model(S_interior, t_interior)
            V_s = tape2.gradient(V, S_interior)
        V_ss = tape1.batch_jacobian(V_s, S_interior)



        with tf.GradientTape() as tape3:
            tape3.watch([S_interior, t_interior])
            V = model(S_interior, t_interior)
        V_t = tape3.gradient(V, t_interior)

        sec_ord = 0
        for i in range(self.dim):
            for j in range(self.dim):
                S_coli = tf.reshape(S_interior[:, i], [-1, 1])
                S_colj = tf.reshape(S_interior[:, j], [-1, 1])
                V_ss_column = tf.reshape(V_ss[:, i, j], [-1, 1])
                sec_ord += self.vol_vec[i] * self.vol_vec[j] * self.corr_mat[i, j] * S_coli * S_colj * V_ss_column

        fir_ord = 0
        for i in range(self.dim):
            V_s_column = tf.reshape(V_s[:, i], [-1, 1])
            S_col1 = tf.reshape(S_interior[:, i], [-1, 1])
            fir_ord += (self.ir - self.dividend_vec[i]) * S_col1 * V_s_column



        diff_V = V_t + 0.5 * sec_ord + fir_ord - self.ir * V

        L1 = tf.reduce_mean(tf.square(diff_V))

        # Boundary Condition 1: C(t, 0) = 0
        V_boundary_zero = model(S_boundary_zero, t_boundary_zero)
        L_boundary_zero = tf.reduce_mean(tf.square(V_boundary_zero))

        # # Boundary Condition 2: C(t, S) ∼ S as S → ∞
        # V_boundary_infinity = model(S_boundary_infinity, t_boundary_infinity)
        # L_boundary_infinity = tf.reduce_mean(tf.square(V_boundary_infinity - S_boundary_infinity))


        L2 = L_boundary_zero
        # Payoff evaluation at terminal time
        target_payoff = tf.convert_to_tensor(self.payoff_func(S_terminal), dtype=tf.float32)
        fitted_payoff = tf.reshape(model(S_terminal, t_terminal), [-1])
        L3 = tf.reduce_mean(tf.square(fitted_payoff - target_payoff))


        # Combining all components of the loss function
        total_loss = L1 + L2 + L3


        return total_loss, L1, L2, L3

    def plot_results(self, model):
        S_plot = np.linspace(self.domain.lower_bound, self.domain.upper_bound, 100).reshape(-1, 1)
        plt.figure()
        plt.figure(figsize = (12,10))
        error = []
        total_error = []
        # time values at which to examine density
        valueTimes = [0, 1/3, 2*1/3, 1]
        for i, curr_t in enumerate(valueTimes):

            # specify subplot
            plt.subplot(2,2,i+1)

            # simulate process at current t
            optionValue, _ =  BSM(S_plot, self.strike, self.ir, self.vol_vec[0], curr_t, self.dividend_vec[0])

            # compute normalized density at all x values to plot and current t value
            t_plot = np.full(S_plot.shape, curr_t, dtype=np.float32)
            value = model(S_plot, t_plot).numpy()

            # plot histogram of simulated process values and overlay estimated density
            plt.plot(S_plot, optionValue, color = 'b', label='Analytical Solution', linewidth = 3, linestyle=':')
            plt.plot(S_plot, value, color = 'r', label='Network Estimate')


            # subplot options

            plt.xlabel("Spot Price", fontsize=15, labelpad=10)
            plt.ylabel("Option Price", fontsize=15, labelpad=20)
            #plt.title("t = " + str(curr_t), fontsize=18, y=1.03)
            plt.xticks(fontsize=13)
            plt.yticks(fontsize=13)
            plt.grid(linestyle=':')

            if i == 0:
                plt.legend(loc='upper left', prop={'size': 16})
            # adjust space between subplots
            plt.subplots_adjust(wspace=0.3, hspace=0.4)

            error.append(np.abs(value - optionValue))

            total_error.append(np.sum(error[i]))
            print("Total Error at t = {}: {}".format(curr_t, total_error[i]))

        plt.show()


        plt.figure()
        plt.figure(figsize = (12,10))
        for i in range(len(valueTimes)):
            plt.subplot(2,2,i+1)
            plt.plot(S_plot, error[i])
            plt.ylim(0, 0.5)
            plt.xlabel("Spot Price")
            plt.ylabel("Absolute Error")
            plt.title("Absolute Error")
            #plt.text(0.5, 0.9, f'Total Error: {total_error[i]:.4f}', ha='center', va='center', transform=plt.gca().transAxes)
        plt.show()

        print("Total Error: {}".format(total_error))


    def checkExchange(self, model):
        S1_range = np.linspace(self.domain.lower_bound, self.domain.upper_bound, 50)
        S2_range = np.linspace(20, 80, 50)

        S1, S2 = np.meshgrid(S1_range, S2_range)
        S1_flat = S1.flatten()
        S2_flat = S2.flatten()
        S_combined = np.vstack((S1_flat, S2_flat)).T
        t_value = 0
        t_flat = np.full(S1_flat.shape, 0, dtype=np.float32)

        optionValue = margrabe(S1_flat, S2_flat, self.vol_vec[0], self.vol_vec[1], self.corr_mat[0, 1], self.dividend_vec[0], self.dividend_vec[1], t_value)
        value = model(S_combined, t_flat.reshape(-1, 1)).numpy()

        optionValue = optionValue.reshape(S1.shape)
        value = value.reshape(S1.shape)

        fig = plt.figure(figsize=(14, 10))
        ax = fig.add_subplot(111, projection='3d')
        surf = ax.plot_surface(S1, S2, value, cmap='viridis', edgecolor='none', alpha=0.7)
        ax.plot_wireframe(S1, S2, optionValue, color='r', linestyle=':', linewidth=1)

        ax.view_init(elev=30, azim=220)
        ax.set_xlabel('S1')
        ax.set_ylabel('S2')
        ax.set_zlabel('Option Value')

        fig.colorbar(surf, ax=ax, shrink=0.5, aspect=5, label='Option Value')
        plt.tight_layout()
        plt.show()

        error = np.abs(value - np.nan_to_num(optionValue))
        total_error = np.sum(error)

        print("Total Error:", total_error)

        fig_error = plt.figure(figsize=(14, 10))
        error_surface = np.abs(value - np.nan_to_num(optionValue)).reshape(S1.shape)

        ax_error = fig_error.add_subplot(111, projection='3d')  # Create subplots for error surfaces
        surf_error = ax_error.plot_surface(S1, S2, error_surface, cmap='coolwarm', edgecolor='none', alpha=0.7)

        ax_error.view_init(elev=30, azim=220)
        ax_error.set_xlabel('S1')
        ax_error.set_ylabel('S2')
        ax_error.set_zlabel('Error Value')
          # Set title for each subplot
        fig_error.colorbar(surf_error, ax=ax_error, shrink=0.5, aspect=5, label='Error')

        total_error = np.sum(error_surface)
        print("Total Error:", total_error)
        #ax_error.text2D(0.6, 1, f'Total Error: {total_error:.4f}', ha='center', va='center', transform=ax_error.transAxes, fontsize = 14)

        plt.tight_layout()
        plt.show()





In [None]:
import numpy as np
import tensorflow as tf

def test_euro10d():
    dim = 10
    T = 1.0
    domain = DomainNd(np.tile(np.array([35, 70], dtype=np.float32), (dim, 1)), T)
    vol_vec, ir, dividend_vec, strike = 0.15*np.ones(dim, dtype=np.float32), 0.04, 0.01*np.ones(dim, dtype=np.float32), 50
    multi = 1
    # payoff_func = lambda x: tf.nn.relu(tf.math.reduce_sum(x, axis=1) - strike)
    payoff_func = lambda x: tf.nn.relu(tf.pow(tf.math.reduce_prod(x, axis=1), 1/dim) - strike)
    #payoff_func = lambda x: tf.nn.relu(tf.reduce_max(x, axis=1) - strike)

    corr_mat = 0.25 * np.ones((dim, dim), dtype=np.float32)
    np.fill_diagonal(corr_mat, 1.0)
    solver = Euro(payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, strike, multi, sampler = SobolSamplerNd(domain))
    solver.run(n_samples=1000, steps_per_sample=10, n_interior = 2048, n_boundary = 128, n_terminal = 128)


def testExchange():
    dim = 2
    T = 1
    domain = DomainNd(np.array([[0, 100], [20, 100]], dtype=np.float32), T)
    vol_vec, ir, dividend_vec, strike = [0.15, 0.1], 0.03, [0.03, 0.02], 50
    payoff_func = lambda x: tf.nn.relu(x[:,0] - x[:,1])
    multi = 1.3
    corr_mat = 0.25 * np.ones((dim, dim), dtype=np.float32)
    np.fill_diagonal(corr_mat, 1)
    solver = Euro(payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, strike, multi, sampler = SobolSamplerNd(domain))
    solver.run(n_samples=500, steps_per_sample=10, n_interior=2048, n_boundary = 128, n_terminal=128, saved_name = "Exchange", new_training= False)
    #solver.trainAgain(saved_name = "Exchange", n_samples=500, steps_per_sample=10, n_interior=2000, n_terminal=100)


def test_American():
    dim = 1
    T = 1
    multi = 1
    domain = DomainNd(np.array([[0, 100]], dtype=np.float32), T)
    vol_vec, ir, dividend_vec, strike = 0.15*np.ones(dim, dtype=np.float32), 0.04, 0.01*np.ones(dim, dtype=np.float32), 50
    payoff_func = lambda x: tf.nn.relu(strike - tf.math.reduce_sum(x, axis=1))
    #payoff_func = lambda x: tf.nn.relu(tf.math.reduce_sum(x, axis=1) - strike)
    corr_mat = 0.25 * np.ones((dim, dim), dtype=np.float32)
    np.fill_diagonal(corr_mat, 1)
    solver = American(payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, strike, sampler = SobolSamplerNd(domain))
    solver.run(n_samples=500, steps_per_sample=10, n_interior=2048, n_boundary = 100 n_terminal=512)

def test_heston1():
    dim = 1
    T = 1
    domain = DomainNd(np.array([[0, 100]], dtype=np.float32), T)
    vol_vec, ir, dividend_vec, strike = 0.15*np.ones(dim, dtype=np.float32), 0.04, 0.01*np.ones(dim, dtype=np.float32), 50
    kappa, theta, xi, rho = 0.5, 0.1, 0.2, 0.3
    payoff_func = lambda x: tf.nn.relu(tf.math.reduce_sum(x, axis=1) - strike)
    corr_mat = 0.25 * np.ones((dim, dim), dtype=np.float32)
    np.fill_diagonal(corr_mat, 1)

    solver = EuroHeston(payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, strike, kappa, theta, xi, rho, sampler = SamplerHeston(domain, vol_vec[0], kappa, theta, xi, rho))
    solver.run(n_samples=500, steps_per_sample=10, n_interior=2000, n_terminal=250, new_training = False)


def test_CEV1():
    dim = 1
    T = 1
    multi = 1.3
    domain, multi = DomainNd(np.tile(np.array([0, 100], dtype=np.float32), (dim, 1)), T), 1.3
    vol_vec, ir, dividend_vec, strike, beta = 0.15*np.ones(dim, dtype=np.float32), 0.04, 0.01*np.ones(dim, dtype=np.float32), 50, 1.0
    payoff_func = lambda x: tf.nn.relu(tf.math.reduce_sum(x, axis=1) - strike)
    #payoff_func = lambda x: tf.nn.relu(tf.pow(tf.math.reduce_prod(x, axis=1), 1/dim) - strike)
    corr_mat = 0.25 * np.ones((dim, dim), dtype=np.float32)
    np.fill_diagonal(corr_mat, 1)
    solver = CEV(payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, beta, strike, sampler= SamplerNd(domain, multi))
    solver.run(n_samples=5000, steps_per_sample=10, n_interior=2000, n_terminal=100)




def test_euro1d():
    dim = 1
    T = 1

    domain, multi = DomainNd(np.array([[0, 100]], dtype=np.float32), T), 1.3
    vol_vec, ir, dividend_vec, strike = 0.1*np.ones(dim, dtype=np.float32), 0.03, 0.02*np.ones(dim, dtype=np.float32), 60
    payoff_func = lambda x: tf.nn.relu(tf.math.reduce_sum(x, axis=1) - strike)
    corr_mat = 0.25 * np.ones((dim, dim), dtype=np.float32)
    np.fill_diagonal(corr_mat, 1)
    solver = Euro(payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, strike, multi, sampler = SamplerNdV2(domain, multi))
    solver.run(n_samples=30, steps_per_sample=10, n_interior=2000, n_boundary = 100, n_terminal=100, saved_name = 'Euro1d', new_training = False)

def test_euro1dAgain():
    dim = 1
    T = 1
    multi = 1.3
    domain, multi = DomainNd(np.array([[0, 100]], dtype=np.float32), T), 1.3
    vol_vec, ir, dividend_vec, strike = 0.15*np.ones(dim, dtype=np.float32), 0.04, 0.01*np.ones(dim, dtype=np.float32), 50
    payoff_func = lambda x: tf.nn.relu(tf.math.reduce_sum(x, axis=1) - strike)
    corr_mat = 0.25 * np.ones((dim, dim), dtype=np.float32)
    np.fill_diagonal(corr_mat, 1)
    solver = Euro(payoff_func, domain, vol_vec, ir, dividend_vec, corr_mat, strike, sampler = SamplerNd(domain, multi))
    solver.trainAgain(saved_name = "Euro1d", n_samples=5000, steps_per_sample=10, n_interior=2000, n_terminal=100)






test_American()


TypeError: SobolSamplerNd.run() missing 1 required positional argument: 'n_boundary'

In [None]:
!find . -name "*.h5"

./Euro1d.weights.h5


In [None]:
import numpy as np
import scipy.integrate as integrate
import matplotlib.pyplot as plt

# Characteristic function for the Heston model
def heston_cf(u, S0, K, T, r, q, v0, kappa, theta, sigma, rho):
    i = 1j
    d = np.sqrt((rho * sigma * i * u - kappa)**2 + (u**2 + i * u) * sigma**2)
    g = (kappa - rho * sigma * i * u - d) / (kappa - rho * sigma * i * u + d)

    C = r * i * u * T + (kappa * theta) / (sigma**2) * ((kappa - rho * sigma * i * u - d) * T - 2 * np.log((1 - g * np.exp(-d * T)) / (1 - g)))
    D = (kappa - rho * sigma * i * u - d) / (sigma**2) * ((1 - np.exp(-d * T)) / (1 - g * np.exp(-d * T)))

    return np.exp(C + D * v0 + i * u * np.log(S0 * np.exp(-q * T)))

# Integrand functions for P1 and P2
def integrand_P1(u, S0, K, T, r, q, v0, kappa, theta, sigma, rho):
    return np.real(np.exp(-1j * u * np.log(K)) * heston_cf(u - 1j, S0, K, T, r, q, v0, kappa, theta, sigma, rho) / (1j * u * heston_cf(-1j, S0, K, T, r, q, v0, kappa, theta, sigma, rho)))

def integrand_P2(u, S0, K, T, r, q, v0, kappa, theta, sigma, rho):
    return np.real(np.exp(-1j * u * np.log(K)) * heston_cf(u, S0, K, T, r, q, v0, kappa, theta, sigma, rho) / (1j * u))

# Numerical integration for P1 and P2
def P1(S0, K, T, r, q, v0, kappa, theta, sigma, rho):
    return 0.5 + (1 / np.pi) * integrate.quad(lambda u: integrand_P1(u, S0, K, T, r, q, v0, kappa, theta, sigma, rho), 0, np.inf)[0]

def P2(S0, K, T, r, q, v0, kappa, theta, sigma, rho):
    return 0.5 + (1 / np.pi) * integrate.quad(lambda u: integrand_P2(u, S0, K, T, r, q, v0, kappa, theta, sigma, rho), 0, np.inf)[0]

# Heston European call price
def heston_call_price(S0, K, T, r, q, v0, kappa, theta, sigma, rho):
    return S0 * np.exp(-q * T) * P1(S0, K, T, r, q, v0, kappa, theta, sigma, rho) - K * np.exp(-r * T) * P2(S0, K, T, r, q, v0, kappa, theta, sigma, rho)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import ncx2

# Call price under CEV Model

def CEVclosed(S0, K, r, q, beta, sigma, t):
    z = 2 + 1/(1-beta)
    kappa = 2*r/(sigma**2*(1-beta)*(np.exp(2*r*(1-beta)*t)-1))
    x = kappa*S0**(2*(1-beta))*np.exp(2*r*(1-beta)*t)
    y = kappa*K**(2*(1-beta))
    return S0*np.exp(-q*t)*(1-ncx2.cdf(y,z,x))-K*np.exp(-r*t)*ncx2.cdf(x,z-2,y)


In [None]:
import numpy as np

def AmericanBinom(S0, K, T, r, sig, delta):
    N = 2500  # number of periods or number of time steps
    payoff = "put"  # payoff

    dT = float(T) / N  # Delta t
    u = np.exp(sig * np.sqrt(dT))  # up factor
    d = 1.0 / u  # down factor

    V = np.zeros(N + 1)  # initialize the price vector
    S_T = np.array([S0 * u**j * d**(N - j) for j in range(N + 1)])  # price S_T at time T

    a = np.exp((r - delta) * dT)  # adjusted risk-free compound return
    p = (a - d) / (u - d)  # risk-neutral up probability
    q = 1.0 - p  # risk-neutral down probability

    if payoff == "call":
        V[:] = np.maximum(S_T - K, 0.0)
    elif payoff == "put":
        V[:] = np.maximum(K - S_T, 0.0)

    for i in range(N - 1, -1, -1):
        V[:-1] = np.exp(-r * dT) * (p * V[1:] + q * V[:-1])  # the price vector is overwritten at each step
        S_T = S_T * u  # it is a tricky way to obtain the price at the previous time step
        if payoff == "call":
            V = np.maximum(V, S_T - K)
        elif payoff == "put":
            V = np.maximum(V, K - S_T)
    return V[0]


In [None]:
import numpy as np

def monte_carlo_basket_option(S0, K, T, r, sigma, q, correlation_matrix, n_simulations, option_type='call'):
    n_assets = len(S0)

    # Cholesky decomposition for correlation
    L = np.linalg.cholesky(correlation_matrix)

    # Simulate asset prices at expiration
    prices = np.zeros((n_simulations, n_assets))
    for i in range(n_simulations):
        z = np.random.normal(size=n_assets)
        correlated_randoms = L @ z

        for j in range(n_assets):
            # Simulate asset price at expiration using geometric Brownian motion
            prices[i, j] = S0[j] * np.exp((r - q - 0.5 * sigma**2) * T + sigma * correlated_randoms[j] * np.sqrt(T))

    # Calculate the geometric mean of the final prices
    geometric_means = np.prod(prices, axis=1) ** (1/n_assets)

    # Calculate payoffs
    if option_type == 'call':
        payoffs = np.maximum(geometric_means - K, 0)
    elif option_type == 'put':
        payoffs = np.maximum(K - geometric_means, 0)

    # Discount payoffs back to present value
    discounted_payoffs = np.exp(-r * T) * payoffs

    # Calculate option price and standard error
    option_price = np.mean(discounted_payoffs)
    standard_error = np.std(discounted_payoffs) / np.sqrt(n_simulations)

    return option_price, standard_error

# Parameters for the 10 asset case
n_assets = 10
S0 = [60] * n_assets  # Initial prices of the assets
K = 50                 # Strike price
T = 1                   # Time to maturity in years
r = 0.04               # Risk-free interest rate
q = 0.01               # Dividend yield
sigma = 0.15           # Volatility of the assets
rho = 0.25             # Correlation

# Constructing the correlation matrix
correlation_matrix = np.full((n_assets, n_assets), rho)
np.fill_diagonal(correlation_matrix, 1)

n_simulations = 100000  # Number of simulations

# Pricing the basket option
price, se = monte_carlo_basket_option(S0, K, T, r, sigma, q, correlation_matrix, n_simulations, option_type='call')
print(f'Basket Option Price: {price}')
print(f'Standard Error: {se}')

Basket Option Price: 10.915120641804958
Standard Error: 0.015815959670967
