In [1]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from parallel_RLNN import *
import matplotlib.pyplot as plt 
import tensorflow as tf


In [2]:
S0 = 50
K = 50 * 1.2
r = 0.06
vol = 0.2
T = 1
no_mon = 4
sample_size = 25000
N = no_mon
style = 'put'

In [None]:

stock_paths = stock_price_simulator(S0, r, vol, T, no_mon, sample_size)
## Initialize the option price matrix
option_price = np.zeros(stock_paths.shape)
## Calculate the option price at the maturity date
option_price[:, N] = payoff_fun(stock_paths[:, N], K, style)
payoff_T = payoff_fun(stock_paths[:, N], K, style)
    
## Normalize the stock prices
normalizer = S0
normalized_stock_paths = stock_paths[:, N]/normalizer

In [None]:
no_hidden_units = 16
l_rate2 = 0.001
## X_train and y_train for the neural network
X = normalized_stock_paths.reshape(-1, 1)
y = payoff_T.reshape(-1, 1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# Compile the model
es = EarlyStopping(monitor='val_loss', mode='min', patience=5)
tf.keras.utils.set_random_seed(42)
model2 = create_shallow_NN(1, no_hidden_units)
model2.compile(loss='mean_squared_error', optimizer=Adam(l_rate2))
## Train the model  
model2.fit(X_train, y_train, epochs=3000, batch_size=int(0.1 * X_train.shape[0]), 
            validation_data=(X_test, y_test), callbacks=[es], verbose = 1)

# Test the model
y_hat_val = model2.predict(X_test)

In [None]:
print("Error", (np.mean((y_test - y_hat_val)**2)))

In [None]:

X_test_sorted_indices = np.argsort(X_test.flatten())  # Get the indices that sort X_test
X_test_sorted = X_test.flatten()[X_test_sorted_indices]  # Sort X_test
y_test_sorted = y_test[X_test_sorted_indices]  # Reorder y_test accordingly
y_hat_val_sorted = y_hat_val[X_test_sorted_indices]  # Reorder y_hat_val accordingly

# Plot
plt.figure(figsize=(8, 6))
plt.plot(X_test_sorted * S0, y_test_sorted, color='blue', label='True payoff', linewidth=2)
plt.plot(X_test_sorted * S0, y_hat_val_sorted, color='red', label='Predicted payoff', linestyle='dashed', linewidth=2)
plt.xlabel('Stock price')
plt.ylabel('Option price')
plt.legend()
plt.title('True vs Predicted Option Payoff')
plt.grid(True)
plt.show()


In [None]:
### Store the weights of the model
weights = []
weights.append(model2.get_weights())

## Get the weights of the model
w1 = model2.layers[0].get_weights()[0].reshape(-1)
b1 = model2.layers[0].get_weights()[1].reshape(-1)
w2 = model2.layers[1].get_weights()[0].reshape(-1)
b2 = model2.layers[1].get_weights()[1].reshape(-1)

## Calculate the continuation value 
continuation_value = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-1], r, vol, T/N, sample_size, normalizer)
              # Payoff at the previous monitoring date   

In [None]:
fun_h = payoff_fun(stock_paths[:, N-1], K, style) 

In [None]:
target_variable = np.maximum(fun_h - continuation_value, 0)

In [None]:
plt.scatter(stock_paths[:, N-1], continuation_value, color = 'blue', label = 'continuation_value')
plt.scatter(stock_paths[:, N-1], fun_h, color = 'red', label = 'payoff')
plt.legend()


In [None]:
plt.figure(figsize=(6, 4))
plt.scatter(stock_paths[:, N-1], target_variable, color = 'blue', label = 'target_variable')
plt.xlabel('Stock price')
plt.ylabel('Pay off - continuation value')
plt.legend()
#plt.savefig('target_variable_static.jpeg')
plt.show()


