
# Data Set Preparation



In [None]:
import numpy as np
import scipy.stats as si
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

# Black-Scholes formula
def black_scholes(S, K, T, r, sigma):
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    call_price = S * si.norm.cdf(d1, 0.0, 1.0) - K * np.exp(-r * T) * si.norm.cdf(d2, 0.0, 1.0)
    return call_price

# Random input parameters
np.random.seed(0)
n_samples = 1000000  # Number of samples
S = np.random.uniform(50, 150, n_samples)  # Stock price
K = np.random.uniform(50, 150, n_samples)  # Strike price
T = np.random.uniform(0.2, 1.1, n_samples) # Time to maturity (years)
r = np.random.uniform(0.02, 0.1, n_samples) # Risk-free rate
sigma = np.random.uniform(0.01, 1, n_samples) # Volatility

# Calculating option prices
call_prices = black_scholes(S, K, T, r, sigma)

# Creating the dataset
dataset = np.column_stack((S, K, T, r, sigma, call_prices))



In [None]:
dataset[:5]

array([[1.04881350e+02, 1.18955605e+02, 9.10557749e-01, 5.30603584e-02,
        1.73128213e-01, 3.72740192e+00],
       [1.21518937e+02, 1.47451698e+02, 4.69116943e-01, 6.73872152e-02,
        3.44594694e-01, 4.53477562e+00],
       [1.10276338e+02, 1.30805917e+02, 1.03438276e+00, 2.77411087e-02,
        2.11638099e-01, 3.88259831e+00],
       [1.04488318e+02, 6.80819179e+01, 9.92908637e-01, 7.00928051e-02,
        9.80072000e-01, 5.55653126e+01],
       [9.23654799e+01, 1.35541154e+02, 2.45040874e-01, 9.12885547e-02,
        2.08587992e-01, 6.73470194e-04]])

# Splitt Testing

In [None]:
X = dataset[:, :-1]  # Other than Option Prices
y = dataset[:, -1]   # Option prices

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=0)

