In [None]:
# CLASS DEFINITIONS FOR NEURAL NETWORKS USED IN DEEP GALERKIN METHOD

#%% import needed packages
import tensorflow as tf

class DenseLayer(tf.keras.layers.Layer):
    
    # constructor/initializer function (automatically called when new instance of class is created)
    def __init__(self, output_dim, input_dim, transformation=None):
        '''
        Args:
            input_dim:       dimensionality of input data
            output_dim:      number of outputs for dense layer
            transformation:  activation function used inside the layer; using
                             None is equivalent to the identity map 
        
        Returns: customized Keras (fully connected) layer object 
        '''        
        
        # create an instance of a Layer object (call initialize function of superclass of DenseLayer)
        super(DenseLayer, self).__init__()
        self.output_dim = output_dim
        self.input_dim = input_dim
        self.W = self.add_weight(
            name="W",
            shape=[self.input_dim, self.output_dim],
            initializer=tf.initializers.GlorotUniform(),
            dtype=tf.float32
        )
        
        # Create the bias vector
        self.b = self.add_weight(
            name="b",
            shape=[1, self.output_dim],
            initializer=tf.zeros_initializer(),
            dtype=tf.float32)
        
        if transformation:
            if transformation == "tanh":
                self.transformation = tf.tanh
            elif transformation == "relu":
                self.transformation = tf.nn.relu
            elif transformation == "mish":
                self.transformation = lambda x: x * tf.math.tanh(tf.math.softplus(x))
            elif transformation == "swish":
                self.transformation = lambda x: x * tf.math.sigmoid(x)
            else:
                self.transformation = None
        else:
            self.transformation = None

    def call(self, X):
        '''Compute output of a dense layer for a given input X 
        
        Args:                        
            X: input to layer            
        '''
        
        # compute dense layer output
        S = tf.add(tf.matmul(X, self.W), self.b)
                
        if self.transformation:
            S = self.transformation(S)
        
        return S

def terminal_utility(x):
    return -x
  
class DGMNet(tf.keras.Model):
    def __init__(self, layer_width, n_layers, input_dim, final_trans=None, feedforward=False, output_dim=1, control_output=False):
        super(DGMNet, self).__init__()

        self.n_layers = n_layers
        self.feedforward = feedforward
        self.output_dim = output_dim

        # Define kernel layers
        self.kernel_layers = []
        for _ in range(n_layers):
            self.kernel_layers.append(DenseLayer(layer_width, layer_width, transformation='swish'))

        # Final output layer
        self.output_weight = tf.keras.layers.Dense(output_dim, activation=final_trans)
        self.control_output = control_output
        self.initial_layer = DenseLayer(layer_width, input_dim+1, transformation='swish')



    def call(self, t, x):
        # Concatenate time and space
        X = tf.concat([t, x], axis=1)
        S = self.initial_layer.call(X)
        for i in range(self.n_layers):
            S = self.kernel_layers[i](S)+S

        # Final output
        result = self.output_weight(S)
        if self.control_output == False:
            result = terminal_utility(x)+result*(1-t)

        return result

In [None]:
#%% import needed packages
import numpy as np
import matplotlib.pyplot as plt


#%% Parameters 
d = 1 # dimension
sigma = 1  # asset volatility
T = 1        # terminal time (investment horizon)
#Cbeta_low=0  # Lower bound for Beta case 1
#Cbeta_up=0.1   # Upper bound for Beta case 1
#CbetaZ_low=0    # Lower bound for Beta + Z case 1
#CbetaZ_up=0.5    # Upper bound for Beta + Z case 1

#Cbeta_low=0  # Lower bound for Beta case 2
#Cbeta_up=0.2   # Upper bound for Beta case 2

Cbeta_up=0.5   # Upper bound for Beta case 3
CbetaZ_up=1.2   # Upper bound for Beta + Z case 3


# Solution parameters (domain on which to solve PDE)
t_low = 0 - 1e-10    # time lower bound
X_low = 0- 1e-10  # wealth lower bound
X_high = 1.0           # wealth upper bound

