In [1]:
## Libraries
from scipy.integrate import odeint, solve_ivp
import numpy as np

In [2]:
## libraries for NN driven model
# Tensorflow / Keras
from tensorflow import keras # for building Neural Networks
print('Tensorflow/Keras: %s' % keras.__version__) # print version
from keras.models import Sequential # for creating a linear stack of layers for our Neural Network
from keras import Input # for instantiating a keras tensor
from keras.layers import Dense # for creating regular densely-connected NN layers.
import keras.metrics as metrics

# # Data manipulation
# import pandas as pd # for data manipulation
# print('pandas: %s' % pd.__version__) # print version
# import numpy as np # for data manipulation
# print('numpy: %s' % np.__version__) # print version

# Sklearn
import sklearn # for model evaluation
print('sklearn: %s' % sklearn.__version__) # print version
# from sklearn.model_selection import train_test_split # for splitting data into train and test samples
# from sklearn.metrics import classification_report # for model evaluation metrics

2023-08-16 12:49:50.541075: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Tensorflow/Keras: 2.11.0
sklearn: 1.3.0


# Define the LWR models

## Lin and Log models

In [3]:
## Traffic dynamic with LWR-model

def TD_LWR_model(t, X, N, v0, L, flag):
    """
    Lighthill-Whitham-Richards (LWR) traffic flow model in 1D
    """
    #N, v0, L, flag = args[0], args[1], args[2], args[3]
    
    # W function
    match flag:
        case "Lin":
            W = lambda z: v0*(1-1/z)
        case "Log":
            W = lambda z: v0*np.log(z)
        case _:
            return f"No match for {flag}, you can only choose between \"Lin\" and \"Log\""

    # ode sys
    d_x = np.zeros(N)
    
    for i in range(0,N-1):
        tmp = (X[i+1] - X[i])/L
        d_x[i] = W(tmp)

    d_x[N-1] = v0
        
    return d_x

## NN driven model

### Define the Traffic Dynamic model drven by a NN

In [None]:
def havesametype(A,B):
    
    """
    check if the two entries A and B are both scalars or both lists
    """
    
    result = False
    test = [np.isscalar(A), np.isscalar(B)]
    
    if len(set(test)) == 1:
        result = True
    
    return result

In [4]:
## Traffic dynamic with ANN

def TD_ANN_model(t, X, vel):
    
    """
    Lighthill-Whitham-Richards (LWR) traffic flow model in 1D
    Transform a list as vel into a function of t,X.
    """
    
#    # X will be only stored in a list, even if is only a scalar   
#     if np.isscalar(vel):
#         pass
#     else: # both lists
#         if len(X) != len(vel):
#             return f"There is a mismatch btw X and vel lenghts as arrays"

    d_x = vel
        
    return d_x

### Ode solver for the NN driven model

In [8]:
def odesolver_ann(x0, vel, t0, tend, deltat = 0.05):
    
    """
    odesolver_ann solves the TD_ANN_model ode system:
    * in [t0, tend] with timestep as deltat,
    * starting from x0
    * vel is the rhs passed to TD_ANN_model to create the model
    """
    
    Nt = int((tend-t0)/deltat) + 1          # number of discretization points (we need to cast the value)
    tspan_ann = np.linspace(t0, tend, Nt)   # timespan
    
#     if havesametype(x0,vel) == True:
#         sol_ann = odeint(TD_ANN_model, x0, tspan_ann, args=(vel,), tfirst = True).T
#     else:
#         return f"X and vel have different dimensions: one is scalar, the other is a list"

    sol_ann = odeint(TD_ANN_model, x0, tspan_ann, args=(vel,), tfirst = True).T

    return tspan_ann, sol_ann

### Create and train a neural network

In [5]:
def create_model(network):
    
    """
    Create the neural network model to define our TD_ANN_model ode system.
    """
    
    model = Sequential()

    # Input Layer
    model.add(Input(shape=(network[0],), name='Input-Layer'))

    # Hidden Layers
    num_layers = len(network)
    for i in range(1,num_layers-1):
        label = "Hidden-Layer-" + str(i)
        #initializer = keras.initializers.RandomUniform(minval=-1., maxval=1.) # init weights in U[-1,1]
        ## softplus(x) = log(exp(x) + 1)
        model.add(Dense(network[i], activation='softplus',
                        use_bias=True,
                        bias_initializer = keras.initializers.RandomUniform(minval=-1., maxval=1.), # init weights in U[-1,1]
                        kernel_initializer=keras.initializers.RandomUniform(minval=-1., maxval=1.),
                        name=label))

    # Output Layer
    initializer = keras.initializers.RandomUniform(minval=-1., maxval=1.) # init weights in U[-1,1]
    ## linear(x) = x
    model.add(Dense(network[-1], activation='linear',
                    use_bias=True,
                    bias_initializer = keras.initializers.RandomUniform(minval=-1., maxval=1.), # init weights in U[-1,1]
                    kernel_initializer=keras.initializers.RandomUniform(minval=-1., maxval=1.),
                    name='Output-Layer'))

    return model

