<a href="https://colab.research.google.com/github/aderdouri/EiCNAM/blob/master/Tutorials/Notebooks/adjoint_mode_graph.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt

# Heston PDE Residual
def heston_pde_residual(model, s, v, t, kappa, theta, sigma, rho, r):
    """
    Compute the residual of the Heston PDE.
    """
    s.requires_grad = True
    v.requires_grad = True
    t.requires_grad = True

    # Forward pass
    V = model(torch.cat((s, v, t), dim=1))

    # Gradients
    V_t = torch.autograd.grad(V, t, grad_outputs=torch.ones_like(V), create_graph=True)[0]
    V_s = torch.autograd.grad(V, s, grad_outputs=torch.ones_like(V), create_graph=True)[0]
    V_v = torch.autograd.grad(V, v, grad_outputs=torch.ones_like(V), create_graph=True)[0]
    V_ss = torch.autograd.grad(V_s, s, grad_outputs=torch.ones_like(V_s), create_graph=True)[0]
    V_vv = torch.autograd.grad(V_v, v, grad_outputs=torch.ones_like(V_v), create_graph=True)[0]
    V_sv = torch.autograd.grad(V_s, v, grad_outputs=torch.ones_like(V_s), create_graph=True)[0]

    # Heston PDE
    term1 = V_t
    term2 = 0.5 * v * s**2 * V_ss
    term3 = rho * sigma * v * s * V_sv
    term4 = 0.5 * sigma**2 * v * V_vv
    term5 = r * s * V_s
    term6 = kappa * (theta - v) * V_v
    term7 = -r * V

    residual = term1 + term2 + term3 + term4 + term5 + term6 + term7
    return residual

# Define the PINN model
class PINN(nn.Module):
    def __init__(self, hidden_dim=50, num_layers=4):
        super(PINN, self).__init__()
        layers = []
        layers.append(nn.Linear(3, hidden_dim))  # Input: (S, v, t)
        layers.append(nn.Tanh())
        for _ in range(num_layers - 1):
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(nn.Tanh())
        layers.append(nn.Linear(hidden_dim, 1))  # Output: V(S, v, t)
        self.network = nn.Sequential(*layers)

        # Trainable parameters
        self.kappa = nn.Parameter(torch.tensor(2.0, requires_grad=True, dtype=torch.float32))
        self.theta = nn.Parameter(torch.tensor(0.2, requires_grad=True, dtype=torch.float32))
        self.sigma = nn.Parameter(torch.tensor(0.3, requires_grad=True, dtype=torch.float32))
        self.rho = nn.Parameter(torch.tensor(-0.7, requires_grad=True, dtype=torch.float32))
        self.r = nn.Parameter(torch.tensor(0.05, requires_grad=True, dtype=torch.float32))

    def forward(self, x):
        return self.network(x)

# Loss Function
def compute_loss(model, s, v, t, observed_s, observed_v, observed_t, observed_prices):
    # Extract trainable parameters
    kappa, theta, sigma, rho, r = model.kappa, model.theta, model.sigma, model.rho, model.r

    # PDE Residual Loss
    residual = heston_pde_residual(model, s, v, t, kappa, theta, sigma, rho, r)
    pde_loss = torch.mean(residual**2)

    # Observed Data Loss
    observed_inputs = torch.cat((observed_s, observed_v, observed_t), dim=1)
    observed_preds = model(observed_inputs)
    data_loss = torch.mean((observed_preds - observed_prices)**2)

    return pde_loss + data_loss

# Training Function
def train_pinn(model, s, v, t, observed_s, observed_v, observed_t, observed_prices, epochs=1000, lr=1e-3):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        optimizer.zero_grad()
        loss = compute_loss(model, s, v, t, observed_s, observed_v, observed_t, observed_prices)
        loss.backward()
        optimizer.step()

        if epoch % 100 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item():.6f}, kappa: {model.kappa.item():.6f}, "
                  f"theta: {model.theta.item():.6f}, sigma: {model.sigma.item():.6f}, "
                  f"rho: {model.rho.item():.6f}, r: {model.r.item():.6f}")