# neural network parameters
num_layers = 3
nodes_per_layer = 64
starting_learning_rate = 0.001

# Training parameters
sampling_stages  = 5000  # number of times to resample new time-space domain points
steps_per_sample = 10    # number of SGD steps to take before re-sampling

# Sampling parameters
nSim_interior = 4000
nSim_terminal = 1

# multipliers for oversampling i.e. draw X from [X_low - X_oversample, X_high + X_oversample]
X_oversample = 0.5
t_oversample = 0.0

# Plot options
n_plot = 41  # Points on plot grid for each dimension

# Save options
saveOutput = False
saveName   = 'Problem'
saveFigure = False
figureName = 'Problem'


#%% Analytical Solution
def h(t, x):
    # return 3*(T-t)/8-x #case 1
    #return (T-t)/2-x #case 2
    return (T-t)/2-x #case 3

def sampler(nSim_interior, nSim_terminal):
    ''' Sample time-space points from the function's domain; points are sampled
        uniformly on the interior of the domain, at the initial/terminal time points
        and along the spatial boundary at different time points. 
    
    Args:
        nSim_interior: number of space points in the interior of the function's domain to sample 
        nSim_terminal: number of space points at terminal time to sample (terminal condition)
    '''
    # Sampler #1: domain interior    
    t_interior = np.random.uniform(low=t_low - t_oversample*(T-t_low), high=T, size=[nSim_interior, 1]).astype(np.float32)
    X_interior = np.random.uniform(low=X_low - X_oversample*(X_high-X_low), high=X_high + X_oversample*(X_high-X_low), size=[nSim_interior, d]).astype(np.float32)
    

    # Sampler #3: initial/terminal condition
    t_terminal = T * np.ones((nSim_terminal, 1)).astype(np.float32)
    X_terminal = np.random.uniform(low=X_low - X_oversample*(X_high-X_low), high=X_high + X_oversample*(X_high-X_low), size = [nSim_terminal, d]).astype(np.float32)


    return t_interior, X_interior, t_terminal, X_terminal

#%% Loss function for Merton Problem PDE

def loss(model,control, t_interior, X_interior, t_terminal, X_terminal):
    ''' Compute total loss for training.'''

    with tf.GradientTape(persistent=True, watch_accessed_variables=False) as gt:
        gt.watch(t_interior)
        gt.watch(X_interior)
        V = model(t_interior, X_interior)  # V shape: (?, 1)
        V_x = gt.gradient(V, X_interior)   # First-order gradients, shape: (?, 1)
        
    V_t = gt.gradient(V, t_interior)  # Time derivative, shape: (?, 1)
    out = control(t_interior, X_interior)
    Z = tf.expand_dims(out[:,0],axis=-1)  # Control function, shape: (?, 1)
    alpha=tf.expand_dims(out[:,1],axis=-1) # Control function, shape: (?, 1)
    beta=tf.expand_dims(out[:,2],axis=-1) # Control function, shape: (?, 1)
    V_xx = gt.batch_jacobian(V_x, X_interior)  # shape (?, 1, 1)
    V_xx = tf.expand_dims(V_xx[:, 0, 0], axis=-1)  # d²V/dx²

    # Compute the PDE residual
    diff_V = V_t+V_x*(1/2*Z*Z-1/2*beta*beta-alpha)+1/2*V_xx*Z*Z+(1-beta)*(beta+Z)-alpha

    L1 = tf.reduce_mean(tf.square(diff_V))  # Loss term for PDE

    target_terminal = terminal_utility(X_terminal)
    fitted_terminal = model(t_terminal, X_terminal)
    diff_terminal = fitted_terminal - target_terminal
    L3 = tf.reduce_mean(tf.square(diff_terminal))  # Loss term for terminal condition
    
    del gt
    return L1, L3, diff_V, diff_terminal

