In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy as sp

In [None]:
from datafold.appfold import EDMD, EDMDControl
from datafold.dynfold import DMDControl, DMDFull
from datafold.dynfold.transform import TSCIdentity, TSCRadialBasis
from datafold.pcfold import (
    InitialCondition,
    InverseMultiquadricKernel,
    PCManifold,
    TSCDataFrame,
)

## Inverted pendulum Physics

In [None]:
from datafold.utils.kmpc import InvertedPendulum

## Data generation

In [None]:
# Data generation parameters
sim_time_step = 0.01  # s
sim_num_steps = 1000  # -
training_size = 20  # -

Different options exist about stacking the trajectories. We start with stacking them into one very long trajectory. (Also to consider reusing the final states, currently resetting to 0).

In [None]:
invertedPendulum = InvertedPendulum()

X = np.zeros((4, training_size * sim_num_steps))
Y = np.zeros((4, training_size * sim_num_steps))
U = np.zeros((1, training_size * sim_num_steps))
Xlist, Ulist = [], []
xycols = ["x", "xdot", "theta", "thetadot"]
np.random.seed(42)
for i in range(training_size):
    control_amplitude = 0.1 + 0.9 * np.random.random()
    control_frequency = np.pi + 2 * np.pi * np.random.random()
    control_phase = 2 * np.pi * np.random.random()
    control_func = lambda t, y: control_amplitude * np.sin(
        control_frequency * t + control_phase
    )
    invertedPendulum.reset()
    traj = invertedPendulum.trajectory(sim_time_step, sim_num_steps, control_func)
    assert (
        invertedPendulum.sol.success
    ), f"Divergent solution for amplitude={control_amplitude}, frequency={control_frequency}"
    t = invertedPendulum.sol.t
    dfx = pd.DataFrame(data=traj.T, index=t, columns=xycols)
    dfx["u"] = 0.0
    Xlist.append(dfx)
    control_input = control_func(t, traj)
    dfu = pd.DataFrame(data=control_input, index=t, columns=("u",))
    for col in xycols:
        dfu[col] = 0.0
    dfu = dfu[xycols + ["u"]]
    Ulist.append(dfu)

    X[:, i * sim_num_steps : (i + 1) * sim_num_steps] = traj[:, :-1]
    Y[:, i * sim_num_steps : (i + 1) * sim_num_steps] = traj[:, 1:]
    U[:, i * sim_num_steps : (i + 1) * sim_num_steps] = control_func(
        invertedPendulum.sol.t[:-1], traj[:, :-1]
    )

XU = TSCDataFrame.from_frame_list(Xlist + Ulist)

In [None]:
plt.figure(figsize=(16, 3))
plt.plot(X[0].ravel())

In [None]:
plt.figure(figsize=(16, 3))
plt.plot(U[0].ravel())

In [None]:
X_tsc = TSCDataFrame.from_frame_list(Xlist)[["x", "xdot", "theta", "thetadot"]]
X_tsc["u"] = TSCDataFrame.from_frame_list(Ulist)[["u"]]
X_tsc

## DMD only

In [None]:
dmdc = DMDControl(
    state_columns=["x", "xdot", "theta", "thetadot"], control_columns=["u"]
)

In [None]:
dmdc.fit(X_tsc)

In [None]:
dmdc = DMDControl(
    state_columns=["x", "xdot", "theta", "thetadot"], control_columns=["u"]
)
dmdc.fit(X_tsc, store_system_matrix=True)
plt.subplot(121)
plt.imshow(dmdc.sys_matrix_)
plt.colorbar()
plt.subplot(122)
plt.imshow(dmdc.control_matrix_)
plt.colorbar()

In [None]:
prediction = dmdc.predict(
    np.atleast_2d(np.array([0, 0, np.pi, 0])),
    control_input=np.atleast_2d(control_input).T,
    time_values=t,
)

In [None]:
plt.plot(prediction["x"].values)
plt.plot(dfx["x"].values)

## EDMD Predictor

In [None]:
state_cols = ["x", "xdot", "theta", "thetadot"]
control_cols = ["u"]
ic = InitialCondition.from_array(
    np.array([0, 0, np.pi, 0]), columns=["x", "xdot", "theta", "thetadot"]
)

### Using Identity dictionary

In [None]:
edmdid = EDMDControl(dict_steps=[("id", TSCIdentity()),], include_id_state=False).fit(
    X_tsc,
    split_by="name",
    state=state_cols,
    control=control_cols,
)
idprediction = edmdid.predict(
    ic, control_input=np.atleast_2d(control_input).T, time_values=t
)
plt.plot(idprediction["x"].values)
plt.plot(dfx["x"].values)

### Using dictinoary as per thesis
100 randomly selected RBF centers from a inverse quadratic kernel

In [None]:
num_rbfs = 100
seed = 42
eps = 10


def find_unique_center_ids(num_centers, seed=42, last_choice=None):
    # needed to deal with a bug in the TSCDataFrame code
    assert num_centers < sim_num_steps
    rng = np.random.default_rng(seed)
    center_ids = (
        last_choice
        if last_choice is not None
        else rng.choice(
            range(0, sim_num_steps * training_size), size=num_centers, replace=False
        )
    )
    uniques = np.unique(center_ids % sim_num_steps)
    if len(uniques) == num_centers:
        return center_ids
    else:
        new_choice = np.hstack(
            [
                uniques,
                rng.choice(
                    range(0, sim_num_steps * training_size),
                    size=num_centers - len(uniques),
                    replace=False,
                ),
            ]
        )
        return find_unique_center_ids(num_centers, seed + 1, new_choice)


from datafold.utils.kmpc import InverseQuadraticKernel

rbf = TSCRadialBasis(
    kernel=InverseQuadraticKernel(epsilon=eps), center_type="fit_params"
)
center_ids = sorted(find_unique_center_ids(num_rbfs, seed=seed))
centers = X_tsc.iloc[center_ids].values

In [None]:
edmdrbf = EDMDControl(dict_steps=[("rbf", rbf),], include_id_state=True).fit(
    X_tsc,
    split_by="name",
    state=state_cols,
    control=control_cols,
    rbf__centers=centers[:, :-1],
)
rbfprediction = edmdrbf.predict(
    ic, control_input=np.atleast_2d(control_input).T, time_values=t
)
plt.plot(rbfprediction["x"].values)
plt.plot(dfx["x"].values)

In [None]:
plt.plot(rbfprediction["theta"].values[:100])
plt.plot(dfx["theta"].values[:100])

In [None]:
from datafold.appfold import KoopmanMPC

In [None]:
kmpc = KoopmanMPC(
    edmdrbf,
    10,
    np.array([[1, -1], [2 * np.pi, -2 * np.pi]]),
    np.array([[5, -5]]),
    ["x", "theta"],
    cost_running=np.array([1, 10]),
)

In [None]:
kmpc.H.shape, kmpc.h.shape, kmpc.G.shape, kmpc.L.shape, kmpc.M.shape, kmpc.c.shape

In [None]:
kmpc.lifted_state_size, kmpc.horizon, kmpc.input_size, kmpc.output_size