In [1]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import KFold
from tensorflow.keras import regularizers

In [2]:
def gen_paths(S0, r, sigma, T, N, M):
    dt = float(T) / N
    paths = np.zeros((N, M), np.float32)
    paths[0] = S0
    for t in range(1, N ):
        rand = np.random.standard_normal(M)
        paths[t] = paths[t - 1] * np.exp((r - 0.5 * sigma ** 2) * dt +
                                         sigma * np.sqrt(dt) * rand)
    return paths

In [3]:
class LSM_NN(tf.keras.Model):
    def __init__(self):
        super(LSM_NN, self).__init__()
        self.fc1 = tf.keras.layers.Dense(50, activation='relu')
        self.fc2 = tf.keras.layers.Dense(50, activation='relu')
        self.fc3 = tf.keras.layers.Dense(50, activation='relu')
        self.fc4 = tf.keras.layers.Dense(1, activation='linear')
        self.fc5 = tf.keras.layers.Dense(1, activation='linear')

    def call(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        continuation_val = self.fc4(x)
        martingale_func = self.fc5(x)
        return continuation_val, martingale_func

In [4]:
def put_payoff(S, K):
    return np.maximum(K - S, 0)

In [5]:
def warm_start(model1, model2):
    weights = [layer.get_weights() for layer in model1.layers if type(layer) is tf.keras.layers.Dense]
    for i, layer in enumerate(model2.layers):
      if type(layer) is tf.keras.layers.Dense:
          layer.set_weights(weights.pop(0))

    return model2

In [6]:
S0 = 36
K = 40
vol = 0.2
rf = 0.06
steps = 50
paths = 10000
T = 1
dt = T/steps
N = steps

discount_factor = np.exp(-rf * dt)

S = gen_paths(S0, rf, vol, T, steps, paths)

payoff = put_payoff(S[-1], K)
upper_price = payoff.copy()
lower_price = payoff.copy()

kf = KFold(n_splits=5, shuffle=False)

In [7]:
# Initialize data structures
cont_store = []
mart_store = []
lower_store = []
upper_store = []

In [None]:


def create_and_initialize_model(paths, model=None):
    new_model = LSM_NN()
    new_model.build(input_shape=(paths, 1))
    if model is not None:
        new_model = warm_start(model, new_model)
    return new_model

def train_model(model, X, y, dW, kf, optimizer, loss_fn):
    best_val_loss = float('inf')
    epochs_no_improve = 0

    for epoch in range(50):
        epoch_val_loss = 0.0
        for train_idx, val_idx in kf.split(X):
            # Splitting dataset
            X_train, X_val = tf.gather(X, train_idx), tf.gather(X, val_idx)
            y_train, y_val = tf.gather(y, train_idx), tf.gather(y, val_idx)
            dW_train, dW_val = tf.gather(dW, train_idx), tf.gather(dW, val_idx)

            # Training step
            loss_train = train_step(model, X_train, y_train, dW_train, optimizer, loss_fn)

            # Validation step
            loss_val = validate_step(model, X_val, y_val, dW_val, loss_fn)
            epoch_val_loss += loss_val.numpy()

        epoch_val_loss /= kf.get_n_splits()

        # Early stopping condition
        if epoch_val_loss < best_val_loss:
            best_val_loss = epoch_val_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve == 5:
            break

    return model, epoch_val_loss

def train_step(model, X_train, y_train, dW_train, optimizer, loss_fn):
    with tf.GradientTape() as tape:
        prediction_train = model_prediction(model, X_train, dW_train)
        loss_train = loss_fn(y_true=tf.squeeze(y_train), y_pred=prediction_train)

    grads = tape.gradient(loss_train, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss_train

def validate_step(model, X_val, y_val, dW_val, loss_fn):
    prediction_val = model_prediction(model, X_val, dW_val)
    loss_val = loss_fn(y_true=tf.squeeze(y_val), y_pred=prediction_val)
    return loss_val

def model_prediction(model, X, dW):
    continuation, martingale = model(X, training=False)
    return tf.add(continuation, martingale * dW)

# Main simulation loop
optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=0.01)
loss_fn = tf.keras.losses.MeanSquaredError()

for t in range(N - 1, -1, -1):
    tf.random.set_seed(123)
    print(f"Time step {t}")

    # Initializing the model
    if t == N - 1:
        y_value = discount_factor * payoff
    else:
        y_value = discount_factor * lower_price

    model = create_and_initialize_model(paths, model if t != 49 else None)

    # Preparing data
    X = tf.convert_to_tensor(S[t], dtype=tf.float32)[:, tf.newaxis]
    y = tf.convert_to_tensor(y_value, dtype=tf.float32)[:, tf.newaxis]
    dW = np.sqrt(dt) * np.random.normal(size=paths)
    dW = tf.convert_to_tensor(dW, dtype=np.float32)[:, tf.newaxis]

    # Training the model
    model, epoch_val_loss = train_model(model, X, y, dW, kf, optimizer, loss_fn)
    print(f"Val Loss for time step {t}: {epoch_val_loss}")

    # Post-training calculations
    continuation_value, martingale_function = model(X, training=False)
    if t == N - 1:
        upper_price = y_value - tf.squeeze(martingale_function).numpy() * tf.squeeze(dW).numpy()
    else:
        upper_price = discount_factor * upper_store[-1] - tf.squeeze(martingale_function).numpy() * tf.squeeze(dW).numpy()

    lower_price = y_value - tf.squeeze(martingale_function).numpy() * tf.squeeze(dW).numpy()

    # Updating prices
    upper_price = np.where(put_payoff(S[t], K) > upper_price, put_payoff(S[t], K), upper_price)
    lower_price = np.where(put_payoff(S[t], K) > tf.squeeze(continuation_value).numpy(), put_payoff(S[t], K), lower_price)

    # Storing values
    cont_store.append(tf.squeeze(continuation_value).numpy())
    mart_store.append(tf.squeeze(martingale_function).numpy())
    lower_store.append(lower_price)
    upper_store.append(upper_price)


In [None]:
upper_price = np.mean(upper_store)
upper_price

In [None]:
lower_price = np.mean(lower_store)
lower_price