In [None]:
# Execute this cell and reassure you're using python 3.9. Swich to 3.9 otherwise using "Command palette" in the right bottom corner of Colab.
!python --version

# EXECUTE THE CELL BELOW ONLY IF YOU'RE WORKING IN GOOGLE COLAB! 

In [None]:
# Execute the cell only ONCE

!git clone https://gitflic.ru/project/aidynamicaction/classedu2023-advctrl.git
%cd /content/classedu2023-advctrl/assignments/asgn-4

<h1 style="color:#333333; text-align:center; line-height: 0;"> <img style="right;" src="logo.png" width=18% height=18%> Advanced Control Methods | Assignment 4 
</h1>
<br/><br/>

**First, familiarize yourself with [Rcognita](https://github.com/AIDynamicAction/rcognita) if you have not already done so. This assignment is based on this framework, so it's better to have an intuition about what's going on behind the scenes.**

The goal of this assignment is to implement a classic **MPC** controller, described in the section 1.2, namely, `_actor_cost` and `_actor_optimizer` methods.

___Total points:___ 100

<h2 style="color:#A7BD3F;"> Section 1: Introduction to model predictive control (MPC) </h2>

***
###  <font color="blue"> 1.1 Intuition behind MPC </font>
**MPC** is one of the most popular methods of obtaining the optimal controller and, in fact, is an industry standard.
Let us start from the most reasonable question. Why should we care about **MPC**? Why do we even need that thing? Are there any problems that naturally lead to the development of such a model?

In fact, yes. There are infinitely many applications of **MPC**, including petroleum extraction, agricultural facilities, automatic assembly and so on. But let us take a look at the development of such a type of machines that are impressive not only to an expert in the field - humanoid robots. If you've ever seen a video of Boston Dynamics' humanoid robots, you've probably wondered, how do the engineers made it work? How does such a complex machine make those precise movements so that it is capable of performing acrobatics that already exceeds the abilities of a typical human in some aspects (i.e. backflip)?

The answer is, you guessed it, **MPC**. 

The performance of the modern robots comes with its cost. The problem of generating control for any robot requires:

- real time performance, as there are in general almost no stable states in the robot's movement, and all the calculations should be performed quickly
- optimizing a complex composite cost to a certain time horizon in order to follow the high-level plan
- (in some cases) discrete-continuous optimization, which is difficult
- taking into account various types of constraints, i.e.
    1. torque and angle limits for the servomotors (in a form of inequality)
    2. functional constraints following from the problem statement, that do not always allow for an analytical solution 
    3. also there could be constraints on foot placement, body placement, slippery surfaces, etc.
    4. constraints of the limbs non-intersection (if this is a case)

All in all, such a problem could lack a closed form solution, like $\boldsymbol u = \boldsymbol f(\text{state}, \text{target})$.

One of the most fundamental important ways to obtain an optimal controller is __MPC__. 

* __MPC-like algorithms__ are ones of the few that can handle very complex constraints, including functional, nonlinear, nonconvex, etc.

Informally speaking, algorithms that generate control for a complex dynamical system typically have to be predictive. And taking the system model into account helps along the way. In order to optimize an objective along the trajectory, we could try to estimate the system behaviour in the future. This is litearlly what **MPC** does. With that in hand, let us state the problem and the **MPC** more formally.

###  <font color="blue"> 1.2 MPC mathematical description </font>
<a id='2.2'></a>

First of all, we consider a controlled physical system described by the system of ordinary differential equations 
<a id='System'></a>
$$
\begin{equation}
\begin{cases}
\dot{\boldsymbol x} = \boldsymbol f(\boldsymbol x, \boldsymbol u)\\
\boldsymbol y = h(\boldsymbol x) \\
\boldsymbol x(0)=\boldsymbol x_{0}\\
\end{cases}
\end{equation}$$ 
where $x_{0}$ is the **initial state**.In the following table we introduce some basic notation. From now on, we will write vectors in **bold**.
<a id='Notation'></a>

| Notation &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;| Description |
|:-----------------------:|-------------|
| $\boldsymbol f(\cdot, \cdot) : \mathbb{R}^{n} \times \mathbb{R}^{m} \rightarrow \mathbb{R}^{n}$ |A **state dynamic function** or, more informally, **righ-hand-side** of a system <br /> of ordinary differential equations $\dot{\boldsymbol x} = \boldsymbol f(\boldsymbol x, \boldsymbol u)$|
| $\boldsymbol x \in \mathbb{R}^{n} $ | An element of the **state space** of a controlled system of dimensionality $n$ |
| $\boldsymbol u \in \mathbb{R}^{m}$ | An element of the **action space** of a controlled system of dimensionality $m$ |
| $\boldsymbol y \in \mathbb{R}^{k}$ | An **observartion**|
| $\mathbb{X}\subset \mathbb{R}^{n} $| **State constraint set**|
| $\mathbb{U}\subset \mathbb{R}^{m} $| **Action constraint set**|
| $\boldsymbol h(\cdot): \mathbb{R}^{n} \rightarrow \mathbb{R}^{k}$ | **Observation function**  |
| $\kappa(\cdot) : \mathbb{R}^{n} \rightarrow \mathbb{R}^{n}$ | **Policy** function |
| $\rho(\cdot) : \mathbb{R}^n \rightarrow \mathbb{R}$ | **Stage cost** function  |
| $J_N(\cdot, \cdot): \mathbb{R}^n \times \mathcal{K} \rightarrow \mathbb{R}$| **N-step cost** (might be refered as **actor cost** either), <br /> where $\mathcal{K}$ is some functional space of admissible policies |
| $\delta$ | **Sampling time** |
| $\boldsymbol x_k$   | A **state** at time $k\delta$: $\qquad\boldsymbol x (k \delta)$   |

Here we would like to make a few clarifications on the notation introduced.
* For **MPC** N-step cost has the following form  $J_N(\boldsymbol x_{k}, \kappa(\cdot)):=\int_{k\delta}^{(k+N)\delta}\rho(\boldsymbol x(t), \kappa(\boldsymbol x(t))) dt$
* In the following problems we consider $\mathbb{X} = \mathbb{R}^{n}$ without any remarks but in general we might want to introduce some reasonable **state constraint set**.
* When working with the system, we cannot know the exact value of the characteristics we are interested in. What we do is **make a measurement** calculatung **observation function** $\boldsymbol h(\cdot)$. The result we call an **observation** $\boldsymbol y$. For now we consider $\boldsymbol y = \boldsymbol h(\boldsymbol x) := \boldsymbol x$ further but in general it might not be so! For example, if we control a body motion on a plane $(x,y)$, we could measure a distance to the origin: $\boldsymbol h(\boldsymbol x) = \lVert \boldsymbol x \rVert$. 
* Most modern controllers are digital, therefore the control signal is generated by sampling with some **sampling time** $\delta$. In that case we call it **digital control setting**. The following table gives a comparison between mathematical description of the original setting of the controlled system and its **digital control setting**.
<a id='Comparison_table'></a>

|Original setting|Digital control setting|
|:-----------------------|:--------------------------|
|1) $ \qquad \dot{\boldsymbol x}=\boldsymbol f\left(\boldsymbol x, \boldsymbol u\right) $ | $\text{1*)} \quad \dot{\boldsymbol x}=\boldsymbol f\left(\boldsymbol x, \boldsymbol u^{\delta}\right)$|
| $\text{2)} \quad \boldsymbol u=\boldsymbol u(t), t \in[0, T]$  | $\text{2*)} \quad \boldsymbol u_k(t)=\boldsymbol u(k\delta) , t \in[k \delta,(k+1) \delta]$  | 
| $$\text{3)} \quad \boldsymbol x(t):=\boldsymbol x_{t_0}+\int_{t_0}^{t}f\left(\boldsymbol x(\tau), \boldsymbol u(\tau) \right) d \tau$$| $$\text{3*)} \quad \boldsymbol x^{\boldsymbol u_{k}}(t):=\boldsymbol x_{k}+\int_{k \delta}^{t}f\left(\boldsymbol x(\tau), \boldsymbol u_{k}\right) d \tau$$|     

