In [None]:
%%capture
%load_ext autoreload
%autoreload 2
%matplotlib inline
%load_ext training_ml_control
%set_random_seed 12

In [None]:
%presentation_style

In [None]:
import warnings

warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter("ignore", FutureWarning)

In [None]:
%autoreload

import casadi
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import pykoopman as pk
import pysindy as ps
from do_mpc.simulator import Simulator
from sklearn.metrics import mean_squared_error
from scipy.signal import periodogram
from scipy.fft import rfft, rfftfreq

from training_ml_control.control import (
    ConstantController,
    SineController,
    SumOfSineController,
    PRBSController,
    SchroederSweepController,
    RandomController,
)
from training_ml_control.control import (
    build_mpc_controller,
)
from training_ml_control.environments import (
    create_pendulum_environment,
    simulate_environment,
)
from training_ml_control.nb_utils import show_video, display_array
from training_ml_control.plots import animate_pendulum_simulation
from training_ml_control.models import build_sindy_model

sns.set_theme()
plt.rcParams["figure.figsize"] = [12, 8]
warnings.simplefilter("ignore", ExperimentalWarning)

```{figure} ./_static/images/aai-institute-cover.png
:width: 90%
:align: center
:name: aai-institute
```

# Practice

In this section, you will be tasked with solving a control problem from start to finish.

Feel free to proceed as you wish. You could use a mathematical model or learn a model from data and then attempt to control it.

