In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import math
import pandas as pd
import tensorflow_probability as tfp
from scipy.interpolate import griddata
from scipy import stats
import timeit

In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

In [None]:
class Sampler:
    def __init__(self, dim, coords, func, name = None):
        self.dim = dim
        self.coords = coords
        self.func = func
        self.name = name
    def sample(self, N):
        x = self.coords[0:1,:] + (self.coords[1:2,:]-self.coords[0:1,:])*np.random.rand(N, self.dim)
        y = self.func(x)
        return x, y

class Black_Scholes:

    # Disable eager execution
    tf.compat.v1.disable_eager_execution()

    def __init__(self, layers, operator, ic, bc, res, r, sigma, delta, K, model, stiff_ratio):

        self.operator = operator
        self.ic = ic
        self.bc = bc
        self.res = res

        self.model = model
        self.stiff_ratio = stiff_ratio

        # Constant
        self.sigma = tf.constant(sigma, dtype=tf.float32)
        self.r = tf.constant(r, dtype = tf.float32)
        self.delta = tf.constant(delta, dtype = tf.float32)
        self.K = tf.constant(K, dtype = tf.float32)

        # Decay rate = 0.9
        self.rate = 0.9
        self.adaptive_constant_ic_val = np.array(1.0) # konstanta ic
        self.adaptive_constant_bc_val = np.array(1.0) # konstanta bc

        # Initial weight + bias
        self.layers = layers
        self.weight, self.bias = self.initialize_NN(layers)

        if model in ['M3', 'M4']:
            self.encoder_weight_1 = self.xavier_init([2, layers[1]])
            self.encoder_bias_1 = self.xavier_init([1,layers[1]])

            self.encoder_weight_2 = self.xavier_init([2, layers[1]])
            self.encoder_bias_2 = self.xavier_init([1, layers[1]])

        self.sess = tf.compat.v1.Session(config = tf.compat.v1.ConfigProto(log_device_placement = True))

        # Define Placeholders and Computational Graph
        self.t_v_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))
        self.s_v_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))

        self.t_ic_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))
        self.s_ic_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))
        self.v_ic_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))

        self.t_bc1_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))
        self.s_bc1_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))
        self.v_bc1_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))

        self.t_r_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))
        self.s_r_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))
        self.res_tf = tf.compat.v1.placeholder(tf.float32, shape = (None,1))

        self.adaptive_constant_ic_tf = tf.compat.v1.placeholder(tf.float32, shape=self.adaptive_constant_ic_val.shape)
        self.adaptive_constant_bc_tf = tf.compat.v1.placeholder(tf.float32, shape=self.adaptive_constant_bc_val.shape)

        # Evaluate Predictions
        self.v_ic_pred = self.net_u(self.t_ic_tf, self.s_ic_tf)
        self.v_bc1_pred = self.net_u(self.t_bc1_tf, self.s_bc1_tf)

        self.v_pred = self.net_u(self.t_v_tf, self.s_v_tf)
        self.res_pred = self.net_r(self.t_r_tf, self.s_r_tf)

        # Initial and Boundary Loss
        self.loss_ic = tf.reduce_mean(tf.square(self.v_ic_tf - self.v_ic_pred))
        self.loss_bc1 = tf.reduce_mean(tf.square(self.v_bc1_pred - self.v_bc1_tf))

        self.loss_bcs = self.adaptive_constant_bc_tf * (self.loss_bc1)
        self.loss_ics = self.adaptive_constant_ic_tf * (self.loss_ic)
        self.loss_v = self.loss_ics + self.loss_bcs

        self.loss_res = tf.reduce_mean(tf.square(self.res_pred))
        # - self.res_tf
        # Total Loss
        self.loss = self.loss_res + self.loss_v

        # Define optimizer with learning rate schedule
        self.global_step = tf.Variable(0, trainable=False)
        initial_learning_rate = 1e-3
        self.learning_rate = tf.compat.v1.train.exponential_decay(initial_learning_rate, self.global_step,
                                                        1000, 0.9, staircase=False)

        # Passing global_step to minimize() will increment it at each step
        self.train_op = tf.compat.v1.train.AdamOptimizer(self.learning_rate).minimize(self.loss, global_step = self.global_step)

        # Logger
        self.loss_ic_log = []
        self.loss_bc_log = []
        self.loss_r_log = []
        self.saver = tf.compat.v1.train.Saver()

        # Generate dictionary for gradients storage
        self.dict_gradients_res_layers = self.generate_grad_dict(self.layers)
        self.dict_gradients_ic_layers = self.generate_grad_dict(self.layers)
        self.dict_gradients_bc_layers = self.generate_grad_dict(self.layers)

        # Gradients Storage
        self.grad_res = []
        self.grad_ic = []
        self.grad_bc = []

        for i in range(len(self.layers) - 1):
            self.grad_res.append(tf.gradients(self.loss_res, self.weight[i])[0])
            self.grad_bc.append(tf.gradients(self.loss_bcs, self.weight[i])[0])
            self.grad_ic.append(tf.gradients(self.loss_ics, self.weight[i])[0])

        # Store Adaptive Constant
        self.adaptive_constant_ic_log = []
        self.adaptive_constant_bc_log = []

        # Compute the adaptive constant
        self.adaptive_constant_ic_list = []
        self.adaptive_constant_bc_list = []

        self.max_grad_res_list = []
        self.mean_grad_bc_list = []
        self.mean_grad_ic_list = []

        for i in range(len(self.layers) - 1):
            self.max_grad_res_list.append(tf.reduce_max(tf.abs(self.grad_res[i])))
            self.mean_grad_bc_list.append(tf.reduce_mean(tf.abs(self.grad_bc[i])))
            self.mean_grad_ic_list.append(tf.reduce_mean(tf.abs(self.grad_ic[i])))

        self.max_grad_res = tf.reduce_max(tf.stack(self.max_grad_res_list))
        self.mean_grad_bc = tf.reduce_mean(tf.stack(self.mean_grad_bc_list))
        self.mean_grad_ic = tf.reduce_mean(tf.stack(self.mean_grad_ic_list))

        self.adaptive_constant_bc = self.max_grad_res / self.mean_grad_bc
        self.adaptive_constant_ic = self.max_grad_res / self.mean_grad_ic

        # Stiff Ratio
        if self.stiff_ratio:
            self.Hessian, self.Hessian_ic, self.Hessian_bc, self.Hessian_res = self.get_H_op()
            self.eigenvalue, _ = tf.linalg.eigh(self.Hessian)
            self.eigenvalue_ic, _ = tf.linalg.eigh(self.Hessian_ic)
            self.eigenvalue_bc, _ = tf.linalg.eigh(self.Hessian_bc)
            self.eigenvalue_res, _ = tf.linalg.eigh(self.Hessian_res)

            self.eigenvalue_log = []
            self.eigenvalue_ic_log = []
            self.eigenvalue_bc_log = []
            self.eigenvalue_res_log = []

        # Initialize tensorflow variables
        init = tf.compat.v1.global_variables_initializer()
        self.sess.run(init)

    # Create dictionary to store gradients
    def generate_grad_dict(self, layers):
        num = len(layers) - 1
        grad_dict = {}
        for i in range(num):
            grad_dict['layer_{}'.format(i + 1)] = []
        return grad_dict

    # Save gradients
    def save_gradients(self, tf_dict):
        num_layers = len(self.layers)
        for i in range(num_layers - 1):
            grad_ic_value, grad_bc_value, grad_res_value = self.sess.run([self.grad_ic[i], self.grad_bc[i], self.grad_res[i]], feed_dict = tf_dict)

            self.dict_gradients_ic_layers['layer_' + str(i + 1)].append(grad_ic_value.flatten())
            self.dict_gradients_bc_layers['layer_' + str(i + 1)].append(grad_bc_value.flatten())
            self.dict_gradients_res_layers['layer_' + str(i + 1)].append(grad_res_value.flatten())
        return None

    # Compute Hessian
    def flatten(self, vectors):
        return tf.concat([tf.reshape(v, [-1]) for v in vectors], axis = 0)

    def get_Hv(self, v):
        loss_gradients = self.flatten(tf.gradients(self.loss, self.weight))
        vprod = tf.math.multiply(loss_gradients, tf.stop_gradient(v))
        Hv_op = self.flatten(tf.gradients(vprod, self.weight))
        return Hv_op

    def get_Hv_ic(self, v):
        loss_gradients = self.flatten(tf.gradients(self.loss_ics, self.weight))
        vprod = tf.math.multiply(loss_gradients, tf.stop_gradient(v))
        Hv_op = self.flatten(tf.gradients(vprod, self.weight))
        return Hv_op

    def get_Hv_bc(self, v):
        loss_gradients = self.flatten(tf.gradients(self.loss_bcs, self.weight))
        vprod = tf.math.multiply(loss_gradients, tf.stop_gradient(v))
        Hv_op = self.flatten(tf.gradients(vprod, self.weight))
        return Hv_op

    def get_Hv_res(self, v):
        loss_gradients = self.flatten(tf.gradients(self.loss_res, self.weight))
        vprod = tf.math.multiply(loss_gradients, tf.stop_gradient(v))
        Hv_op = self.flatten(tf.gradients(vprod, self.weight))
        return Hv_op

    def get_H_op(self):
        self.P = self.flatten(self.weight).get_shape().as_list()[0]
        H = tf.map_fn(self.get_Hv, tf.eye(self.P, self.P), dtype = 'float32')
        H_ic = tf.map_fn(self.get_Hv_ic, tf.eye(self.P, self.P), dtype = 'float32')
        H_bc = tf.map_fn(self.get_Hv_bc, tf.eye(self.P, self.P), dtype = 'float32')
        H_res = tf.map_fn(self.get_Hv_res, tf.eye(self.P, self.P), dtype = 'float32')
        return H, H_ic, H_bc, H_res

    # Xavier initialization
    def xavier_init(self, size):
        in_dim = size[0]
        out_dim = size[1]
        xavier_stddev = 1. / np.sqrt((in_dim + out_dim) / 2.)
        return tf.Variable(tf.compat.v1.random_normal([in_dim, out_dim], dtype = tf.float32) * xavier_stddev, dtype=tf.float32)

    # Initialize network weights and biases using Xavier initialization
    def initialize_NN(self, layers):
        weight = []
        bias = []
        num_layers = len(layers)
        for l in range(0, num_layers - 1):
            W = self.xavier_init(size = [layers[l], layers[l+1]])
            b = tf.Variable(tf.zeros([1, layers[l+1]], dtype = tf.float32), dtype = tf.float32)
            weight.append(W)
            bias.append(b)
        return weight, bias

    # Evaluate the forward pass
    def forward_pass(self, H):
        if self.model in ['M1', 'M2']:
            num_layers = len(self.layers)
            for l in range(0, num_layers - 2):
                W = self.weight[l]
                b = self.bias[l]
                H = tf.tanh(tf.add(tf.matmul(H, W), b))
            W = self.weight[-1]
            b = self.bias[-1]
            H = tf.add(tf.matmul(H, W), b)
            return H

        if self.model in ['M3', 'M4']:
            num_layers = len(self.layers)
            encoder_1 = tf.tanh(tf.add(tf.matmul(H, self.encoder_weight_1), self.encoder_bias_1))
            encoder_2 = tf.tanh(tf.add(tf.matmul(H, self.encoder_weight_2), self.encoder_bias_2))

            for l in range(0, num_layers - 2):
                W = self.weight[l]
                b = self.bias[l]
                H = tf.math.multiply(tf.tanh(tf.add(tf.matmul(H, W), b)), encoder_1) + \
                    tf.math.multiply(1 - tf.tanh(tf.add(tf.matmul(H, W), b)), encoder_2)

            W = self.weight[-1]
            b = self.bias[-1]
            H = tf.add(tf.matmul(H, W), b)
            return H

    # Forward pass for u
    # This code is for initial and boundary condition
    def net_u(self, t, s):
        v = self.forward_pass(tf.concat([t, s], 1))
        return v

    # Forward pass for residual
    def net_r(self, t, s):
        v = self.net_u(t, s)
        residual = self.operator(v, t, s, self.r, self.sigma, self.delta, self.K)
        return residual

    def fetch_minibatch(self, sampler, N):
        X, Y = sampler.sample(N)
        return X, Y

    # Trains the model by minimizing the MSE loss
    def train(self, nIter = 10000):
        start_time = timeit.default_timer()

        # Fetch boundary mini-batches
        s_ic_batch, v_ic_batch = self.fetch_minibatch(self.ic, 250)
        s_bc1_batch, v_bc1_batch = self.fetch_minibatch(self.bc, 250)

        # Fetch residual mini-batch
        s_res_batch, f_res_batch = self.fetch_minibatch(self.res, 62500)

        # Define a dictionary for associating placeholder with data
        tf_dict = {self.t_ic_tf: s_ic_batch[:, 0:1], self.s_ic_tf: s_ic_batch[:, 1:2],
                    self.v_ic_tf: v_ic_batch,
                    self.t_bc1_tf: s_bc1_batch[:, 0:1], self.s_bc1_tf: s_bc1_batch[:, 1:2],
                    self.v_bc1_tf: v_bc1_batch,
                    self.t_r_tf: s_res_batch[:, 0:1], self.s_r_tf: s_res_batch[:, 1:2],
                    self.res_tf: f_res_batch,
                    self.adaptive_constant_ic_tf: self.adaptive_constant_ic_val,
                    self.adaptive_constant_bc_tf: self.adaptive_constant_ic_val}

        for it in range(nIter):

            # Run the tensorflow session to minimize losss
            self.sess.run(self.train_op, tf_dict)

            # Print
            if it % 10 == 0:
                elapsed = timeit.default_timer() - start_time
                loss_value = self.sess.run(self.loss, tf_dict)
                loss_ic_value, loss_bc_value, loss_r_value = self.sess.run([self.loss_ics, self.loss_bcs, self.loss_res], tf_dict)

                if self.model in ['M2', 'M4']:
                    # Compute the adaptive constant
                    adaptive_constant_ic_val, adaptive_constant_bc_val = self.sess.run([self.adaptive_constant_ic, self.adaptive_constant_bc], tf_dict)

                    self.adaptive_constant_ic_val = adaptive_constant_ic_val * (1.0 - self.rate) + self.rate * self.adaptive_constant_ic_val
                    self.adaptive_constant_bc_val = adaptive_constant_bc_val * (1.0 - self.rate) + self.rate * self.adaptive_constant_bc_val

                # Store loss and adaptive weights
                self.loss_ic_log.append(loss_ic_value)
                self.loss_bc_log.append(loss_bc_value)
                self.loss_r_log.append(loss_r_value)

                self.adaptive_constant_ic_log.append(self.adaptive_constant_ic_val)
                self.adaptive_constant_bc_log.append(self.adaptive_constant_bc_val)

                print('It: %d, Loss: %.3e, Loss_u0: %.3e, Loss_ub: %.3e, Loss_r: %.3e, Time: %.2f' %
                      (it, loss_value, loss_ic_value, loss_bc_value, loss_r_value, elapsed))
                print("constant_ic_val: {:.3f}, constant_bc_val: {:.3f}".format(
                    self.adaptive_constant_ic_val,
                    self.adaptive_constant_bc_val))
                start_time = timeit.default_timer()

            # Compute the eigenvalues of the Hessian of losses
            if self.stiff_ratio:
                if it % 1000 == 0:
                    print("Eigenvalues information stored...")
                    eigenvalue, eigenvalue_ic, eigenvalue_bc, eigenvalue_res = self.sess.run([self.eigenvalue, self.eigenvalue_ic, self.eigenvalue_bc, self.eigenvalue_res], tf_dict)

                    self.eigenvalue_log.append(eigenvalue)
                    self.eigenvalue_ic_log.append(eigenvalue_ic)
                    self.eigenvalue_bc_log.append(eigenvalue_bc)
                    self.eigenvalue_res_log.append(eigenvalue_res)

            if it % 10000 == 0:
                self.save_gradients(tf_dict)
                print("Gradients information stored...")

    # Evaluates predictions at test points
    def predict_v(self, s_star):
            tf_dict = {self.t_v_tf: s_star[:, 0:1], self.s_v_tf: s_star[:, 1:2]}
            v_star = self.sess.run(self.v_pred, tf_dict)
            return v_star

    def predict_r(self, s_star):
        tf_dict = {self.t_r_tf: s_star[:, 0:1], self.s_r_tf: s_star[:, 1:2]}
        r_star = self.sess.run(self.res_pred, tf_dict)
        return r_star

