# Nonlinear 1D Verified

This notebook provides a tutorial on how to build a dynamically defined consolidated control barrier function controller for a 1-dimensional nonlinear dynamical system. The tutorial consists of 3 sections: 1) constructing the control law, 2) building the simulator, and 3) parameter tuning via simulation-based testing.

Before proceeding with the first section, some generic import statements are required.

In [1]:
from typing import List, Callable, Tuple
from jax import jit, Array, jacfwd, jacrev
import jax.numpy as jnp

### I. Constructing the Control Law

#### I.A Background

In this section, the dynamically-defined consolidated control barrier function (C-CBF) based control law will be defined. A dynamically-defined C-CBF controller is used in the context of a control-affine, nonlinear dynamical system of the form
$$\dot x = f(x) + g(x)u, \quad x(t_0) = x_0,$$
where $x \in \mathbb R^n$ is the state vector, $u \in \mathcal U \subset \mathbb R^m$ is the control input vector with input constraint set $\mathcal U = \{-u_{max} \leq u \leq u_{max}\}, and where $f: \mathbb R^n \mapsto \mathbb R^n$ and $g: \mathbb R^n \mapsto \mathbb R^{n \times m}$ are known, Lipschitz continuous functions such that for a Lipschitz continuous state-feedback control law the system has a unique solution over the maximal interval of existence $\mathcal T = [t_0, \infty)$. The system is subject to a collection of (possibly time-varying) state constraints, each of which admits a set of states $\mathcal S_i$ over which the constraint is satisfied, i.e., 
$$\mathcal S_i(t) = \{x \in \mathbb R^n \mid h_i(t, x) \geq 0\},$$
for continuously differentiable functions $h_i \in \mathcal H, i \in [c]$. The objective of the controller is to contain the state of the system inside the intersection of all the constraints sets, in other words to ensure that
$$x(t) \in \mathcal S(t) = \bigcap_{i=1}^c \mathcal S_i(t), \; \forall t \in \mathcal T.$$

#### I.B State Dynamics

The state dynamics will now be generated using a procedure from the ```cbfkit``` package.

In [2]:
from cbfkit.utils.user_types import DynamicsCallable
from cbfkit.codegen.create_new_system import generate_model

# Define dynamics
drift_dynamics = "x[0] * (exp(k * x[0] ** 2) - 1)"  # drift dynamics f(x)
control_matrix = "(4 - x[0] ** 2)"  # control matrix g(x)

# Define dynamics parameters for code-gen
params = {"dynamics": {"k: float": 1.0}}

# Define target dir and name for code-gen
target_directory = "../src/ccbf/systems/nonlinear_1d/models/"
model_name = "black2024consolidated"

# Generate new model code
generate_model.generate_model(
    directory=target_directory,
    model_name=model_name,
    drift_dynamics=drift_dynamics,
    control_matrix=control_matrix,
    params=params,
)


Now, a function to compute the state dynamics ($f(x)$ and $g(x)$) will be defined.

In [4]:
from src.ccbf.systems.nonlinear_1d.models import black2024consolidated
from examples.nonlinear_1d import setup

controlled_x_dynamics = black2024consolidated.plant(k=setup.K_DYNAMICS)
modified_x_dynamics = lambda s: (
    controlled_x_dynamics(s)[0] + jnp.matmul(controlled_x_dynamics(s)[1], s[setup.IDX_U]),
    jnp.zeros((setup.N_STATES, setup.N_CONTROLS))
)

ModuleNotFoundError: No module named 'src'

#### I.B Adaptive Controller Dynamics

In [None]:
from cbfkit.controllers.model_based.cbf_clf_controllers.dynamically_defined_consolidated_cbf_controller import (
    generate_compute_w2dot_udot,
)
from cbfkit.modeling.augmented_dynamics import (
    generate_w_dynamics,
    generate_augmented_dynamics,
)