In [None]:
## Normalize the stock prices
normalizer = S0
normalized_stock_paths = stock_paths[:, N-1]/normalizer
X = normalized_stock_paths.reshape(-1, 1)
y = target_variable.reshape(-1, 1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state = 42)
# Compile the model


In [None]:
def create_shallow_NN2(input_dim, hidden_units):
    
    model = Sequential()
    # Use Input layer for specifying input shape
    model.add(Input(shape=(input_dim,)))
    model.add(Dense(hidden_units, activation='relu', kernel_initializer='random_uniform', bias_initializer='random_uniform'))
    model.add(Dense(1, activation='linear', kernel_initializer='random_normal', bias_initializer='random_uniform'))
    
    return model

# Keep existing weights but train with smaller learning rate and batch size
#tf.keras.utils.set_random_seed(42)
model3 = create_shallow_NN2(1, no_hidden_units)
model3.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
model3.optimizer.learning_rate.assign(5e-4)  # Reduced from 5e-3

history = model3.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=3000,
    batch_size=50,  # Reduced from 8
    callbacks=[EarlyStopping(monitor='val_loss', mode='min', patience=20, restore_best_weights=True, min_delta=1e-7)],
    verbose=1
)

In [None]:
X_test_sorted_indices = np.argsort(X_test.flatten())  # Get the indices that sort X_test
X_test_sorted = X_test.flatten()[X_test_sorted_indices]  # Sort X_test
y_hat = model3.predict(X_test)
y_test_sorted = y_test[X_test_sorted_indices]  # Reorder y_test accordingly
  # Reorder y_hat_val accordingly
y_hat_sorted = y_hat[X_test_sorted_indices]
# Plot
plt.figure(figsize=(6, 4), dpi=200)
plt.scatter(X_test * S0, y_test, color='blue', label='True payoff', linewidth=2)
plt.plot(X_test_sorted * S0, y_hat_sorted, color='red', label='Predicted payoff', linestyle='dashed', linewidth=2)
plt.xlabel('Stock price')
plt.ylabel('Target Value')
plt.legend()
#plt.grid(True)
#plt.rc('font', size=14)
plt.savefig('static_hedge_prevDate.jpeg')
plt.show()


In [None]:
w1 = model3.layers[0].get_weights()[0].reshape(-1)
b1 = model3.layers[0].get_weights()[1].reshape(-1)
w2 = model3.layers[1].get_weights()[0].reshape(-1)
b2 = model3.layers[1].get_weights()[1].reshape(-1)
dt = 2 * T/N
new_continuation_val = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-2], r, vol, dt , sample_size, normalizer)
dt = 1 * T/N
w1 = weights[0][0].reshape(-1)
b1 = weights[0][1].reshape(-1)
w2 = weights[0][2].reshape(-1)
b2 = weights[0][3].reshape(-1)
cont_value2 = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-2], r, vol, dt, sample_size, normalizer)

In [None]:
weights.append(model3.get_weights())

In [None]:
payoff_h = payoff_fun(stock_paths[:, N-2], K, style)

In [None]:
new_target_variable = np.maximum(payoff_h - new_continuation_val - cont_value2, 0)

In [None]:
plt.scatter(stock_paths[:, N-2], new_continuation_val + cont_value2, color = 'blue', label = 'Continuation value')
plt.scatter(stock_paths[:, N-2], payoff_h, color = 'red', label = 'Payoff')
plt.legend()
plt.show()


In [None]:
plt.scatter(stock_paths[:, N-2], new_target_variable, color = 'green', label = 'Target variable')

In [None]:
new_continuation_val2 = new_continuation_val + cont_value2

In [None]:
fun_h2 = payoff_fun(stock_paths[:, N-2], K, style)               # Payoff at the previous monitoring date

In [None]:
target_variable = np.maximum(fun_h2, new_continuation_val2) - new_continuation_val2

In [None]:
plt.scatter(stock_paths[:, N-2], target_variable, color = 'red', label = 'Target variable')

