In [1]:
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import torch
import matplotlib.pyplot as plt
#from src.dataloader import DataloaderEuropean1D, DataloaderAmerican1D
from PINN import PINNforwards
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.set_default_device(DEVICE)

In [2]:
from torch.distributions import Normal
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
import torch
import numpy as np
from PINN import PINNforwards
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.set_default_device(DEVICE)

import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

mpl.rcParams["figure.titlesize"] = 18
mpl.rcParams["axes.labelsize"] = 15
mpl.rcParams["axes.titlesize"] = 14
mpl.rcParams["legend.fontsize"] = "medium"
mpl.rcParams["xtick.labelsize"] = 12
mpl.rcParams["ytick.labelsize"] = 12
#mpl.rcParams["figure.dpi"] = 500

In [5]:
def make_training_plot(filename : str, for_validation : bool = False):
    types_of_loss = ["total_loss", "loss_boundary",
                     "loss_pde", "loss_expiry", "loss_lower", "loss_upper"]
    if for_validation:
        X_loss = np.loadtxt("results/validation_" + filename + ".txt")
    else:
        X_loss = np.loadtxt("results/loss_" + filename + ".txt")
    
    df_loss = pd.DataFrame(columns = types_of_loss, data = X_loss)
    df_loss["epoch"] = np.arange(1, len(df_loss) + 1)
    df_melted = df_loss.melt(id_vars = "epoch", value_vars=types_of_loss, var_name='Category', value_name = "Loss")

    y_name = "Validation loss " if for_validation else "Loss"
    fig = px.line(df_melted, x = "epoch", y = "Loss", color = "Category", log_y = True)
    fig.show()

In [None]:
filename = "multi_1_560"
make_training_plot(filename)


In [None]:
filename = "multi_1_560"
make_training_plot(filename, True)

In [24]:
def make_heat_map(layers : list[int], nodes : list[int], filename : str):
    data = np.zeros((len(layers), len(nodes)))
    
    for i, layer in enumerate(layers):
        for j, node in enumerate(nodes):
            X_loss = np.loadtxt("results/" + filename + f"_{layer}_{node}" ".txt")
            data[i,j] = np.min(X_loss[:,1])
    
    #np.savetxt("results/"+filename + "_lowest_MSE" ".txt", data)
    df = pd.DataFrame(data, index=layers, columns=nodes)
    formatted_df = df.applymap(lambda x: f'{x:.2e}')

    fig = px.imshow(df, 
                labels=dict(x="Columns", y="Rows", color="Value"), 
                text_auto=True,  # This automatically adds values in the cells
                aspect="auto")

    fig.update_traces(text=formatted_df.values, texttemplate="%{text}")
    # Show the plot
    fig.show()

In [None]:
make_heat_map(["1","2","3","4","5","6","7"], ["10","20","40","80","160","320","560"], "loss_multi")

In [None]:
make_heat_map(["1","2","3","4","5","6","7"], ["10","20","40","80","160","320","560"], "validation_multi")

In [5]:
def make_heat_map_activation(filename : str, activation_functions : list[str], layers : list[str]):
    data = np.loadtxt(filename)

    df = pd.DataFrame(data, index=activation_functions, columns=layers)
    formatted_df = df.applymap(lambda x: f'{x:.2e}')

    fig = px.imshow(df, 
                labels=dict(x="Layers", y="Activation function", color="Value"), 
                text_auto=True,  # This automatically adds values in the cells
                aspect="auto")

    fig.update_traces(text=formatted_df.values, texttemplate="%{text}")
    # Show the plot
    fig.show()


In [None]:
make_heat_map_activation("important_results/different_activation.txt", ["ReLU", "LeakyReLU", "Sigmoid", "Tanh"], ["1", "2", "4", "8", "16"])

In [2]:
def make_heat_map_learning_rate(filename : str, learning_rates : list[str], batch_sizes : list[str], format : str):
    data = np.loadtxt(filename)

    df = pd.DataFrame(data, index=learning_rates, columns=batch_sizes)
    formatted_df = df.applymap(format)

    fig = px.imshow(df, 
                labels=dict(x="Batch size", y="Learning rate", color="Batch size"), 
                text_auto=True,  # This automatically adds values in the cells
                aspect="auto")

    fig.update_traces(text=formatted_df.values, texttemplate="%{text}")
    # Show the plot
    fig.show()

In [None]:
make_heat_map_learning_rate("important_results/MSE_lr_1d_european.txt", ["1e-2", "5e-3", "1e-3", "5e-4", "1e-4", "5e-5"], ["100", "1_000", "5_000", "10_000"], lambda x: f'{x:.2e}')

In [None]:
make_heat_map_learning_rate("important_results/epoch_lr_1D_european.txt", ["1e-2", "5e-3", "1e-3", "5e-4", "1e-4", "5e-5"], ["100", "1_000", "5_000", "10_000"], lambda x: f'{x:.0f}')