# Define CBF constraint function
def cbf_constraint(z: Array) -> Array:
    return jnp.array(
        [
            jnp.matmul(
                cbf_grad(z)[setup.IDX_X],
                controlled_x_dynamics(z)[0]
                + jnp.matmul(controlled_x_dynamics(z)[1], z[setup.IDX_U]),
            )
            + jnp.matmul(cbf_grad(z)[setup.IDX_X], z[setup.IDX_WDOT])
            + setup.LINEAR_CLASS_K * cbf(z)
            for cbf, cbf_grad in zip(setup.cbfs, setup.cbf_grads)
        ]
    )


# Define pure adaptation weight dynamics
w_dynamics = generate_w_dynamics(setup.N_STATES, setup.N_CBFS, setup.N_CONTROLS)


# Define adaptation optimization problem
def cost_wdot(z):
    w0 = 1.0
    wdot_des = -1.0 * (z[setup.IDX_W] - w0)
    return 0.5 * jnp.linalg.norm(wdot_des - z[setup.IDX_WDOT] + setup.EPS)


#! Change setup.cbfs to imported from code-gen
def adaptation_lower_bound(z: Array) -> Array:
    return -1 * (z[setup.IDX_WDOT] + (z[setup.IDX_W] - setup.W_MIN))


def adaptation_upper_bound(z: Array) -> Array:
    return -1 * (z[setup.IDX_WDOT] + (setup.W_MAX - z[setup.IDX_W]))


w_constraint_funcs = [
    adaptation_lower_bound,
    adaptation_upper_bound,
    cbf_constraint,
]


# Define control optimization problem constraint functions
def cost_u(z):
    u_nom, _ = setup.nominal_controller(z[setup.IDX_T], z)
    return 0.5 * jnp.linalg.norm(u_nom - z[setup.IDX_U] + setup.EPS)


def actuation_lower_bound(z: Array) -> Array:
    return -1 * (z[setup.IDX_U] + setup.ACTUATION_LIMITS)


def actuation_upper_bound(z: Array) -> Array:
    return -1 * (setup.ACTUATION_LIMITS - z[setup.IDX_U])


u_constraint_funcs = [
    actuation_lower_bound,
    actuation_upper_bound,
    cbf_constraint,
]


# Define augmented cost functions
def generate_augmented_cost(
    cost: Callable[[Array], Array], constraints: Callable[[Array], Array], scale: float
) -> Callable[[Array], Array]:
    def compute_augmented_cost(z: Array) -> Array:
        return cost(z) + 1 / scale * jnp.sum(
            jnp.array([jnp.log(constraint(z)) for constraint in constraints])
        )

    return compute_augmented_cost


adaptation_cost_function = generate_augmented_cost(
    cost_wdot, w_constraint_funcs, setup.S_VAL
)
control_cost_function = generate_augmented_cost(cost_u, u_constraint_funcs, setup.S_VAL)

# Define augmented dynamics for states and adaptation weights
x_and_w_augmented_dynamics = generate_augmented_dynamics(
    setup.N_STATES, [controlled_x_dynamics, w_dynamics]
)

# Define interconnected adaptation and control dynamics
wdot_and_u_dynamics = generate_compute_w2dot_udot(
    adaptation_cost_function,
    control_cost_function,
    x_and_w_augmented_dynamics,
)

#### I.C Control Dynamics

#### I.D Time Dynamics

In [None]:
from cbfkit.modeling.augmented_dynamics import generate_t_dynamics
t_dynamics = generate_t_dynamics(setup.N_CONTROLS)

#### I.E Augmented Dynamics

In [None]:
def augmented_dynamics(params):
    return generate_augmented_dynamics(setup.N_STATES, [modified_x_dynamics, w_dynamics, wdot_and_u_dynamics(params), t_dynamics])

### II. Building the Simulator

This section is organized into X subsections: 

