In [None]:
from numpy import *
from matplotlib.pyplot import *
from scipy.stats import norm
import time

In [None]:
set_printoptions(suppress = True)
#Clock
started = time.time()

#Neural network parameters 
epochs = 100
D = 3 #num dimensions incl.time(time, price, input)
L = 9 #layers (depth)
N = 20 #nodes per layer (width)


# constant parameters
r = 0.05        #interest rate
K = 3.0         #Strike price
sigma = 0.15    #Volatility


#Hyperparameters
M = 320                 #Num samples
learning_rate = 5e-4
m = 64                  #mini-batch size
reg = 0*1e-5            #L^2 regularisation
beta = 0.9              #constant of momentum
epsilon = 0.1           #step size of finite difference

In [None]:
# Input parameter X(Asset price in Black Scholes equation)
X = 7.0 * random.random_sample((M, D)) #data (input)
# print(X)

# boudary conditions
X[ : , 0] = 10 * random.random_sample (M)
X[0:10, 0] = 0.0
X[10:20, 0] = 10.0
X[20:30, 1] = 0.0
X[30:40, 1] = 7.0
X[40:50, 2] = 0.0
X[50:60, 2] = 7.0
# print(X)

# Initialisation (Xavier initialisation)
## Weights
The initialisation of the weight matrices:
- The elements of the weight matrices need to be random numbers to avoid them updating equally at every iteration. 
- The initial weight should not be too far away from their target value (The gradietn may be too large), so we use a normal distribution with mean zero and small variance so that the expect average weight value is zero. 
- The variance of neuron's output will increase following by the increasing number of examples. Theirfore, the inital weight is divided by $\sqrt{n}$, where n is the number of inputs to the specific neuron solves the problem.
- One of the assumptions is that the expectation of inputs to a neuron is zero, therefore, the initialisation should divided by $\sqrt{2/n}$.

### Why we need the initialisation?
It seemed impossible to have more than two hidden layers without getting the gradient to either blow up or be close to zero(dont know the reason). To compensate for only having two hidden layers we had to use 50 to 100
nodes. However, When the above suggested initialisations were implemented we could right away increase hidden layers to 20 and reduce nodes to 10 per layer without any immediate problems.

## Bias
Biases can be all set to zero, but from experience it seems here to work better
with a small positive number at the start to make sure every node is activated.

## Monmentum
Momentum can help the neural network escape from local optimisation, where a moving average of previous gradients is used in the calculation of the next moemntum. It can also helps in reducing overly stochastic convergence. Some other methods: esterov momentum where the gradient is calculated with the current weighted velocity, or Aadam (Adaptive momentum).

In [None]:
W_in = random.randn(D, N) * sqrt(2.0/D) #weights layer 1
b_in = ones((1, N)) * 0.01 #bias layer 1

W = random.randn(L-1, N, N) * sqrt(2.0/N) #weights
b = ones((L-1, 1, N)) * 0.01 #biases

W_out = random.randn(N, 1) * sqrt(2.0/N) #weights output
b_out = 0.01 #biases output

hidden_layers = zeros((L, M, N))

In [None]:
#momentum weight numpy array
VdW_in = zeros((D,N)) 
VdW = zeros((L-1,N,N))
VdW_out = zeros((N, 1 ))

# momentum bias numpy array
Vdb_in = zeros((1, N)) 
Vdb = zeros((L-1, 1, N))
Vdb_out = 0.0

# Functions
- Target function: Using the Feynman-Kac theorem to calculate the expected/target value and the loss(error).
- Activation function: Using ReLU as the activation function.

In [None]:
def target(out, dout, ddout, X=X, M=M):
    """
    Loss function
    Args:
        out (_type_): _description_
        dout (_type_): _description_
        ddout (_type_): _description_
        X (_type_, optional): _description_. Defaults to X.
        M (_type_, optional): _description_. Defaults to M.
    """
    t = copy(X[:,0]).reshape(M,1)
    Expectation = zeros_like(out)
    
    # calculate the expectation/target
    for m in range(100):
        # discretisize
        dt = (10 - t)/100
        x = copy(X[:, 1:D]).reshape(M, D-1)
        
        # Feynman-Kac theorem
        for n in range(100):
            x = x + x * r * dt + sigma * sqrt(dt) * x * random.randn(M, D-1)
            pass
        I = -r * (10 - t)
        psi = mean(maximum(0, x-K), axis=1).reshape(M,1)
        Expectation += exp(I)*psi
        pass
    Expectation /= 100
    
    # mean squared error
    target = (out - Expectation)**2
    dtarget = 2*(out - Expectation)
    
    return target, dtarget