def loss_control(model, control, t_interior, X_interior, t_terminal, X_terminal):
    ''' Compute total loss for training.'''
    with tf.GradientTape(persistent=True, watch_accessed_variables=False) as gt:
        gt.watch(t_interior)
        gt.watch(X_interior)
        V = model(t_interior, X_interior)  # V shape: (?, 1)
        V_x = gt.gradient(V, X_interior)   # First-order gradients, shape: (?, 1)

    out = control(t_interior, X_interior)
    Z = tf.expand_dims(out[:,0],axis=-1)  # Control function, shape: (?, 1)
    alpha=tf.expand_dims(out[:,1],axis=-1) # Control function, shape: (?, 1)
    beta=tf.expand_dims(out[:,2],axis=-1) # Control function, shape: (?, 1)
    # Compute second-order derivatives
    V_xx = gt.batch_jacobian(V_x, X_interior)  # shape (?, 1, 1)
    V_xx = tf.expand_dims(V_xx[:, 0, 0], axis=-1)  # d²V/dx²

    # Compute the PDE residual
    f=V_x*(1/2*Z*Z-1/2*beta*beta-alpha)+1/2*V_xx*Z*Z+(1-beta)*(beta+Z)-alpha # shape (?, 1)
    dfdalpha= -V_x -1

    #penalty=tf.nn.relu(beta-Cbeta_up)+tf.nn.relu(Cbeta_low-beta)+tf.nn.relu(beta+Z-CbetaZ_up)+tf.nn.relu(CbetaZ_low-Z-beta) #case 1
    #penalty=tf.nn.relu(beta-Cbeta_up)+tf.nn.relu(Cbeta_low-beta) #case 2
    penalty = tf.nn.relu(beta-Cbeta_up)+tf.nn.relu(beta+Z-CbetaZ_up)#case 3
    controlbound=tf.reduce_mean(penalty) 
    
    L2 = tf.reduce_mean(f)-controlbound  # Loss term for control
    dfdalpha = tf.reduce_mean(dfdalpha) # derivative of f(L2) on control

    del gt
    return -L2, dfdalpha, controlbound, penalty

#%% Set up network

# initialize DGM model
model = DGMNet(nodes_per_layer, num_layers, input_dim=d, output_dim=1)
control = DGMNet(nodes_per_layer, num_layers, input_dim=d,output_dim=3, control_output=True)
# controlalpha=DGMNet(nodes_per_layer, num_layers, input_dim=d,output_dim=1, control_output=True)
# controlbeta=DGMNet(nodes_per_layer, num_layers, input_dim=d,output_dim=1, control_output=True)


lr_schedule = tf.keras.optimizers.schedules.PolynomialDecay(
    initial_learning_rate=0.001, decay_steps=10000, end_learning_rate=0.00001, power=1.2
)


# Optimizers
optimizer_value = tf.keras.optimizers.Adam(lr_schedule)
optimizer_control = tf.keras.optimizers.Adam(lr_schedule)
# optimizer_control_alpha = tf.keras.optimizers.Adam(lr_schedule)
# optimizer_control_beta = tf.keras.optimizers.Adam(lr_schedule)


#%% Train network

# Define the train_step function
@tf.function
def train_step(model, control,  t_interior, X_interior, t_terminal, X_terminal, optimizer_value, optimizer_control):
    ''' Perform of single training step.'''
    # Compute loss for the value function (L1 + L3)
    t_interior = tf.convert_to_tensor(t_interior, dtype=tf.float32)
    X_interior = tf.convert_to_tensor(X_interior, dtype=tf.float32)
    t_terminal = tf.convert_to_tensor(t_terminal, dtype=tf.float32)
    X_terminal = tf.convert_to_tensor(X_terminal, dtype=tf.float32)

    with tf.GradientTape() as tape1:
        L1, L3, diff_V, diff_terminal = loss(model, control, t_interior, X_interior, t_terminal, X_terminal)
        total_loss = L1+L3
    grads = tape1.gradient(total_loss, model.trainable_variables)

    optimizer_value.apply_gradients(zip(grads, model.trainable_variables))  # Update the value model
    

    with tf.GradientTape(persistent=True) as tape:
        L2, _, _ ,_= loss_control(
            model, control,
            t_interior, X_interior,
            t_terminal, X_terminal
        )


    grads_control = tape.gradient(L2, control.trainable_variables)


    optimizer_control.apply_gradients(zip(grads_control, control.trainable_variables))

    
    return L1, L3, diff_V, diff_terminal, L2, total_loss
