In [37]:
import numpy as np

# Problem setup
T = 1.0          # Terminal time
N = 100          # Number of time steps
dt = T / N       # Time step size
d = 1            # Dimension of Brownian motion
M = 10000        # Number of Monte Carlo samples


In [38]:
def sigma(t, x):
    return np.ones((M, d))  # Example: constant volatility

def sigma_inv(t, x):
    return np.ones((M, d))  # Inverse of sigma (identity for constant sigma)

def H(t, x, z):
    return -0.5 * z**2  # Example Hamiltonian

def g(x):
    return x**2-x  # Terminal condition


In [39]:
def conditional_expectation(Y,X):
    # Simple Monte Carlo average as a placeholder for conditional expectation
    learning_rate = 1e-6
    num_epochs = 5_000
    # Precompute powers of x out of the loop for gaining efficiency
    x2 = X ** 2
    #x3 = X ** 3
    loss_history =[]
    for t in range(num_epochs):
        # Forward pass: compute predicted y
        # y = a + b x + c x^2 + d x^3
        y_pred = a + b * X + c * x2 #+ d * x3 #+ e * x ** 4

        # Compute and print loss
        loss = np.square(y_pred - Y).mean()
        loss_history.append(loss)
        if t % 5000 == 5999:
            print(t, loss)

        # Backpropagation to compute gradients of a, b, c, d with respect to loss
        grad_y_pred = 2.0 * (y_pred - y)
        grad_a = grad_y_pred.sum()
        grad_b = (grad_y_pred * X).sum()
        grad_c = (grad_y_pred * x2).sum()
        #grad_d = (grad_y_pred * x3).sum()

        # Update weights
        a -= learning_rate * grad_a
        b -= learning_rate * grad_b
        c -= learning_rate * grad_c
        #d -= learning_rate * grad_d
    return a,b,c #,d

In [None]:
class BSDE_Solver(object):
    def __init__(self, T, N, d, M):
        self.T = T
        self.N = N
        self.dt = T / N
        self.d = d
        self.M = M
        self.X = np.zeros((N + 1, M, d))
        self.Y = np.zeros((N + 1, M))
        self.Z = np.zeros((N, M, d))

    def simulate_brownian_motion(self):
        self.dW = np.sqrt(self.dt) * np.random.randn(self.M, self.N, self.d)
        W = np.cumsum(self.dW, axis=1)
        for n in range(self.N):
            self.X[n + 1,:,:] = self.X[n,:,:] + sigma(n * self.dt, self.X[n,:,:]) * self.dW[n,:,:]

    def solve(self):
        # Initial condition
        self.Y[:, -1] = g(self.X[:, -1, :])

        for n in reversed(range(self.N)):
            t_n = n * self.dt
            a,b,c = conditional_expectation(self.Y[:, n + 1] * self.dW[:, n + 1], self.X[:, n, :])
            self.Z[:, n, :] = sigma_inv(t_n, self.X[:, n, :]) * (a + b * self.X[:, n, :] + c * self.X[:, n, :]**2) / self.dt
            a, b, c = conditional_expectation(self.Y[:, n + 1], self.X[:, n, :])
            self.Y[:, n] =  a + b * self.X[:, n, :] + c * self.X[:, n, :]**2 + H(t_n, self.X[:, n, :], self.Z[:, n, :]) * self.dt - self.Z[:, n, :] * self.dW[:, n, :]


$\begin{cases}
    \hat{X}_{t_{n+1}}=\hat{X}_{t_{n}} + \sigma(t_n,\hat{X}_{t_{n}})\Delta W_{t_{n+1}}\\
    \hat{Z}_{t_n}= \frac{1}{\Delta t}\sigma^{-1}(t_n,\hat{X}_{t_n}) \mathbb{E}[\hat{Y}_{t_{n+1}}\Delta W_{t_{n+1}}|\hat{X}_{t_n}] \\
        \hat{Y}_{t_n} = \mathbb{E}[\hat{Y}_{t_{n+1}}|\hat{X}_{t_n}] +H(t_n,\hat{X}_{t_n},\hat{Z}_{t_n})\Delta t\\
        \hat{Y}_T=g(\hat{X}_T)
    \end{cases}$

In [45]:
my_bsde = BSDE_Solver(T, N, d, M)
my_bsde.simulate_brownian_motion()  
my_bsde.solve()
print("Estimated initial value Y_0:", np.mean(my_bsde.Y[0]))

ValueError: operands could not be broadcast together with shapes (10000,1) (100,1) 