In [None]:
## Normalize the stock prices
normalizer = S0
normalized_stock_paths = stock_paths[:, N-2]/normalizer
X = normalized_stock_paths.reshape(-1, 1)
y = target_variable.reshape(-1, 1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [None]:
plt.scatter(X_train, y_train, color = 'blue', label = 'Training data')
plt.scatter(X_test, y_test, color = 'red', label = 'Test data')
plt.legend()
plt.show()

In [None]:
history = model3.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=3000,
    batch_size=50,  # Reduced from 8
    callbacks=[EarlyStopping(monitor='val_loss', mode='min', patience=20, restore_best_weights=True, min_delta=1e-6)],
    verbose=1
)

In [None]:
y_hat_val = model3.predict(X_test)
plt.scatter(X_test*S0, y_test, color = 'blue', label = 'True payoff')
plt.scatter(X_test*S0, y_hat_val, color = 'red', label = 'Predicted payoff')
plt.legend()
plt.show()

In [None]:
weights.append(model3.get_weights())

In [None]:
dt = 3 * T/N
w1 = weights[0][0].reshape(-1)
b1 = weights[0][1].reshape(-1)
w2 = weights[0][2].reshape(-1)
b2 = weights[0][3].reshape(-1)
cont_value3 = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-3], r, vol, dt, sample_size, normalizer)
dt = 2 * T/N
w1 = weights[1][0].reshape(-1)
b1 = weights[1][1].reshape(-1)
w2 = weights[1][2].reshape(-1)
b2 = weights[1][3].reshape(-1)
cont_value2 = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-3], r, vol, dt, sample_size, normalizer)
dt = 1 * T/N
w1 = weights[2][0].reshape(-1)
b1 = weights[2][1].reshape(-1)
w2 = weights[2][2].reshape(-1)
b2 = weights[2][3].reshape(-1)
cont_value1 = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-3], r, vol, dt, sample_size, normalizer)

continuation_value_3 = cont_value3 + cont_value2 + cont_value1

In [None]:
fun_h3 = payoff_fun(stock_paths[:, N-3], K, style)               # Payoff at the previous monitoring date

In [None]:
plt.scatter(stock_paths[:, N-3], continuation_value_3, color = 'blue', label = 'Continuation value')
plt.scatter(stock_paths[:, N-3], fun_h3, color = 'red', label = 'Payoff')
plt.legend()
plt.show()

In [None]:
target_variable3 = np.maximum(fun_h3, continuation_value_3) - continuation_value_3

In [None]:
plt.figure(figsize=(12, 6))
plt.scatter(stock_paths[:, N-3], target_variable3, color = 'red', label = 'Target variable')
plt.xlabel('Stock price')
plt.ylabel('Pay off - continuation value')
plt.legend()
plt.savefig('target_variable_static.jpeg')
plt.show()

In [None]:
normalized_stock_paths = stock_paths[:, N-3]/normalizer
X = normalized_stock_paths.reshape(-1, 1)
y = target_variable3.reshape(-1, 1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state = 42)

In [None]:
plt.scatter(X_train, y_train, color = 'blue', label = 'Training data')
plt.scatter(X_test, y_test, color = 'red', label = 'Test data')
plt.legend()
plt.show()

In [None]:
#model3 = create_shallow_NN2(1, no_hidden_units)
#model3.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
#model3.optimizer.learning_rate.assign(5e-4)  # Reduced from 5e-3
#tf.keras.utils.set_random_seed(42)
history = model3.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=3000,
    batch_size=50,  # Reduced from 8
    callbacks=[EarlyStopping(monitor='val_loss', mode='min', patience=20, restore_best_weights=True, min_delta=1e-5)],
    verbose=1
)

In [None]:
weights.append(model3.get_weights())