# Example Usage
if __name__ == "__main__":
    # Observed data (mock data for simplicity)
    observed_s = torch.tensor([[50.0], [100.0], [150.0]])
    observed_v = torch.tensor([[0.2], [0.2], [0.2]])
    observed_t = torch.tensor([[0.5], [0.5], [0.5]])
    observed_prices = torch.tensor([[10.0], [15.0], [20.0]])

    # Collocation points for PDE residual computation
    s = torch.rand(100, 1) * 200  # Stock prices in [0, 200]
    v = torch.rand(100, 1) * 1.0  # Variance in [0, 1]
    t = torch.rand(100, 1) * 1.0  # Time in [0, 1]

    # Initialize and train the model
    model = PINN(hidden_dim=50, num_layers=4)
    train_pinn(model, s, v, t, observed_s, observed_v, observed_t, observed_prices)

    # Validate the model
    s_test = torch.linspace(10, 200, 100).view(-1, 1)
    v_test = torch.full_like(s_test, 0.2)
    t_test = torch.full_like(s_test, 0.5)
    inputs = torch.cat((s_test, v_test, t_test), dim=1)
    predictions = model(inputs)

    # Plot the results
    plt.plot(s_test.detach().numpy(), predictions.detach().numpy(), label="PINN Predictions")
    plt.scatter(observed_s.detach().numpy(), observed_prices.detach().numpy(), color='red', label="Observed Data")
    plt.xlabel("Stock Price (S)")
    plt.ylabel("Option Price (V)")
    plt.legend()
    plt.show()


In [None]:
import numpy as np

In [None]:
0.01/np.pi

In [None]:
!pip install deepxde

In [None]:
import deepxde as dde
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

# Define the Black-Scholes PDE
def pde(x, y):
    V = y
    dV_dt = dde.grad.jacobian(y, x, i=0, j=0)  # ∂V/∂t
    dV_dS = dde.grad.jacobian(y, x, i=0, j=1)  # ∂V/∂S
    d2V_dS2 = dde.grad.hessian(y, x, component=0, i=1, j=1)  # ∂²V/∂S²
    sigma = 0.2
    r = 0.05
    return dV_dt + 0.5 * sigma**2 * x[:, 1:2]**2 * d2V_dS2 + r * x[:, 1:2] * dV_dS - r * V

# Define the domain and boundary conditions
geom = dde.geometry.Interval(0, 1)  # Stock price interval [0, 1]
timedomain = dde.geometry.TimeDomain(0, 1)  # Time interval [0, 1]
geomtime = dde.geometry.GeometryXTime(geom, timedomain)  # Space-time domain

# Initial condition: V(S, 0) = max(S - K, 0) for a call option
def initial_condition(x):
    K = 0.5
    return np.maximum(x[:, 1:2] - K, 0)

# Boundary condition: V(0, t) = 0 and V(S, t) -> S as S -> infinity
def boundary_left(x, on_boundary):
    return on_boundary and np.isclose(x[0], 0)  # Left boundary (S=0)

def boundary_right(x, on_boundary):
    return on_boundary and np.isclose(x[0], 1)  # Right boundary (S=1)

bc_left = dde.DirichletBC(geomtime, lambda x: 0, boundary_left)  # V(0, t) = 0
bc_right = dde.DirichletBC(geomtime, lambda x: x[:, 1:2], boundary_right)  # V(S, t) -> S as S → infinity

# Define the data for the PDE
data = dde.data.TimePDE(
    geomtime,
    pde,
    [bc_left, bc_right],
    num_domain=2540,
    num_boundary=80,
    num_initial=160,
    anchors=None,
    solution=initial_condition,  # Solution for the initial condition
)

# Define the neural network
net = dde.maps.FNN([2] + [50] * 3 + [1], "tanh", "Glorot uniform")

# Define the model
model = dde.Model(data, net)

# Train the model
model.compile("adam", lr=1e-3)
losshistory, train_state = model.train(epochs=10000)

In [None]:
# Analytical Black-Scholes solution for comparison
def black_scholes_call(S, K, T, r, sigma):
    # Ensure valid inputs
    S = np.maximum(S, 1e-8)  # Prevent S = 0
    K = np.maximum(K, 1e-8)  # Prevent K = 0
    T = np.maximum(T, 1e-8)  # Prevent T = 0
    sigma = np.maximum(sigma, 1e-8)  # Prevent sigma = 0

    # Black-Scholes formula
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)


# Generate test data
S_test = np.linspace(0, 1, 100)
T_test = np.linspace(0, 1, 100)
S_test, T_test = np.meshgrid(S_test, T_test)
X_test = np.vstack((T_test.flatten(), S_test.flatten())).T

# Predict using the trained model
y_pred = model.predict(X_test)

# Calculate the analytical solution
K = 0.5
r = 0.05
sigma = 0.2
y_true = black_scholes_call(S_test.flatten(), K, T_test.flatten(), r, sigma)

# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(S_test.flatten(), y_true, label="Analytical Solution", linestyle='--')
plt.scatter(S_test.flatten(), y_pred, label="Predicted Solution", color='r', s=10)
plt.xlabel("Stock Price")
plt.ylabel("Option Price")
plt.legend()
plt.title("Comparison of Predicted vs Analytical Black-Scholes Option Prices")
plt.show()

In [None]:
import deepxde as dde
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

# Define the Black-Scholes PDE
def pde(x, y):
    V = y
    dV_dt = dde.grad.jacobian(y, x, i=0, j=0)  # ∂V/∂t
    dV_dS = dde.grad.jacobian(y, x, i=0, j=1)  # ∂V/∂S
    d2V_dS2 = dde.grad.hessian(y, x, component=0, i=1, j=1)  # ∂²V/∂S²
    sigma = 0.2  # Volatility
    r = 0.05     # Risk-free interest rate
    return dV_dt + 0.5 * sigma**2 * x[:, 1:2]**2 * d2V_dS2 + r * x[:, 1:2] * dV_dS - r * V

# Define the domain and geometry
geom = dde.geometry.Interval(0, 1)  # Stock price interval [0, 1]
timedomain = dde.geometry.TimeDomain(0, 1)  # Time interval [0, 1]
geomtime = dde.geometry.GeometryXTime(geom, timedomain)  # Space-time domain

# Initial condition: V(S, 0) = max(S - K, 0) for a call option
def initial_condition(x):
    K = 0.5
    return np.maximum(x[:, 1:2] - K, 0)

# Boundary conditions: V(0, t) = 0 and V(S, t) → S as S → ∞
def boundary_left(x, on_boundary):
    return on_boundary and np.isclose(x[0], 0)  # Left boundary (S = 0)

def boundary_right(x, on_boundary):
    return on_boundary and np.isclose(x[0], 1)  # Right boundary (S = 1)

bc_left = dde.DirichletBC(geomtime, lambda x: 0, boundary_left)  # V(0, t) = 0
bc_right = dde.DirichletBC(geomtime, lambda x: x[:, 1:2], boundary_right)  # V(S, t) → S

# Define the data for the PDE
data = dde.data.TimePDE(
    geomtime,
    pde,
    [bc_left, bc_right],
    num_domain=2540,
    num_boundary=80,
    num_initial=160,
    solution=initial_condition,  # Solution for the initial condition
)

# Define the neural network
net = dde.maps.FNN([2] + [50] * 3 + [1], "tanh", "Glorot uniform")

# Define the model
model = dde.Model(data, net)

# Train the model with Adam optimizer
model.compile("adam", lr=1e-3)
losshistory, train_state = model.train(epochs=5000)

# Fine-tune with LBFGS optimizer
model.compile("lbfgs", lr=1e-3) # Added a learning rate
losshistory, train_state = model.train()

# Analytical Black-Scholes solution for comparison
def black_scholes_call(S, K, T, r, sigma):
    S = np.maximum(S, 1e-8)  # Prevent divide-by-zero issues
    K = np.maximum(K, 1e-8)
    T = np.maximum(T, 1e-8)
    sigma = np.maximum(sigma, 1e-8)
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

# Generate test data
S_test = np.linspace(0, 1, 100)  # Normalized stock prices
T_test = np.linspace(0, 1, 100)  # Normalized times
S_test, T_test = np.meshgrid(S_test, T_test)
X_test = np.vstack((T_test.flatten(), S_test.flatten())).T

# Predict using the trained model
y_pred = model.predict(X_test)

# Calculate the analytical solution
K = 0.5
r = 0.05
sigma = 0.2
y_true = black_scholes_call(S_test.flatten(), K, T_test.flatten(), r, sigma)

# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(S_test.flatten(), y_true, label="Analytical Solution", linestyle="--", color="blue")
plt.scatter(S_test.flatten(), y_pred, label="Predicted Solution", color="red", s=10)
plt.xlabel("Stock Price (S)")
plt.ylabel("Option Price (V)")
plt.legend()
plt.title("Comparison of Predicted vs Analytical Black-Scholes Option Prices")
plt.show()


In [None]:
import deepxde as dde
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

