# Learning the CBF for the Nonlinear System
This Notebooks contains the learning pipeline used to train the model

In [None]:
""" Copyright (c) 2023, ETH Zurich, 
Alexandre Didier*, Robin C. Jacobs*, Jerome Sieber*, Kim P. Wabersich°, Prof. Dr. Melanie N. Zeilinger*, 
*Institute for Dynamic Systems and Control, D-MAVT
°Corporate Research of Robert Bosch GmbH
All rights reserved."""

In [1]:
import numpy as np
import torch
import pickle
import matplotlib.pyplot as plt
from apcbf.custom_dataset import *
from apcbf.approximator_nonlinear import *
from apcbf.dynamic import *
from apcbf.pcbf_nonlinear import *
from torch.utils.data.sampler import RandomSampler
from torch.utils.data.dataloader import DataLoader
from tqdm.notebook import tqdm
from pytope import Polytope

In [2]:
# Plotting settings
plt.rcParams['figure.figsize'] = [15, 10]
plt.rc('font', **{'family': 'serif', 'serif': ['Computer Modern'], 'size': 24})
plt.rc('text', usetex=True)

In [3]:
if torch.cuda.is_available():
    # device = 'cuda'
    device = torch.device('cuda:0')
else :
    device = 'cpu'
print(device)

cuda:0


### Load datasets

In [4]:
train_data_set_1 = torch.load('../data/train_data_rand_2000.0k_nonlinear_paper.pt')
train_data_set_2 = torch.load('../data/train_data_geom_sampl_thresh_100_8941.995k_nonlinear_paper.pt')
val_data = torch.load('../data/val_data_geom_sampl_thresh_100_13.155k_nonlinear_paper.pt')

# concat two datasets
x_train = torch.cat([train_data_set_1.X, torch.from_numpy(train_data_set_2.X)]).float()
y_train = torch.cat([train_data_set_1.y, torch.from_numpy(train_data_set_2.y).reshape(-1,1)]).float()
train_data_set = SimpleData(x_train, y_train)

x_val = val_data.X
y_val = val_data.y

In [5]:
len(train_data_set)

10941995

In [6]:
x_val = torch.from_numpy(x_val).float()
y_val = torch.from_numpy(y_val).float().reshape(-1,1)

In [7]:
train_data_set.X = train_data_set.X.to(device)
train_data_set.y = train_data_set.y.to(device)
x_val = x_val.to(device)
y_val = y_val.to(device)

In [8]:
train_data_set.y.shape

torch.Size([10941995, 1])

### Data transformation

In [9]:
use_log_space = True

In [10]:
if use_log_space :
    with torch.no_grad():
        train_data_set.y = torch.log(1 + train_data_set.y)
        y_val = torch.log(1 + y_val)

### Load NN Model

In [11]:
LOAD_CHECKPOINT = False
LOAD_PATH = '../models/ext_model_nlup_NNplus_100_02_06_11_35.pt'
LOG_NAME = LOAD_PATH[17:-5]
if not LOAD_CHECKPOINT :
    LOG_NAME = 'UNSPECIFIED_NONLINEAR'
print(LOG_NAME)

UNSPECIFIED_NONLINEAR


### Model Selection

In [12]:
model = HpbApproximatorNonLinGplus()
model.to(device)

HpbApproximatorNonLinGplus(
  (first_lin): Linear(in_features=4, out_features=64, bias=True)
  (second_lin): Linear(in_features=64, out_features=64, bias=True)
  (third_lin): Linear(in_features=64, out_features=64, bias=True)
  (fourth_lin): Linear(in_features=64, out_features=1, bias=True)
)

## Training 

In [13]:
batch_size = 512
epochs = 1

In [14]:
epoch_list = []
train_data_set_loader = DataLoader(train_data_set, batch_size=batch_size, sampler=RandomSampler(train_data_set))
# loss_fn = nn.MSELoss()
loss_fn = nn.HuberLoss(delta=2)

In [15]:
optimizer = torch.optim.Adam(model.parameters())

In [16]:
if LOAD_CHECKPOINT :
    #model = torch.load(PATH)
    checkpoint = torch.load(LOAD_PATH)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch_list += checkpoint['epoch']
    loss_values_combined = checkpoint['loss']
    val_loss_values_combined = checkpoint['valloss']
    print("Loaded checkpoint")
else : # Start fresh
    print("Fresh start")
    loss_values_combined = []
    val_loss_values_combined = []

Fresh start


Start training

In [17]:
loss_values = []
val_loss_values = []
epoch_list.append(epochs)
# Some code snippets taken from pytorch tutorial : 
# https://pytorch.org/tutorials/beginner/basics/optimization_tutorial.html

val_loss = 0 
#if len(val_loss_values_combined) > 0 :
#    val_loss = val_loss_values_combined[-1]