In [None]:
def compare_model_with_analytical_1D(model_name, dataloader):
    S_range, t_range = dataloader.S_range, dataloader.time_range

    model = PINN(2, 1, 300, 5).to(DEVICE)
    model.load_state_dict(torch.load(model_name, weights_only=True))

    s = np.linspace(S_range[0], S_range[1], 100)
    t = np.linspace(t_range[0], t_range[1], 100)
    s_grid, t_grid = np.meshgrid(s, t)
    s_flat = s_grid.flatten()
    t_flat = t_grid.flatten()

    # Create a 2D tensor from the flattened arrays
    X_test = torch.tensor(np.column_stack((t_flat, s_flat)), dtype=torch.float)
    X_test_scaled = dataloader.normalize(X_test)
    # X_test_norm = euro_call_data.normalize(X_test)
    y_analytical_test = dataloader.get_analytical_solution(X_test[:,1],X_test[:,0])
    model.eval()
    with torch.no_grad():
        y_pinn_test = model(X_test_scaled)
    
    err = (y_pinn_test-y_analytical_test.reshape(y_analytical_test.shape[0],-1)).cpu().numpy()
    loss_mse = np.square(err).sum()/(len(err))
    print(f"MSE net: {loss_mse:.3f}")

    fig = plt.figure(figsize=(14,7))
    ax = fig.add_subplot(121, projection='3d')
    ax.plot_surface(s_grid, t_grid, y_analytical_test.cpu().numpy().reshape(s_grid.shape), cmap = "viridis")
    ax.set_title("Analytical Soln")
    ax.set_xlabel("Spot Price")
    ax.set_ylabel(f"current time (t), $t \in {[t_range]}$")
    ax.set_zlabel("Call price")
    ax.view_init(elev=20, azim=-120)
    ax = fig.add_subplot(122, projection='3d')
    ax.plot_surface(s_grid, t_grid, y_pinn_test.cpu().numpy().reshape(s_grid.shape), cmap = "viridis")
    ax.set_title("PINN prediction")
    ax.set_xlabel("Spot Price")
    ax.set_ylabel(f"current time (t), $t \in {[t_range]}$")
    ax.set_zlabel("Call price")
    ax.view_init(elev=20, azim=-120)
    plt.show()
    #plt.savefig(experiment_dir+f"/true_vs_pred_final.jpg")
    fig = plt.figure(figsize=(14,7))
    ax = fig.add_subplot(121, projection='3d')
    err = y_pinn_test.cpu().numpy().reshape(s_grid.shape) - y_analytical_test.cpu().numpy().reshape(s_grid.shape)
    ax.plot_surface(s_grid, t_grid, err, cmap = "viridis")
    ax.plot_surface(s_grid, t_grid, np.zeros(err.shape), color = "grey")
    ax.set_title("Analytical Soln - Predicted")
    ax.set_xlabel("Spot Price")
    ax.set_ylabel("Time to expiry")
    ax.set_zlabel("Call price")
    ax.view_init(elev=20, azim=-120)
    plt.show()


In [None]:
dataloader = DataloaderEuropean1D([0,1], [0, 100], 20, 0.03, 0.25, DEVICE)
compare_model_with_analytical_1D("models/epoch_9985_test_analytical.pth", dataloader)

In [None]:
dataloader = DataloaderAmerican1D([0, 1], [0, 100], 40, 0.04, 0.25, DEVICE)
compare_model_with_analytical_1D("models/epoch_9941_america_test.pth", dataloader)

In [23]:
#X_analytical = np.loadtxt("results/analytical_test.txt")
#X_validation_data = np.loadtxt("results/validation_data.txt")

In [8]:
#df_analytical = pd.DataFrame(X_analytical, columns = ["value"])
#df_validation_data = pd.DataFrame(X_validation_data, columns = ["t", "S"])

In [38]:
def make_std_plot(filename):
    loss = np.loadtxt("results/average_loss_" + filename)
    std_loss = np.loadtxt("results/std_loss_" + filename)

    loss_upper = loss + std_loss
    loss_lower = loss - std_loss
    x_loss = np.arange(1, len(loss) + 1)

    val = np.loadtxt("results/average_validation_" + filename)
    std_val = np.loadtxt("results/std_validation_" + filename)

    val_upper = val + std_val
    val_lower = val - std_val
    x_val = np.arange(1, len(loss) + 1, 30)

    fig = make_subplots(rows=2, cols=1, subplot_titles=("Plot 1", "Plot 2"))

    fig.add_trace(go.Scatter(x=x_loss[1000::10], y=loss[1000::10], mode='lines+markers', name='Data 1', line=dict(color='red')), row=1, col=1)
    fig.add_trace(go.Scatter(x=x_loss[1000::10], y=loss_upper[1000::10], fill=None, mode='lines', line=dict(color='rgba(255,0,0,0)')), row=1, col=1)
    fig.add_trace(go.Scatter(x=x_loss[1000::10], y=loss_lower[1000::10], fill='tonexty', mode='lines', line=dict(color='rgba(255,0,0,0.2)')), row=1, col=1)

    # Second plot (y2 with shaded error band)
    fig.add_trace(go.Scatter(x=x_val, y=val, mode='lines+markers', name='Data 2', line=dict(color='blue')), row=2, col=1)
    fig.add_trace(go.Scatter(x=x_val, y=val_upper, fill=None, mode='lines', line=dict(color='rgba(0,0,255,0)')), row=2, col=1)
    fig.add_trace(go.Scatter(x=x_val, y=val_lower, fill='tonexty', mode='lines', line=dict(color='rgba(0,0,255,0.2)')), row=2, col=1)

    #fig.update_yaxes(type='log', row=1, col=1)  # Log scale for first plot
    #fig.update_yaxes(type='log', row=2, col=1)  # Log scale for second plot
    # Update layout
    fig.update_layout(height=600, width=800, title_text="2x1 Plot with Standard Deviation")

    # Show the plot
    fig.show()

In [None]:
for cur in ["lr_1", "lr_2", "lr_3", "lr_4"]:
    make_std_plot(cur)

In [5]:
X = np.load("results/average_lambdas_test.npy")

In [None]:
X

In [None]:
plt.plot(X[:, 0])
plt.plot(X[:, 1])
plt.plot(X[:, 2])
#plt.plot(X[0, ::1000] + X[1, ::1000])
#plt.yscale("log")

In [None]:
np.min(X[0, ::1000] - X[1, ::1000])