# Define the Black-Scholes PDE
def pde(x, y):
    V = y
    dV_dt = dde.grad.jacobian(y, x, i=0, j=0)  # ∂V/∂t
    dV_dS = dde.grad.jacobian(y, x, i=0, j=1)  # ∂V/∂S
    d2V_dS2 = dde.grad.hessian(y, x, component=0, i=1, j=1)  # ∂²V/∂S²
    sigma = 0.2  # Volatility
    r = 0.05     # Risk-free interest rate
    return dV_dt + 0.5 * sigma**2 * x[:, 1:2]**2 * d2V_dS2 + r * x[:, 1:2] * dV_dS - r * V

# Define the domain and geometry
geom = dde.geometry.Interval(0, 1)  # Stock price interval [0, 1]
timedomain = dde.geometry.TimeDomain(0, 1)  # Time interval [0, 1]
geomtime = dde.geometry.GeometryXTime(geom, timedomain)  # Space-time domain

# Initial condition: V(S, 0) = max(S - K, 0) for a call option
def initial_condition(x):
    K = 0.5
    return np.maximum(x[:, 1:2] - K, 0)

# Boundary conditions: V(0, t) = 0 and V(S, t) → S as S → ∞
def boundary_left(x, on_boundary):
    return on_boundary and np.isclose(x[0], 0)  # Left boundary (S = 0)

def boundary_right(x, on_boundary):
    return on_boundary and np.isclose(x[0], 1)  # Right boundary (S = 1)

bc_left = dde.DirichletBC(geomtime, lambda x: 0, boundary_left)  # V(0, t) = 0
bc_right = dde.DirichletBC(geomtime, lambda x: x[:, 1:2], boundary_right)  # V(S, t) → S

# Define the data for the PDE
data = dde.data.TimePDE(
    geomtime,
    pde,
    [bc_left, bc_right],
    num_domain=2540,
    num_boundary=80,
    num_initial=160,
    solution=initial_condition,  # Solution for the initial condition
)

# Define the neural network
net = dde.maps.FNN([2] + [50] * 3 + [1], "tanh", "Glorot uniform")

# Define the model
model = dde.Model(data, net)

# Train the model with Adam optimizer
model.compile("adam", lr=1e-3)
losshistory, train_state = model.train(epochs=5000)

# # Fine-tune with LBFGS optimizer - Removed the problematic lines
model.compile("L-BFGS") # Added a learning rate
# losshistory, train_state = model.train()

# Analytical Black-Scholes solution for comparison
def black_scholes_call(S, K, T, r, sigma):
    S = np.maximum(S, 1e-8)  # Prevent divide-by-zero issues
    K = np.maximum(K, 1e-8)
    T = np.maximum(T, 1e-8)
    sigma = np.maximum(sigma, 1e-8)
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

# Generate test data
S_test = np.linspace(0, 1, 100)  # Normalized stock prices
T_test = np.linspace(0, 1, 100)  # Normalized times
S_test, T_test = np.meshgrid(S_test, T_test)
X_test = np.vstack((T_test.flatten(), S_test.flatten())).T

# Predict using the trained model
y_pred = model.predict(X_test)

# Calculate the analytical solution
K = 0.5
r = 0.05
sigma = 0.2
y_true = black_scholes_call(S_test.flatten(), K, T_test.flatten(), r, sigma)

# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(S_test.flatten(), y_true, label="Analytical Solution", linestyle="--", color="blue")
plt.scatter(S_test.flatten(), y_pred, label="Predicted Solution", color="red", s=10)
plt.xlabel("Stock Price (S)")
plt.ylabel("Option Price (V)")
plt.legend()
plt.title("Comparison of Predicted vs Analytical Black-Scholes Option Prices")
plt.show()

In [None]:
"""Backend supported: tensorflow.compat.v1, tensorflow, pytorch, paddle"""
import deepxde as dde
import numpy as np


def heat_eq_exact_solution(x, t):
    """Returns the exact solution for a given x and t (for sinusoidal initial conditions).

    Parameters
    ----------
    x : np.ndarray
    t : np.ndarray
    """
    return np.exp(-(n**2 * np.pi**2 * a * t) / (L**2)) * np.sin(n * np.pi * x / L)


def gen_exact_solution():
    """Generates exact solution for the heat equation for the given values of x and t."""
    # Number of points in each dimension:
    x_dim, t_dim = (256, 201)

    # Bounds of 'x' and 't':
    x_min, t_min = (0, 0.0)
    x_max, t_max = (L, 1.0)

    # Create tensors:
    t = np.linspace(t_min, t_max, num=t_dim).reshape(t_dim, 1)
    x = np.linspace(x_min, x_max, num=x_dim).reshape(x_dim, 1)
    usol = np.zeros((x_dim, t_dim)).reshape(x_dim, t_dim)

    # Obtain the value of the exact solution for each generated point:
    for i in range(x_dim):
        for j in range(t_dim):
            usol[i][j] = heat_eq_exact_solution(x[i], t[j])

    # Save solution:
    np.savez("heat_eq_data", x=x, t=t, usol=usol)