A) Load Simulation Parameters: loads simulation parameters like timestep size, time interval
B) Configure Simulation: instantiates required objects like sensors, estimators, numerical integration schemes, etc.
C) Construct executable: defines the function responsible for executing simulation

#### II.A Configure Simulation

In [None]:


# Load CBFkit dependencies
import cbfkit.simulation.simulator as sim
from cbfkit.sensors import perfect as sensor
from cbfkit.estimators.naive import naive as estimator
from cbfkit.utils.numerical_integration import forward_euler as integrator
from cbfkit.optimization.dynamically_defined.optimization_interior_point import (
    generate_predictor_corrector_dynamical_solution,
)
from cbfkit.utils.matrix_vector_operations import invert_array
from staliro.models import blackbox
from staliro.core import Trace, BasicResult, best_eval, best_run
from staliro.specifications import RTAMTDense
from staliro.options import Options
from staliro.optimizers import DualAnnealing, UniformRandom
from staliro.staliro import simulate_model, staliro


# Load setup and barrier modules
from examples.nonlinear_1d import setup






















# Generate dynamics functions
w_dynamics = lambda z: (
    z[N_STATES + N_CBFS : N_STATES + 2 * N_CBFS],
    jnp.zeros((N_CBFS, N_CONTROLS)),
    jnp.zeros((N_CBFS, N_STATES)),
)
wdot_and_u_dynamics = generate_compute_w2dot_udot(
    generate_augmented_cost(cost_wdot, w_constraint_funcs, S_VAL),
    generate_augmented_cost(cost_u, u_constraint_funcs, S_VAL),
    generate_augmented_dynamics(N_STATES, [x_dynamics, w_dynamics]),
)
t_dynamics = lambda z: (
    jnp.array([1.0]),
    jnp.zeros((1, N_CONTROLS)),
    jnp.zeros((1, N_STATES)),
)


# Augmented System
augmented_initial_state = jnp.hstack(
    [
        setup.INITIAL_STATE,
        setup.W0 * jnp.ones((N_CBFS,)),
        jnp.zeros((N_CBFS,)),
        jnp.zeros((N_CONTROLS,)),
        jnp.zeros((1,)),
    ]
)
modified_x_dynamics = lambda s: (
    x_dynamics(s)[0] + jnp.matmul(x_dynamics(s)[1], s[IDX_U]),
    jnp.zeros((N_STATES, N_CONTROLS)),
    jnp.zeros((N_STATES, N_STATES)),
)
augmented_dynamics = lambda params: generate_augmented_dynamics(
    N_STATES, [modified_x_dynamics, w_dynamics, wdot_and_u_dynamics(params), t_dynamics]
)

import numpy as np


def execute(params: Array) -> List[Array]:
    """_summary_

    Args:
        int (ii): _description_

    Returns:
        List[Array]: _description_
    """
    x, u, z, p, dkeys, dvalues = sim.execute(
        x0=augmented_initial_state,
        dynamics=augmented_dynamics(jnp.diag(jnp.array(params))),
        sensor=sensor,
        estimator=estimator,
        integrator=integrator,
        dt=setup.DT,
        num_steps=setup.N_STEPS,
    )

    # Reformat results as numpy arrays
    x = np.array(x)
    u = np.array(u)
    z = np.array(z)
    p = np.array(p)

    return x, u, z, p, dkeys, dvalues


@blackbox
def execute_for_staliro(params, _signal_times, _signal_traces):
    states, _, _, _, _, _ = execute(params)
    times = jnp.arange(setup.T0, setup.TF + setup.DT, setup.DT)
    trace = Trace(times, list(states.T))

    return BasicResult(trace)