In [6]:
def train_nn(model, X, y, batch_size=10, epochs=3, verbose='auto'):
    
    """
    Compile the neural network model and fit using X and y.
    """
    
    ## Compile keras model
    model.compile(optimizer='adam',
                     loss='mean_squared_error', # Loss function to be optimized.
                     metrics=[metrics.mae]      # List of metrics to be evaluated by the model during training and testing.
                    )

    ## Fit keras model on the dataset
    model.fit(X, # input data
              y, # target data
              batch_size,  # default=32, Number of samples per gradient update.
              epochs,      # default=1, Number of epochs to train the model.
                           # An epoch is an iteration over the entire x and y data provided
              verbose
             )

    return model

In [7]:
def get_stats(model,flag_summary=True):
    """
    function to plot stats of our model
    """
    if flag_summary==True:
        print("")
        print('-------------------- Model Summary --------------------')
        model.summary() # print model summary
        print("")
    print('-------------------- Weights and Biases --------------------')
    for layer in model.layers:
        print("Layer: ", layer.name) # print layer name
        print("  --Kernels (Weights): ", layer.get_weights()[0]) # kernels (weights)
        print("  --Biases: ", layer.get_weights()[1]) # biases

    print("")
    
    return

### Solve the NN-driven model in a scene

In [9]:
def create_data_ann_scene(scn):
    
    """
    create_data_ann_scene gives the X and y for an entire scene
    
    X_arr, y_arr = create_data_ann_scene(scn)
    """
    
    ## Create X
    X_arr = scn['Cons Dis']
    
    ## Create y
    dX_arr = np.diff(scn['Xarr'],axis=1)
    dT_arr = np.diff(scn['Tarr'])
    velocity = dX_arr/dT_arr # velocity at the timestamps

    # we choose the first velocity discretized as (x_(i+1)-x_i)/deltaT
    y_arr = velocity[:-1] #drop the last vehicle
    
    return X_arr, y_arr

In [12]:
def odesolver_ann_scene(nn_model, scn, batch_size, epochs, v0, tout="end", verbose="auto"):
    
    """
    * tout = "end" for the solution at final t
    * tout = "all" for the solution over all the time interval
    """
    
    N = scn['N. vehicles']
    tstamps = scn['Tarr']
    
    vel_ann_list = []
    t_ann_list = [scn['Tarr'][0]]
    x_ann_list = [[i] for i in scn['Xarr'][:,0]]

    X_arr, y_arr = create_data_ann_scene(scn)
    
    for_end = len(tstamps)-1
    
    print(f"We have {for_end} time intervals inside [{tstamps[0]},{tstamps[-1]}]\n")
    
    for i in range(0,for_end):

        print("--"*50)
        print(f"Time interval n.{i}: [{scn['Tarr'][i]},{scn['Tarr'][i+1]}]\n")
        
        ## STEP 1: Neural Network
        # Create the dataset and train the nn model
        X, y = X_arr[:,i], y_arr[:,i]
        train_nn(nn_model, X, y, batch_size, epochs, verbose)
        
        # Predict the rhs of the TD_ANN_model
        y_pred = nn_model(X).numpy().flatten().tolist() # To avoid that the funcion is retraced at every call.
        vel_ann_list.append(np.append(y_pred,v0).tolist())

        print(f"\
        * y_true: {y}\n\
        * y_pred: {y_pred}\n")
        
        ## STEP 2: Solve the ODE sys in this time interval
        # x0 = scn['Xarr'][:,i]                                     # not use the true value
        x0 = [list[-1] for list in np.vstack(x_ann_list).tolist()]  # last values computed
        t0, tend = scn['Tarr'][i], scn['Tarr'][i+1]
        vel_ann = vel_ann_list[i]
        tspan_ann, sol_ann = odesolver_ann(x0, vel_ann, t0, tend, deltat = 0.05)

        
        ## Only last value
        if tout == "end":
            x_ann = sol_ann[:,-1].tolist()
            t_ann = tspan_ann[-1]

            # add the solution to the correct vehicle path
            for j in range(0,N):
                x_ann_list[j].append(x_ann[j])
            t_ann_list.append(t_ann)
            
            
        ## In all the time interval
        if tout == "all":
            x_ann = sol_ann.tolist()
            t_ann = tspan_ann[1:]

            # add the solution to the correct vehicle path
            for j in range(0,N):
                # drop the first recording, because added before the for
                tmp = x_ann[j][1:]
                x_ann_list[j] = np.concatenate([x_ann_list[j],tmp])
            t_ann_list = np.concatenate([t_ann_list,t_ann]).tolist()

        print("--"*50)
            
    return t_ann_list, x_ann_list, vel_ann_list