### Load necessary libraries

In [1]:
import numpy as np
import neuralsens.partial_derivatives as ns
import pandas as pd
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error
from sklearn import set_config
set_config(display='diagram')
%matplotlib qt

  from .autonotebook import tqdm as notebook_tqdm


### Create synthetic dataset to check behavior of functions

In [2]:
samples = 100000
n_columns = 8
sm = np.random.normal(size=(samples,n_columns))
df = pd.DataFrame(sm, columns=['X' + str(x) for x in range(1,n_columns+1)])

### Check behavior of Jacobian function

#### Create output Y as linear function of inputs with some non-linear relationship

In [3]:
df['Y'] = - 0.8 * df.X1 + 0.5 * df.X2 ** 2 - df.X3 * df.X4 + 0.1 * np.random.normal(size=(samples,)) 

#### Train MLP model using the data.frame created

In [4]:
## Create random 80/20 % split
X_train, X_test, y_train, y_test = train_test_split(df.loc[:, df.columns != 'Y'].to_numpy(), df['Y'], test_size = 0.2, random_state = 5)

In [5]:
### Create MLP model
model = MLPRegressor(solver='sgd', # Update function
                    hidden_layer_sizes=[10], # #neurons in hidden layers
                    learning_rate_init=0.001, # initial learning rate
                    activation='logistic', # Logistic sigmoid activation function
                    alpha=0.01, # L2 regularization term
                    learning_rate='adaptive', # Type of learning rate used in training
                    max_iter=500, # Maximum number of iterations
                    batch_size=10, # Size of batch when training
                    tol=1e-2, # Tolerance for the optimization
                    validation_fraction=0.0, # Percentage of samples used for validation
                    n_iter_no_change=10, # Maximum number of epochs to not meet tol improvement
                    random_state=150)

# Train model
model.fit(X_train, y_train)

In [6]:
# Predict values to check model performance
y_train_pred = model.predict(X_train)
y_test_pred = model.predict(X_test)

# Obtain performance metrics
print("Training set: R2", r2_score(y_train, y_train_pred), "MSE", mean_squared_error(y_train, y_train_pred))
print("Test set: R2", r2_score(y_test, y_test_pred), "MSE", mean_squared_error(y_test, y_test_pred))


Training set: R2 0.9627375736386207 MSE 0.0794574619071889
Test set: R2 0.9625435088618721 MSE 0.07916687680012464


In [7]:
import matplotlib.pyplot as plt
fig=plt.figure()
ax=fig.add_axes([0,0,1,1])
ax.scatter(y_train, y_train_pred, color='r')
plt.show()

#### Execute jacobian function and check sensitivity metrics

In [8]:
# Obtain parameters to perform jacobian
wts = model.coefs_
bias = model.intercepts_
actfunc = ['identity',model.get_params()['activation'],model.out_activation_]
X = pd.DataFrame(X_train, columns=df.columns[df.columns != 'Y'])
y = pd.DataFrame(y_train, columns=['Y'])
sens_end_layer = 'last'
sens_end_input = False
sens_origin_layer = 0
sens_origin_input = True

In [9]:
jacobian = ns.jacobian_mlp(wts, bias, actfunc, X, y)

In [10]:
# Check sensitivity metrics
# For X1, mean should be around -0.8
# For X2, X3, X4, std shall be much greater than their mean
# For X5, mean and std shall be near 0
jacobian.summary()

Sensitivity analysis of [8, 10, 1] MLP network.

Sensitivity measures of each output:

$Y 

        mean       std  mean_squared
X1 -0.790251  0.075959      0.630267
X2  0.000836  0.929067      0.863166
X3  0.005942  0.939730      0.883128
X4 -0.000940  0.939830      0.883282
X5 -0.000430  0.003722      0.000014
X6 -0.000956  0.003367      0.000012
X7 -0.001262  0.001220      0.000003
X8 -0.001185  0.002839      0.000009