# Building the ANN with updated parameters
model = Sequential()
model.add(Dense(400, input_dim=X.shape[1], activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(400, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(400, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(400, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(1, kernel_initializer='glorot_uniform'))  # Output layer

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Fit the model
model.fit(X_train, y_train, epochs=3000, batch_size=1024, verbose=1)

# Evaluate the model
loss = model.evaluate(X_test, y_test, verbose=0)
print(f"Test loss: {loss}")


Epoch 1/3000
Epoch 2/3000

# Replicate the Process of BS-ANN paper
Latin Sampler

In [None]:
import numpy as np
import scipy.stats as si
from scipy.stats.qmc import LatinHypercube

# Black-Scholes formula
def black_scholes(S, K, T, r, sigma):
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    call_price = S * si.norm.cdf(d1, 0.0, 1.0) - K * np.exp(-r * T) * si.norm.cdf(d2, 0.0, 1.0)
    return call_price

# Parameters for Latin Hypercube Sampling
n_samples = 1000000

# Define the parameter ranges
S_over_K_range = (0.4, 1.6)  # Stock price over Strike price ratio
T_range = (0.2, 1.1)         # Time to maturity
r_range = (0.02, 0.1)        # Risk-free rate
sigma_range = (0.01, 1)      # Volatility

# Create a Latin Hypercube Sampler and generate samples
sampler = LatinHypercube(d=4)
samples = sampler.random(n=n_samples)

# Scale the samples to the specified ranges
S_over_K = samples[:, 0] * (S_over_K_range[1] - S_over_K_range[0]) + S_over_K_range[0]
T = samples[:, 1] * (T_range[1] - T_range[0]) + T_range[0]
r = samples[:, 2] * (r_range[1] - r_range[0]) + r_range[0]
sigma = samples[:, 3] * (sigma_range[1] - sigma_range[0]) + sigma_range[0]

# Set Strike Price (K) to 1 and calculate Stock Price (S) based on S/K ratio
K = np.ones(n_samples)
S = S_over_K * K

# Calculate option prices using the Black-Scholes formula
call_prices = black_scholes(S, K, T, r, sigma)

# Creating the dataset
dataset = np.column_stack((S, K, T, r, sigma, call_prices))

# First few rows of the dataset
dataset[:5]


array([[9.48790484e-001, 1.00000000e+000, 9.36974631e-001,
        2.79031499e-002, 9.16185627e-001, 3.16784142e-001],
       [1.17398731e+000, 1.00000000e+000, 8.82109548e-001,
        5.31436720e-002, 5.56918188e-001, 3.45991129e-001],
       [5.36001806e-001, 1.00000000e+000, 1.00100086e+000,
        8.87807540e-002, 1.42511682e-002, 1.04012958e-311],
       [8.66020926e-001, 1.00000000e+000, 1.08420816e+000,
        9.89218918e-002, 3.53224779e-001, 1.13203195e-001],
       [1.21093630e+000, 1.00000000e+000, 2.96766198e-001,
        9.50260334e-002, 7.07770928e-002, 2.38742894e-001]])

# Normal Model For Replicate


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler
from sklearn.model_selection import train_test_split

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(dataset[:, :-1], dataset[:, -1], test_size=0.1, random_state=0)

# Building the ANN with updated parameters
model = Sequential()
model.add(Dense(400, input_dim=X_train.shape[1], activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(400, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(400, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(400, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(400, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(1, kernel_initializer='glorot_uniform'))  # Output layer

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Learning rate decay function
def scheduler(epoch, lr):
    if epoch < 1000:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

callback = LearningRateScheduler(scheduler)

# Fit the model with learning rate decay
model.fit(X_train, y_train, epochs=10, batch_size=1024, verbose=1, callbacks=[callback])

# Evaluate the model
loss = model.evaluate(X_test, y_test, verbose=0)
print(f"Test loss: {loss}")



Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test loss: 7.842352829356969e-07


# Experiment Changing Activaiton Function

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler
from sklearn.model_selection import train_test_split
import time
from tensorflow_model_optimization.sparsity import keras as sparsity


# Assuming 'dataset' is defined and available
# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(dataset[:, :-1], dataset[:, -1], test_size=0.9, random_state=0)

# Building the ANN with updated parameters
model = Sequential()
model.add(Dense(400, input_dim=X_train.shape[1], kernel_initializer='glorot_uniform'))
model.add(LeakyReLU(alpha=0.01))
model.add(Dense(400, kernel_initializer='glorot_uniform'))
model.add(LeakyReLU(alpha=0.01))
model.add(Dense(400, kernel_initializer='glorot_uniform'))
model.add(LeakyReLU(alpha=0.01))
model.add(Dense(400, kernel_initializer='glorot_uniform'))
model.add(LeakyReLU(alpha=0.01))
model.add(Dense(400, kernel_initializer='glorot_uniform'))
model.add(LeakyReLU(alpha=0.01))
model.add(Dense(400, kernel_initializer='glorot_uniform'))
model.add(LeakyReLU(alpha=0.01))
model.add(Dense(1, kernel_initializer='glorot_uniform'))  # Output layer

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Learning rate decay function
def scheduler(epoch, lr):
    if epoch < 1000:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

callback = LearningRateScheduler(scheduler)

# Fit the model with learning rate decay
model.fit(X_train, y_train, epochs=10, batch_size=1024, verbose=1, callbacks=[callback])

# Evaluate the model
loss = model.evaluate(X_test, y_test, verbose=0)
print(f"Test loss: {loss}")

# Speed Calculation
start_time = time.time()
predictions = model.predict(X_test)
end_time = time.time()
total_time = end_time - start_time
print(f"Time taken for predictions: {total_time} seconds")


NameError: ignored

In [None]:
 pip install -q tensorflow-model-optimization

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/241.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━[0m [32m122.9/241.2 kB[0m [31m3.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m241.2/241.2 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h

# Model Compression

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler
from sklearn.model_selection import train_test_split
import time
from tensorflow_model_optimization.sparsity import keras as sparsity
import tensorflow_model_optimization as tfmot
from tensorflow.keras.callbacks import EarlyStopping



# Assuming dataset is predefined
# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(dataset[:, :-1], dataset[:, -1], test_size=0.1, random_state=0)

# Building the ANN with updated parameters
model = Sequential()
model.add(Dense(563, input_dim=X_train.shape[1], activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(563, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(563, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(563, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(1, kernel_initializer='glorot_uniform'))  # Output layer

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Learning rate decay function
def scheduler(epoch, lr):
    if epoch < 1000:
        return lr
    else:
        return lr * tf.math.exp(-0.065)

callback = LearningRateScheduler(scheduler)
early_stopping = EarlyStopping(monitor='loss', min_delta=1e-20, patience=80, verbose=1, mode='min')


# early stoping

# Fit the model with learning rate decay
model.fit(X_train, y_train, epochs=3000, batch_size=1035, verbose=1, callbacks=[callback, early_stopping])

# Evaluate the original model
original_loss = model.evaluate(X_test, y_test, verbose=0)
print(f"Test loss before pruning: {original_loss}")

# Speed Calculation for the original model
start_time = time.time()
original_predictions = model.predict(X_test)
end_time = time.time()
original_total_time = end_time - start_time
print(f"Time taken for predictions with the original model: {original_total_time} seconds")


# Model Pruning
pruning_params = {
    'pruning_schedule': sparsity.PolynomialDecay(initial_sparsity=0.50,
                                                 final_sparsity=0.80,
                                                 begin_step=2000,
                                                 end_step=4000,
                                                 frequency=100)
}

model_for_pruning = sparsity.prune_low_magnitude(model, **pruning_params)

# compile prunning model
model_for_pruning.compile(optimizer='adam', loss='mean_squared_error')

# Ensure UpdatePruningStep callback
pruning_callbacks = [callback, tfmot.sparsity.keras.UpdatePruningStep()]

# Fit the model with the necessary callback for pruning
model_for_pruning.fit(X_train, y_train, epochs=1000, batch_size=1035, verbose=1, callbacks=[pruning_callbacks, early_stopping])
# Strip the pruning wrappers
final_model = sparsity.strip_pruning(model_for_pruning)

# Compile the model again after stripping
final_model.compile(optimizer='adam', loss='mean_squared_error')

# Evaluate the model
loss = final_model.evaluate(X_test, y_test, verbose=0)
print(f"Test loss after pruning: {loss}")

# Speed Calculation
start_time = time.time()
predictions = final_model.predict(X_test)
end_time = time.time()
total_time = end_time - start_time
print(f"Time taken for predictions: {total_time} seconds")

# Comparison
print("\nModel Comparison:")
print(f"Original Model - Test Loss: {original_loss}, Inference Time: {original_total_time} seconds")
print(f"Pruned Model - Test Loss: {loss}, Inference Time: {total_time} seconds")



Epoch 1/3000
Epoch 2/3000
Epoch 3/3000
Epoch 4/3000
Epoch 5/3000
Epoch 6/3000
Epoch 7/3000
Epoch 8/3000
Epoch 9/3000
Epoch 10/3000
Epoch 11/3000
Epoch 12/3000
Epoch 13/3000
Epoch 14/3000
Epoch 15/3000
Epoch 16/3000
Epoch 17/3000
Epoch 18/3000
Epoch 19/3000
Epoch 20/3000
Epoch 21/3000
Epoch 22/3000
Epoch 23/3000
Epoch 24/3000
Epoch 25/3000
Epoch 26/3000
Epoch 27/3000
Epoch 28/3000
Epoch 29/3000
Epoch 30/3000
Epoch 31/3000
Epoch 32/3000
Epoch 33/3000
Epoch 34/3000
Epoch 35/3000
Epoch 36/3000
Epoch 37/3000
Epoch 38/3000
Epoch 39/3000
Epoch 40/3000
Epoch 41/3000
Epoch 42/3000
Epoch 43/3000
Epoch 44/3000
Epoch 45/3000
Epoch 46/3000
Epoch 47/3000
Epoch 48/3000
Epoch 49/3000
Epoch 50/3000
Epoch 51/3000
Epoch 52/3000
Epoch 53/3000
Epoch 54/3000
Epoch 55/3000
Epoch 56/3000
Epoch 57/3000
Epoch 58/3000
Epoch 59/3000
Epoch 60/3000
Epoch 61/3000
Epoch 62/3000
Epoch 63/3000
Epoch 64/3000
Epoch 65/3000
Epoch 66/3000
Epoch 67/3000
Epoch 68/3000
Epoch 69/3000
Epoch 70/3000
Epoch 71/3000
Epoch 72/3000
E

# Generate Implied Volatility Sampler

In [None]:
import numpy as np
import scipy.stats as si
from scipy.stats.qmc import LatinHypercube

# Black-Scholes formula
def black_scholes(S, K, T, r, sigma):
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    call_price = S * si.norm.cdf(d1, 0.0, 1.0) - K * np.exp(-r * T) * si.norm.cdf(d2, 0.0, 1.0)
    return call_price

# Parameters for Latin Hypercube Sampling
np.random.seed(0)
n_samples = 1100000

# Define the parameter ranges according to Table 6
S_over_K_range = (0.5, 1.4)  # Stock price over Strike price ratio
T_range = (0.05, 1.0)        # Time to maturity
r_range = (0.0, 0.1)         # Risk-free rate
sigma_range = (0.05, 1.0)    # Volatility

# Create a Latin Hypercube Sampler and generate samples
sampler = LatinHypercube(d=4)
samples = sampler.random(n=n_samples)

# Scale the samples to the specified ranges
S_over_K = samples[:, 0] * (S_over_K_range[1] - S_over_K_range[0]) + S_over_K_range[0]
T = samples[:, 1] * (T_range[1] - T_range[0]) + T_range[0]
r = samples[:, 2] * (r_range[1] - r_range[0]) + r_range[0]
sigma = samples[:, 3] * (sigma_range[1] - sigma_range[0]) + sigma_range[0]

# Set Strike Price (K) to 1 and calculate Stock Price (S) based on S/K ratio
K = np.ones(n_samples)
S = S_over_K * K

# Calculate option prices using the Black-Scholes formula
call_prices = black_scholes(S, K, T, r, sigma)

# Calculate scaled time value and filter out extremely small values
V_tilde = call_prices - np.maximum(S - np.exp(-r * T), 0)
log_V_tilde_over_K = np.log(V_tilde / K)
filtered_indices = log_V_tilde_over_K > -16.12

# Creating the dataset with the filtered values
dataset = np.column_stack((S[filtered_indices], T[filtered_indices], r[filtered_indices], log_V_tilde_over_K[filtered_indices], sigma[filtered_indices]))

# First few rows of the dataset
print(dataset[:5])
print(len(dataset))


[[ 6.32876831e-01  9.53087224e-01  4.07612524e-02 -4.59645886e+00
   3.19902894e-01]
 [ 8.11720933e-01  8.56502946e-01  4.06222382e-04 -1.83048989e+00
   7.43159228e-01]
 [ 1.03015625e+00  7.56263609e-01  3.40255528e-02 -4.36159872e+00
   9.62461521e-02]
 [ 1.16866209e+00  7.70959427e-01  2.33344408e-02 -3.30367481e+00
   2.79051260e-01]
 [ 8.38806994e-01  1.73690694e-01  8.30234761e-02 -2.85771211e+00
   7.72374170e-01]]
1040736


  log_V_tilde_over_K = np.log(V_tilde / K)
  log_V_tilde_over_K = np.log(V_tilde / K)


# Implied Volatolity traning

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler
from sklearn.model_selection import train_test_split
import time
from tensorflow_model_optimization.sparsity import keras as sparsity
import tensorflow_model_optimization as tfmot
from tensorflow.keras.callbacks import EarlyStopping



# Assuming dataset is predefined
# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(dataset[:, :-1], dataset[:, -1], test_size=0.1, random_state=0)

# Building the ANN with updated parameters
model = Sequential()
model.add(Dense(563, input_dim=X_train.shape[1], activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(563, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(563, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(563, activation='relu', kernel_initializer='glorot_uniform'))
model.add(Dense(1, kernel_initializer='glorot_uniform'))  # Output layer

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Learning rate decay function
def scheduler(epoch, lr):
    if epoch < 1000:
        return lr
    else:
        return lr * tf.math.exp(-0.065)

callback = LearningRateScheduler(scheduler)


# early stoping

# Fit the model with learning rate decay
model.fit(X_train, y_train, epochs=1500, batch_size=1035, verbose=1, callbacks=[callback])

# Evaluate the original model
original_loss = model.evaluate(X_test, y_test, verbose=0)
print(f"Test loss before pruning: {original_loss}")

# Speed Calculation for the original model
start_time = time.time()
original_predictions = model.predict(X_test)
end_time = time.time()
original_total_time = end_time - start_time
print(f"Time taken for predictions with the original model: {original_total_time} seconds")


# Model Pruning
pruning_params = {
    'pruning_schedule': sparsity.PolynomialDecay(initial_sparsity=0.50,
                                                 final_sparsity=0.80,
                                                 begin_step=2000,
                                                 end_step=4000,
                                                 frequency=100)
}

model_for_pruning = sparsity.prune_low_magnitude(model, **pruning_params)

# compile prunning model
model_for_pruning.compile(optimizer='adam', loss='mean_squared_error')

# Ensure UpdatePruningStep callback
pruning_callbacks = [callback, tfmot.sparsity.keras.UpdatePruningStep()]

# Fit the model with the necessary callback for pruning
model_for_pruning.fit(X_train, y_train, epochs=1500, batch_size=1035, verbose=1, callbacks=[pruning_callbacks])
# Strip the pruning wrappers
final_model = sparsity.strip_pruning(model_for_pruning)

# Compile the model again after stripping
final_model.compile(optimizer='adam', loss='mean_squared_error')

# Evaluate the model
loss = final_model.evaluate(X_test, y_test, verbose=0)
print(f"Test loss after pruning: {loss}")

# Speed Calculation
start_time = time.time()
predictions = final_model.predict(X_test)
end_time = time.time()
total_time = end_time - start_time
print(f"Time taken for predictions: {total_time} seconds")

# Comparison
print("\nModel Comparison:")
print(f"Original Model - Test Loss: {original_loss}, Inference Time: {original_total_time} seconds")
print(f"Pruned Model - Test Loss: {loss}, Inference Time: {total_time} seconds")



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Epoch 506/1500
Epoch 507/1500
Epoch 508/1500
Epoch 509/1500
Epoch 510/1500
Epoch 511/1500
Epoch 512/1500
Epoch 513/1500
Epoch 514/1500
Epoch 515/1500
Epoch 516/1500
Epoch 517/1500
Epoch 518/1500
Epoch 519/1500
Epoch 520/1500
Epoch 521/1500
Epoch 522/1500
Epoch 523/1500
Epoch 524/1500
Epoch 525/1500
Epoch 526/1500
Epoch 527/1500
Epoch 528/1500
Epoch 529/1500
Epoch 530/1500
Epoch 531/1500
Epoch 532/1500
Epoch 533/1500
Epoch 534/1500
Epoch 535/1500
Epoch 536/1500
Epoch 537/1500
Epoch 538/1500
Epoch 539/1500
Epoch 540/1500
Epoch 541/1500
Epoch 542/1500
Epoch 543/1500
Epoch 544/1500
Epoch 545/1500
Epoch 546/1500
Epoch 547/1500
Epoch 548/1500
Epoch 549/1500
Epoch 550/1500
Epoch 551/1500
Epoch 552/1500
Epoch 553/1500
Epoch 554/1500
Epoch 555/1500
Epoch 556/1500
Epoch 557/1500
Epoch 558/1500
Epoch 559/1500
Epoch 560/1500
Epoch 561/1500
Epoch 562/1500
Epoch 563/1500
Epoch 564/1500
Epoch 565/1500
Epoch 566/1500
Epoch 567/1500
Epoch