In [None]:
import torch
from torch import nn
from time import time_ns
import numpy as np
import copy
import pickle
from hashlib import sha256

In [None]:
class SimpleNetwork(nn.Module):
    def __init__(self, params_in: int, layer_sizes: list(), act_funs: list(), calc_deriv: bool):
        super(SimpleNetwork, self).__init__()
        
        self.in_out = layer_sizes
        self.in_out.insert(0, params_in)
        self.ins = self.in_out[0:-1]
        self.outs = self.in_out[1:]
        self.act_funs = act_funs
        self.layer_count = len(self.act_funs)
        self.calc_deriv = calc_deriv
        
        assert len(self.ins) == len(self.act_funs)
        assert len(self.outs) == len(self.act_funs)
        assert all([f in [nn.Sigmoid, nn.Tanh] for f in self.act_funs])
        
        for layer_idx, (n_in, n_out, act_fun) in enumerate(zip(self.ins, self.outs, self.act_funs), 1):
            layer = nn.Linear(n_in, n_out, bias=True)
            self.add_module(f'fc{layer_idx}', layer)
            self.add_module(f'act{layer_idx}', self.act_funs[layer_idx-1]())

        self.requires_grad_(not calc_deriv)
        if calc_deriv:
            self.fwd_activation = {}
            def get_fwd_activation(name):
                def hook(model, input, output):
                    self.fwd_activation[name] = output.detach()
                return hook
            for name, module in self.named_modules():
                if name != '':
                    _ = module.register_forward_hook(get_fwd_activation(name))
        self.x_hash = None
    
    def forward(self, x):
        x = x.view(-1, self.ins[0])
        if self.calc_deriv and self._recalculate_derivatives(x):
            self.layer_derivatives_1 = dict()
            self.layer_derivatives_2 = dict()
        for layer_idx in range(1, self.layer_count + 1):
            x = self._modules[f'fc{layer_idx}'](x)
            x = self._modules[f'act{layer_idx}'](x)
        return x
    
    def _recalculate_derivatives(self, x):
        new_hash = sha256(np.ascontiguousarray(x.detach().numpy()))
        if (not self.x_hash is None) and (self.x_hash.digest() == new_hash.digest()):
            return False
        else:
            self.x_hash = new_hash
            return True
    
    def _get_analytic_derivative_1(self, actfun, x):
        if type(actfun) == nn.Sigmoid:
            return actfun(x) * (1 - actfun(x))
        elif type(actfun) == nn.Tanh:
            return 1 - actfun(x)**2
        else:
            raise NotImplementedError()

    def _get_analytic_derivative_2(self, actfun, x):
        if type(actfun) == nn.Sigmoid:
            return actfun(x) * (1 - actfun(x)) * (1 - 2*actfun(x))
        elif type(actfun) == nn.Tanh:
            return -2*actfun(x)*(1 - actfun(x)**2)
        else:
            raise NotImplementedError()
    
    def _calculate_layer_derivatives_1(self):
        for layer_idx in range(1, self.layer_count + 1):
            layer_weights = self._modules[f'fc{layer_idx}'].weight
            act_d1 = self._modules[f'act{layer_idx}']
            act_input = self.fwd_activation[f'fc{layer_idx}']
            layer_act = self._get_analytic_derivative_1(act_d1, act_input)
            self.layer_derivatives_1.update(
                {layer_idx: layer_act.view(layer_act.shape[0], layer_act.shape[1], 1) * layer_weights})
    
    def _calculate_subnetwork_jacobian(self, p, q, x):
        if (len(self.layer_derivatives_1) == 0) or self._recalculate_derivatives(x):
            self._calculate_layer_derivatives_1()
        if ((p == 1) and (q == 0)):
            return torch.eye(self.ins[0]).repeat(self.layer_derivatives_1[1].shape[0], 1, 1)
        if ((p == self.layer_count + 1) and (q == self.layer_count)):
            return torch.eye(self.outs[-1]).repeat(self.layer_derivatives_1[1].shape[0], 1, 1)
        else:
            if not ((p >= 1) and (p <= self.layer_count) and (q >= p) and (q <= self.layer_count)):
                raise ValueError()
            subjac = self.layer_derivatives_1[q]
            for l in range(self.layer_count - q + 1, self.layer_count - p + 1):
                subjac = torch.matmul(subjac, self.layer_derivatives_1[self.layer_count - l])
            return subjac
    
    def get_network_jacobian(self, x):
        x = x.view(-1, self.ins[0])
        _ = self.forward(x)
        return self._calculate_subnetwork_jacobian(1, self.layer_count, x)

    def get_network_hessian_slice(self, x, j):
        x = x.view(-1, self.ins[0])
        _ = self.forward(x)
        hessian_slice = torch.zeros(x.shape[0], self.outs[-1], self.ins[0])
        for l in range(1, self.layer_count + 1):
            phi_pre = self._calculate_subnetwork_jacobian(1, l-1, x)
            phi_post = self._calculate_subnetwork_jacobian(l+1, self.layer_count, x)
            layer_weights = self._modules[f'fc{l}'].weight
            act_d2 = self._modules[f'act{l}']
            act_input = self.fwd_activation[f'fc{l}']
            layer_act = self._get_analytic_derivative_2(act_d2, act_input)
            m = torch.matmul(layer_weights, phi_pre)
            phi = (layer_act.unsqueeze(2) * m[:, :, j].unsqueeze(2)) * layer_weights
            hessian_slice += torch.matmul(torch.matmul(phi_post, phi), phi_pre)
        return hessian_slice
    
    def get_network_hessians(self, x):
        x = x.view(-1, self.ins[0])
        _ = self.forward(x)
        hessians = torch.zeros(x.shape[0], self.ins[0], self.outs[-1], self.ins[0])
        for l in range(1, self.layer_count + 1):
            phi_pre = self._calculate_subnetwork_jacobian(1, l-1, x)
            phi_post = self._calculate_subnetwork_jacobian(l+1, self.layer_count, x)
            layer_weights = self._modules[f'fc{l}'].weight
            act_d2 = self._modules[f'act{l}']
            act_input = self.fwd_activation[f'fc{l}']
            layer_act = self._get_analytic_derivative_2(act_d2, act_input)
            m = torch.matmul(layer_weights, phi_pre)
            phi = (layer_act.unsqueeze(2) * m).transpose(1, 2).unsqueeze(3) * layer_weights
            hessians += torch.matmul(torch.matmul(phi_post.unsqueeze(1), phi), phi_pre.unsqueeze(1))
        return hessians