In [None]:
dt = 4 * T/N
w1 = weights[0][0].reshape(-1)
b1 = weights[0][1].reshape(-1)
w2 = weights[0][2].reshape(-1)
b2 = weights[0][3].reshape(-1)
cont_value_41 = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-4], r, vol, dt, sample_size, normalizer)
dt = 3 * T/N
w1 = weights[1][0].reshape(-1)
b1 = weights[1][1].reshape(-1)
w2 = weights[1][2].reshape(-1)
b2 = weights[1][3].reshape(-1)
cont_value_42 = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-4], r, vol, dt, sample_size, normalizer)
dt = 2 * T/N
w1 = weights[2][0].reshape(-1)
b1 = weights[2][1].reshape(-1)
w2 = weights[2][2].reshape(-1)
b2 = weights[2][3].reshape(-1)
cont_value_43 = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-4], r, vol, dt, sample_size, normalizer)
dt = 1 * T/N
w1 = weights[3][0].reshape(-1)
b1 = weights[3][1].reshape(-1)
w2 = weights[3][2].reshape(-1)
b2 = weights[3][3].reshape(-1)
cont_value_44 = cal_continuation_value(w1, b1, w2, b2, no_hidden_units, stock_paths[:, N-4], r, vol, dt, sample_size, normalizer)

In [None]:
option_price = cont_value_41 + cont_value_42 + cont_value_43 + cont_value_44

In [None]:
option_price

In [None]:
weights

In [None]:
def cal_continuation_value2(weights, no_hidden_units, stock_prices, r, sigma, dt, normalizer):
    """ Calculate the continuation value of the Bermudan option

    Args:
        w1 (array): Weights of the first layer
        b1 (array): Biases of the first layer
        w2 (array): Weights of the second layer
        b2 (float): Biases of the second layer
        no_hidden_units (int): Number of hidden units
        stock_prices (float): Stock prices at time
        r (float): Risk free rate
        sigma (float): volatility
        dt (_type_): time step
        M (_type_): number of simulations(samples)
        normalizer (_type_): normalizer for the stock prices

    Returns:
        _type_: _description_
    """
     ## Get the weights of the model
    w1 = weights[0].reshape(-1)
    b1 = weights[1].reshape(-1)
    w2 = weights[2].reshape(-1)
    b2 = weights[3].reshape(-1)
    normalized_stock_values = stock_prices/normalizer
    continuation_value = 0
    for i in range(no_hidden_units):
        continuation_value += expected_value(w1[i], b1[i], normalized_stock_values, r, sigma, dt) * w2[i]
        #print(continuation_value)
    continuation_value += b2
    
    return continuation_value * np.exp(-r*dt)

In [None]:
def cal_continuation_value_static(weights_, stock_price, r, vol, dt, S0, no_hidden_units):
    conti_val = 0
    for i in range(1, len(weights_)+1):
        #print("i", i, dt)
        conti_val += cal_continuation_value2(weights_[-i], no_hidden_units, stock_price, r, vol, dt, S0)
        dt += T/N
    return conti_val

In [None]:
def check_payoffs(stock_price, weights, S0):
    """
    Given a single stock_price, compute the total 
    payoff implied by the 2-layer NN with piecewise logic.
    
    :param stock_price:  current price of the underlying (float)
    :param weights:      a tuple/list containing 
                         [weights_layer1, bias_layer1, weights_layer2, bias_layer2]
    :param S0:           normalizing constant (e.g. initial stock price)
    :return:             total payoff (float)
    """
    payoffs = 0.0
    
    # Extract layers
    weights_layer1 = weights[0].flatten()  # array of w1_i
    bias_layer1    = weights[1]  # array of b1_i
    weights_layer2 = weights[2].flatten()  # array of w2_i
    bias_layer2    = weights[3][0]  # float or array of length 1

    # Normalize
    normalized_stock = stock_price / S0
    
    # Start from second-layer bias
    # (if bias_layer2 is an array, do bias_layer2[0])
    payoffs = bias_layer2

    # Loop over all first-layer neurons
    for i in range(len(weights_layer1)):
        w1i = weights_layer1[i]
        b1i = bias_layer1[i]
        w2i = weights_layer2[i]

        # Case 1: w1 >= 0, b1 >= 0 => Forward contract
        if w1i >= 0 and b1i >= 0:
            payoffs += w2i * (w1i * normalized_stock + b1i)

        # Case 2: w1 > 0, b1 < 0 => call-like payoff
        elif w1i > 0 and b1i < 0:
            strike_call = -b1i / w1i
            call_payoff = max(normalized_stock - strike_call, 0)
            payoffs += w2i * w1i * call_payoff

        # Case 3: w1 < 0, b1 > 0 => put-like payoff
        elif w1i < 0 and b1i > 0:
            strike_put = -b1i / w1i
            put_payoff = max(strike_put - normalized_stock, 0)
            payoffs -= w2i * w1i * put_payoff

        else:
            # The "else" might be w1 <= 0, b1 <= 0 => payoff is effectively 0
            0.0

    return payoffs