In [11]:
jacobian.info()

Sensitivity analysis of [8, 10, 1] MLP network.

80000 samples

Sensitivities of each output (only 5 first samples):

$Y 

         X1        X2        X3        X4        X5        X6        X7  \
0 -0.878113 -1.209858 -0.564912  1.432208 -0.000024  0.001642 -0.002341   
1 -0.895638 -1.584511 -0.922908 -0.666574  0.005098 -0.000307 -0.000148   
2 -0.899316 -1.382358 -0.983582 -0.486375  0.005185  0.001362 -0.001840   
3 -0.795823  0.831578  0.587179 -0.375461  0.000024 -0.001394 -0.000744   
4 -0.722826 -1.080295  1.062432 -1.100959  0.000715  0.002715 -0.002628   

         X8  
0 -0.001914  
1 -0.005624  
2 -0.003781  
3  0.001348  
4  0.001105  


In [12]:
jacobian.plot()

In [15]:
ns.alpha_sens_curves(jacobian, max_alpha = 300)

### Pytorch Functionality

In [None]:
from torch import manual_seed
import torch
manual_seed(1)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# device = torch.device('cpu')
device

In [None]:
samples = 10000
n_columns = 8
sm = np.random.normal(size=(samples,n_columns))
df = pd.DataFrame(sm, columns=['X' + str(x) for x in range(1,n_columns+1)])

In [None]:
df['Y'] = - 0.8 * df.X1 + 0.5 * df.X2 ** 2 - df.X3 * df.X4 + 0.1 * np.random.normal(size=(samples,)) 

In [None]:
## Create random 80/20 % split
X_train, X_test, y_train, y_test = train_test_split(df.loc[:, df.columns != 'Y'].to_numpy(), df['Y'], test_size = 0.25, random_state = 5)

In [None]:
class MLP(torch.nn.Sequential):
    def __init__(self, input_size:int, output_size:int = 1, hidden_size:list = [10]):
        # Store layers to initiate sequential neural network
        layers           = []
        first = True
        for idx, neurons in enumerate(hidden_size):
            if first:
                layers += [torch.nn.Linear(input_size, neurons)]
                first = False
            else:
                layers += [torch.nn.Linear(hidden_size[idx-1], neurons)]
            layers += [torch.nn.Sigmoid()]
        layers += [torch.nn.Linear(hidden_size[idx-1], output_size)]
        super(MLP, self).__init__(*layers)

In [None]:
X_train_tch = torch.FloatTensor(X_train).requires_grad_(True).to(device)
X_test_tch = torch.FloatTensor(X_test).requires_grad_(True).to(device)
y_train_tch = torch.FloatTensor(y_train.to_numpy()).to(device)
y_test_tch = torch.FloatTensor(y_test.to_numpy()).to(device)

In [None]:
model = MLP(input_size=n_columns, output_size=1, hidden_size=[15,15])
model = model.to(device)

In [None]:
criterion = torch.nn.MSELoss()
lr = 0.1
optimizer = torch.optim.SGD(model.parameters(), lr = 0.1)

In [None]:
model.eval()
y_pred = model(X_test_tch)
before_train = criterion(y_pred.squeeze().to(device), y_test_tch)
print('Test loss before training' , before_train.item())    

In [None]:
model.train()
epoch = 0
lr = 0.8
loss = before_train
counter = 0
path=[]
while loss.item() > 0.1:
    optimizer.zero_grad() # Reset the gradient
    epoch += 1
    counter += 1
    # Forward pass
    y_pred = model(X_train_tch)
    # Update learning rate based on loss
    if loss < criterion(y_pred.squeeze().to(device), y_train_tch):
        counter = 0
        lr /= 2
        optimizer = torch.optim.SGD(model.parameters(), lr = lr)
    if counter > 100:
        counter = 0
        lr *= 1.2
        optimizer = torch.optim.SGD(model.parameters(), lr = lr)
    # Compute Loss
    loss = criterion(y_pred.squeeze().to(device), y_train_tch)
    print('Epoch {}: train loss: {} learning rate: {}'.format(epoch, loss.item(), lr))
    # Backward pass
    loss.backward()
    optimizer.step()