Setting the options_case flag to True uses input parameter counts which correspond to the parameter count in the following options pricing models:

* 8: Heston model with no dividend yield: $P_{8} = \{\kappa, \theta, \sigma, \rho, v_{0}, K, \tau, r \}$;
* 9: Heston model with continuous dividend yield: $P_{9} = P_{8} \cup \{ q \}$;
* 11: Bates model: $P_{9} \cup \{\lambda_{v}, \mu_{v}\}$;
* 14: Duffie model with correlated jumps only (SVJJ): $P_{14} = P_{9} \cup \{\lambda_{c}, \mu_{cv}, \mu_{cy}, s_{cy}, \rho_{j} \}$;
* 19: Full Duffie model: $P_{19} = P_{14} \cup \{\lambda_{y}, \mu_{y}, s_{y}, \lambda_{v}, \mu_{v} \}$.

If the options_case flag is set to False, a more generic performance analysis is performed for 16, 32 and 128 input parameters, respectively.

In [None]:
num_eval = 10000
options_case = True

if options_case:
    params_in = [8, 9, 11, 14, 19]
else:    
    params_in = [16, 32, 128]

layer_sizes_jac = [
    [32, 32, 32, 32, 1],
    [64, 64, 64, 64, 1],
    [128, 128, 128, 128, 1],
    [256, 256, 256, 256, 1],
    [32, 32, 32, 32, 4],
    [64, 64, 64, 64, 4],
    [128, 128, 128, 128, 4],
    [256, 256, 256, 256, 4],
    [32, 32, 32, 32, 16],
    [64, 64, 64, 64, 16],
    [128, 128, 128, 128, 16],
    [256, 256, 256, 256, 16]
]
layer_sizes_hess = [
    [32, 32, 1],
    [64, 64, 1],
    [128, 128, 1],
    [256, 256, 1]
]
    
act_funs = [nn.Sigmoid, nn.Tanh, nn.Sigmoid, nn.Tanh, nn.Sigmoid]

times_jac_pt = np.zeros(shape=(num_eval, len(params_in), len(layer_sizes_jac)))
times_jac_mat = np.zeros(shape=(num_eval, len(params_in), len(layer_sizes_jac)))
jac_abs_diff_max = 0

times_hess_pt = np.zeros(shape=(num_eval, len(params_in), len(layer_sizes_hess)))
times_hess_mat = np.zeros(shape=(num_eval, len(params_in), len(layer_sizes_hess)))
hess_abs_diff_max = 0

### Comparison of the Jacobian calculations