In [None]:
sample_size = 10000
stock_prices_hedging = stock_price_simulator(S0, r, vol, T, no_mon, sample_size)
stock_prices_hedging.shape

In [None]:
M = sample_size
hedging_errors = np.zeros(sample_size)
for i in range(sample_size):
    cash_account = option_price[0]
    # Initial portfolio setup
    weights_now = weights
    dt = T/no_mon
    cash_account -= cal_continuation_value_static(weights_now, stock_prices_hedging[i][0], r, vol, dt, S0, no_hidden_units)
    #print(cash_account)
    for j in range(1, no_mon):
        cash_account *= np.exp(r * dt)
        Exercise_value = payoff_fun(stock_prices_hedging[i, j], K, "put")
        weights_next = weights_now[:no_mon-j]
        #print("weights_next", len(weights_next))
        cont_value = cal_continuation_value_static(weights_next, stock_prices_hedging[i, j], r, vol, dt, S0, no_hidden_units)  
        
        if Exercise_value > cont_value:
            #print("Exercise", weights_now, "at time", j)
            weights_excercise = weights_now[-1]
            payoff_j = check_payoffs(stock_prices_hedging[i, j], weights_excercise, S0) + cont_value
            cash_account += payoff_j
            weights_now = weights_next
            #print("Exercise", cash_account, Exercise_value, "Hedge error at", i, cash_account - Exercise_value)
            hedging_errors[i] = cash_account - Exercise_value
            break
        weights_now = weights_next
    else:
        cash_account *= np.exp(r * dt)
        #print("weights_now", weights_now)
        portfolio_t = check_payoffs(stock_prices_hedging[i, j], weights_now[0], S0)
        cash_account += portfolio_t
        payoff_T = payoff_fun(stock_prices_hedging[i, j], K, "put")
        hedging_errors[i] = cash_account - payoff_T

In [None]:
hedging_errors_ATM = hedging_errors

In [None]:
len(hedging_errors)

In [None]:
dict_hedging_errors = {"ATM": hedging_errors_ATM}

In [None]:
df = pd.DataFrame(dict_hedging_errors)
df.to_csv("hedging_errors_static_ATM.csv")

In [None]:
hedging_errors_12 = pd.read_csv("hedging_errors_static_12.csv")

In [None]:
hedging_errors_ATM = pd.read_csv("hedging_errors_static_ATM.csv")

In [None]:
hedging_errors_08 = pd.read_csv("hedging_errors_static_08.csv")

In [None]:
hedge_error_OTM = hedging_errors_08["08"] 

In [None]:
#hedge_error_OTM = hedge_error_OTM[:9999]

In [None]:
hedging_errors_12["12"]

In [None]:
import numpy as np
import matplotlib.pyplot as plt 
from scipy import stats

In [None]:
# Helper function to calculate confidence intervals
def calculate_confidence_interval(data, confidence=0.95):
    mean = np.mean(data)
    std_error = stats.sem(data)  # Standard error of the mean
    margin = std_error * stats.t.ppf((1 + confidence) / 2, len(data) - 1)
    return mean, mean - margin, mean + margin