<!--- $$\boldsymbol x_{i \mid k}:=\boldsymbol x((k+i-1) \delta), k \in \mathbb{N}$$
$\boldsymbol u^{\delta}(t) \equiv \boldsymbol u_{i \mid k}=\kappa\left(\boldsymbol x_{i \mid k}\right), t \in[k \delta,(k+i) \delta]$ 
One can see that for any $k \in \mathbb{N}$, the state $x^{u_{k}}(t)$ at $t \geq k \delta$ under $u_{k}$ satisfies
-->
<p style="text-align: center;">

</p>

From now and on by the system we mean a system in the **digital control setting**, which is illustrated on the figure below .
<img src="digital_control_setting.svg" width=40% height=40% />

**Objective:**

In general, our optimal control problem is written as $\min_{\kappa(\cdot)}\int_{k\delta}^{(k+N)\delta}\rho(\boldsymbol x(t), \kappa(\boldsymbol x(t))) dt$, where $N$ is **prediction horizon**. But in **MPC** we switch to a discrete sum instead of an integral: $$\min _{\boldsymbol u_{k + i}: i=0, \ldots, N-1} \left(\sum_{i=1}^{N-1} \rho \left(\hat{\boldsymbol x}_{k + i}, \boldsymbol u_{k + i}\right)\delta \right) $$ Wich is obviously equivalent to the following form:
$$\min _{ \boldsymbol u_{k + i}: i=0, \ldots, N-1} \left(\sum_{i=1}^{N-1} \rho \left(\hat{\boldsymbol x}_{k + i}, \boldsymbol u_{k + i}\right)\right)$$  
And now we see that it fits conviniently in our **digital control setting**: we will just find a minimizing sequence of actions for our digital model predictive control for stage costs predicted for N steps forward.  Notice that under digital control our state evolves according to [3*)](#Comparison_table) . Now we need to somehow numerically evaluate the interal $\int_{k \delta}^{t}f\left(\boldsymbol x(\tau), \boldsymbol u_{k}\right) d \tau$. In order to do this one might use any numerical integration scheme. For example the **Euler scheme** :
<a id='Euler'></a>
$$
\boldsymbol x_{k+1}=\boldsymbol x_{k}+\delta \boldsymbol f\left(\boldsymbol x_{k}, \boldsymbol u_{k}\right) \text {, }
$$
#### Algorithm <sup>[1]</sup>:
<a id='Objective'></a>
Let us describe the **MPC** algorithm for the problem just described above.  
At the current state $\boldsymbol x_{k}$ :

(a) **MPC** solves an $N$-step lookahead  problem: $\min _{\boldsymbol u_{k + i}: i=0, \ldots, N-1} \left(\sum_{i=0}^{N-1} \rho \left(\hat{\boldsymbol x}_{k + i}, \boldsymbol u_{k + i}\right)\right)$

(b) If $\left\{\boldsymbol u^{*}_{k}, \ldots, \boldsymbol u^{*}_{k+N-1}\right\}$ is the optimal control sequence of this problem, **MPC** applies $\boldsymbol u^{*}_{k}$ and discards the other controls $\boldsymbol u^{*}_{k+1}, \ldots, \boldsymbol u^{*}_{k+N-1}$. 

(c) At the next stage, **MPC** repeats this process, once the next state $\boldsymbol x_{k}$ is revealed.


<img src="MPC.svg" width=35% height=35% />

***

<h2 style="color:#A7BD3F;"> Section 2: Problems </h2>

In the following, for testing, we will use a system representing a three-wheeled robot described by a system of equations
$$
\begin{cases}
\dot{x}_c=v \cos \alpha \\
\dot{y}_c=v \sin \alpha \\
\dot{\alpha}=\omega
\end{cases}
$$ where $x_c$ and $y_c$ are coordinates of the center of mass, $v$ and $\omega$ are velocity of the center of mass and angular velocity respectively and these are components of the control $\boldsymbol u := (v, \omega)$ as well.  

***

In [None]:
%pip install -e src

In [None]:
%load_ext autoreload
%autoreload 2

from rcognita.__utilities import rc, Clock
import rcognita.__utilities as utilities
from rcognita.systems import System, Sys3WRobotNI
from rcognita.controllers import Controller
from rcognita.testing_pipeline import create_testing_pipeline
from rcognita.visualization import dashboards
from rcognita.models import ModelQuadForm
import rcognita
import numpy as np
from matplotlib import rc as rc_params
from scipy.optimize import minimize, Bounds
rc_params("animation", html="jshtml")


In [None]:
def apply_action_bounds(method):
    def wrapper(self, *args, **kwargs):
        self.action = method(self, *args, **kwargs)
        if hasattr(self, "action_bounds") and self.action_bounds != []:
            action = np.clip(
                self.action, self.action_bounds[:, 0], self.action_bounds[:, 1]
            )
            self.action = action
        return self.action

    return wrapper

### <font color="blue"> 2.1 Problem 1 | Euler predictor </font>
Euler predictor uses a simple Euler discretization scheme.

It does predictions by increments scaled by a sampling time times the velocity evaluated at each successive node.

You already have an experience in implementing such predictor. Now you're intended to wrap this into a corresponding `EulerPredictor` class.

* `predict` here is a method that performs only one step: $x_{k+1} = \delta f(x_k, u_k)$ given some state $x_k$ and action $u_k$
* `predict_sequence` performs computations of `predict` method successively and returns `observation_sequence_predicted` of `len = self.prediction_horizon`  

In [None]:
class EulerPredictor:

    def __init__(
        self,
        pred_step_size: float,
        system: System,
        dim_input: int,
        prediction_horizon: int,
    ):
        self.system = system
        self.pred_step_size = pred_step_size
        self.compute_state_dynamics = system.compute_dynamics
        self.sys_out = system.out
        self.dim_input = dim_input
        self.prediction_horizon = prediction_horizon

    def predict(self, current_state, action):
    
        ####################
        #YOUR CODE GOES HERE
        ####################
        next_state = 
        ####################
        #YOUR CODE ENDS HERE
        ####################
        return next_state

    def predict_sequence(self, state, action_sequence):
        ####################
        #YOUR CODE GOES HERE
        ####################

        ####################
        #YOUR CODE ENDS HERE
        ####################
        
        return observation_sequence_predicted

### <font color="blue"> 2.2 Problem 2 | MPC-based Controller implementation </font>
Implement MPC-based controller using predictor from the previous task.

1. Implement `mpc_objective` that gets an action sequence as decision variable for the main optimization problem $$\min _{ \boldsymbol u_{k + i}: i=0, \ldots, N-1} \left(\sum_{i=1}^{N-1} \rho \left(\hat{\boldsymbol x}_{k + i}, \boldsymbol u_{k + i}\right)\right)$$ 
2. Implement the mpc algorithm itself in `compute_action` method using scipy optimizer.

In [None]:
class ControllerMPC(Controller):
    def __init__(self, predictor, *args, action_bounds=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.predictor = predictor
        self.dim_input = self.predictor.system.dim_input
        self.dim_output = self.predictor.system.dim_output
        self.action_bounds = action_bounds

    def compute_action_sampled(self, time, state, observation, observation_target=[]):
        """
        Compute sampled action.

        """

        is_time_for_new_sample = self.clock.check_time(time)

        if is_time_for_new_sample:  # New sample
            action = self.compute_action(state, observation)
            self.times.append(time)
            self.action_old = action


            return action

        else:
            return self.action_old


    def mpc_objective(
        self,
        action_sequence, 
        observation, ## Here state and observation are equivalent
    ):
        ####################
        #YOUR CODE GOES HERE
        ####################

        ####################
        #YOUR CODE ENDS HERE
        ####################

    @apply_action_bounds
    def compute_action(
        self, state, observation, time=0 ## Here state and observation are equivalent
    ):

        ####################
        #YOUR CODE GOES HERE
        ####################

        ####################
        #YOUR CODE ENDS HERE
        ####################

### <font color="blue"> Testing of the implemented controller </font>

In [None]:
state_init = np.array([5, 5, 3 * np.pi / 4])
action_init = np.ones(2)
prediction_horizon = 5
sampling_time=0.01

system = Sys3WRobotNI(
    dim_input=2,
    dim_output=3,
    sys_type="diff_eqn",
    dim_state=3,
    dim_disturb=3,
    pars=[]
)
predictor = EulerPredictor(
    pred_step_size=sampling_time,
    system=system,
    dim_input=system.dim_input,
    prediction_horizon=prediction_horizon,
)
controller = ControllerMPC(
 action_bounds=np.array([[-25, 25],[-5, 5]]), time_start=0, sampling_time=sampling_time
)

dashboard = dashboards.RobotTrackingDasboard

scenario, animator = create_testing_pipeline(
    system, 
    controller, 
    state_init=state_init, 
    action_init=action_init, 
    model_of_running_objective = ModelQuadForm(weights=np.diag([5, 5, 1, 0, 0])),
    observation_namings=["x", "y", "angle"],
    action_namings=["lin. speed", "ang. speed"],
    system_visualization_dashboard=dashboard,
    system_visualization_dashboard_params={
        "time_start": 0, 
        "x_max": 10, 
        "x_min": -10, 
        "y_max": 10, 
        "y_min": -10
        },
    time_final=15,
    )

### Here you can debug your code. When the code is changed, rerun previous cells to update the scenario cache.

In [None]:
scenario.run() # <--- takes some time. (In our case about 30 seconds)
trajectory = []
for i , (k, v) in enumerate(scenario.cache.items()):
    trajectory.append(v[3])

trajectory[-30:]

### Run the animation to finalize your results

In [None]:
%%capture
animation = animator.get_animation()

In [None]:
animation

## Sources

 ***
 **<sup>[1]</sup> Bertsekas, D. , Reinforcement Learning and Optimal Control**