In [None]:
model.eval()
y_pred = model(X_test_tch)
before_train = criterion(y_pred.squeeze().to(device), y_test_tch)
print('Test loss after training' , before_train.item())   

In [None]:
X = pd.DataFrame(X_train, columns=df.columns[df.columns != 'Y'])
y = pd.DataFrame(y_train, columns=['Y'])
sens_end_layer = 'last'
sens_end_input = False
sens_origin_layer = 0
sens_origin_input = True

In [None]:
wts_torch = []
bias_torch = []
for name, param in model.named_parameters():
    #print(name, ":", param)
    if "weight" in name:
        wts_torch.append(param.detach().T.to(device))
    if "bias" in name:
        bias_torch.append(param.detach().to(device))
actfunc_torch = ["identity", "logistic", "logistic", "identity"]

In [None]:
device

In [None]:
jacobian = ns.jacobian_mlp(wts_torch, bias_torch, actfunc_torch, X, y, use_torch=True, dev="cuda")

In [None]:
jacobian.summary()

### Check behavior of Hessian function

#### Create output Y as linear function of inputs with some non-linear relationships

In [None]:
df['Y'] = - 0.4 * df.X1 ** 3 - 0.5 * df.X2 ** 2 + 0.7 * df.X3 * df.X4 + 0.1 * np.random.normal(size=(samples,)) 

#### Train MLP model using the data.frame created

In [None]:
## Create random 80/20 % split
X_train, X_test, y_train, y_test = train_test_split(df.loc[:, df.columns != 'Y'].to_numpy(), df['Y'], test_size = 0.2, random_state = 5)

In [None]:
### Create MLP model
model = MLPRegressor(solver='sgd', # Update function
                    hidden_layer_sizes=[10], # #neurons in hidden layers
                    learning_rate_init=0.001, # initial learning rate
                    activation='logistic', # Logistic sigmoid activation function
                    alpha=0.01, # L2 regularization term
                    learning_rate='adaptive', # Type of learning rate used in training
                    max_iter=500, # Maximum number of iterations
                    batch_size=10, # Size of batch when training
                    tol=1e-2, # Tolerance for the optimization
                    validation_fraction=0.0, # Percentage of samples used for validation
                    n_iter_no_change=10, # Maximum number of epochs to not meet tol improvement
                    random_state=150)

# Train model
model.fit(X_train, y_train)

In [None]:
# Predict values to check model performance
y_train_pred = model.predict(X_train)
y_test_pred = model.predict(X_test)

# Obtain performance metrics
print("Training set: R2", r2_score(y_train, y_train_pred), "MSE", mean_squared_error(y_train, y_train_pred))
print("Test set: R2", r2_score(y_test, y_test_pred), "MSE", mean_squared_error(y_test, y_test_pred))


In [None]:
import matplotlib.pyplot as plt
fig=plt.figure()
ax=fig.add_axes([0,0,1,1])
ax.scatter(y_train, y_train_pred, color='r')
plt.show()

#### Execute hessian function and check sensitivity metrics

In [None]:
# Obtain parameters to perform hessian
wts = model.coefs_
bias = model.intercepts_
actfunc = ['identity',model.get_params()['activation'],model.out_activation_]
X = pd.DataFrame(X_train, columns=df.columns[df.columns != 'Y'])
y = pd.DataFrame(y_train, columns=['Y'])
sens_end_layer = 'last'
sens_end_input = False
sens_origin_layer = 0
sens_origin_input = True

In [None]:
hessian = ns.hessian_mlp(wts, bias, actfunc, X, y)

