In [None]:
"""

Title: "Neural Barrier Certificates for Monotone Systems"
Code for: Learing a mixed-monotone neural barrier certificate for a traffic networks model.
Author: Amirreza Alavi
Contact: seyedamirreza.alavi@colorado.edu
Affiliation: University of Colorado Boulder
Date: 03/2025

"""

In [None]:
import tensorflow as tf 
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from numpy import linalg as LA
from scipy import stats
from functools import partial
%matplotlib inline
import numpy as np
from scipy.optimize import fsolve
from tensorflow.keras.layers import Input, Dense, Concatenate, Add
from tensorflow.keras import Model
from tensorflow.keras import regularizers
from tensorflow.keras.models import load_model
import time
from mpl_toolkits import mplot3d
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from mpl_toolkits.mplot3d import Axes3D  
from math import inf

In [None]:
def phi_cr(q):
    
    return 10.0 * (1 - np.exp(-0.138 * q)) 

def sigma_cr(q):

    return 10.0 * (1 - (q / 50.0)**4) * (q < 50.0)


def phi(q):
    
    return 15.0 * (1 - np.exp(-0.038 * q))

def sigma(q):

    return 15.0 * (1 - (q / 100.0)**4) * (q < 100.0)

def phi_wrap(x):
    return np.array(phi(x), dtype=np.float64)

def sigma_wrap(x):
    return np.array(sigma(x), dtype=np.float64)

def phi_cr_wrap(x):
    return np.array(phi_cr(x), dtype=np.float64)

def sigma_cr_wrap(x):
    return np.array(sigma_cr(x), dtype=np.float64)

def split_hypercubenewmethod(lower, upper, split):

    lower = np.asarray(lower, dtype=np.float64)
    upper = np.asarray(upper, dtype=np.float64)
    split = np.asarray(split, dtype=np.float64)
    D = lower.shape[0]

    qund_list = []
    qbar_list = []
    
    
    for i in reversed(range(D)):
        new_lower = lower.copy()
        new_lower[i] = split[i]
        qund_list.append(new_lower)
        qbar_list.append(upper.copy())
        
    
    qund_list.append(lower.copy())
    qbar_list.append(split.copy())
    
    return np.array(qund_list, dtype=np.float64), np.array(qbar_list, dtype=np.float64)

def generate_q0(N):
    if N < 2:
        raise ValueError("N must be at least 2 to accommodate the fixed values.")
    
    q0bar = np.full((1, N), 5)
    
    q0bar[0, 1] = 18
    
    q0und = np.zeros((1, N), dtype=int)
    
    return q0bar, q0und

def generate_qu(N):
    if N < 2:
        raise ValueError("N must be at least 2 to accommodate the fixed second element.")
    
    qubar = np.full((1, N), 50)
    
    quund = np.zeros((1, N), dtype=int)
    
    quund[0, 1] = 30
    
    return qubar, quund

def generate_bounds(N):
    if N < 2:
        raise ValueError("N must be at least 2 to accommodate the fixed first two elements.")

    lower = [0] * N

    upper = [50] * N
    upper[1] = 30

    split = [5, 18]
    pattern = [5, 5, 5, 5]
    remaining = N - 2  
    
    for i in range(remaining):
        split.append(pattern[i % len(pattern)])
    
    return lower, upper, split

In [None]:
N = 20 

q0bar, q0und = generate_q0(N)
qubar, quund = generate_qu(N)
lower, upper, split = generate_bounds(N)

qund, qbar = split_hypercubenewmethod(lower, upper, split)

h=0.1

qbars = tf.split(qbar, num_or_size_splits=qbar.shape[1], axis=1)
qunds = tf.split(qund, num_or_size_splits=qbar.shape[1], axis=1)


phi_qbar = []
sigma_qbar = []

for i, q in enumerate(qbars):
    
    if i == 1:
        phi_qbar.append(tf.numpy_function(phi_cr_wrap, [q], tf.float64))
    else:
        phi_qbar.append(tf.numpy_function(phi_wrap, [q], tf.float64))
    
    if i == 0:
        sigma_qbar.append(None)
    else:
        if i == 1:
            sigma_qbar.append(tf.numpy_function(sigma_cr_wrap, [q], tf.float64))
        else:
            sigma_qbar.append(tf.numpy_function(sigma_wrap, [q], tf.float64))