In [None]:
K = 10
r = 0.05
sigma = 0.3
delta = 0.03
s_min = 0.0
s_max = 20.0
t_min = 0.0
t_max = 1.0

In [None]:
def initial(v):
  return np.maximum(K - v[:,1:2],0)

def boundary_1(v):
    return K * np.exp(-1* r * (t_max - v[:,0:1]))

In [None]:
def u(v):
    ans = []
    n_d1 = []
    n_d2 = []
    for i in range(len(v[:,0:1])):
        if v[i,0:1] == 1:
            ans = np.append(ans,max(K - v[i, 1:2], 0))
        elif v[i, 1:2] == 0:
            ans = np.append(ans, K * np.exp(-1* r * (t_max - v[i,0:1])))
        else:
            d1 = (math.log(v[i,1:2]/K) + (r - delta + 0.5 * sigma**2)*(t_max - v[i,0:1]))/(sigma * math.sqrt(t_max - v[i,0:1]))
            d2 = d1 - (sigma * math.sqrt(t_max - v[i,0:1]))
            n_d1 = stats.norm.cdf(-1 * d1)
            n_d2 = stats.norm.cdf(-1 * d2)
            temp = K* np.exp(-1* r * (t_max - v[i,0:1])) * n_d2 - v[i,1:2]* np.exp(-1*delta*(t_max-v[i,0:1]))* n_d1
            ans = np.append(ans, max(temp, max(0, K - v[i, 1:2])))
    return ans