for t in tqdm(range(epochs)):
    #print(f"Epoch {t+1}\n-------------------------------")
    
    running_loss = 0
    # Training loop
    size = len(train_data_set_loader.dataset)
    for batch, (X, y) in tqdm(enumerate(train_data_set_loader), total=size//batch_size):
        pred = model(X)
#       with torch.no_grad() :
#             ymod = torch.log(y+1)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() 

        if batch % 50 == 0 and False:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
        
    loss_values.append(running_loss/len(train_data_set_loader))
    
    if t%10 == 0 :
        print(f"training loss: {loss_values[t]:>7f} epoch: [{t:>5d}/{epochs}]")        
        
   
    # Test set validaton 
    with torch.no_grad() :
        validation_pred = model(x_val.reshape(-1,x_val.shape[1]).float())
        #y_val_mod = torch.log(y_val+1)
        val_loss = loss_fn(validation_pred, y_val)
        if t%10 == 0 :
            print(f'validation loss: {val_loss:>7f}')
    
    val_loss_values.append(val_loss)
    
loss_values_combined += loss_values
val_loss_values_combined += val_loss_values

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/21371 [00:00<?, ?it/s]

training loss: 0.045791 epoch: [    0/1]
validation loss: 0.002265


In [18]:
loss_values_combined[-1]

0.0457912093896606

In [19]:
val_loss_values_combined[-1]

tensor(0.0023, device='cuda:0')

In [20]:
# Computing moving average validation loss (less noise)
vloss = torch.FloatTensor(val_loss_values_combined)
moving_avg = []
window = 15
for k in range(len(vloss)-1):
    moving_avg.append((torch.mean(vloss[(k-window):(k)])))

In [21]:
#loss_values_combined = [value.cpu() for value in loss_values_combined]
val_loss_values_combined = [value.cpu() for value in val_loss_values_combined]
moving_avg = [value.cpu() for value in moving_avg]

In [23]:
model.to('cpu')

HpbApproximatorNonLinGplus(
  (first_lin): Linear(in_features=4, out_features=64, bias=True)
  (second_lin): Linear(in_features=64, out_features=64, bias=True)
  (third_lin): Linear(in_features=64, out_features=64, bias=True)
  (fourth_lin): Linear(in_features=64, out_features=1, bias=True)
)

### Extract error from trained model

In [None]:
non_lin_cont = NonLinearContinuousDynamics(None, 4, 2) #placeholder
# Input constraints
constraint_dict = pickle.load(open("../params/non_linear_constraints_params.p", "rb"))
X = constraint_dict["X"]
U = constraint_dict["U"]                   
params_dict = pickle.load(open( "../params/non_linear_termset_params.p", "rb" ))
delta_i = lambda i : i*0.004 
print(params_dict['gamma_x'])
pcbf = NONLINEAR_PCBF_OPT(non_lin_cont, X, U, delta_i, params_dict, N=50, verbose=False)

In [None]:
# Evaluate learned model on 813k data set
test_data = torch.load('../data/val_data_813k_nonlinear.pt')
hpb_learned = torch.from_numpy(np.zeros_like(test_data.y))
for idx, point in tqdm(enumerate(test_data)) :
    x = point[0]
    y_true = point[1]
    
    with torch.no_grad() :
        hpb_temp = model(torch.tensor(x).reshape(-1,4).float())
        if use_log_space :
            hpb_learned[idx] = torch.exp(hpb_temp) - 1
        else :
            hpb_learned[idx] = hpb_temp
hpb_optim = torch.from_numpy(test_data.y)
hpb_optim.max()

In [None]:
#np.save(f"data/true_nonlinear_temp_hpb_{n_points}_gammax_{params_dict['gamma_x']: .4f}_scale_{scaling_factor}", hpb_optim) 
#hpb_optim = np.load('data/.npy')

In [None]:
# Compute Error
h_abs_err = np.abs(hpb_learned - hpb_optim)
sqr_err = np.square(hpb_learned - hpb_optim)
rms_err = np.sqrt(sqr_err.mean())
norm_rms_err = rms_err / (hpb_learned.max()-hpb_learned.min())
print('min hpb learned ', hpb_learned.min().numpy())
print('max_error',h_abs_err.max().numpy())
print('min_error',h_abs_err.min().numpy())
print('mse ', sqr_err.mean().numpy())
print('average_error', h_abs_err.mean().numpy())
print('median_error', h_abs_err.median().numpy())
print('variance error', h_abs_err.var().numpy())
print('std ', h_abs_err.std().numpy())
print('RMS error', rms_err.numpy())
print('Normalized RMS error', norm_rms_err.numpy())
print('---')
print(f'{h_abs_err.max().numpy():.3f}, {h_abs_err.mean().numpy():.3f}/{h_abs_err.median().numpy():.3f}, {h_abs_err.var().numpy():.3f}/{h_abs_err.std().numpy():.3f}, {rms_err.numpy():.3f}/{norm_rms_err.numpy():.3f} ')

In [None]:
from tqdm.notebook import tqdm
#%matplotlib widget
n_points = 100
x1 = np.linspace(-2.3,2.3,n_points)
x2 = np.linspace(-2,2,n_points)
x1,x2 = np.meshgrid(x1,x2)
#pcbf =  NONLINEAR_PCBF_OPT(non_lin_disc, X, U, delta_i, param_dict)
hpb = np.zeros((len(x1), len(x2)))
h_ind = np.zeros((len(x1), len(x2)))
#x_safe, u_safe = sim.simulate_discrete(x0, lin_sys, controller_object=algo, Nsteps=N_steps)

numb_infeasible = 0
failure_pts = []

for j in tqdm(range(len(x1))):
    for k in tqdm(range(len(x2)), leave = False):
        #_, _, hpb[j,k], _ ,  _ = pcbf.solve(np.array([x1[j,k],x2[j,k], 0, 0])) 
        with torch.no_grad():
            hpb[j,k] = model(torch.tensor([x1[j,k],x2[j,k], 0, 0]).float())
            if use_log_space :
                hpb[j,k] = np.exp(hpb[j,k]) - 1
        if hpb[j,k] < 0.001 :
            h_ind[j,k] = 1


### Save model + some training data/statistics