TUNE = True
if TUNE:
    initial_params = [(0, 5), (0, 5)]
    # phi = r"(always (a >= -2 and a <= 2))"
    phi = r"(always (c >= -100 and c <= 100 and d >= -100 and d <= 100))"
    specification = RTAMTDense(phi, {"a": 0, "b": 1, "c": 2, "d": 3})
    options = Options(
        runs=1,
        iterations=20,
        interval=(setup.T0, setup.TF),
        static_parameters=initial_params,
    )
    # optimizer = DualAnnealing()
    optimizer = UniformRandom()
    logging.basicConfig(level=logging.DEBUG)

    result = staliro(execute_for_staliro, specification, optimizer, options)

    best_run_ = best_run(result)
    best_sample = best_eval(best_run_).sample
    best_result = simulate_model(execute_for_staliro, options, best_sample)

    success_sample_pd_wdot = []
    success_sample_pd_u = []
    failure_sample_pd_wdot = []
    failure_sample_pd_u = []
    max_cost = 0
    min_cost = jnp.inf
    for evaluation in best_run_.history:
        if evaluation.cost > 0:
            if evaluation.cost > max_cost:
                best_x, best_y = evaluation.sample[0], evaluation.sample[1]
                max_cost = evaluation.cost
            if evaluation.cost < min_cost:
                worst_x, worst_y = evaluation.sample[0], evaluation.sample[1]
                min_cost = evaluation.cost
            success_sample_pd_wdot.append(evaluation.sample[0])
            success_sample_pd_u.append(evaluation.sample[1])
        else:
            failure_sample_pd_wdot.append(evaluation.sample[0])
            failure_sample_pd_u.append(evaluation.sample[1])

    figure = go.Figure()
    figure.add_trace(
        go.Scatter(
            name="Parameter Region",
            x=[0, 0, 5, 5, 0],
            y=[0, 5, 5, 0, 0],
            fill="toself",
            fillcolor="lightsteelblue",
            line_color="steelblue",
            mode="lines+markers",
        )
    )
    figure.add_trace(
        go.Scatter(
            name="Successes",
            x=success_sample_pd_wdot,
            y=success_sample_pd_u,
            mode="markers",
            marker=go.scatter.Marker(color="green", symbol="circle"),
        )
    )
    figure.add_trace(
        go.Scatter(
            name="Failures",
            x=failure_sample_pd_wdot,
            y=failure_sample_pd_u,
            mode="markers",
            marker=go.scatter.Marker(color="black", symbol="x"),
        )
    )
    figure.add_trace(
        go.Scatter(
            name="Most Conservative Result",
            x=[best_x],
            y=[best_y],
            mode="markers",
            marker=go.scatter.Marker(color="lemonchiffon", symbol="square"),
        )
    )
    figure.add_trace(
        go.Scatter(
            name="Most Aggressive Result",
            x=[worst_x],
            y=[worst_y],
            mode="markers",
            marker=go.scatter.Marker(color="darkcyan", symbol="star"),
        )
    )
    figure.update_layout(xaxis_title=r"Gain $\dot w$", yaxis_title=r"Gain $\dot u$")
    # figure.add_hline(y=0, line_color="red")
    # figure.write_image("examples/nonlinear_1d/nonlinear_1d.jpeg")
    figure.show()

    figure2 = go.Figure()
    figure2.add_trace(
        go.Scatter(
            x=best_result.trace.times,
            y=best_result.trace.states[0],
            mode="lines",
            line_color="green",
            name="x",
        )
    )
    figure2.update_layout(xaxis_title="time (s)", yaxis_title="x")
    figure2.add_hline(y=0, line_color="red")
    figure2.write_image("examples/nonlinear_1d/nonlinear_1d.jpeg")

    values = best_sample.values
else:
    values = [1.0, 1.0]

    #! To Do: download lyznet source and modify to be compatible with JAX

# # Verify with Lyapunov function
# dynamics_to_verify = augmented_dynamics(jnp.diag(jnp.array(values)))
# from examples.nonlinear_1d.setup.learn_lyapunov_nn import model, test_lnn

# mod = model(dynamics_to_verify)
# test_lnn(mod)