In [None]:
def f(v):
    return v[:,1:2]*0

def operator(v, t, s, r, sigma, delta, K):
    v_t = tf.gradients(v,t)[0]
    v_s = tf.gradients(v,s)[0]
    v_ss = tf.gradients(v_s,s)[0]
    pdp = v_t + 0.5* sigma**2 * s**2 * v_ss + (r - delta)*s*v_s - r*v
    d1 = (tf.math.log(s/K) + (r - delta + 0.5 * sigma**2)*(t_max - t))/(sigma * tf.math.sqrt(t_max - t))
    d2 = d1 - (sigma * tf.math.sqrt(t_max - t))
    n_d1 = tfp.distributions.Normal(0,1).cdf(-1 * d1)
    n_d2 = tfp.distributions.Normal(0,1).cdf(-1 * d2)
    ans = K* tf.exp(-1* r * (t_max - t)) * n_d2 - s* tf.exp(-1*delta*(t_max-t))* n_d1
    residual = (ans - tf.maximum(K - s,0))* pdp
    return residual

In [None]:
# Domain Boundaries
ic_coords = np.array([[t_max,s_min],[t_max,s_max]])
bc1_coords = np.array([[t_min,s_min],[t_max, s_min]])
dom_coords = np.array([[t_min,s_min],[t_max,s_max]])