In [None]:
plt.plot(X[1,::1000])
plt.yscale("log")

In [3]:
X = np.load("important_results_backwards/sigmas_learning_test.npy")

In [None]:
scale = [1_000, 5_000]
lr = [1e-3, 5e-4]
for i in range(0, X.shape[0], 2):
    for j in range(2):
        plt.plot(X[i + j, :], label = f"{scale[j]}")
    plt.title(f"lr = {lr[i // 5] : .1e}")
    plt.legend()
    plt.show()

In [52]:
n = 50
S_range = [0,200]
t_range = [0,1]
s = np.linspace(*S_range, n)
t = np.linspace(*t_range, n)
S, T = np.meshgrid(s, t)

In [53]:

X = torch.tensor(np.column_stack((T.flatten(), S.flatten())), dtype=torch.float)
min_values = torch.tensor(
            [t_range[0], S_range[0]]).to(DEVICE)
max_values = torch.tensor(
    [t_range[1], S_range[1]]).to(DEVICE)

X_scaled = (X - min_values) / (max_values - min_values)
#Z = model.forward()

In [54]:
from torch.distributions import Normal
def get_analytical_solution(S, t, t_range, sigma, r, K):
        T = t_range[-1]
        t2m = T-t  # Time to maturity
        d1 = (torch.log(S / K) + (r + 0.5 * sigma**2)
              * t2m) / (sigma * torch.sqrt(t2m))

        d2 = d1 - sigma * torch.sqrt(t2m)

        # Normal cumulative distribution function (CDF)
        standard_normal = Normal(0, 1)

        Nd1 = standard_normal.cdf(d1)
        Nd2 = standard_normal.cdf(d2)

        # Calculate the option price
        F = S * Nd1 - K * Nd2 * torch.exp(-r * t2m)
        return F

In [55]:
model = PINNforwards(2,1,256, 4)
model.load_state_dict(torch.load("models/greeks.pth", weights_only=True))
model = model.to(DEVICE)

Z_model = model(X_scaled).to("cpu").detach().numpy().reshape((n,n))
Z_analytical = get_analytical_solution(X[:,1], X[:,0], t_range, 0.5, 0.04, 40).to("cpu").detach().numpy().reshape((n,n))

In [None]:
fig = make_subplots(
    rows=1, cols=3,
    specs=[[{'type': 'surface'}, {'type': 'surface'}, {'type': 'surface'}]],  # Specify 3D plots
    subplot_titles=("Predicted", "Analytical", "Predicted subtracted Analytical"),
    #horizontal_spacing=0.5
)

# Add the first 3D surface plot
fig.add_trace(
    go.Surface(z=Z_model, x=S, y=T, colorscale='Cividis', showscale = True, colorbar=dict(title="Option price", x= 0.65)), 
    row=1, col=1
)

# Add the second 3D surface plot
fig.add_trace(
    go.Surface(z=Z_analytical, x=S, y=T, showscale = False,colorscale='Cividis'),
    row=1, col=2
)

fig.add_trace(
    go.Surface(z=Z_model - Z_analytical, x=S, y=T, colorscale='Viridis', showscale=True, colorbar=dict(title="Difference", x = 1.0)),
    row=1, col=3
)