@tf.function
def eval_step(model, control, t_interior_val, X_interior_val, t_terminal_val, X_terminal_val):
    L1_val, L3_val, diff_V_val, diff_terminal_val = loss(model, control, t_interior_val, X_interior_val, t_terminal_val, X_terminal_val)
    L2_val, dL2dalpha_val, controlbound_val, penalty_val = loss_control(model, control, t_interior_val, X_interior_val, t_terminal_val, X_terminal_val)
    return L1_val, L3_val, diff_V_val, diff_terminal_val, L2_val, dL2dalpha_val, controlbound_val, penalty_val

# Initialize lists for storing loss values
loss_list = []
L2_list = []
L1_list = []
L3_list = []
value_list = []
control_list = []
penalty_list = []
maxdiffV_list = []
cb_list=[]

maxdiff_V = 100
maxdiff_terminal = 100

# dataset = dataset.prefetch(tf.data.AUTOTUNE)
t_eval = tf.constant([[0.0]], dtype=tf.float32)  # shape (1, 1)
X_eval = tf.constant([[0.0]], dtype=tf.float32)  # shape (1, 1)

# Main training loop
for i in range(sampling_stages):
    t_interior, X_interior, t_terminal, X_terminal = sampler(nSim_interior, nSim_terminal)
    for _ in range(steps_per_sample):
        L1, L3, diff_V, diff_terminal, L2, total_loss = train_step(
                model, control, t_interior, X_interior,
                t_terminal, X_terminal,
                optimizer_value, optimizer_control
            )
    loss_list.append(total_loss)
    L2_list.append(L2)
    L1_list.append(L1)
    L3_list.append(L3)

    # val
    t_interior_val, X_interior_val, t_terminal_val, X_terminal_val = sampler(nSim_interior, nSim_terminal)
    t_interior_val = tf.convert_to_tensor(t_interior_val, dtype=tf.float32)
    X_interior_val = tf.convert_to_tensor(X_interior_val, dtype=tf.float32)
    t_terminal_val = tf.convert_to_tensor(t_terminal_val, dtype=tf.float32)
    X_terminal_val = tf.convert_to_tensor(X_terminal_val, dtype=tf.float32)

    L1_val, L3_val, diff_V_val, diff_terminal_val, L2_val, dL2dalpha_val, controlbound_val, penalty_val = eval_step(
        model, control, t_interior_val, X_interior_val, t_terminal_val, X_terminal_val
    )

    maxdiff_V = np.max(np.abs(diff_V_val.numpy()))
    maxdiff_terminal = np.max(np.abs(diff_terminal_val.numpy()))
    maxdL2dalpha = np.max(np.abs(dL2dalpha_val.numpy()))
    maxControlBound= np.max(np.abs(controlbound_val.numpy()))
    maxPenalty = np.max(np.abs(penalty_val.numpy()))
    penalty_list.append(maxPenalty)
    maxdiffV_list.append(maxdiff_V)
    cb_list.append(maxControlBound)

    print(f"Stage {i}:{maxdiff_V},{maxdiff_terminal},{maxdL2dalpha},{maxControlBound},{maxPenalty}, L1 = {L1.numpy()}, L3 = {L3.numpy()}, L2 = {L2.numpy()}")

    # Evaluate at (t=0, x=0)
    V_pred = model(t_eval, X_eval)
    value_list.append(V_pred.numpy()[0, 0])
    print("V(0, 0) =", V_pred.numpy()[0, 0])
    out_pred = control(t_eval, X_eval)
    control_pred = tf.expand_dims(out_pred[:,0],axis=-1)  # Control function, shape: (?, 1)
    control_alpha_pred = tf.expand_dims(out_pred[:,1],axis=-1) # Control function, shape: (?, 1)
    control_beta_pred = tf.expand_dims(out_pred[:,2],axis=-1) # Control function, shape
    control_list.append(control_pred.numpy()[0, 0])
    print("z(0, 0) =", control_pred.numpy()[0, 0])
    print("alpha(0, 0) =", control_alpha_pred.numpy()[0, 0])
    print("beta(0, 0) =", control_beta_pred.numpy()[0, 0])
    print("beta+z =",control_pred.numpy()[0, 0]+control_beta_pred.numpy()[0, 0])

    if maxdiff_V < 1e-2 and maxdL2dalpha < 1e-3 and maxPenalty==0:
        break