# Create initial conditions random data
ic = Sampler(2, ic_coords, lambda v: initial(v), name = 'Initial Condition')

# Create boundary conditions random data
bc = Sampler(2, bc1_coords, lambda v: boundary_1(v), name = 'Boundary 1 Condition')

# Create residual conditions random data
res = Sampler(2, dom_coords, lambda v: f(v), name = 'Residual')

# Define model
layers = [2,50,50,50,50,1]
model = 'M1'
stiff_ratio = False
model = Black_Scholes(layers, operator, ic, bc, res, r, sigma, delta, K, model, stiff_ratio)

# Train model
model.train(nIter = 40000)

In [None]:
# Test data
nn = 100
t = np.linspace(dom_coords[0,0], dom_coords[1,0], nn)[:, None]
s = np.linspace(dom_coords[0,1], dom_coords[1,1], nn)[:, None]
t, s = np.meshgrid(t, s)
X_star = np.hstack((t.flatten()[:, None], s.flatten()[:, None]))

# Predictions
v_pred = model.predict_v(X_star)

In [None]:
# Plot
V_pred = griddata(X_star, v_pred.flatten(), (t, s), method='cubic')

# untuk mencari nilai min dan max
V_min , V_max = np.amin(V_pred), np.amax(V_pred)

fig_1 = plt.figure(1, figsize=(18, 5))
plt.subplot(1, 3, 1)
plt.pcolor(t, s, V_pred, cmap='jet', vmin = V_min, vmax = V_max)
plt.colorbar()
plt.xlabel('$t$')
plt.ylabel('$S$')
plt.title('Predicted V(S,t)')