In [None]:
# Check sensitivity metrics
# For X1, X5 mean and std should be around 0
# For X2, mean should be around -1
# For X3, X4, mean should be around 0.7
hessian.summary()

In [None]:
hessian.info()

In [None]:
# hessian.plot()

### Pytorch Functionality

In [None]:
X_train_tch = torch.FloatTensor(X_train).requires_grad_(True).to(device)
X_test_tch = torch.FloatTensor(X_test).requires_grad_(True).to(device)
y_train_tch = torch.FloatTensor(y_train.to_numpy()).to(device)
y_test_tch = torch.FloatTensor(y_test.to_numpy()).to(device)

In [None]:
model = MLP(input_size=n_columns, output_size=1, hidden_size=[15,15])
model = model.to(device)

In [None]:
criterion = torch.nn.MSELoss()
lr = 0.1
optimizer = torch.optim.SGD(model.parameters(), lr = 0.1)

In [None]:
model.eval()
y_pred = model(X_test_tch)
before_train = criterion(y_pred.squeeze().to(device), y_test_tch)
print('Test loss before training' , before_train.item())    

In [None]:
model.train()
epoch = 0
lr = 0.8
loss = before_train
counter = 0
path=[]
while loss.item() > 0.1:
    optimizer.zero_grad() # Reset the gradient
    epoch += 1
    counter += 1
    # Forward pass
    y_pred = model(X_train_tch)
    # Update learning rate based on loss
    if loss < criterion(y_pred.squeeze().to(device), y_train_tch):
        counter = 0
        lr /= 2
        optimizer = torch.optim.SGD(model.parameters(), lr = lr)
    if counter > 100:
        counter = 0
        lr *= 1.2
        optimizer = torch.optim.SGD(model.parameters(), lr = lr)
    # Compute Loss
    loss = criterion(y_pred.squeeze().to(device), y_train_tch)
    print('Epoch {}: train loss: {} learning rate: {}'.format(epoch, loss.item(), lr))
    # Backward pass
    loss.backward()
    optimizer.step()

In [None]:
model.eval()
y_pred = model(X_test_tch)
before_train = criterion(y_pred.squeeze().to(device), y_test_tch)
print('Test loss after training' , before_train.item())   

In [None]:
device = "cpu"

In [None]:
X = pd.DataFrame(X_train, columns=df.columns[df.columns != 'Y'])
y = pd.DataFrame(y_train, columns=['Y'])
sens_end_layer = 'last'
sens_end_input = False
sens_origin_layer = 0
sens_origin_input = True

In [None]:
wts_torch = []
bias_torch = []
for name, param in model.named_parameters():
    #print(name, ":", param)
    if "weight" in name:
        wts_torch.append(param.detach().T.to(device))
    if "bias" in name:
        bias_torch.append(param.detach().to(device))
actfunc_torch = ["identity", "logistic", "logistic", "identity"]

In [None]:
hessian = ns.hessian_mlp(wts_torch, bias_torch, actfunc_torch, X, y, use_torch=True, dev="cpu")

In [None]:
# Check sensitivity metrics
# For X1, X5 mean and std should be around 0
# For X2, mean should be around -1
# For X3, X4, mean should be around 0.7
hessian.summary()

### Check behavior of Jerkian function

#### Create output Y as linear function of inputs with some non-linear relationships

In [None]:
# df['Y'] = - 1.0 * df.X4 + 0.8 * df.X2 ** 2
df['Y'] =  df.X3 * df.X4 * df.X5 + df.X1 * df.X2 #* df.X6 + df.X8 ** 3 #+ df.X7 ** 4

#### Train MLP model using the data.frame created

In [None]:
## Create random 80/20 % split
X_train, X_test, y_train, y_test = train_test_split(df.loc[:, df.columns != 'Y'].to_numpy(), df['Y'], test_size = 0.2, random_state = 5)