In [None]:
# Calculate mean and confidence intervals for each dataset
otm_mean, otm_lower, otm_upper = calculate_confidence_interval(hedge_error_OTM)
atm_mean, atm_lower, atm_upper = calculate_confidence_interval(hedging_errors_ATM["ATM"])
itm_mean, itm_lower, itm_upper = calculate_confidence_interval(hedging_errors_12["12"])


In [None]:
# Plotting
plt.figure(dpi=500, figsize=(6, 4))
plt.hist(hedge_error_OTM, edgecolor='black', bins=50, label='OTM', alpha=0.7)
plt.axvline(otm_mean, linestyle='--')
plt.axvline(otm_lower, linestyle=':')
plt.axvline(otm_upper, linestyle=':')
plt.hist(hedging_errors_ATM["ATM"], edgecolor='black', bins=50, label='ATM', alpha=0.7)
plt.axvline(atm_mean, linestyle='--')
plt.axvline(atm_lower, linestyle=':')
plt.axvline(atm_upper, linestyle=':')
plt.hist(hedging_errors_12["12"], edgecolor='black', bins=50, label='ATM', alpha=0.7)
plt.axvline(itm_mean, color='orange', linestyle='--')
plt.axvline(itm_lower,color='orange',  linestyle=':')
plt.axvline(itm_upper,color='orange', linestyle=':')

In [None]:
#plt.hist(hedging_errors_ATM, bins=60, alpha=0.7, label='ATM', edgecolor='black')
#plt.hist(hedging_errors_08, bins=60, alpha=0.75, label='K=0.8*S0', edgecolor='black')
#plt.hist(hedging_errors_12, bins=60, alpha=0.75, label='K=1.2*S0')
#plt.legend()
#plt.xlabel('Hedging error')
#plt.ylabel('Frequency')
#plt.savefig('Static_hedging_error.jpeg')
#plt.show()

In [None]:

# Plot function for individual axes
def plot_with_percentile_ci(ax, data_rlnn, data_bin, data_3, xlabel):
    mean_otm = np.mean(data_rlnn)
    ci_otm = (np.percentile(data_rlnn, 2.5), np.percentile(data_rlnn, 97.5))
    mean_atm = np.mean(data_bin)
    ci_atm = (np.percentile(data_bin, 2.5), np.percentile(data_bin, 97.5))
    mean_itm = np.mean(data_3)
    ci_itm = (np.percentile(data_3, 2.5), np.percentile(data_bin, 97.5))

    # Plot data
    ax.hist(data_rlnn, bins=50, alpha=0.5, color='blue', edgecolor='black', label='OTM')
    ax.hist(data_bin, bins=50, alpha=0.5, color='orange', edgecolor='black', label='ATM')
    ax.hist(data_3, bins=50, alpha=0.5, color='green', edgecolor='black', label='ITM')
    ax.axvline(x=mean_otm, color='blue', linestyle='--', label='OTM Mean')
    ax.axvline(x=mean_atm, color='orange', linestyle='--', label='ATM Mean')
    ax.axvline(x=mean_itm, color='green', linestyle='--', label='ITM Mean')
    ax.axvspan(ci_otm[0], ci_otm[1], color='blue', alpha=0.2)
    ax.axvspan(ci_atm[0], ci_atm[1], color='orange', alpha=0.2)
    ax.axvspan(ci_itm[0], ci_itm[1], color='green', alpha=0.2)
    ax.set_xlabel(xlabel)
    ax.set_ylabel("Frequency")
    ax.legend()



In [None]:
# Generate individual subplots
# Plot histograms comparing RLNN vs. Binomial for each strike ratio with 95% Percentile CI
fig, axes = plt.subplots(1, 1, figsize=(6, 4), sharey=True, dpi=500)  # Set high DPI here

plot_with_percentile_ci(axes, hedge_error_OTM, hedging_errors_ATM['ATM'],hedging_errors_12['12'], "Hedge Error")
fig.savefig("static_hedge.jpeg")