def gen_testdata():
    """Import and preprocess the dataset with the exact solution."""
    # Load the data:
    data = np.load("heat_eq_data.npz")
    # Obtain the values for t, x, and the excat solution:
    t, x, exact = data["t"], data["x"], data["usol"].T
    # Process the data and flatten it out (like labels and features):
    xx, tt = np.meshgrid(x, t)
    X = np.vstack((np.ravel(xx), np.ravel(tt))).T
    y = exact.flatten()[:, None]
    return X, y


# Problem parameters:
a = 0.4  # Thermal diffusivity
L = 1  # Length of the bar
n = 1  # Frequency of the sinusoidal initial conditions

# Generate a dataset with the exact solution (if you dont have one):
gen_exact_solution()


def pde(x, y):
    """Expresses the PDE residual of the heat equation."""
    dy_t = dde.grad.jacobian(y, x, i=0, j=1)
    dy_xx = dde.grad.hessian(y, x, i=0, j=0)
    return dy_t - a * dy_xx


# Computational geometry:
geom = dde.geometry.Interval(0, L)
timedomain = dde.geometry.TimeDomain(0, 1)
geomtime = dde.geometry.GeometryXTime(geom, timedomain)

# Initial and boundary conditions:
bc = dde.icbc.DirichletBC(geomtime, lambda x: 0, lambda _, on_boundary: on_boundary)
ic = dde.icbc.IC(
    geomtime,
    lambda x: np.sin(n * np.pi * x[:, 0:1] / L),
    lambda _, on_initial: on_initial,
)

# Define the PDE problem and configurations of the network:
data = dde.data.TimePDE(
    geomtime,
    pde,
    [bc, ic],
    num_domain=2540,
    num_boundary=80,
    num_initial=160,
    num_test=2540,
)
net = dde.nn.FNN([2] + [20] * 3 + [1], "tanh", "Glorot normal")
model = dde.Model(data, net)

# Build and train the model:
model.compile("adam", lr=1e-3)
model.train(iterations=20000)
model.compile("L-BFGS")
losshistory, train_state = model.train()

# Plot/print the results
dde.saveplot(losshistory, train_state, issave=True, isplot=True)
X, y_true = gen_testdata()
y_pred = model.predict(X)
f = model.predict(X, operator=pde)
print("Mean residual:", np.mean(np.absolute(f)))
print("L2 relative error:", dde.metrics.l2_relative_error(y_true, y_pred))
#np.savetxt("test.dat", np.hstack((X, y_true, y_pred)))

In [None]:
# Generate the test dataset (observed values)
X, y_true = gen_testdata()  # X: (t, x), y_true: observed solution
y_pred = model.predict(X)  # Predicted solution

# Print the first few observed and predicted values
print("Observed (True) vs Predicted:")
for i in range(10):  # Print first 10 values
    print(f"Observed: {y_true[i][0]:.6f}, Predicted: {y_pred[i][0]:.6f}")

# Visualize Observed vs Predicted
plt.figure(figsize=(10, 6))
plt.plot(y_true, label="Observed (True)", linestyle="--", color="blue", alpha=1)
plt.plot(y_pred, label="Predicted", linestyle="-", color="red", alpha=0.7)
plt.xlabel("Index")
plt.ylabel("Solution")
plt.title("Comparison of Observed vs Predicted Values")
plt.legend()
plt.show()


In [None]:
!pip install pygraphviz

In [None]:
!apt-get install -y graphviz libgraphviz-dev

In [None]:
!pip install pygraphviz

In [None]:
!pip install networkx

In [None]:
import networkx as nx
import matplotlib.pyplot as plt

# Define the computational graph for the Black formula for a call option
G = nx.DiGraph()

# Inputs
G.add_node("F", label="F (Forward Price)", color="lightgray")  # Forward price
G.add_node("K", label="K (Strike Price)", color="lightgray")  # Strike price
G.add_node("T", label="T (Time to Expiry)", color="lightgray")  # Time to expiry
G.add_node("sigma", label="σ (Volatility)", color="lightgray")  # Volatility