In [None]:
fig = plt.figure(figsize = (9,6))
ax = fig.add_subplot(111, projection = '3d')
ax.plot_surface(t, s, V_pred, cmap = "viridis")
ax.view_init(40,40)
ax.set_xlabel('$t$')
ax.set_ylabel('$S$')
ax.set_zlabel('$V(S,t)$')
ax.set_title('Predicted call option price')

In [None]:
# Loss
loss_r = model.loss_r_log
loss_ic = model.loss_ic_log
loss_bc = model.loss_bc_log


fig_2 = plt.figure(2)
ax = fig_2.add_subplot(1, 1, 1)
ax.plot(loss_r, label='$\mathcal{L}_{r}$')
ax.plot(loss_ic, label='$\mathcal{L}_{u_0}$')
ax.plot(loss_bc, label='$\mathcal{L}_{u_b}$')
ax.plot()
ax.set_yscale('log')
ax.set_xlabel('iterations')
ax.set_ylabel('Loss')
plt.legend()
plt.tight_layout()
plt.show()

np.save('/content/gdrive/My Drive/Colab Notebooks/ap_loss_ic_1.npy',loss_ic)
np.save('/content/gdrive/My Drive/Colab Notebooks/ap_loss_bc_1.npy',loss_bc)
np.save('/content/gdrive/My Drive/Colab Notebooks/ap_loss_res_1.npy',loss_r)