fig.update_layout(
    scene=dict(
        xaxis_title="Stock price",
        yaxis_title="Time",
        zaxis_title="Option price",
        camera=dict(eye=dict(x=-1, y=2, z=2)),  # Adjust view for both plots
        zaxis=dict(
            tickvals=[np.max(Z_analytical) // 4, np.max(Z_analytical) // 8*5,  np.round(np.max(Z_analytical))],  # Specify the ticks to display
        ),
        xaxis = dict(
            tickvals=[np.min(S), np.max(S) // 2,  np.round(np.max(S))],  # Specify the ticks to display
        ),
    ),
    scene2=dict(  # Layout for the second plot
        xaxis_title="Stock price",
        yaxis_title="Time",
        zaxis_title="Option price",
        camera=dict(eye=dict(x=-1, y=2, z=2)),  # Adjust view for both plots
        zaxis=dict(
            tickvals=[np.max(Z_analytical) // 4, np.max(Z_analytical) // 8*5,  np.round(np.max(Z_analytical))],  # Specify the ticks to display
        ),
        xaxis = dict(
            tickvals=[np.min(S), np.max(S) // 2,  np.round(np.max(S))],  # Specify the ticks to display
        ),
    ),
    scene3=dict(  # Layout for the second plot
        xaxis_title="Stock price",
        yaxis_title="Time",
        zaxis_title="Option price",
        camera=dict(eye=dict(x=-1, y=2, z=2)),  # Adjust view for both plots
        xaxis = dict(
            tickvals=[np.min(S), np.max(S) // 2,  np.round(np.max(S))],  # Specify the ticks to display
        ),
    ),
    margin=dict(l=0, r=0, t=40, b=0),
    font=dict(size=17),
    
)
fig.update_annotations(font_size=30)

# Show the plot
fig.write_image("plots/one_dim_european.pdf", width=2200, height=800, scale=1)
fig.show()

In [46]:
n = 1_000
S_range = [0,200]
t_range = [0,1]
S = np.linspace(*S_range, n)
T = np.full(n, 0.5)

X = torch.tensor(np.column_stack((T, S)), dtype=torch.float, requires_grad=True)
min_values = torch.tensor(
            [t_range[0], S_range[0]]).to(DEVICE)
max_values = torch.tensor(
    [t_range[1], S_range[1]]).to(DEVICE)

X_scaled = (X - min_values) / (max_values - min_values)


In [47]:
model = PINNforwards(2,1,256, 4)
model.load_state_dict(torch.load("models/greeks.pth", weights_only=True))
model = model.to(DEVICE)

delta, gamma, theta, nu, rho = model.estimate_greeks_call(X_scaled, X, 0.5, t_range[1])

In [48]:
delta = delta.to("cpu").detach().numpy().flatten()
gamma = gamma.to("cpu").detach().numpy().flatten()
theta = theta.to("cpu").detach().numpy().flatten()
nu = nu.to("cpu").detach().numpy().flatten()
rho = rho.to("cpu").detach().numpy().flatten()

In [None]:
fig = make_subplots(
    rows=2, cols=3,
    specs=[
        [{'type': 'xy'}, {'type': 'xy'}, {'type': 'xy'}],  # Row 1: 3 columns
        [{'type': 'xy'}, {'type': 'xy'}, None]            # Row 2: 2 columns
    ],
    subplot_titles=(r"$\Delta$", r"$\Gamma$", r"$\Theta$", r"$\nu$", r"$\rho$"),
    horizontal_spacing=0.1,  # Adjust horizontal spacing
    vertical_spacing=0.2     # Adjust vertical spacing
)

# Add line plots to the first row
fig.add_trace(go.Scatter(x=S, y=delta, mode='lines', name="Sine", line=dict(width=2)), row=1, col=1)
fig.add_trace(go.Scatter(x=S, y=gamma, mode='lines', name="Cosine", line=dict(width=2)), row=1, col=2)
fig.add_trace(go.Scatter(x=S, y=theta, mode='lines', name="Tangent", line=dict(width=2)), row=1, col=3)

# Add line plots to the second row
fig.add_trace(go.Scatter(x=S, y=nu, mode='lines', name="Exponential", line=dict(width=2)), row=2, col=1)
fig.add_trace(go.Scatter(x=S, y=rho, mode='lines', name="Quadratic", line=dict(width=2)), row=2, col=2)

# Update layout
fig.update_layout(
    title="Different Greeks at t = 0.5",
    height=800,  # Adjust height to fit all subplots
    width=1000,  # Adjust width for better aspect ratio
    font=dict(size=20),  # Adjust font size
    margin=dict(l=20, r=20, t=50, b=20),  # Tight margins
    showlegend = False
)
fig.update_annotations(font_size=20)
fig.update_xaxes(title_text="Stock Price", row=1, col=1)
fig.update_xaxes(title_text="Stock Price", row=1, col=2)
fig.update_xaxes(title_text="Stock Price", row=1, col=3)
fig.update_xaxes(title_text="Stock Price", row=2, col=1)
fig.update_xaxes(title_text="Stock Price", row=2, col=2)
# Show the plot
fig.write_image("plots/one_dim_european_greeks.pdf", width=2200, height=800, scale=1)
fig.show()

In [None]:
X_lambda = np.load("results/average_loss_with_lambda.npy")
X = np.load("results/average_loss_no_lambda.npy")

X.shape

In [None]:
import numpy as np
import matplotlib.pyplot as plt

X_lambda = np.load("results/average_loss_no_fourier.npy")
X = np.load("results/average_loss_with_fourier.npy")
plt.plot(X_lambda[:len(X_lambda) // 2:100, 0], label = "No fourier")
plt.plot(X[:len(X)//2:100, 0], label = "Fourier")

plt.legend()
plt.yscale("log")

In [None]:
X_lambda = np.load("results/average_validation_with_lambda.npy")
X = np.load("results/average_validation_no_lambda.npy")
plt.plot(X_lambda[:, 1], label = "with_lambda")
plt.plot(X[1], label = "no lambda")

plt.legend()
plt.yscale("log")

In [None]:
X_lambda = np.load("results/average_lambdas_with_lambda.npy")
X = np.load("results/average_lambdas_no_lambda.npy")
plt.plot(X_lambda[0], label = "pde")
plt.plot(X_lambda[1], label = "boundary")
plt.plot(X_lambda[2], label = "expiry")
#plt.plot(X_lambda[0] + X_lambda[1] + X_lambda[2], label = "sum")
plt.legend()
plt.yscale("log")

In [None]:


X_loss_fourier.shape

In [38]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

mpl.rcParams["figure.titlesize"] = 18
mpl.rcParams["axes.labelsize"] = 15
mpl.rcParams["axes.titlesize"] = 14
mpl.rcParams["legend.fontsize"] = "medium"
mpl.rcParams["xtick.labelsize"] = 12
mpl.rcParams["ytick.labelsize"] = 12
mpl.rcParams["figure.dpi"] = 500

In [None]:
X_loss_fourier = np.load("results/average_loss_with_fourier.npy")
X_loss = np.load("results/average_loss_no_fourier.npy")

n_loss = X_loss_fourier.shape[0]

X_validation_fourier = np.load("results/average_validation_with_fourier.npy")
X_validation = np.load("results/average_validation_no_fourier.npy")

n_val = X_validation_fourier.shape[0]


skip = 0
plot_every = 2_000
x_loss = np.arange(skip, n_loss//2, plot_every)

fig, ax = plt.subplots(1, 2, figsize=(12, 6))

ax[0].plot(x_loss, X_loss_fourier[skip:n_loss//2:plot_every, 0], label = "Fourier", color = "midnightblue")
ax[0].plot(x_loss, X_loss[skip:n_loss//2:plot_every, 0], label = "No Fourier", color = "red")
ax[0].set_yscale("log")
ax[0].set_xlabel("epochs")
ax[0].legend()
ax[0].set_title("Training loss")
ax[0].grid()


plot_every = 100
x_val = np.arange(skip, n_loss//2, 30*plot_every)

ax[1].plot(x_val, X_validation_fourier[skip:n_val//2:plot_every, 0], label = "Fourier", color = "midnightblue")
ax[1].plot(x_val, X_validation[skip:n_val//2:plot_every, 0], label = "No Fourier", color = "red")
ax[1].set_yscale("log")
ax[1].set_xlabel("epochs")
#as[1].legend()
ax[1].set_title("Validation loss")
ax[1].grid()

fig.tight_layout()
fig.show()
#plt.savefig("plots/fourier_loss.pdf")

In [2]:
from torch.distributions import Normal
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
import torch
import numpy as np
from PINN import PINNforwards
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.set_default_device(DEVICE)

import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

mpl.rcParams["figure.titlesize"] = 18
mpl.rcParams["axes.labelsize"] = 15
mpl.rcParams["axes.titlesize"] = 14
mpl.rcParams["legend.fontsize"] = "medium"
mpl.rcParams["xtick.labelsize"] = 12
mpl.rcParams["ytick.labelsize"] = 12
mpl.rcParams["figure.dpi"] = 500

In [None]:
def standard_normal_pdf(x):
    return (1 / torch.sqrt(torch.tensor(2 * torch.pi))) * torch.exp(-x.pow(2) / 2)

def plots_greeks(n: int, time: float, path_to_weights: str, name_of_plot: str):
    #n = 1_000
    S_range = [0, 400]
    t_range = [0, 1]
    S = np.linspace(*S_range, n)
    T = np.full(n, time)

    X = torch.tensor(np.column_stack((T, S)),
                     dtype=torch.float, requires_grad=True)
    min_values = torch.tensor(
        [t_range[0], S_range[0]]).to(DEVICE)
    max_values = torch.tensor(
        [t_range[1], S_range[1]]).to(DEVICE)

    X_scaled = (X - min_values) / (max_values - min_values)

    model = PINNforwards(2, 1, 128, 4, use_fourier_transform=True, sigma_FF=5.0, encoded_size=128)
    model.load_state_dict(torch.load(path_to_weights, weights_only=True))
    model = model.to(DEVICE)
    
    r = 0.04
    sigma = 0.5
    K = 40

    t2m = 1.0-T
    t2m = torch.from_numpy(t2m).to(DEVICE)
    S_torch = torch.from_numpy(S).to(DEVICE)

    d1 = (torch.log(S_torch / K) + (r + 0.5 * sigma**2)
          * t2m) / (sigma * torch.sqrt(t2m))

    d2 = d1 - sigma * torch.sqrt(t2m)

    standard_normal = Normal(0, 1)

    analytical_delta = standard_normal.cdf(d1).cpu().detach().numpy()
    analytical_gamma = (standard_normal_pdf(
        d1) / (S_torch * sigma * t2m)).cpu().detach().numpy()
    analytical_rho = (t2m * K * torch.exp(-r * t2m) *
                      standard_normal.cdf(d2)).cpu().detach().numpy()

    analytical_theta = (-S_torch * sigma / (2 * torch.sqrt(t2m)) * standard_normal_pdf(
        d1) - r * K * torch.exp(- r * t2m) * standard_normal.cdf(d2)).cpu().detach().numpy()

    analytical_nu = (S_torch * torch.sqrt(t2m) *
                     standard_normal_pdf(d1)).cpu().detach().numpy()

    delta, gamma, theta, nu, rho = model.estimate_greeks_call(
        X_scaled, X, 0.5, t_range[1])

    Nd1 = standard_normal.cdf(d1)
    Nd2 = standard_normal.cdf(d2)

    # Calculate the option price
    analytical_price = (S_torch * Nd1 - K * Nd2 * torch.exp(-r * t2m)).to("cpu").detach().numpy().flatten()

    price = model(X_scaled)

    delta = delta.to("cpu").detach().numpy().flatten()
    gamma = gamma.to("cpu").detach().numpy().flatten()
    theta = theta.to("cpu").detach().numpy().flatten()
    nu = nu.to("cpu").detach().numpy().flatten()
    rho = rho.to("cpu").detach().numpy().flatten()
    price = price.to("cpu").detach().numpy().flatten()

    fig, ax = plt.subplots(2,3, figsize = (10,5))
    linestyle = (0, (4, 4))
    linewidth = 3
    # Add line plots to the first row
    ax[0, 0].plot(S, delta, color = "midnightblue", linewidth = linewidth)
    ax[0, 0].plot(S, analytical_delta, color = "red", linestyle = linestyle, linewidth = linewidth)
    ax[0,0].set_title(r"$\Delta$")

    ax[0,1].plot(S, gamma, color = "midnightblue", label = "Predicted",linewidth = linewidth)
    ax[0,1].plot(S, analytical_gamma, color = "red", linestyle = linestyle, label = "Analytical", linewidth = linewidth)
    ax[0,1].set_title(r"$\Gamma$")
    ax[0,1].legend()
    
    ax[0,2].plot(S, theta, color = "midnightblue", linewidth = linewidth)
    ax[0,2].plot(S, analytical_theta, color = "red", linestyle = linestyle, linewidth = linewidth)
    ax[0,2].set_title(r"$\Theta$")

    ax[1,0].plot(S, nu, color = "midnightblue", linewidth = linewidth)
    ax[1,0].plot(S, analytical_nu,  color = "red", linestyle = linestyle, linewidth = linewidth)
    ax[1,0].set_title(r"$\nu$")
    ax[1,0].set_xlabel("Asset price")

    ax[1,1].plot(S, rho, color = "midnightblue", linewidth = linewidth)
    ax[1,1].plot(S, analytical_rho, color = "red", linestyle = linestyle, linewidth = linewidth)
    ax[1,1].set_title(r"$\rho$")
    ax[1,1].set_xlabel("Asset price")
    
    ax[1,2].plot(S, price, color = "midnightblue", linewidth = linewidth)
    ax[1,2].plot(S, analytical_price, color = "red", linestyle = linestyle, linewidth = linewidth)
    ax[1,2].set_title(r"Pricing function")
    ax[1,2].set_xlabel("Asset price")
    
    for i in range(2):
        for j in range(3):
            ax[i,j].grid()
    
    fig.tight_layout()
    #plt.savefig(name_of_plot)
    plt.show()

plots_greeks(10_000, 0.5, "../models/greeks.pth",
                 "plots/one_dim_european_greeks.pdf")

In [None]:
def standard_normal_pdf(x):
    return (1 / torch.sqrt(torch.tensor(2 * torch.pi))) * torch.exp(-x.pow(2) / 2)

def plots_greeks(n: int, time: float, path_to_weights: str, name_of_plot: str):
    #n = 1_000
    S_range = [0, 400]
    t_range = [0, 1]
    S = np.linspace(*S_range, n)
    T = np.full(n, time)

    X = torch.tensor(np.column_stack((T, S)),
                     dtype=torch.float, requires_grad=True)
    min_values = torch.tensor(
        [t_range[0], S_range[0]]).to(DEVICE)
    max_values = torch.tensor(
        [t_range[1], S_range[1]]).to(DEVICE)

    X_scaled = (X - min_values) / (max_values - min_values)

    model = PINNforwards(2, 1, 128, 4, use_fourier_transform=True, sigma_FF=5.0, encoded_size=128)
    model.load_state_dict(torch.load(path_to_weights, weights_only=True))
    model = model.to(DEVICE)
    
    r = 0.04
    sigma = 0.5
    K = 40

    t2m = 1.0-T
    t2m = torch.from_numpy(t2m).to(DEVICE)
    S_torch = torch.from_numpy(S).to(DEVICE)

    d1 = (torch.log(S_torch / K) + (r + 0.5 * sigma**2)
          * t2m) / (sigma * torch.sqrt(t2m))

    d2 = d1 - sigma * torch.sqrt(t2m)

    standard_normal = Normal(0, 1)

    analytical_delta = standard_normal.cdf(d1).cpu().detach().numpy()
    analytical_gamma = (standard_normal_pdf(
        d1) / (S_torch * sigma * t2m)).cpu().detach().numpy()
    analytical_rho = (t2m * K * torch.exp(-r * t2m) *
                      standard_normal.cdf(d2)).cpu().detach().numpy()

    analytical_theta = (-S_torch * sigma / (2 * torch.sqrt(t2m)) * standard_normal_pdf(
        d1) - r * K * torch.exp(- r * t2m) * standard_normal.cdf(d2)).cpu().detach().numpy()

    analytical_nu = (S_torch * torch.sqrt(t2m) *
                     standard_normal_pdf(d1)).cpu().detach().numpy()

    delta, gamma, theta, nu, rho = model.estimate_greeks_call(
        X_scaled, X, 0.5, t_range[1])

    Nd1 = standard_normal.cdf(d1)
    Nd2 = standard_normal.cdf(d2)

    # Calculate the option price
    analytical_price = (S_torch * Nd1 - K * Nd2 * torch.exp(-r * t2m)).to("cpu").detach().numpy().flatten()

    price = model(X_scaled)

    delta = delta.to("cpu").detach().numpy().flatten()
    gamma = gamma.to("cpu").detach().numpy().flatten()
    theta = theta.to("cpu").detach().numpy().flatten()
    nu = nu.to("cpu").detach().numpy().flatten()
    rho = rho.to("cpu").detach().numpy().flatten()
    price = price.to("cpu").detach().numpy().flatten()

    fig = make_subplots(
        rows=2, cols=3,
        specs=[
            [{'type': 'xy'}, {'type': 'xy'}, {'type': 'xy'}],  # Row 1: 3 columns
            [{'type': 'xy'}, {'type': 'xy'}, {'type': 'xy'}]            # Row 2: 2 columns
        ],
        subplot_titles=(r"$\Delta$", r"$\Gamma$",
                        r"$\Theta$", r"$\nu$", r"$\rho$", "Pricing function"),
        horizontal_spacing=0.1,  # Adjust horizontal spacing
        vertical_spacing=0.2     # Adjust vertical spacing
    )

    # Add line plots to the first row
    fig.add_trace(go.Scatter(x=S, y=analytical_delta, mode='lines',
                  line=dict(width=4, color = 'black', dash = 'dot')), row=1, col=1)
    
    fig.add_trace(go.Scatter(x=S, y=delta, mode='lines',
                  line=dict(width=2)), row=1, col=1)
    
    fig.add_trace(go.Scatter(x=S, y=analytical_gamma, mode='lines',
                  line=dict(width=4, color = 'black', dash = 'dot')), row=1, col=2)
    
    fig.add_trace(go.Scatter(x=S, y=gamma, mode='lines',
                  line=dict(width=2)), row=1, col=2)
    
    fig.add_trace(go.Scatter(x=S, y=analytical_theta, mode='lines',
                  line=dict(width=4, color = 'black', dash = 'dot')), row=1, col=3)
    
    fig.add_trace(go.Scatter(x=S, y=theta, mode='lines',
                  line=dict(width=2)), row=1, col=3)

    # Add line plots to the second row
    fig.add_trace(go.Scatter(x=S, y=analytical_nu, mode='lines',
                  line=dict(width=4, color = 'black', dash = 'dot')), row=2, col=1)

    fig.add_trace(go.Scatter(x=S, y=nu, mode='lines',
                  line=dict(width=2)), row=2, col=1)
    
    fig.add_trace(go.Scatter(x=S, y=analytical_rho, mode='lines',
                  line=dict(width=4, color = 'black', dash = 'dot')), row=2, col=2)
    
    fig.add_trace(go.Scatter(x=S, y=rho, mode='lines',
                  line=dict(width=2)), row=2, col=2)
    
    fig.add_trace(go.Scatter(x=S, y=analytical_price, mode='lines',
                  line=dict(width=4, color = 'black', dash = 'dot')), row=2, col=3)
    
    fig.add_trace(go.Scatter(x=S, y=price, mode='lines',
                  line=dict(width=2)), row=2, col=3)

    # Update layout
    fig.update_layout(
        #title=f"Different Greeks at t = {time}",
        height=800,  # Adjust height to fit all subplots
        width=1000,  # Adjust width for better aspect ratio
        font=dict(size=20),  # Adjust font size
        margin=dict(l=20, r=20, t=50, b=20),  # Tight margins
        showlegend=False
    )
    fig.update_annotations(font_size=20)
    fig.update_xaxes(title_text="Stock Price", row=1, col=1)
    fig.update_xaxes(title_text="Stock Price", row=1, col=2)
    fig.update_xaxes(title_text="Stock Price", row=1, col=3)
    fig.update_xaxes(title_text="Stock Price", row=2, col=1)
    fig.update_xaxes(title_text="Stock Price", row=2, col=2)
    # Show the plot
    #fig.write_image(name_of_plot, width=2200, height=800, scale=1)
    fig.update_annotations(font_size=24)
    fig.show()

plots_greeks(10_000, 0.5, "../models/greeks.pth",
                 "plots/one_dim_european_greeks.pdf")

In [None]:
def plot_timing_vs_RMSE_binomial(filename):
    rmse = np.loadtxt("important_results/american_1D/RMSE_binomial.txt")
    timings = np.loadtxt("important_results/american_1D/timings_binomial.txt")

    M_values = np.array([32, 64, 128, 256, 384, 512, 768, 1024, 1280, 1536, 1792, 2048])

    fig, ax = plt.subplots(1, 2, figsize=(10, 4))

    ax[0].plot(M_values, rmse, label = "RMSE", color = "midnightblue")
    ax[0].plot(M_values, 1 / M_values, label = r"$O(n^{-1})$", color = "red")
    ax[0].set_yscale('log')
    ax[0].set_xscale("log")
    ax[0].set_ylabel("RMSE")
    ax[0].set_xlabel("M")
    ax[0].legend()
    ax[0].grid()

    ax[1].plot(M_values, timings, label = "Run time")
    ax[1].plot(M_values, M_values, label = r"$O(n)$", color = "red")
    ax[1].set_yscale('log')
    ax[1].set_xscale("log")
    ax[1].set_ylabel("Run time [s]")
    ax[1].set_xlabel("M")
    ax[1].legend()
    ax[1].grid()
    fig.tight_layout()
    plt.show()

plot_timing_vs_RMSE_binomial("")

In [3]:
torch.manual_seed(2026)
np.random.seed(2026)
from train import create_validation_data
from data_generator import DataGeneratorAmerican1D

config = {}

config["K"] = 40
config["t_range"] = [0, 1]
config["S_range"] = [0, 400]
config["sigma"] = 0.5
config["r"] = 0.04
config["w_expiry"] = 1
config["w_lower"] = 1
config["w_upper"] = 1

dataloader = DataGeneratorAmerican1D(
    time_range=config["t_range"], S_range=config["S_range"], K=config["K"], r=config["r"], sigma=config["sigma"], DEVICE=DEVICE)

validation_data = create_validation_data(
    dataloader=dataloader, N_validation=5_000, config=config)

test_data = create_validation_data(
    dataloader=dataloader, N_validation=20_000, config=config)

In [None]:
X = test_data["X1_validation"].cpu().detach().numpy()
y = np.load("../data/test_data_american_1D.npy")

print(X[:,1].shape, y.shape)

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D  # This import registers the 3D projection
import numpy as np


# Create a new figure and add a 3D subplot
fig = plt.figure(figsize=(8, 6))
ax = fig.add_subplot(111, projection='3d')

# Create a 3D line plot
ax.plot_trisurf(X[::1, 0], X[::1,1], y[::1].ravel(), cmap='viridis', edgecolor='none')

# Optionally, you can also create a scatter plot if you want discrete points:
# ax.scatter(x, y, z, color='r', marker='o', label='3D points')

# Label the axes
ax.set_xlabel('Time')
ax.set_ylabel('Asset price')
ax.set_zlabel('Price function')

# Add a title and legend
#ax.set_title('3D Plot Example')
ax.view_init(elev=30, azim=30)
ax.legend()

fig.tight_layout()
# Display the plot
plt.savefig("../plots/american_3D.png")

In [8]:
config = {}

config["N_INPUT"] = 2
config["use_fourier_transform"] = True
config["sigma_fourier"] = 5.0
config["fourier_encoded_size"] = 128

config["w_expiry"] = 1
config["w_lower"] = 1
config["w_upper"] = 1

config["weight_decay"] = 0
config["gamma"] = 0.99
config["scheduler_step"] = 1_000

config["N_sample"] = 1024
config["lambda_pde"] = 1
config["lambda_boundary"] = 1
config["lambda_expiry"] = 1
# config["update_lambda"] = 500
# config["alpha_lambda"] = 0.9

config["K"] = 40
config["t_range"] = [0, 1]
config["S_range"] = [0, 400]
config["sigma"] = 0.5
config["r"] = 0.04

In [None]:
from data_generator import DataGeneratorEuropean1D
from train import create_validation_data


torch.manual_seed(2025)
np.random.seed(2025)
dataloader = DataGeneratorEuropean1D(
    time_range=[0,1], S_range=[0,400], K=40, r=0.04, sigma=0.5, DEVICE=DEVICE)

validation_data = create_validation_data(
    dataloader=dataloader, N_validation=5_000, config=config)

test_data = create_validation_data(
    dataloader=dataloader, N_validation=20_000, config=config)

In [None]:
model = PINNforwards(2, 1, 128, 8, use_fourier_transform=True,
                     sigma_FF=5.0, encoded_size=128)
model.load_state_dict(torch.load("../models/large_model.pth", weights_only=True))

In [None]:
X_loss = np.load("../results/average_loss_american_multiple.npy")
X_validation = np.load("../results/average_validation_american_multiple.npy")

X_loss = X_loss[: X_loss.shape[0] // 2]
X_validation = X_validation[: X_validation.shape[0] // 2]

print(X_loss.shape, X_validation.shape)
print(np.arange(1, 600_000, 600).shape)

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(12, 5))

x_values = np.arange(1, 600_000 + 1, 600)

ax[0].plot(x_values[100::5], X_loss[100::5,0], label = "Loss", color = "midnightblue")
ax[0].plot(x_values[100::5], X_validation[100::5, 0], linestyle = (0, (4, 4)), label = "Validation", color = "red")
ax[0].set_yscale("log")
#ax[0].set_xscale("log")
ax[0].set_xlabel("epoch")
ax[0].legend()
ax[0].grid()

ax[1].plot(x_values[100::5], X_loss[100::5, 4], label = "loss_lower")
ax[1].plot(x_values[100::5], X_loss[100::5, 5], label = "loss_upper")
ax[1].plot(x_values[100::5], X_loss[100::5, 2], label = "loss_pde")
ax[1].plot(x_values[100::5], X_loss[100::5, 3], label = "loss_expiry")
ax[1].grid()
ax[1].set_xlabel("epoch")
#plt.plot(X_loss[100::5, 0], label = "Loss")

ax[1].legend()
ax[1].set_yscale("log")

In [39]:
X1_test = test_data["X1_validation"]
X1_test_scaled = test_data["X1_validation_scaled"]

expiry_x_tensor_test = test_data["expiry_x_tensor_validation"]
expiry_x_tensor_test_scaled = test_data["expiry_x_tensor_validation_scaled"]
expiry_y_tensor_test = test_data["expiry_y_tensor_validation"]

lower_x_tensor_test = test_data["lower_x_tensor_validation"]
lower_x_tensor_test_scaled = test_data["lower_x_tensor_validation_scaled"]
lower_y_tensor_test = test_data["lower_y_tensor_validation"]

upper_x_tensor_test = test_data["upper_x_tensor_validation"]
upper_x_tensor_test_scaled = test_data["upper_x_tensor_validation_scaled"]
upper_y_tensor_test = test_data["upper_y_tensor_validation"]


analytical_solution = dataloader.get_analytical_solution(
    X1_test[:, 1], X1_test[:, 0]).cpu().detach().numpy()


analytical_solution = analytical_solution.reshape(
    analytical_solution.shape[0], -1)

tmp = dataloader.get_analytical_solution(upper_x_tensor_test[:,1], upper_x_tensor_test[:,0]).cpu().detach().numpy()
tmp = tmp.reshape(tmp.shape[0], -1)

with torch.no_grad():
    predicted_pde = model(X1_test_scaled).cpu().detach().numpy()
    predicted_expiry = model(expiry_x_tensor_test_scaled).cpu().detach().numpy()
    predicted_lower = model(lower_x_tensor_test_scaled).cpu().detach().numpy()
    predicted_upper = model(upper_x_tensor_test_scaled).cpu().detach().numpy()


In [40]:
X1_test = X1_test.cpu().detach().numpy()
expiry_x_tensor_test = expiry_x_tensor_test.cpu().detach().numpy()
lower_x_tensor_test = lower_x_tensor_test.cpu().detach().numpy()
upper_x_tensor_test = upper_x_tensor_test.cpu().detach().numpy()

expiry_y_tensor_test = expiry_y_tensor_test.cpu().detach().numpy()
lower_y_tensor_test = lower_y_tensor_test.cpu().detach().numpy()
upper_y_tensor_test = upper_y_tensor_test.cpu().detach().numpy()


In [None]:
all_points = np.vstack([X1_test, expiry_x_tensor_test, lower_x_tensor_test, upper_x_tensor_test])
all_preds = np.vstack([predicted_pde, predicted_expiry, predicted_lower, predicted_upper])
all_y = np.vstack([analytical_solution, expiry_y_tensor_test, lower_y_tensor_test, upper_y_tensor_test])


In [None]:
plt.scatter(all_points[:,0], all_points[:,1], c = all_preds)
plt.colorbar()

In [None]:
from matplotlib.colors import LogNorm

fig, ax = plt.subplots(1, 2, figsize=(12, 5))

scatter1 = ax[0].scatter(all_points[::100,0], all_points[::100,1], c = all_preds[::100], cmap = "viridis")
cbar1 = plt.colorbar(scatter1, ax=ax[0])
ax[0].set_title("Predicted values")

scatter2 = ax[1].scatter(all_points[::100,0], all_points[::100,1], c = np.abs(all_y - all_preds)[::100], cmap='plasma',norm=LogNorm())
cbar2 = plt.colorbar(scatter2, ax=ax[1])
ax[1].set_title("Absolute error")

plt.tight_layout()
#plt.savefig("../plots/large_model.jpg", format = "jpg", dpi=120, pil_kwargs={"quality": 100}, bbox_inches='tight')