In [None]:
# activation function
def ReLU(x):
    """
    Rectofoed Linear Unit
    Args:
        x (_type_): _description_
    """
    return maximum(0, x)

## Forward pass function 
The interior function of the node in the neural network:

A node in a feedforward neural network takes the sum of a weighted input vector x, adds a bias b and uses an activation function $\sigma(x)$. 

The output of this function is then sent to the next layer as input: $\mathcal{z} = \sigma \left(\sum_{n=1}^{N}x_n w_n + b\right)$, where N is the width of the previous layer.

In [None]:
def forwardPass(hidden_layers, X, W_in=W_in, W=W, W_out=W_out, b_in = b_in, b=b, b_out=b_out):
    """
    forward pass of neural network
    Args:
        hidden_layers (_type_): _description_
        X (_type_): _description_
        W_in (_type_, optional): _description_. Defaults to W_in.
        W (_type_, optional): _description_. Defaults to W.
        W_out (_type_, optional): _description_. Defaults to W_out.
        b_in (_type_, optional): _description_. Defaults to b_in.
        b (_type_, optional): _description_. Defaults to b.
        b_out (_type_, optional): _description_. Defaults to b_out.
    """
    L,M,N = shape(hidden_layers)
    hl = copy(hidden_layers)
    
    # input layer
    hl[0, :, :] = dot(X, W_in) + b_in
    z = dot(hl[0, :, :], W[0, :, :])+b[0, :, :]
    hl[1, :, :] = ReLU(z)
    
    # update layers one by one
    for j in range (2, L):
        z = dot(hl[j-1, :, :],W[j-1, :, :]) + b[j-1, :, :]
        hl[j, :, :] = ReLU(z)
        pass
    
    # output layer
    output = dot(hl[L-1], W_out) + b_out
    return output, hl

## Stochastic Gradient Process

Create to update parameters.