In [None]:
### Create MLP model
model = MLPRegressor(solver='sgd', # Update function
                    hidden_layer_sizes=[8], # #neurons in hidden layers
                    learning_rate_init=0.01, # initial learning rate
                    activation='logistic', # Logistic sigmoid activation function
                    alpha=0.01, # L2 regularization term
                    learning_rate='adaptive', # Type of learning rate used in training
                    max_iter=2000, # Maximum number of iterations
                    batch_size=10, # Size of batch when training
                    tol=1e-2, # Tolerance for the optimization
                    validation_fraction=0.0, # Percentage of samples used for validation
                    n_iter_no_change=10, # Maximum number of epochs to not meet tol improvement
                    random_state=1)

# Train model
model.fit(X_train, y_train)

In [None]:
# Predict values to check model performance
y_train_pred = model.predict(X_train)
y_test_pred = model.predict(X_test)

# Obtain performance metrics
print("Training set: R2", r2_score(y_train, y_train_pred), ", MSE ", mean_squared_error(y_train, y_train_pred))
print("Test set: R2", r2_score(y_test, y_test_pred), ", MSE ", mean_squared_error(y_test, y_test_pred))

In [None]:
import matplotlib.pyplot as plt
fig=plt.figure()
ax=fig.add_axes([0,0,1,1])
ax.scatter(y_train, y_train_pred, color='r')
plt.show()

#### Execute hessian function and check sensitivity metrics

In [None]:
# Obtain parameters to perform hessian
wts = model.coefs_
bias = model.intercepts_
actfunc = ['identity',model.get_params()['activation'],model.out_activation_]
X = pd.DataFrame(X_train, columns=df.columns[df.columns != 'Y'])
y = pd.DataFrame(y_train, columns=['Y'])
sens_end_layer = 'last'
sens_end_input = False
sens_origin_layer = 0
sens_origin_input = True

In [None]:
hessian = ns.hessian_mlp(wts, bias, actfunc, X, y)

In [None]:
hessian.sens[0]['mean']

In [None]:
from neuralsens.activation_functions import activation_function, der_activation_function, der_2_activation_function, der_3_activation_function  
### Initialize all the necessary variables
# Structure of the mlp model
mlpstr = [wts[0].shape[0]] + [lyr.shape[1] for lyr in wts] 

# Derivative and activation functions for each neuron layer
der3actfunc = [der_3_activation_function(af) for af in actfunc]
der2actfunc = [der_2_activation_function(af) for af in actfunc]
deractfunc = [der_activation_function(af) for af in actfunc]
actfunc = [activation_function(af) for af in actfunc]

In [None]:
# Number of samples to be cached (it is used several times)
n_samples = 10000#X.shape[0]
r_samples = range(n_samples)
n_inputs = X.shape[1]

# Weights of input layer
W = [np.identity(X.shape[1])]

# Input of input layer 
# inputs = [np.hstack((np.ones((len(X_train),1), dtype=int), X_train))]
Z = [np.dot(X, W[0])]

# Output of input layer
O = [actfunc[0](Z[0])]

# First Derivative of input layer
D = [np.array([deractfunc[0](Z[0][irow,]) for irow in r_samples])]

# Second derivative of input layer
D2 = [np.array([der2actfunc[0](Z[0][irow,]) for irow in r_samples])]

# Third derivative of input layer
D3 = [np.array([der3actfunc[0](Z[0][irow,]) for irow in r_samples])]

In [None]:
# Let's go over all the layers calculating each variable
for lyr in range(1,len(mlpstr)):
    # Calculate weights of each layer
    W.append(np.vstack((bias[lyr-1], wts[lyr-1])))
    
    # Calculate input of each layer
    # Add columns of 1 for the bias
    aux = np.ones((O[lyr-1].shape[0],O[lyr-1].shape[1]+1))
    aux[:,1:] = O[lyr-1]
    Z.append(np.dot(aux,W[lyr]))
    
    # Calculate output of each layer
    O.append(actfunc[lyr](Z[lyr]))
    
    # Calculate first derivative of each layer
    D.append(np.array([deractfunc[lyr](Z[lyr][irow,]) for irow in r_samples]))
    
    # Calculate second derivative of each layer
    D2.append(np.array([der2actfunc[lyr](Z[lyr][irow,]) for irow in r_samples]))
    
    # Calculate third derivative of each layer
    D3.append(np.array([der3actfunc[lyr](Z[lyr][irow,]) for irow in r_samples]))