In [None]:
# Adaptive Constant
adaptive_constant_ic = model.adaptive_constant_ic_log
adaptive_constant_bc = model.adaptive_constant_bc_log

fig_3 = plt.figure(3)
ax = fig_3.add_subplot(1, 1, 1)
ax.plot(adaptive_constant_ic, label='$\lambda_{u_0}$')
ax.plot(adaptive_constant_bc, label='$\lambda_{u_b}$')
plt.legend()
plt.tight_layout()
plt.show()

np.save('/content/gdrive/My Drive/Colab Notebooks/ap_adaptive_constant_ic_1.npy',adaptive_constant_ic)
np.save('/content/gdrive/My Drive/Colab Notebooks/ap_adaptive_constant_bc_1.npy',adaptive_constant_bc)

In [None]:
# Gradients at the end of training
data_gradients_ic = model.dict_gradients_ic_layers
data_gradients_bc = model.dict_gradients_bc_layers
data_gradients_res = model.dict_gradients_res_layers

np.save('/content/gdrive/My Drive/Colab Notebooks/ap_gradients_ic_1.npy',data_gradients_ic)
np.save('/content/gdrive/My Drive/Colab Notebooks/ap_gradients_bc_1.npy',data_gradients_bc)
np.save('/content/gdrive/My Drive/Colab Notebooks/ap_gradients_res_1.npy',data_gradients_res)

In [None]:
num_hidden_layers = len(layers) - 1
for j in range(num_hidden_layers):
  data = {'gradient_ic': data_gradients_ic['layer_' + str(j + 1)][-1],
          'gradient_bc': data_gradients_bc['layer_' + str(j + 1)][-1],
          'gradient_res': data_gradients_res['layer_' + str(j + 1)][-1]}
  data = pd.DataFrame(data)

  sns.displot(data, kind = "kde", common_norm = False)

  plt.title("Gradients Layer" + str(j + 1))
  plt.tight_layout()
  plt.show()