# Save output if needed
if saveOutput:
    model.save(f'./SavedNets/{saveName}')

In [None]:
# this plot beta
# vector of t and X values for plotting
X_plot = np.linspace(X_low, X_high, n_plot)
t_plot = np.linspace(t_low, T, n_plot)
    
# compute model-implied optimal control for each (t,X) pair
t_mesh, X_mesh = np.meshgrid(t_plot, X_plot)

t_plot = np.reshape(t_mesh, [n_plot**2,1])
X_plot = np.reshape(X_mesh, [n_plot**2,1])

t_plot = tf.convert_to_tensor(t_plot, dtype=tf.float32)
X_plot = tf.convert_to_tensor(X_plot, dtype=tf.float32)

fitted_optimal_control_out = control(t_plot, X_plot)
fitted_optimal_control_beta = tf.expand_dims(fitted_optimal_control_out[:,2],axis=-1)
# fitted_optimal_control_beta = controlbeta(t_plot, X_plot)
fitted_optimal_control_beta_mesh = np.reshape(fitted_optimal_control_beta, [n_plot, n_plot])

print(fitted_optimal_control_beta_mesh)

plt.rcParams['axes.spines.right'] = True
plt.rcParams['axes.spines.top'] = True
# PLOT optimal beta
c=np.min(fitted_optimal_control_beta_mesh)
d=np.max(fitted_optimal_control_beta_mesh)
fig, ax = plt.subplots(figsize = (12,12),dpi=500)
#plt.figure(figsize = (8,8),dpi=500)
levels = np.arange(c,d,0.1)

#CB=plt.pcolormesh(t_mesh, X_mesh,A, cmap = "rainbow",vmin=c ,vmax=c)

CS=plt.contour(t_mesh, X_mesh,fitted_optimal_control_beta_mesh, levels=levels)
plt.clabel(CS, inline=True,fontsize=20)
# plot options
#a=plt.colorbar(CS)
#for t in a.ax.get_yticklabels():
     #t.set_fontsize(10)
#plt.title("numerical control for c=0", fontsize=20)
plt.ylabel("x", fontsize=25, labelpad=10)
plt.xlabel("t", fontsize=25, labelpad=20)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.savefig('beta.png')

In [None]:
# this plot beta+z
# vector of t and X values for plotting
X_plot = np.linspace(X_low, X_high, n_plot)
t_plot = np.linspace(t_low, T, n_plot)
    
# compute model-implied optimal control for each (t,X) pair
t_mesh, X_mesh = np.meshgrid(t_plot, X_plot)

t_plot = np.reshape(t_mesh, [n_plot**2,1])
X_plot = np.reshape(X_mesh, [n_plot**2,1])

t_plot = tf.convert_to_tensor(t_plot, dtype=tf.float32)
X_plot = tf.convert_to_tensor(X_plot, dtype=tf.float32)


fitted_optimal_control_out = control(t_plot, X_plot)
fitted_optimal_control = tf.expand_dims(fitted_optimal_control_out[:,0],axis=-1)
fitted_optimal_control_mesh = np.reshape(fitted_optimal_control, [n_plot, n_plot])

print(fitted_optimal_control_mesh+fitted_optimal_control_beta_mesh)