In [None]:
# Now, let's calculate the derivatives of interest
if sens_end_layer == 'last':
    sens_end_layer = len(actfunc)
# Initialize cross derivatives
D_ = [np.identity(mlpstr[sens_origin_layer]) for irow in r_samples]
if sens_origin_input:
    D_ = [D[sens_origin_layer]]
Q = [np.zeros((n_samples,n_inputs,n_inputs,n_inputs))]
H = [D2[0]]

M = [np.zeros((n_samples,n_inputs,n_inputs,n_inputs,n_inputs))]
N = [np.zeros((n_samples,n_inputs,n_inputs,n_inputs,n_inputs))]
J = [D3[0]]

In [None]:
counter = 0
for layer in range(sens_origin_layer + 1, sens_end_layer):
    counter += 1

    # First derivatives
    z_ = D_[counter - 1] @ W[layer][1:,:] 
    D_.append(z_ @ D[layer])

    # Second derivatives
    q = np.matmul(np.matmul(H[counter-1], W[layer][1:,:]).swapaxes(0,1), D[counter]).swapaxes(0,1)
    h = np.matmul(z_, np.matmul(z_, D2[layer].swapaxes(0,1)).swapaxes(0,2)).swapaxes(0,2).swapaxes(0,1)
    Q.append(q)
    H.append(h + q)

    # Third derivatives
    m = np.matmul(np.matmul(J[counter-1], W[layer][1:,:]).swapaxes(0,2), D[counter]).swapaxes(0,2)
    hx = np.matmul(np.matmul(H[counter-1], W[layer][1:,:]).swapaxes(0,1),np.array([np.diagonal(D2[layer][i,:,:,:]) for i in range(D2[layer].shape[0])])).swapaxes(0,1)
    n = np.matmul(z_,np.broadcast_to(np.expand_dims(hx, axis=4), list(hx.shape) + [hx.shape[-1]]).swapaxes(0,2)).swapaxes(0,2)
    j = np.matmul(z_, np.matmul(z_, np.matmul(z_, D3[layer].swapaxes(0,2)).swapaxes(0,3)).swapaxes(1,3)).swapaxes(1,3).swapaxes(0,3).swapaxes(0,2)
    M.append(m)
    N.append(n)
    J.append(j + m + n + n.swapaxes(2, 3).swapaxes(1, 2) + n.swapaxes(1, 2).swapaxes(2, 3))


In [None]:
W[layer][1:,:]

In [None]:
J[counter-1][0,:,:,:,:]

In [None]:
m = np.matmul(J[counter-1], W[layer][1:,:])

In [None]:
meanSens = np.mean(m, axis=0)
stdSens = np.std(m, axis=0)
meansquareSens = np.mean(np.square(m), axis=0)

In [None]:
print(meanSens[2,3,4,0],stdSens[2,3,4,0])

In [None]:
if sens_end_input:
    raw_sens = M[counter] 
else:
    raw_sens = J[counter]

# Calculate sensitivity measures for each input and output 
meanSens = np.mean(raw_sens, axis=0)
stdSens = np.std(raw_sens, axis=0)
meansquareSens = np.mean(np.square(raw_sens), axis=0)

In [None]:
print(meanSens[2,3,4,0],stdSens[2,3,4,0])

In [None]:
print(meanSens[0,1,5,0],stdSens[0,1,5,0])

In [None]:
print(meanSens[7,7,7,0],stdSens[7,7,7,0])