phi_qund = []
sigma_qund = []

for i, q in enumerate(qunds):
    if i == 1:
        phi_qund.append(tf.numpy_function(phi_cr_wrap, [q], tf.float64))
    else:
        phi_qund.append(tf.numpy_function(phi_wrap, [q], tf.float64))
        
    if i == 0:
        sigma_qund.append(None)
    else:
        if i == 1:
            sigma_qund.append(tf.numpy_function(sigma_cr_wrap, [q], tf.float64))
        else:
            sigma_qund.append(tf.numpy_function(sigma_wrap, [q], tf.float64))



F_qbar = []

F_qbar.append(tf.constant(9.0, shape=(N+1, 1), dtype=tf.float64))

for i in range(len(phi_qbar) - 1):
    F_qbar.append(tf.minimum(phi_qbar[i], sigma_qbar[i + 1]))

F_qbar.append(phi_qbar[-1])


F_qund = []

F_qund.append(tf.constant(9.0, shape=(N+1, 1), dtype=tf.float64))

for i in range(len(phi_qbar) - 1):
    F_qund.append(tf.minimum(phi_qund[i], sigma_qund[i + 1]))

F_qund.append(phi_qund[-1])


qbarsn_list = []
for i in range(len(qbars)):
    qbarsn = qbars[i] + h * (F_qbar[i] - F_qbar[i + 1])
    qbarsn_list.append(qbarsn)

fqbar = tf.concat(qbarsn_list, axis=1)


qundsn_list = []
for i in range(len(qunds)):
    qundsn = qunds[i] + h * (F_qund[i] - F_qund[i + 1])
    qundsn_list.append(qundsn)

fqund = tf.concat(qundsn_list, axis=1)

In [None]:
# H1

modelh1 = tf.keras.Sequential()
modelh1.add(layers.Dense(40, use_bias=True, activation = 'tanh' ,input_shape=(N,),kernel_constraint=tf.keras.constraints.NonNeg()))
modelh1.add(layers.Dense(40, use_bias=True, activation = 'tanh' , kernel_constraint=tf.keras.constraints.NonNeg()))
modelh1.add(layers.Dense(40, use_bias=True, activation = 'tanh' ,kernel_constraint=tf.keras.constraints.NonNeg()))
modelh1.add(layers.Dense(40, use_bias=True, activation = 'tanh' ,kernel_constraint=tf.keras.constraints.NonNeg()))
modelh1.add(layers.Dense(40, use_bias=True, activation = 'tanh' ,kernel_constraint=tf.keras.constraints.NonNeg()))
modelh1.add(layers.Dense(1,use_bias=True, activation = 'linear',kernel_constraint=tf.keras.constraints.NonNeg()))

# H2

modelh2 = tf.keras.Sequential()
modelh2.add(layers.Dense(40, use_bias=True, activation = 'tanh' , input_shape=(N,),kernel_constraint=tf.keras.constraints.NonNeg()))
modelh2.add(layers.Dense(40, use_bias=True, activation = 'tanh' , kernel_constraint=tf.keras.constraints.NonNeg()))
modelh2.add(layers.Dense(40, use_bias=True, activation = 'tanh' , kernel_constraint=tf.keras.constraints.NonNeg()))
modelh2.add(layers.Dense(40, use_bias=True, activation = 'tanh' , kernel_constraint=tf.keras.constraints.NonNeg()))
modelh2.add(layers.Dense(40, use_bias=True, activation = 'tanh' , kernel_constraint=tf.keras.constraints.NonNeg()))
modelh2.add(layers.Dense(1,use_bias=True, activation = 'linear',kernel_constraint=tf.keras.constraints.NonNeg()))

lr1_initial = 1e-2
lr2_initial = 1e-2

opt1 = tf.keras.optimizers.Adam(lr1_initial)
opt2 = tf.keras.optimizers.Adam(lr2_initial)