plt.rcParams['axes.spines.right'] = True
plt.rcParams['axes.spines.top'] = True
# PLOT optimal beta
c=np.min(fitted_optimal_control_mesh+fitted_optimal_control_beta_mesh)
d=np.max(fitted_optimal_control_mesh+fitted_optimal_control_beta_mesh)
fig, ax = plt.subplots(figsize = (12,12),dpi=500)
#plt.figure(figsize = (8,8),dpi=500)
levels = np.arange(c,d,0.004)

#CB=plt.pcolormesh(t_mesh, X_mesh,A, cmap = "rainbow",vmin=c ,vmax=c)

CS=plt.contour(t_mesh, X_mesh,fitted_optimal_control_mesh+fitted_optimal_control_beta_mesh, levels=levels)
plt.clabel(CS, inline=True,fontsize=20)
# plot options
#a=plt.colorbar(CS)
#for t in a.ax.get_yticklabels():
     #t.set_fontsize(10)
#plt.title("numerical control for c=0", fontsize=20)
plt.ylabel("x", fontsize=25, labelpad=10)
plt.xlabel("t", fontsize=25, labelpad=20)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.savefig('beta+Z.png')

In [None]:
def lolt(loss):
    res = []
    for i in range (len(loss)):
        if (loss[i]<10**-8 or i%5!=0):
            res.append(float("NaN"))
        else:
            res.append(loss[i])
    return res

def lola(loss):
    res = []
    for i in range (len(loss)):
        if (loss[i]<10**-8 and i%5==0):
            res.append(10**-8)
        else:
            res.append(float("NaN"))
    return res

In [None]:
#import matplotlib.ticker as ticker
plt.rcParams['axes.spines.right'] = False
plt.rcParams['axes.spines.top'] = False
fig, ax = plt.subplots(figsize = (12,12),dpi=500)
#fig,ax = plt.figure()
#plt.figure(figsize = (8,8),dpi=500)
#fig, ax = plt.subplots()
x=np.linspace(0,10*len(L1_list),len(L1_list))
plt.plot(x,L1_list,label='$||L_{int}||_2$',color='blue')
plt.plot(x,maxdiffV_list,label='$||L_{int}||_{\infty}$',color='blue',linestyle='dashed')
plt.plot(x,lolt(cb_list),'g^',label='$||P(\\beta,Z)||_{1}$',color='green')
plt.plot(x,lola(cb_list),'g^',color='green',linewidth=5)
plt.plot(x,lolt(penalty_list),'x',color='green',label='$||P(\\beta,Z)||_{\infty}$')
plt.plot(x,lola(penalty_list),'x',color='green',linestyle='dashed')
#plt.title("L2", fontsize=20)
plt.ylabel("Loss function", fontsize=25, labelpad=20)
plt.xlabel("Training step n", fontsize=25, labelpad=20)
plt.xticks(fontsize=20)
#fig, ax = plt.subplots()
plt.yticks(fontsize=20)
plt.yscale('log')
#plt.xlim([0, 5*len(L1_list)+10])
plt.ylim([10**-8, 10])
#_labels = ax.get_yticks()
#ax.yaxis.set_major_formatter(ticker.FormatStrFormatter('%0.0e'))
#plt.ticklabel_format(axis='y', style='scientific')
#fig.get_axes()[0].ticklabel_format(axis='y', style='scientific')
#ax.ticklabel_format(axis='y', style='sci',useMathText=True)
plt.yticks(np.array([0.0000001,0.000001,0.00001,0.0001,0.001,0.01,0.1,1,10**1]),fontsize=20)
ax.set_yticklabels(['$10^{-7}$','$10^{-6}$','$10^{-5}$','$10^{-4}$','$10^{-3}$','$10^{-2}$','$10^{-1}$','$10^0$','$10^1$'])
#locmaj = ticker.LogLocator(base=10,numticks=12) 
#ax.yaxis.set_major_locator(locmaj)
#plt.gca().set_yticklabels()
plt.legend(fontsize="25", loc ="upper right",framealpha=0.2)
#plt.box(False)
plt.savefig('loss function.png')

In [None]:
# save models weights .h5
model.save_weights('model_value.weights.h5')
control.save_weights('model_control_z.weights.h5')