# Intermediate computations
# Compute d1
G.add_node("v1", label="log(F / K)", color="white")
G.add_edge("F", "v1")
G.add_edge("K", "v1")

G.add_node("v2", label="(1/2) * σ^2 * T", color="white")
G.add_edge("sigma", "v2")
G.add_edge("T", "v2")

G.add_node("v3", label="log(F / K) + (1/2) * σ^2 * T", color="white")
G.add_edge("v1", "v3")
G.add_edge("v2", "v3")

G.add_node("v4", label="σ * sqrt(T)", color="white")
G.add_edge("sigma", "v4")
G.add_edge("T", "v4")

G.add_node("d1", label="d1 = [log(F / K) + (1/2) * σ^2 * T] / (σ * sqrt(T))", color="white")
G.add_edge("v3", "d1")
G.add_edge("v4", "d1")

# Compute d2
G.add_node("d2", label="d2 = d1 - σ * sqrt(T)", color="white")
G.add_edge("d1", "d2")
G.add_edge("v4", "d2")

# Normal CDF computations
G.add_node("N(d1)", label="N(d1)", color="white")
G.add_edge("d1", "N(d1)")

G.add_node("N(d2)", label="N(d2)", color="white")
G.add_edge("d2", "N(d2)")

# Final output (call option price)
G.add_node("C", label="C = exp(-rT) * [F * N(d1) - K * N(d2)]", color="lightblue")
G.add_edge("F", "C")
G.add_edge("K", "C")
G.add_edge("N(d1)", "C")
G.add_edge("N(d2)", "C")
G.add_edge("T", "C")

# Draw the graph
pos = nx.nx_agraph.graphviz_layout(G, prog="dot")
colors = [G.nodes[n].get("color", "white") for n in G.nodes()]
labels = nx.get_node_attributes(G, "label")

plt.figure(figsize=(12, 8))
nx.draw(G, pos, with_labels=True, labels=labels, node_color=colors, node_size=3000, font_size=8, font_color="black")
plt.title("Computational Graph for Black Formula (Call Option)")

# Save as PDF
output_path = "Black_Formula_Call_Option_Graph01.pdf"
plt.savefig(output_path, format="pdf", bbox_inches="tight")
plt.close()

output_path


In [None]:
import networkx as nx
import matplotlib.pyplot as plt

# Define the computational graph for the given function
G = nx.DiGraph()

# Inputs
G.add_node("y", label="y", color="lightgray")  # Observation
G.add_node("mu", label="μ", color="lightgray")  # Mean
G.add_node("sigma", label="σ", color="lightgray")  # Standard deviation

# Intermediate computations
G.add_node("v1", label="y - μ", color="white")  # Difference
G.add_edge("y", "v1") # Changed g to G
G.add_edge("mu", "v1")

G.add_node("v2", label="(y - μ) / σ", color="white")  # Division
G.add_edge("v1", "v2") # Changed g to G
G.add_edge("sigma", "v2")

G.add_node("v3", label="((y - μ) / σ)^2", color="white")  # Square
G.add_edge("v2", "v3") # Changed g to G

G.add_node("v4", label="-1/2 * ((y - μ) / σ)^2", color="white")  # Multiply by -1/2
G.add_edge("v3", "v4") # Changed g to G

G.add_node("v5", label="log(σ)", color="white")  # Log of sigma
G.add_edge("sigma", "v5") # Changed g to G

G.add_node("v6", label="- log(σ)", color="white")  # Negative log of sigma
G.add_edge("v5", "v6") # Changed g to G

G.add_node("v7", label="-1/2 * log(2π)", color="white")  # Constant term

# Final output
G.add_node("f", label="f(y, μ, σ)", color="lightblue")
G.add_edge("v4", "f") # Changed g to G
G.add_edge("v6", "f") # Changed g to G
G.add_edge("v7", "f") # Changed g to G

# Draw the graph
pos = nx.nx_agraph.graphviz_layout(G, prog="dot")
colors = [G.nodes[n].get("color", "white") for n in G.nodes()]
labels = nx.get_node_attributes(G, "label")

plt.figure(figsize=(12, 8))
nx.draw(G, pos, with_labels=True, labels=labels, node_color=colors, node_size=3000, font_size=8, font_color="black")
plt.title("Computational Graph for f(y, μ, σ)")

# Save as PDF
output_path = "Function_Computational_Graph.pdf"
plt.savefig(output_path, format="pdf", bbox_inches="tight")
plt.close()

output_path