In [None]:
def SGD(step_size, hidden_layers, W_in=W_in,W=W, W_out=W_out, b_in=b_in, b=b, b_out=b_out, VdW_in=VdW_in, VdW=VdW, VdW_out=VdW_out, Vdb_in=Vdb_in, Vdb=Vdb, Vdb_out=Vdb_out):
    """
    Stochastic Gradient Descent

    Args:
        step_size (_type_): _description_
        hidden_layers (_type_): _description_
        W_in (_type_, optional): _description_. Defaults to W_in.
        W (_type_, optional): _description_. Defaults to W.
        W_out (_type_, optional): _description_. Defaults to W_out.
        b_in (_type_, optional): _description_. Defaults to b_in.
        b (_type_, optional): _description_. Defaults to b.
        b_out (_type_, optional): _description_. Defaults to b_out.
        VdW_in (_type_, optional): _description_. Defaults to VdW_in.
        VdW (_type_, optional): _description_. Defaults to VdW.
        VdW_out (_type_, optional): _description_. Defaults to VdW_out.
        Vdb_in (_type_, optional): _description_. Defaults to Vdb_in.
        Vdb (_type_, optional): _description_. Defaults to Vdb.
        Vdb_out (_type_, optional): _description_. Defaults to Vdb_out.
    """
    #Batches
    minibatches = []
    p = arange (M)
    random.shuffle(p)
    X_shuffled = X[p]
    hl_shuffled = hidden_layers[:, p]
    for k in range(0, M, m) :
        X_batch = X_shuffled [k:k+m]
        hl_batch = hl_shuffled[:, k:k+m]
        minibatches.append ((X_batch, hl_batch))
        pass
    for X_batch, hl_batch in minibatches:
        out_batch , hl_batch = forwardPass(hl_batch, X_batch)[0:2]
        dout_batch , ddout_batch = 0.0 , 0.0
        #dout_batch , ddout_batch = derivative(hl_batch, X_batch)
        #gradientat end layer
        dLoss_batch = target(out_batch, dout_batch, ddout_batch, X=X_batch, M=m)[1]
        dLoss_batch /= m
        #Backpropagation to find gradient w.r.t W and b
        dW_out = dot(hl_batch[L-1].T, dLoss_batch)
        db_out = sum(dLoss_batch, axis=0, keepdims=False)
        dhidden = dot(dLoss_batch, W_out.T)
        dhidden[hl_batch[L-1] <=0] = 0
        dW = zeros((L-1,N,N))
        db = zeros((L-1,1,N) )
        for j in reversed(range(0, L-1)):
            dW[j] = dot(hl_batch[j].T, dhidden)
            db[j] = sum(dhidden, axis=0, keepdims=False)
            dhidden = dot(dhidden, W[j].T)
            dhidden[hl_batch[j] <= 0] = 0
            pass
        pass
            
    dW_in = dot(X_batch.T, dhidden)
    db_in = sum(dhidden, axis=0, keepdims=True)
    
    #adding regularization
    dW_in += reg*W_in
    dW += reg*W
    dW_out += reg*W_out
    
    #momentum
    VdW_in = beta * VdW_in - step_size * dW_in
    VdW = beta * VdW - step_size * dW
    VdW_out = beta * VdW_out - step_size * dW_out
    Vdb_in = beta * Vdb_in - step_size * db_in
    Vdb = beta * Vdb - step_size * db
    Vdb_out = beta * Vdb_out - step_size * db_out
    
    #parameter update
    cap = 5.0
    W_in += VdW_in
    W_in [W_in > cap] = cap
    W_in [W_in < -cap] = -cap
    b_in += Vdb_in
    W += VdW
    W[W > cap] = cap
    W[W < -cap] = -cap
    b += Vdb
    W_out += VdW_out
    W_out [W_out > cap] = cap
    W_out [W_out < -cap] = -cap
    b_out += Vdb_out
    pass

## Derivative function: 

## Test function: 
Create to test the performance of the trained neural network.

## Learning schedule function 
Create to control the learning rate.

In [None]:
def derivative(hidden_layers ,X=X, order='first'):
    """ Finite differences of NN"""
    h = epsilon
    u = forwardPass(hidden_layers ,X)[0]
    h1 = zeros_like(X)
    h1 [:, 0] += h
    h2 = zeros_like(X)
    h2 [:, 1 ] += h
    u1p = forwardPass(hidden_layers, X+h1)[0]
    u1m = forwardPass(hidden_layers, X-h1)[0]
    du1 = (u1p - u1m)/(2*h)
    u2p = forwardPass(hidden_layers, X+h2)[0]
    u2m = forwardPass(hidden_layers, X-h2)[0]
    du2 = (u2p - u2m)/(2*h)
    if order == 'second':
        ddu1 = (u1p - 2*u + u1m) / h**2
        ddu2 = (u2p - 2*u + u2m) / h**2
    else: 
        ddu1, ddu2 = zeros_like(u), zeros_like(u)
    #du1 , du2 , ddu1 , ddu2 = 0 , 0 , 0 , 0
    return [du1, du2], [ddu1, ddu2]
    
    
def test(hidden_layers = hidden_layers):
    """ Find loss of NN on random validation set """
    test_points= 7.0 * random.random_sample((M,D))
    test_points[:, 0] = 10*random.random_sample(M)
    
    u,_ = forwardPass(hidden_layers, test_points)[0:2]
    #du, ddu = derivative(hidden_layers, test_points, 'second')
    du = ddu = 0.0
    return sum(target(u, du, ddu, X=test_points)[0])/M

def learning_rate_schedule(Loss, learning_rate):
    """ Learning rate schedule to reduce
    learning rate during training """
    if Loss <= 0.02:
        step_size = learning_rate/2
    else: 
        step_size = learning_rate
    return step_size