We will be using the [Pendulum](https://gymnasium.farama.org/environments/classic_control/pendulum/) environment from [gymnasium](https://gymnasium.farama.org/).

The system consists of a pendulum attached at one end to a fixed point, and the other end being free. The pendulum starts in a random position and we can apply torque to rotate the free end.

As seen below, the pendulum is represented in red and the joint is represented in black.

In [None]:
env = create_pendulum_environment()
result = simulate_environment(env)
show_video(result.frames, fps=env.metadata["render_fps"])

The environments allows the use of the following control (action):

| Index | Action                              | Unit      | Min | Max |
|-----|---------------------------------------|-----------|-----|-----|
| 0   | apply torque to the actuated joint | torque (N m) |-2   | 2   |

and the following measurements (observation):
	

| Index | Observation                  | Min               | Max               |
|-----|--------------------------------|---------------------|-------------------|
| 0   | $\cos(\theta)$                   | $-1$           | $1$          |
| 1   | $\sin(\theta)$                   | $-1$           | $1$          |
| 2   | $\dot{\theta}$             | $-8$ | $8$ |


## First Goal

The first goal is to apply torques on the actuated joint to swing the pendulum into an upright position and keep it there.

## Second Goal

The second goal is to apply torques on the actuated joint to swing the pendulum as fast as possible.

# Exercise 1

::::{exercise} Pendulum Model
:label: pendulum-model-exercise

Use one of the previously seen methods to learn a model of the system from data.

:::{hint}
If you would like to use a mathematical model of the system either for the control or just as help when learning a model, then please refer to [the following page](https://en.wikipedia.org/wiki/Pendulum_(mechanics)) for the equations.
:::
::::

:::{solution} pendulum-model-exercise
:::

In [None]:
# Your solution here

## Solution

:::{solution-start} pendulum-model-exercise
:class: dropdown

**Work in Progress**
:::

**Data**

In [None]:
env = create_pendulum_environment(max_steps=1000)

observations = dict()
actions = dict()
frames = dict()

controllers = {
    "Sinusoid": SineController(env, np.asarray([0.5 * env.max_torque]), frequency=1),
    "Schroeder Sweep": SchroederSweepController(
        env,
        n_time_steps=1000,
        n_harmonics=10,
        frequency=10,
    ),
    "PRBS": PRBSController(np.asarray([env.max_torque])),
}

for controller_name, controller in controllers.items():
    result = simulate_environment(env, controller=controller)
    observations[controller_name] = result.observations
    actions[controller_name] = result.actions
    frames[controller_name] = result.frames

In [None]:
fig, axes = plt.subplots(2, 2, sharex=True)
axes = axes.ravel()
for i, label in enumerate([r"$\cos(\theta)$", r"$\sin(\theta)$", r"$\dot{\theta}$"]):
    for j, (controller_name, obs) in enumerate(observations.items()):
        t = np.arange(len(obs[:])) * env.dt
        axes[i].plot(t, obs[:, i], label=controller_name)
        axes[i].set_xlabel("Time")
        axes[i].set_title(label)
        axes[i].legend()
fig.tight_layout()
fig.show();

In [None]:
fig, axes = plt.subplots(2, 2, sharex=True)
axes = axes.ravel()
for i, label in enumerate([r"$\cos(\theta)$", r"$\sin(\theta)$", r"$\dot{\theta}$"]):
    for j, (controller_name, obs) in enumerate(observations.items()):
        f, Pxx_den = periodogram(obs[:, i], 1 / env.dt)
        axes[i].semilogy(f, Pxx_den, label=controller_name)
        axes[i].set_xlabel("frequency [Hz]")
        axes[i].set_ylabel("Power Spectral Density [V**2/Hz]")
        axes[i].set_title(label)
        axes[i].legend()
fig.tight_layout()
plt.show();

In [None]:
training_controller_name = "PRBS"
testing_controller_name = "Sinusoid"

X_train = observations[training_controller_name][:-1].copy()
U_train = actions[training_controller_name].copy()
t_train = np.arange(0, len(X_train)) * env.dt

X_test = observations[testing_controller_name][:-1].copy()
U_test = actions[testing_controller_name].copy()
t_test = np.arange(0, len(X_test)) * env.dt

**SINDYc**

In [None]:
optimizer = ps.STLSQ(threshold=0.9)
feature_library = ps.PolynomialLibrary(degree=2)
sindy_model = ps.SINDy(optimizer=optimizer, feature_library=feature_library)
sindy_model.fit(X_train, u=U_train, t=t_train)

In [None]:
sindy_model.print()

In [None]:
print(
    "Model score: %f"
    % sindy_model.score(X_test, u=U_test, t=env.dt, metric=mean_squared_error)
)

In [None]:
X_sindy = sindy_model.simulate(X_test[0], t_test, u=U_test)
X_sindy = np.vstack([X_test[0][np.newaxis, :], X_sindy])

In [None]:
fig, axes = plt.subplots(2, X_test.shape[1] // 2 + X_test.shape[1] % 2, sharex=True)
axes = axes.ravel()
for i in range(X_test.shape[1]):
    axes[i].plot(t_test, X_test[:, i], "k", label="Measured")
    axes[i].plot(t_test, X_sindy[:, i], "r--", label="Model")
    axes[i].legend()
    axes[i].set(xlabel="t", ylabel="$x_{}$".format(i + 1))

fig.tight_layout()
fig.show();

The results seem to be good, at least for up to a certain number of steps which should be good enough for our purpose. We could also use hyper-parameter optimization to find the best model. However, we have to be careful with overfitting.

**EDMDc**

Let's also use DMD to fit a model in order to select the best model of the two methods.

In [None]:
regressor = pk.regression.EDMDc()
observables = pk.observables.Polynomial(degree=2)
dmd_model = pk.Koopman(observables=observables, regressor=regressor)
dmd_model.fit(X_train, u=U_train, dt=env.dt)

In [None]:
X_dmd = dmd_model.simulate(X_test[0], U_test, n_steps=X_test.shape[0] - 1)
X_dmd = np.vstack([X_test[0][np.newaxis, :], X_dmd])

In [None]:
print("Model score: %f" % mean_squared_error(X_test, X_dmd))

In [None]:
fig, axes = plt.subplots(2, X_test.shape[1] // 2 + X_test.shape[1] % 2, sharex=True)
axes = axes.ravel()
for i in range(X_test.shape[1]):
    axes[i].plot(t_test, X_test[:, i], "k", label="Measured")
    axes[i].plot(t_test, X_dmd[:, i], "r--", label="Model")
    axes[i].legend()
    axes[i].set(xlabel="t", ylabel="$x_{}$".format(i + 1))

fig.tight_layout()
fig.show();

The results don't seem good enough. We could also use hyper-parameter optimization to find the best model. However, we have to be careful with overfitting.

We will use the sindy model for the second exercise.

:::{solution-end}
:::

# Exercise 2

::::{exercise} Pendulum Control
:label: pendulum-control-exercise

Use the learned model and synthesize a controller to achieve the goals described above.

::::

:::{solution} pendulum-control-exercise
:::

In [None]:
# Your solution here

## Solution

:::{solution-start} pendulum-control-exercise
:class: dropdown
:::

For this exercise, we will use the SINDYc model with along with an MPC controller to achieve our control objectives.

For that we have to first convert the SINDYc model to a CasADi model in order to use it with do-mpc.

**Model**

In [None]:
mpc_model = build_sindy_model(sindy_model)

To make sure that our model is correct, we simulate the system using it

In [None]:
simulator = Simulator(mpc_model)
params_simulator = {
    "integration_tool": "idas",
    "abstol": 1e-8,
    "reltol": 1e-8,
    "t_step": env.dt,
}
simulator.set_param(**params_simulator)
simulator.setup()

In [None]:
%%capture
x0 = X_test[0]

simulator.reset_history()
simulator.x0 = x0

for u in U_test:
    simulator.make_step(u.reshape((-1, 1)))

In [None]:
animate_pendulum_simulation(simulator.data)

**Controller**

In [None]:
setpoint = np.array([1.0, 0.0, 0.0])
cost = casadi.norm_2(mpc_model.x.cat - setpoint) - 100 * mpc_model.x["x0"]

terminal_cost = cost
stage_cost = cost
print(f"Stage Cost = {stage_cost}")
print(f"Terminal Cost = {terminal_cost}")

In [None]:
u_limits = {"u0": np.array([-2, 2])}
u_penalty = {"u0": 0.00}
x_limits = {"x0": np.array([-1, 1]), "x1": np.array([-1, 1]), "x2": np.array([-8, 8])}

In [None]:
mpc_controller = build_mpc_controller(
    model=mpc_model,
    t_step=env.dt,
    n_horizon=50,
    stage_cost=stage_cost,
    terminal_cost=terminal_cost,
    x_limits=x_limits,
    u_penalty=u_penalty,
    u_limits=u_limits,
)

**Simulation**

In [None]:
%%capture

mpc_controller.reset_history()
simulator.reset_history()

x = np.zeros((3, 1))
# random angle
theta0 = np.random.uniform(low=-np.pi, high=np.pi)
# cosine and sine
x[0] = np.cos(theta0)
x[1] = np.sin(theta0)
# angular velocity
x[2] = np.random.uniform(low=-8, high=8)

simulator.x0 = x
mpc_controller.x0 = x
mpc_controller.set_initial_guess()

for k in range(100):
    u = mpc_controller.make_step(x)
    x = simulator.make_step(u)

In [None]:
animate_pendulum_simulation(mpc_controller.data)

**Environment**

In [None]:
class MPCController:
    def __init__(self, mpc: MPC) -> None:
        self.mpc = mpc
        self.mpc.reset_history()
        x0 = np.zeros((3, 1))
        # random angle
        theta0 = np.random.uniform(low=-np.pi, high=np.pi)
        # cosine and sine
        x0[0] = np.cos(theta0)
        x0[1] = np.sin(theta0)
        # angular velocity
        x0[2] = np.random.uniform(low=-8, high=8)
        self.mpc.x0 = x0
        self.mpc.set_initial_guess()

    def act(self, observation: NDArray) -> NDArray:
        return self.mpc.make_step(observation.reshape(-1, 1)).ravel()

In [None]:
%%capture
controller = MPCController(mpc_controller)
results = simulate_environment(env, max_steps=100, controller=controller)

In [None]:
show_video(results.frames, fps=env.metadata["render_fps"])

In [None]:
animate_pendulum_simulation(mpc_controller.data)

:::{solution-end}
:::