# Tutorial session: nonlinear MPC
In this tutorial session, we will build and test an MPC controller for an autonomously driving vehicle.

We will first implement some simple, yet nonlinear vehicle dynamics as a 
system of ordinary differential equations. We will implement two widely used numerical integration schemes, which will result in a discrete-time dynamical system, which we can use to design our controller. 

Then, we will build the optimal control problem and experiment with some different variants -- first in a simple case without any nonlinear constraints, and then extending the setting to a full parking maneuver with obstacle avoidance constraints.

## About the solver 

Before we start with the tutorial, a quick word on the solver. 

You will not need to implement any optimization solver in this session. To solve our online optimization problems, we will be using an efficient C++ implementation of the **Augmented Lagrangian method** using **PANOC** as an inner solver. The software for this solver is *open source* and available on [Github](https://github.com/tttapa/PANOC-ALM). 

<div class="alert alert-info">
The solver is currently still in pre-release and the API is still subject to change! If you are interested in using the code in your own work, make sure to <i>watch</i> the <a href="https://github.com/tttapa/PANOC-ALM">Repository</a> to stay up-to-date with the latest developments.
</div>



### See also

If you are interested in using this or a similar solver for your own work, we can additionally refer you to some related pages. 

- [**OpEn**](https://alphaville.github.io/optimization-engine/) [Rust](https://www.rust-lang.org/)-based implementation of PANOC + ALM with an emphasis on MPC for embedded devices. See also [Opengen](https://alphaville.github.io/optimization-engine/docs/python-interface), a collection of Python bindings that make it easy to use OpEn in your Python prototyping environment.  
- [**OPTEC**](https://github.com/kul-optec) `OPTEC` is the collective name for our research group. The equally named Github organisation is where we release the code for custom solvers, which are focused on efficient, even real-time optimization, possibly under stringent hardware constraints. 
- [**CasADi**](https://web.casadi.org/) Optimization modelling tool/Automatic differentiation engine. This library is a available for Python, MATLAB, and C++ and is a very useful tool to quickly construct optimization problems using symbolic variables. It interfaces with a lot of commonly used optimization solvers, including our own. 

## Setting up the environment

Now it's time to get started with the actual tutorial

### Install and import dependencies
To get started, we will first install the pre-built version of the [`panocpy`](https://github.com/tttapa/PANOC-ALM) library which we will be using.

In [None]:
! pip install panocpy*.whl

With this, we should have all the necessary dependencies available for import: 

In [None]:
%matplotlib inline
import numpy as np
import casadi as cs 
import matplotlib.pyplot as plt 
import utilities

## Dynamics and integration
### Kinematic bicycle model

As a warm-up, we start by defining the vehicle dynamics and a numerical integration scheme, which will allow us to plan ahead the trajectories of the system under different control sequences. 

Let us fix the dimensions of the vehicle first. 

In [None]:
VEHICLE_LENGTH = 5 
VEHICLE_WIDTH = 2.5;
vehicle_dims = (VEHICLE_WIDTH, VEHICLE_LENGTH)

We will use a kinematic bicycle model for the vehicle [1].

<img src="bicycle_drawing.png"/>

The state of the vehicle is given as $x = (p_x, p_y, \theta, v)$ with $(p_x, p_y)$ the position of the vehicle center of mass in a fixed cartesian coordinate system, $\theta$ the heading angle of the vehicle and $v$ the longitudinal velocity. The inputs the vehicle are $u = (a, \delta)$ with $a$ the longitudinal acceleration and $\delta$ the steering angle in radians. 

The kinematic bicycle model is given as: 
$$
\begin{aligned}
    \begin{cases}
    \dot{p}_{x} &= v  \cos(\theta+\beta) \\ 
    \dot{p}_{y} &= v \sin(\theta+\beta) \\
    \dot{\theta} &= v \sin(\beta) \\
    \dot{v} &= a 
    \end{cases}
\end{aligned}
$$
with $\beta = \arctan(\frac{1}{2} \tan \delta)$. This version is slightly simplified compared to the more well-known formulation of the system, but for our purposes today, the given model will suffice. 

[1] R. Rajamani, *Vehicle Dynamics and Control*, 2nd ed. Springer US, 2012. doi: [10.1007/978-1-4614-1433-9](https://doi.org/10.1007/978-1-4614-1433-9).

### 🔧 Task: vehicle dynamics 

Complete the function below, implementing the dynamics, where `x` can be assumed to be a $4 \times 1$ array and `u` a $2 \times 1$ array. 

*Hint.* We will use this function in the construction of the optimal control problem later. To output a single vector containing the components of the state vector, suppose that `components` is a `list` containing the derivatives. The command `casadi.vertcat(*components)` will construct a single output vector. 

In [None]:
def kinematic_bicycle(x,u):
        """
        Implements the continuous-time bicycle dynamics
        The bicycle dynamics are a set of (differential) equations which model
        the motions of a vehicle by assuming that the left and right wheels stay
        perfectly parallel at all times.


        The input u consists of [acceleration, steering angle]

        Args:
            x: The current state vector, consisting of
                   - X: the x-position.
                   - Y: the y-position.
                   - psi: the angle of the vehicle with respect to the road.
                   - v: the speed of the vehicle (scalar).
            u: The input u consists of [acceleration, steering angle]

        Returns:
            the time derivative of the state vector
        """
        res = [] ## TO-DO! Res should be a list of expressions of `x` and `u` representing the dynamics!
        return cs.vertcat(*res)

#### Test block

In [None]:
x_test = np.array([1.,3.,4.,5.])
u_test = np.zeros(2)

try: 
    np.testing.assert_allclose(kinematic_bicycle(x_test, u_test).toarray(), np.array([-3.26822, -3.78401, 0, 0]).reshape((-1,1)), rtol=1e-4) 
    print("Looks like it works! Let's move on!")
except Exception as e:
    print(e)
    print("Seems like something is still wrong in your code... The exception above was thrown")

### Numerical integration
In order to use the system of ODEs presented above to make predictions, we need to be able to solve these differential equations numerically. There exist many different algorithms to do so, all somehow trading off accuracy with speed. We will not go into detail on these methods, but simply implement and compare two of the most widely used integrators for applications in optimal control. 

- Forward Euler: the simplest explicit integration scheme. Somewhat inaccurate, but very computationally inexpensive. For sufficiently fast sample speeds, this method will be sufficient in many control applications.  
$$
    x_{k+1} = x_k + f(x_k,u_k) T_{\mathrm{s}}
$$
- Runge-Kutta-order 4: a slightly more sophisticated explicit integration scheme. Often considered to have a good trade-off between computational load and accuracy
$$
    ...
$$

### 🔧 Task: implement numerical integrators

Complete the functions below. As mentioned in the docstring, you should output a (lambda) function which takes three inputs (array, array, float) and returns one output (array). 


In [None]:
def forward_euler(f, ts):
    """Given a function f: (x, u) -> xdot, return a function 
    (x, u, t) -> x+, where x+ is given by the Forward euler method. 
    """
    def dt_dyn(x,u,_):   # Hint, you don't need time (third argument)! 
        # TODO 
        pass 
    return dt_dyn 
    
def runge_kutta4(f, ts):
    """Given a function f: (x, u) -> xdot, return a function 
    (x, u, t) -> x+, where x+ is given by the Runge-kutta 4 method. 
    """
    def dt_dyn(x,u,_): 
        #TODO 
        pass 
    return dt_dyn 

#### Test block

In [None]:
x_test = np.array([1.,3.,4.,5.])
u_test = np.zeros(2)

test_fw_eul = forward_euler(kinematic_bicycle, 0.1) 
test_dt = runge_kutta4(kinematic_bicycle, 0.1) 

try: 
    np.testing.assert_allclose(test_fw_eul(x_test, u_test,0).toarray(), np.array([[0.673178, 2.6216, 4, 5]]).reshape((-1,1)), rtol=1e-4) 
    print("Looks like Forward Euler works!")
except Exception as e:
    print(e)
    print("Seems like something is still wrong in your code... The exception above was thrown")

try: 
    np.testing.assert_allclose(test_dt(x_test, u_test,0).toarray(), np.array([[0.673178, 2.6216, 4, 5]]).reshape((-1,1)), rtol=1e-4) 
    print("Looks like Runge kutta 4 works!")
except Exception as e:
    print(e)
    print("Seems like something is still wrong in your code... The exception above was thrown")

### Comparing numerical integrators
Let's try out our numerical integrators and see how well they perform.

#### 🔧 Task: build a simulator

Implement the function below, which runs a simulation of given dynamics with a given control policy, over a given number of steps. 

In [None]:
from typing import Callable, Tuple

def simulate(x0: np.ndarray,
             dynamics: Callable,
             policy: Callable,
             time_steps: int = 50) -> Tuple[np.ndarray, np.ndarray]:
    """Simulate the system for `time_steps` steps into the future. 

    Args: 
        x0 [np.ndarray]: the initial state 
        dynamics [Callable]: function state, input, time -> state: discrete-time dynamics
        policy [Callable]: function State, time -> input: time-varying policy
        time_steps [int]: Number of time steps to simulate
    Return: 
        state sequence [np.ndarray]: (4 x time steps+1)-dimensional array containing the state sequence over time
        input sequence [np.ndarray]: (2 x time steps)-dimensional array containing the input sequence over time
    """
    state_sequence = [x0]
    input_sequence = [] 
    
    # TODO: Loop over the time steps and update the state and input sequences! 
    pass 
    
    
    state = np.hstack(state_sequence)
    controls = np.hstack(input_sequence)
    return state, controls 

Now that we have a simulator, we can compare the outputs of the two integrators. To do so, we'll define an arbitrary policy first. We consider a policy to be a function $\pi : (x, t) \mapsto u$ which maps the state $x$ and possibly the discrete time $t$ to a control action $u$. Later, this policy will be determined by an MPC controller. 


In [None]:
# Define a policy to test the system
acceleration = 10 # Fix a constant longitudinal acceleration 
policy = lambda _, t: np.array([[acceleration],
                               [0.1 * np.sin(t)]])

time = np.arange(0,20)
input_trajectory = np.array([policy(None, t) for t in time])

fig, ax1 = plt.subplots()
plt.title("policy $\pi(t)$")
ax1.step(time, np.array([policy(None, t)[0] for t in time]), color="tab:orange")
ax1.set_ylabel("acceleration $a(t)$", color="tab:orange");
ax1.tick_params('y', colors='tab:orange')

ax2 = ax1.twinx()
ax2.step(time, np.array([policy(None, t)[1] for t in time]), color="tab:blue")
ax2.set_ylabel("steering angle $\delta(t)$", color="tab:blue")
ax2.tick_params('y', colors='tab:blue')
ax1.set_xlabel("time $t$")

plt.show()

Combining this policy with the previously implemented dynamics, we obtain a closed loop system, for which we can compare our integrators. 

Let us first set some **parameters** 

In [None]:
ts = 0.05 #0.05                    # Sample time
x = np.array([[0,0,0,0]]).T  # initial state: 4 x 1 vector 
steps=100                    # Number of steps to simulate [int]

Now, we construct a discrete time dynamics $x^+ = f_{\mathrm{d}}(x, u)$, which are supposed to approximate the solution to the ode:
$$
    f_{\mathrm{d}}(x,u) = \int_{t}^{t+T_{\mathrm{s}}} f(x(\tau),u) \mathrm{d} \tau, \quad x(t) = x
$$


In [None]:
# %matplotlib inline
from scipy.integrate import odeint  # Load scipy integrator as a ground truth

# Discrete-time dynamics under forward Euler 
dynamics_fw_euler = forward_euler(kinematic_bicycle, ts)   
# Discrete-time dynamics "ground truth"
def true_discretized_dynamics(x,u,t):
    f = lambda x,t: np.array(kinematic_bicycle(x, u)).reshape([x.size])
    y = odeint(f, x.reshape([x.size]), [t, t+ts])
    return y[-1].reshape((x.size, 1))

#### 🔧 Task: Simulate the "true" dynamics and compare with a simulation of your implementations of the integrators.

❓ Play around a bit with the value of `acceleration` in the policy defined above and compare the size of the integration errors at the end of the simulation. What do you observe? Can you explain this?

❓ What other parameter do you expect to play an important role in the accuracy (and therefore the choice) of your integration scheme? 

💡**Answer**: The error accumulates over time, since we are using the previous (erroneous) point to compute the next point from. The larger the acceleration, the more the velocity will change over the sampling period. As a result, the error in the Forward Euler method resulting from keeping the velocity constant will also be larger as the acceleration increases. This is of course relative to the size of the sampling period; the smaller the sampling period, the smaller the errors.

In [None]:
# Simulate the ground truth dynamics 
state_true, controls_true = simulate(x, dynamics=true_discretized_dynamics, policy=policy, time_steps=steps)

# Simulate the forward Euler dynamics
state, controls = simulate(x, dynamics=dynamics_fw_euler, 
                              policy=policy, time_steps=steps)


# Plot the results
fig, axes = plt.subplots(2,1, constrained_layout = True)
axes[0].plot(state[0,:], state[1,:], label="Forward Euler")
axes[0].plot(state_true[0,:], state_true[1,:], label="scipy odeint")
axes[0].set_xlabel("$p_{x}$")
axes[0].set_ylabel("$p_{y}$")
axes[0].legend()
axes[0].set_title("Position trajectories")

error_fw_euler = np.linalg.norm(state_true-state, axis=0)
axes[1].plot(error_fw_euler, label="Forward Euler")
axes[1].set_xlabel("time step")
axes[1].set_ylabel("error")
axes[1].set_title("Integration errors")
axes[1].legend();

We can conclude that for both methods, the approximation error accumulates over time, as the new state is computed based on an erroneous estimate in the previous time step. Fortunately, however, if the open loop prediction horizon is not too long, this error is not too severe and can be corrected by observing the actual new state online.

## Intermezzo: animation 
It's often convenient to represent a simulation in animated form. We provide a small piece of code for this (see `utilities.py`). To use it, simply, supply it a sequence states and inputs that it needs to plot. Furthermore, it needs the vehicle dimensions to draw a rectangle from (these were defined above and stored in `vehicle_dims`) and the sampling time `ts`. 

**Example**: The code block below demonstrates the usage of the given animation code on some dummy data.

🔧 **Task:** Try it out on the output of your simulation from the block above.
 
⚠ *NOTE: It can take a few seconds for the animation to generate!*

In [None]:
# %matplotlib inline
from utilities import Animation

# Generate some dummy data. Replace these arrays with the data generated above! 
datalen = 100 
state = np.vstack([np.linspace(0,5, datalen)**2, 
                   np.zeros(datalen), 
                   np.zeros(datalen),
                   np.linspace(0,5,datalen)]
                )
controls = np.vstack([np.zeros(datalen), 
                   np.zeros(datalen)])
#------------------------------------------------------
# Generate animation! 
from IPython.display import HTML
animator = Animation(state,controls, *vehicle_dims)
HTML(animator.build(ts).to_jshtml())

## Optimal control problem

We are now ready to start actually controlling our vehicle. To do so we will start by constructing an optimal control problem specifying our requirements.

### Building a nonlinear program in 📖 [CasADi](https://web.casadi.org/)
We will use CasADi as a modelling language for our problem. CasADi is 
an automatic differentiation library with powerful C-code generation capabilities,
which provides bindings to many popular optimization solvers. 
In today's tutorial, we will pass our optimization problem to a custom 
solver, but the modelling procedure for any other solver that is interfaced 
with CasADi is the same. 

CasADi expects an optimization problem of the form 
$$
    \begin{aligned}
        &\operatorname{Minimize}_{x}&& f(x) \\ 
        &\text{s.t.}&& \underline{x} \leq x \leq \bar{x}\\ 
        &&& \underline{g} \leq g(x) \leq \bar{g}
    \end{aligned}
$$
with $x \in \mathbb{R}^{n}$ the decision variable, which may be bounded above and below 
using $\underline{x}, \bar{x}$, and $g: \mathbb{R} \to \mathbb{R}^m$ a 
possibly nonlinear constraint function with given lower and upper bounds.

To represent this in the code, $x$ is an object of the CasADi `SX.sym`
class and $g$ and $f$ are expressions (which can be built from regular mathematical functions that are available in Python). 

📖 See [The CasADi documentation](https://web.casadi.org/docs/#nonlinear-programming) for more info. 


### The optimal control problem 
The goal of our controller will be to drive our vehicle from a given initial state to a desired setpoint. For convenience, we will put this set-point at the origin, but this could be any other reference state.
We fix a prediction horizon $N$ and consider the standard additive cost which penalizes at every time step the deviations of the current state from the given setpoint. 
This yields a cost function 
$$
    V(x,u) = \sum_{k=0}^{N-1} x_k^\top Q x_k + u_k^\top R u_k + x_N^\top Q_N x_N, 
$$
where $x_0 = x$ and $x_k = f(x_k, u_k)$ for $k = 0, \dots, N-1$.
For convenience, we'll take $Q_N = Q$.

❗*Note: a careful choice of $Q_N$ is crucial in proving certain properties of the closed-loop system, such as stability.
For this exercise in particular, we will not worry too much about this.*

We will assume there are physical limits on the actuation of the vehicle, as well as bounds on the system states: 
$$
    \begin{aligned}
        \underline{u}  &\leq u_k \leq \bar{u}, \quad k = 0, \dots, N-1\\
        \underline{x}  &\leq x_k \leq \bar{x}, \quad k = 0, \dots, N-1,
    \end{aligned}
$$ 
with $\underline{x} = (-30, -20, -\frac{\pi}{2}, -10)$, $\bar x = (30, 20, \frac{\pi}{2}, 10)$, and 
$\underline{u} = (-8, -\frac{\pi}{4})$, $\bar{u} = (3, \frac{\pi}{4})$.

We will implement our controller using a *sequential approach* (a.k.a. single-shooting) so that
$x_k$ will be represented as a nonlinear function of the initial state $x_0$ (which is a parameter) and the 
inputs up to time step $k$, namely $x_{k+1} = f(\,\cdot\,, u_{k-1}) \circ \dots \circ f(x_0, u_0)$.

As a result, $x_k$, $k=1,\dots,N$ will make up the output of the constraint function $g$. 



#### 🔧 Task: Construct an optimal control problem using [CasADi](https://web.casadi.org/)
Complete the function below, which takes as argument the prediction horizon (`int`), and returns a dictionary containing the information that describe the nlp above. See the docstring of the function for a more precise specification. 
Under the hood, CasADi will compute and store the required gradient (and Hessian) information that a solver would need. We will not use Hessian information, as we will be using an ALM solver with PANOC as an inner solver, which is designed to be matrix free. This is potentially a significant advantage in control applications, which are often run on underpowered embedded devices!

For the quadratic cost function, you can use the following parameter values
$$
Q = \operatorname{diag}(1,1,1,0.1), \quad 
R = \operatorname{diag}(0.1, 1). 
$$
❓ Suppose that we do not care about driving any state entry to zero particularly fast, what reason could we have for increasing certain values in the weight matrix?

*hint*: think about the units/the range of values that each state variable can take. 

To propagate the states, use your forward Euler method from before, i.e., `x = dynamics_fw_euler(x,u,t)`

In [None]:
from typing import Tuple 

def construct_ocp(horizon: int) -> Tuple[cs.nlpsol, np.ndarray, np.ndarray]: 
    """
    Given the prediction horizon, build a nonlinear program that represents the parametric optimization problem described above, with the initial state x as a parameter. 
    Use a single shooting formulation, i.e., do not define a new decision variable for the states, but rather write them as functions of the initial state and the control variables. Also return the lower bound and upper bound on the decision variables and constraint functions:

    Args: 
        horizon [int]: decision horizon  
    Returns: 
        solver [dict]: the nonlinear program as a dictionary: 
            {"f": [cs.SX] cost (as a function of the decision variables, built as an expression, e.g., x + y, where x and y are CasADi SX.sym objects),
             "g": [cs.Expression] nonlinear constraint function as 
             an expression of the variables and the parameters. 
             These constraints represent the bounds on the state. 
             "x": [cs.SX] decision_vars (all control actions over the prediction horizon (concatenated into a long vector)), 
             "p": [cs.SX] parameters (initial state vector)} 
        bounds [dict]: the bounds on the constraints 
            {"lbx": [np.ndarray] Lower bounds on the decision variables, 
             "ubx": [np.ndarray] Upper bounds on the decision variables, 
             "lbg": [np.ndarray] Lower bounds on the nonlinear constraint g, 
             "ubg": [np.ndarray] Upper bounds on the nonlinear constraint g 
            }
    """
    # Make decision variables 
    x0 = cs.SX.sym("x0", (4,1))
    x = x0
    u = [cs.SX.sym(f"u_{t}", (2,1)) for t in range(horizon)]

    # TODO state weights matrix
    
    # TODO controls weights matrix
    
    # TODO -- Define state bounds
    
    # TODO -- Define input bounds 
    
    # Initialize containers 
    lbx = []
    ubx = []
    g = [] 
    lbg = []
    ubg = []
    # Rollout the prediction horizon 
    for i in range(horizon):
        # TODO: augment cost and add constraints 
        ... 
    
    # Collect everything into dictionaries 
    
    variables = cs.vertcat(*u)
    nlp = {"f": cost,
           "x": variables,
           "g": cs.vertcat(*g),
           "p": x0}
    bounds = {"lbx": cs.vertcat(*lbx), 
              "ubx": cs.vertcat(*ubx), 
              "lbg": cs.vertcat(*lbg), 
              "ubg": cs.vertcat(*ubg), 
            }
 
    return nlp, bounds


We will now define a small utility function to build an ALM solver using panocpy. 

In [None]:
import panocpy as pa 
from datetime import timedelta

def get_ALM_solver(lbfgsmem = 20, max_iter=30, 
                   max_time_seconds=1) -> pa.ALMSolver: 
    
    tol = 1e-2
    
    verbose = False
    # Parameters about the inner solver 
    panocparams = {
        "max_iter": 1000,
        "max_time": timedelta(seconds=0.5),
        "print_interval": 0 if verbose else 0,
        "stop_crit": pa.PANOCStopCrit.FPRNorm,
        "update_lipschitz_in_linesearch": True,
    }

    innersolver = pa.PANOCSolver(
        pa.PANOCParams(**panocparams),
        pa.LBFGSParams(memory=lbfgsmem),
    )
    
    # Parameters about the outer solver 
    almparams = pa.ALMParams(
        max_iter=max_iter,
        max_time=timedelta(seconds=max_time_seconds),
        print_interval=1 if verbose else 0,
        preconditioning=False,
        ε=tol,
        δ=1e-4,
        Δ=5,
        Σ_0=1e3,
        Σ_max=1e13,
    )

    solver = pa.ALMSolver(almparams, innersolver)
    return solver

### Compiling the optimization problem 
Now that we have constructed a CasADi representation of the optimal control problem, we can generate efficient C-code to evaluate the cost and the gradient, which is used by the optimizer. `panocpy` provides useful utilities to do this easily:

Let's set our prediction horizon to some (rather large) number and compile the parametric optimal control problem.

In [None]:
from utilities import compile_ocp
prediction_horizon = 30                         # Set the prediction horizon 

nlp_description, bounds = construct_ocp(prediction_horizon)   # Build the optimizer
problem = compile_ocp(nlp_description, bounds)

#### 🔧 Task: simulate the open-loop trajectory 
Call the optimizer to obtain the optimal sequence of control actions and use these to simulate the state trajectory for $N$ steps.  

In [None]:
# Set initial state and some initial guess
x_init = np.array([-20, -5, 0, 0]).reshape((4,1))      # Initial state as column vector 

# -------------
# TODO 
# set x_init as the `param` of the problem
# make a solver and call it
# Reshape the solution into a 2xn_steps array 
# ------------

def policy(x,t):                                   # Write the solution as a function of time 
    return u_sequence[:,t]                         # At time t, use the t'th element in the sequence 

# We can only simulate forward as many steps as we have predicted in the MPC problem (in open loop).
n_steps = u_sequence.shape[1]
x_predicted, _ = simulate(x0=x_init, dynamics=dynamics_fw_euler, policy=policy, time_steps=n_steps) 

# Compare against a simulation with the true dynamics
x_true, _ = simulate(x0=x_init, dynamics=true_discretized_dynamics, policy=policy, time_steps=n_steps)

For convenience, we define a quick function to plot the control actions

In [7]:
def plot_input_sequence(u_sequence): 
    plt.subplot(2,2, (1,3))
    plt.title("Control actions")
    plt.plot(u_sequence[0,:], u_sequence[1,:], marker=".") 
    plt.xlabel("$a$")
    plt.ylabel("$\delta$");
    plt.subplot(2,2,2)
    plt.title("Steering angle")
    plt.plot(u_sequence[1,:].squeeze(), marker=".") 
    plt.ylabel("$\delta$");
    plt.subplot(2,2,4)
    plt.title("Acceleration")
    plt.plot(u_sequence[0,:].squeeze(), marker=".") 
    plt.ylabel("$a$");
    plt.xlabel("$t$")
    plt.tight_layout()


In [8]:
plt.figure() 
plot_input_sequence(u_sequence)

plt.figure() 
plt.title("Predicted positions")
plt.plot(x_predicted[0,:], x_predicted[1,:], label="predicted trajectory",marker=".") 
plt.plot(x_true[0,:], x_true[1,:], label="True trajectory",marker=".")
plt.legend()
plt.xlabel("$p_x$")
plt.ylabel("$p_y$") ; 

NameError: name 'plt' is not defined

## Model error 

So far, everything looks nice, but of course, this open-loop simulation only reveals what our controller predicted. 
Now, suppose that there is some model error and the true vehicle dynamics are given by the following function:

In [None]:
def kinematic_bicycle_true(x,u):
        """
        Implements the continuous-time bicycle dynamics
        The bicycle dynamics are a set of (differential) equations which model
        the motions of a vehicle by assuming that the left and right wheels stay
        perfectly parallel at all times.


        The input u consists of [acceleration, steering angle]

        Args:gewoon 
            x: The current state vector, consisting of
                   - X: the x-position.
                   - Y: the y-position.
                   - psi: the angle of the vehicle with respect to the road.
                   - v: the speed of the vehicle (scalar).
            u: The input u consists of [acceleration, steering angle]

        Returns:
            the time derivative of the state vector
        """
        lr = 0.3*VEHICLE_LENGTH 
        l = VEHICLE_LENGTH

        b = cs.arctan(lr/l * np.tan(u[1]))
        res = [x[3] * np.cos(x[2]+b),
               x[3] * np.sin(x[2]+b),
               x[3] * np.sin(b),
               u[0]]
        return cs.vertcat(*res)

def true_system(x,u,t):
    f = lambda x,t: np.array(kinematic_bicycle_true(x, u)).reshape([x.size])
    y = odeint(f, x.reshape([x.size]), [t, t+ts])
    return y[-1].reshape((x.size, 1))

Now, running the simulation again, using `true_system` for the dynamics... 

❓What do you observe? 

In [None]:
# Compare against a simulation with the true dynamics
x_with_error, _ = simulate(x0=x_init, dynamics=true_system, policy=policy, time_steps=n_steps)

plt.figure()
plt.plot(x_predicted[0,:], x_predicted[1,:], marker=".", label="predicted trajectory") 
plt.plot(x_true[0,:], x_true[1,:], label="Trajectory with accurate ODE solver",marker=".")
plt.plot(x_with_error[0,:], x_with_error[1,:], label="True trajectory",marker=".",)
plt.legend()
plt.xlabel("$p_x$")
plt.ylabel("$p_y$"); 


## Introducing feedback with MPC 

We can repeat the previous simulation by including the solution of the optimization problem into the definition of the control policy. 

#### 🔧 Task: redefine the policy to use receding horizon
Implement an MPC controller by repeatedly solving the optimization problem every time the policy is called.

You can even reduce the prediction horizon to 20 for this test. This means you'll need to compile a new optimal control problem! Use `construct_ocp` as before.


In [None]:
prediction_horizon = 20
nlp_description, bounds = construct_ocp(prediction_horizon)   # Build the optimizer 
problem = compile_ocp(nlp_description, bounds) 
problem.param = x_init

In [None]:
class WarmStartedPolicy: 
    
    def __init__(self, solver:pa.ALMSolver,
                 problem: pa.ProblemWithParam):  # STORING SOME PERSISTENT DATA 
        self.alm_solver=solver
        self.problem = problem 
        self.stats = None  # Useful for debugging purposes. 
        self.initial_guess_primal = np.zeros(problem.n)
        self.initial_guess_dual = np.zeros(problem.m)

    def __call__(self, x, t):  # CALL THE POLICY 
        # TODO
        # - Update the solver parameter,
        # - call the solver
        # - update the initial guess attributes with the obtained solution 
        
        return sol_primal[0:2]    

    
# Define the MPC policy.
policy = WarmStartedPolicy(solver, problem) 

Now, let's simulate the MPC controller, both for the dynamics used and for the true dynamics given above, and verify that the performance of the controller has improved.

In [None]:
n_steps = 200

x_simplified, _ = simulate(x0=x_init, dynamics=dynamics_fw_euler, policy=policy, time_steps=n_steps)
x_true, u_true = simulate(x0=x_init, dynamics=true_system, policy=policy, time_steps=n_steps)

plt.figure()
plt.plot(x_simplified[0,:], x_simplified[1,:], marker=".", label="MPC trajectory with model dynamics") 
plt.plot(x_true[0,:], x_true[1,:], marker=".", label="MPC trajectory with true dynamics")
plt.legend()
plt.xlabel("$p_x$")
plt.ylabel("$p_y$"); 

In [None]:
## Animation for the true dynamics 
%matplotlib inline
animator = Animation(x_true,u_true,*vehicle_dims);
HTML(animator.build(ts).to_jshtml())

## Extension: obstacle avoidance 

We can now repeat the previous example, but now extending it to a more interesting/realistic scenario. Suppose that 
we have a number of obstacles in the scene, which we represent by a location $o = (o_x, o_y)$ and a size $r$.

#### 🔧 Task: write down the collision avoidance constraint 

Given an circular obstacle with position $o = (o_x, o_y)$ and size $r$; write down the condition on the position such that for all heading angles $\theta$, 
the vehicle (of length $l$ and width $w$) does not intersect with the obstacle. 

Is the corresponding constraint convex or not?

#### 🔧 Task: Reimplement `construct_ocp` with collision avoidance constraints 

Implement a function to build a parametric optimizer like before, but now supporting collision avoidance constraints. 
Based on the argument `n_obstacles`, introduce a parameter vector containing all the positions $o_{i,x}, o_{i,y}$ of the obstacles $i = 1, \dots,n_o$ , and their radii $r_{i}$. That is, 
the parameter vector is of the form 
$$
    p = \begin{bmatrix}
        x_0 \\
        o_{1,x}\\
        o_{1,y}\\
        o_{2,x}\\
        o_{3,y}\\
        \vdots \\
        o_{n_o,x}\\
        o_{n_o,y}\\
        r_{1}\\ 
        \vdots \\ 
        r_{n_o}
    \end{bmatrix}
$$




In [None]:
from typing import Tuple 

def construct_ocp_constrained(horizon: int, n_obstacles: int) -> Tuple[dict, dict]:
    """
    Given the prediction horizon, build a CasADi nlp object that represents the parametric optimization problem described above, with the initial state x as a parameter. 
    Use a single shooting formulation, i.e., do not define a new decision variable for the states, but rather write them as functions of the initial state and the 
    control variables. Also return the lower bound and upper bound on the decision variables and constraint functions: 

    Args: 
        horizon [int]: decision horizon  
        n_obstacles [int]: number of obstacles in the scene
    Returns: 
        solver [dict]: the nonlinear program as a dictionary: 
            {"f": [cs.Expression] cost (as a function of the decision variables, built as an expression, e.g., x + y, where x and y are CasADi SX.sym objects),
             "g": [cs.Expression] nonlinear constraint function as 
             an expression of the variables and the parameters. 
             The constraints are both the original state bounds and 
             the collision avoidance constraints. 
             "x": [cs.SX] decision_vars (all control actions over the prediction horizon (concatenated into a long vector)),
             "p": [cs.SX] parameters (initial state vector, obstacle positions, obstacle radii) concatenated into one column vector} 
        bounds [dict]: the bounds on the constraints 
            {"lbx": [np.ndarray] Lower bounds on the decision variables, 
             "ubx": [np.ndarray] Upper bounds on the decision variables, 
             "lbg": [np.ndarray] Lower bounds on the nonlinear constraint g, 
             "ubg": [np.ndarray] Upper bounds on the nonlinear constraint g 
            }
    """

    #TODO: Define variables and parameters. 
    

    # TODO: Initialize the cost at zero and define the cost weights  
    # state weights matrix
    
    # TODO: controls weights matrix
    
    # TODO: Define the bounds for every time step 
    # -- State bounds
    
    # -- Input bounds 
    
    # Initialize containers for the constraints and constraint bounds 
    lbx = []
    ubx = []
    g = [] 
    lbg = []
    ubg = []

    # Initialize the current state to the initial value
    x = x0
    # TODO: Rollout the simulation horizon, accumulating costs and constraints 
    for i in range(horizon):
        # TODO: update cost
        
        # TODO: Update the state 
        
        # Bound the controls 
        
        # Bound the states  
        
        # [NEW] Impose collision avoidance 
        

    #TODO Add terminal costs/constraints 
    
    
    # Concatenate the variables and constraints, and package the problem into dicts 
    variables = cs.vertcat(*u)
    parameters = cs.vertcat(x0, *obs_pos, *obs_r)
    nlp = {"f": cost,
           "x": variables,
           "g": cs.vertcat(*g),
           "p": parameters}
    bounds = {"lbx": cs.vertcat(*lbx), 
              "ubx": cs.vertcat(*ubx), 
              "lbg": cs.vertcat(*lbg), 
              "ubg": cs.vertcat(*ubg), 
            }
    return nlp, bounds

### Obstacle settings 

We introduce two obstacles that need to be avoided during the maneuver. 

In [None]:
obstacle_pos = [np.array([[-15,4]]).T, np.array([[-5,-8]]).T]  # positions 
obstacle_rad = [3,5]                                           # sizes 
num_obstacles = len(obstacle_pos)
# Combine into parameter vector
if num_obstacles > 0:
    params_obst = np.vstack(obstacle_pos + obstacle_rad)  
else: 
    params_obst = [] 

#### 🔧 Task: Simulate open loop trajectory with model errors

Like before, but now with the constrained problem, simulate the solution under the model dynamics (forward Euler) and the true dynamics (using the given function `true_dynamics` with  scipy `odeint` for integration).

❓What do you observe? Are the obstacles correctly avoided for the true dynamics?

In [None]:
from utilities import compile_ocp
x_init = np.array([[-20, -5, 0, 0]]).T
horizon = 60

# Define the problem 
nlp_description, bounds = construct_ocp_constrained(horizon, num_obstacles)
# Compile the OCP
problem = compile_ocp(nlp_description, bounds)
if num_obstacles > 0: 
    params = np.vstack([x_init, params_obst])
else: 
    params = x_init 

problem.param = params

In [None]:
# Instantiate a solver. This time, we can increase the number of iterations etc. a bit to account for the larger horizon. 
solver = get_ALM_solver(max_iter=50, max_time_seconds=60)

2. Set the problem parameters and call the solver to obtain a solution

In [None]:
initial_guess_primal = np.zeros((problem.n,))
initial_guess_dual = np.zeros((problem.m,))

_, u_sequence, stats = solver(problem, initial_guess_dual, initial_guess_primal)
utilities.print_solver_stats(stats)
u_sequence = u_sequence.reshape((2,-1), order="f")         # Extract the control sequence back from the solution


3. Define the open-loop policy and simulate it on the model system and the true system

In [None]:
def policy(x,t):  # Write the solution as a function of time 
    return u_sequence[:,t]  # At time t, use the t'th element in the sequence 

n_steps = u_sequence.shape[1]                 # We can only simulate forward as many steps as we have predicted in the MPC problem. 
x_predicted, _ = simulate(x0=x_init, dynamics=dynamics_fw_euler, policy=policy, time_steps=n_steps)

# Compare against a simulation with the true dynamics
x_true, _ = simulate(x0=x_init, dynamics=true_system, policy=policy, time_steps=n_steps)


4. Plot the results

In [None]:
plt.figure() 
plot_input_sequence(u_sequence)

plt.figure()

def draw_scene(): 
    ax = plt.gca()
    ax.axis("square")

    for o_p, o_r in zip(obstacle_pos, obstacle_rad):
        c1 = plt.Circle(o_p, radius=o_r, ec="k", fc="red", alpha=0.5)
        c2 = plt.Circle(o_p, radius=o_r+0.5*np.sqrt(VEHICLE_LENGTH**2+VEHICLE_WIDTH**2), ec="k", fc=(0,0,0,0), linestyle="--")
        ax.add_artist(c1)
        ax.add_artist(c2)
    ax.scatter([0], [0], color="tab:green", marker="X")
    plt.xlim([-40, 10])
    plt.ylim([-10, 5])

draw_scene()
plt.plot(x_predicted[0,:], x_predicted[1,:], label="predicted trajectory") 
plt.plot(x_true[0,:], x_true[1,:], label="True trajectory")
plt.legend(bbox_to_anchor=(1,1), loc="upper left")
plt.xlabel("$p_x$")
plt.ylabel("$p_y$"); 


### MPC with obstacle avoidance 

To reduce the computation, we reduce the prediction horizon again to 20 steps. Now, define the policy that solves the optimal control problem in receding horizon. Compare again the performance on the model dynamics with the forward Euler integrator to the performance on the true system. 


In [9]:
horizon = 20
ocp, bounds = construct_ocp_constrained(horizon, num_obstacles)
problem = compile_ocp(ocp, bounds)

# Don't forget to add the obstacle information as parameters!
if num_obstacles > 0: 
    params = np.vstack([x_init, params_obst])
else: 
    params = x_init 
problem.param = params

NameError: name 'construct_ocp_constrained' is not defined

In [None]:
# Instantiate a solver 
solver = get_ALM_solver()
# Define the MPC policy.
pol = WarmStartedPolicy(solver, problem) 

def policy(x,t): 
    u = pol(x,t)
    return u 

# policy = ipopt_policy


In [None]:
n_steps = 150

x_simplified, u_simplified = simulate(x0=x_init, dynamics=dynamics_fw_euler, policy=policy, time_steps=n_steps)
x_true, u_true = simulate(x0=x_init, dynamics=true_system, policy=policy, time_steps=n_steps)

In [None]:
u_true = u_true.reshape((2,-1), order="F")
plt.figure() 
plot_input_sequence(u_true)

plt.figure()

draw_scene() 

plt.plot(x_simplified[0,:], x_simplified[1,:], label="MPC trajectory with model dynamics") 
plt.plot(x_true[0,:], x_true[1,:], label="MPC trajectory with true dynamics")

plt.legend(bbox_to_anchor=(1,1), loc="upper left")
plt.xlabel("$p_x$")
plt.ylabel("$p_y$");

In [None]:
from utilities import Animation 
from IPython.display import HTML

animator = Animation(x_true, u_true, *vehicle_dims)
for o_p, o_r in zip(obstacle_pos, obstacle_rad):
    c1 = plt.Circle(o_p, radius=o_r, ec="k", fc="red", alpha=0.5)
    c2 = plt.Circle(o_p, radius=o_r+1/2*np.sqrt(VEHICLE_LENGTH**2+VEHICLE_WIDTH**2), ec="k", fc=(0,0,0,0), linestyle="--")
    animator.ax.add_artist(c1)
    animator.ax.add_artist(c2)

HTML(animator.build(ts).to_jshtml())