In [None]:
epochs = 300000

t1 = time.perf_counter()
eta= 0.001
etaa=0.0000001

for i in range(epochs):

    if i == 500:
        new_lr1 = 1e-3 
        new_lr2 = 1e-3 
        opt1.learning_rate.assign(new_lr1)
        opt2.learning_rate.assign(new_lr2)

    
    with tf.GradientTape() as tape0, tf.GradientTape() as tape1:
        
        
        H1_q0bar = tf.reshape(modelh1(q0bar, training=True), [q0bar.shape[0], 1])
        H1_q0und = tf.reshape(modelh1(q0und, training=True), [q0und.shape[0], 1])
        H1_qubar = tf.reshape(modelh1(qubar, training=True), [qubar.shape[0], 1])
        H1_quund = tf.reshape(modelh1(quund, training=True), [quund.shape[0], 1])
        H1_qund = tf.reshape(modelh1(qund, training=True), [qund.shape[0], 1])
        H1_qbar = tf.reshape(modelh1(qbar, training=True), [qbar.shape[0], 1])
        
        H2_q0und = tf.reshape(modelh2(q0und, training=True), [q0und.shape[0], 1])
        H2_q0bar = tf.reshape(modelh2(q0bar, training=True), [q0bar.shape[0], 1])
        H2_quund = tf.reshape(modelh2(quund, training=True), [quund.shape[0], 1])
        H2_qubar = tf.reshape(modelh2(qubar, training=True), [qubar.shape[0], 1])
        H2_qund = tf.reshape(modelh2(qund, training=True), [qund.shape[0], 1])
        H2_qbar = tf.reshape(modelh2(qbar, training=True), [qbar.shape[0], 1])

        H1fqbar = tf.reshape(modelh1(fqbar, training=True), [fqbar.shape[0], 1])
        H2fqund = tf.reshape(modelh2(fqund, training=True), [fqund.shape[0], 1])

   
        loss_h3 = tf.reduce_sum(tf.nn.relu(tf.subtract(eta,eta)))
        loss_h4 = tf.reduce_sum(tf.nn.relu(tf.subtract(eta,eta)))


        loss_h3 += tf.reduce_sum(tf.nn.relu(tf.subtract(H1_q0bar-H2_q0und,-etaa))) 
        loss_h3 += tf.reduce_sum(tf.nn.relu(etaa-(H1_quund-H2_qubar)))
        loss_h3 += tf.reduce_sum(tf.math.sign(tf.nn.relu(-(H1_qund-H2_qbar)))* tf.nn.relu(H1fqbar-H2fqund))

        
        loss_h4 += tf.reduce_max(tf.square(tf.nn.relu(H1_q0bar-H2_q0und + 30))) 
        loss_h4 += tf.reduce_max(tf.square(tf.nn.relu(eta-H1_quund+H2_qubar)))
        loss_h4 += tf.reduce_max(tf.math.sign(tf.nn.relu(-(H1_qund-H2_qbar)))* tf.square(tf.nn.relu(H1fqbar-H2fqund)))


        if tf.reduce_sum(loss_h3) == 0 and i > 11:
            print(f"Training terminated at epoch {i} as loss reached exactly zero.")
            break
        
        grad_h1 = tape0.gradient(loss_h4, modelh1.trainable_variables)
        grad_h2 = tape1.gradient(loss_h4, modelh2.trainable_variables)
        
        
        if (i // 10) % 2 == 0:
            clipgrad_h1 = [tf.clip_by_norm(g, 2) for g in grad_h1]
            opt1.apply_gradients(zip(clipgrad_h1, modelh1.trainable_variables))
        else:
            if grad_h2 is not None:
                clipgrad_h2 = [tf.clip_by_norm(g, 2) for g in grad_h2]
                opt2.apply_gradients(zip(clipgrad_h2, modelh2.trainable_variables))

        if i % 50 == 0:
            print(f"Epoch {i}: loss_h3 = {loss_h3.numpy()}, loss_h4 = {loss_h4.numpy()}")
            
    
t2 = time.perf_counter()
tot = t2 - t1
print(f'Total runtime: {tot}s')