In [None]:
def run_jacobian_calculations(num_eval, params_in, layer_sizes, act_funs, calc_deriv):
    jacobians = []
    times = []
    for seed_idx in range(num_eval):
        torch.manual_seed(seed_idx)
        x = torch.rand(params_in).float().requires_grad_(not calc_deriv)
        model = SimpleNetwork(params_in, copy.deepcopy(layer_sizes), act_funs, calc_deriv)
        start = time_ns()
        if not calc_deriv:
            jac = torch.autograd.functional.jacobian(model, x)
        else:
            jac = model.get_network_jacobian(x)
        end = time_ns()
        times.append(end - start)
        jacobians.append(jac)
    return (jacobians, times)

In [None]:
for p in range(len(params_in)):
    for l in range(len(layer_sizes_jac)):
        print(f'Running Jacobian calculations with {params_in[p]} params in, and layer sizes {layer_sizes_jac[l]}')
        jacobians_pt, t_jac_pt = \
            run_jacobian_calculations(num_eval, params_in[p],
                                      layer_sizes_jac[l], act_funs[0:len(layer_sizes_jac[l])], False)
        jacobians_mat, t_jac_mat = \
            run_jacobian_calculations(num_eval, params_in[p],
                                      layer_sizes_jac[l], act_funs[0:len(layer_sizes_jac[l])], True)
        for i in range(num_eval):
            jac_abs_diff_iter = (jacobians_mat[i].squeeze() - jacobians_pt[i]).abs().max()
            assert jac_abs_diff_iter < 1e-7
            if (jac_abs_diff_iter > jac_abs_diff_max):
                jac_abs_diff_max = jac_abs_diff_iter
            
        print(f'Average time - Jacobian - Pytorch : {np.mean(t_jac_pt)} nanoseconds')
        print(f'Average time - Jacobian - Matrix : {np.mean(t_jac_mat)} nanoseconds')
        print(f'Median time - Jacobian - Pytorch : {np.median(t_jac_pt)} nanoseconds')
        print(f'Median time - Jacobian - Matrix : {np.median(t_jac_mat)} nanoseconds')
        times_jac_pt[:, p, l] = t_jac_pt
        times_jac_mat[:, p, l] = t_jac_mat

### Comparison of the Hessian calculations

In [None]:
def run_hessian_calculations(num_eval, params_in, layer_sizes, act_funs, calc_deriv):
    hessians = []
    times = []
    for seed_idx in range(num_eval):
        torch.manual_seed(seed_idx)
        x = torch.rand(params_in).float().requires_grad_(not calc_deriv)
        model = SimpleNetwork(params_in, copy.deepcopy(layer_sizes), act_funs, calc_deriv)
        start = time_ns()
        if not calc_deriv:
            hess = torch.autograd.functional.hessian(model, x)
        else:
            hess = model.get_network_hessians(x)
        end = time_ns()
        times.append(end - start)
        hessians.append(hess)
    return (hessians, times)

In [None]:
for p in range(len(params_in)):
    for l in range(len(layer_sizes_hess)):
        print(f'Running Hessian calculations with {params_in[p]} params in, and layer sizes {layer_sizes_hess[l]}')
        hessians_pt, t_hess_pt = \
            run_hessian_calculations(num_eval, params_in[p],
                                     layer_sizes_hess[l], act_funs[0:len(layer_sizes_hess[l])], False)
        hessians_mat, t_hess_mat = \
            run_hessian_calculations(num_eval, params_in[p],
                                     layer_sizes_hess[l], act_funs[0:len(layer_sizes_hess[l])], True)
        for i in range(num_eval):
            hess_abs_diff_iter = (hessians_mat[i].squeeze() - hessians_pt[i]).abs().max()
            assert hess_abs_diff_iter < 1e-7
            if (hess_abs_diff_iter > hess_abs_diff_max):
                hess_abs_diff_max = hess_abs_diff_iter
        print(f'Average time - Hessian - Pytorch : {np.mean(t_hess_pt)} nanoseconds')
        print(f'Average time - Hessian - Matrix : {np.mean(t_hess_mat)} nanoseconds')
        print(f'Median time - Hessian - Pytorch : {np.median(t_hess_pt)} nanoseconds')
        print(f'Median time - Hessian - Matrix : {np.median(t_hess_mat)} nanoseconds')
        times_hess_pt[:, p, l] = t_hess_pt
        times_hess_mat[:, p, l] = t_hess_mat

In [None]:
if options_case:
    suffix = 'options'
else:
    suffix = 'generic'

pickle.dump(times_jac_pt, open(f'rev_times_{suffix}_jac_pt.p', 'wb'))
pickle.dump(times_jac_mat, open(f'rev_times_{suffix}_jac_mat.p', 'wb'))

pickle.dump(times_hess_pt, open(f'rev_times_{suffix}_hess_pt.p', 'wb'))
pickle.dump(times_hess_mat, open(f'rev_times_{suffix}_hess_mat.p', 'wb'))