# Training

In [None]:
#main part
test_array = zeros(epochs) #To plot loss
loss_array = zeros(epochs) #To plot loss
Loss = 1.0
i = 0
step_size = learning_rate_schedule(Loss, learning_rate)
while Loss >= 0.001 and i < epochs:
    out, hidden_layers = forwardPass(hidden_layers, X)[0:2]
    #dout, ddout = derivative(hidden_layers, X)
    #ddout = derivative(hidden_layers, X, 'second')
    dout, ddout = 0, 0
    
    #Loss
    dataLoss = sum(target(out, dout, ddout)[0])/M
    regLoss = 0.5 * reg * sum(W_in*W_in) + 0.5 * reg *sum(W_out*W_out) + 0.5 * reg * sum(W*W)
    Loss = dataLoss + regLoss
    
    #parameter update
    SGD(step_size, hidden_layers)
    
    #store loss evolution
    test_array[i] = test()
    loss_array[i] = Loss
    
    #continously print loss
    if(i%10 == 0):
        print('Loss at epoch %d = %f, data loss:%f'%(i, Loss, dataLoss))
    i += 1
    step_size = learning_rate_schedule(Loss, learning_rate)
    pass 
print('Loss at end = %f'%(Loss))
print('Data Loss:', dataLoss, 'Reg Loss:' , regLoss)
print('Test loss:', test())
elapsed = time.time()- started
print('Time taken: %.2f seconds'%(elapsed))

In [None]:
#Plots
times = linspace(0, 9.99, M)
dt = times[1] - times[0]
tau = (10 - times).reshape(M, 1)

In [None]:
#first plot
X1 = 2.5 * ones((M,D))
X1 [:, 0] = times

for i in range(M-1):
    X1[i+1 ,1:D] = X1[i, 1:D] + X1[i, 1:D] * r * dt + sigma * sqrt(dt) * X1[i, 1:D] * random.randn(D-1)

Y1 = forwardPass(hidden_layers, X1)[0]
meanX = mean(X1[:, 1:D], axis=1).reshape(M, 1)
dp = (log(meanX/K) + (r + 0.5 * sigma**2) * tau) / (sigma * sqrt(tau))
dm = dp - sigma * sqrt(tau)
C1 = norm.cdf(dp) * meanX - norm.cdf(dm)*K*exp(-r * tau)

plot(times, meanX, '-', label = 'Asset Price')
plot(times, C1, 'r--', label = 'Analytic', lw =2.0)
plot( times, Y1, '-', label = 'Deep Learning', lw =1.0)
tick_params (labelsize =16)
xlabel ('Time', fontsize =16)
ylabel('Value', fontsize =16)
legend()
show()

plot(arange(epochs), test_array, 'r-', label= 'Validation set', lw =1.0)
plot(arange(epochs), loss_array, '-', label= 'Training set', lw =1.0)
tick_params(labelsize =16)
xlabel('Epochs', fontsize =16)
ylabel('loss', fontsize =16)
axis([0, epochs, 0, 1])
legend()
show()

In [None]:
#second plot
X2 = 2.5 * ones((M,D))
X2 [:, 0] = times

for i in range(M-1):
    X2[i+1 ,1:D] = X2[i, 1:D] + X2[i, 1:D] * r * dt + sigma * sqrt(dt) * X2[i, 1:D] * random.randn(D-1)

Y2 = forwardPass(hidden_layers, X2)[0]
meanX = mean(X2[:, 1:D], axis=1).reshape(M, 1)
dp = (log(meanX/K) + (r + 0.5 * sigma**2) * tau) / (sigma * sqrt(tau))
dm = dp - sigma * sqrt(tau)
C2 = norm.cdf(dp) * meanX - norm.cdf(dm)*K*exp(-r * tau)


plot(times, meanX, '-', label = 'Asset Price')
plot(times, C2, 'r--', label = 'Analytic', lw =2.0)
plot( times, Y2, '-', label = 'Deep Learning', lw =1.0)
tick_params (labelsize =16)
xlabel ('Time', fontsize =16)
ylabel('Value', fontsize =16)
legend()
show()