# Benchmarks

In [1]:
# the notebook imports
import matplotlib.pyplot as plt
import numpy as np
# this is the convenience function
from autokoopman import auto_koopman
# for a complete example, let's create an example dataset using an included benchmark system
from autokoopman.benchmark import bio2, fhn, lalo20, prde20, robe21, spring, pendulum, trn_constants
from glop import Glop
import random
import copy

from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
import statistics
import os
import csv
import time
import sys

In [2]:
benches = [bio2.Bio2(), fhn.FitzHughNagumo(), lalo20.LaubLoomis(), pendulum.PendulumWithInput(beta=0.05), prde20.ProdDestr(), robe21.RobBench(), spring.Spring(), trn_constants.TRNConstants()]

In [3]:
def get_training_data(bench, param_dict):
    init_states = get_init_states(bench, param_dict["train_size"])
    if bench._input_vars:
        steps = []
        for low, high in zip(bench.input_set_low, bench.input_set_high):
            if bench.input_type == "step":
                params = np.random.uniform(low, high, size=(param_dict["train_size"], 3))
                steps += [make_input_step(*p, bench.teval) for p in params]
            elif bench.input_type == "rand":
                steps += [make_random_input(low, high, bench.teval) for i in range(param_dict["train_size"])]
            else:
                sys.exit("Please set an input type for your benchmark") 
        #print(steps)
        training_data = bench.solve_ivps(initial_states=init_states, inputs=steps, teval=bench.teval)
    else:
        training_data = bench.solve_ivps(initial_states=init_states,tspan=[0.0, 10.0], 
                                         sampling_period=param_dict["samp_period"])
        
    return training_data

In [4]:
def get_init_states(bench, size, init_seed=0):
    if hasattr(bench, 'init_constrs'):
        init_states = []
        for i in range(size):
            init_state_dict = glop_init_states(bench, i+init_seed)
            init_state = []
            for name in bench.names:
                init_state.append(init_state_dict[name])
            init_states.append(init_state)
        init_states = np.array(init_states)  
    else:
        init_states = np.random.uniform(low=bench.init_set_low, 
                    high=bench.init_set_high, size=(size, len(bench.names)))
        
    return init_states

In [5]:
def glop_init_states(bench, seed):    
    constrs = []
    for constr in bench.init_constrs:
        constrs.append(constr)
    for i, (name, init_low, init_high) in enumerate(zip(bench.names, bench.init_set_low, bench.init_set_high)):
        low_constr = f"{name} >= {init_low}"
        high_constr = f"{name} <= {init_high}"
        constrs.extend([low_constr, high_constr])
        
    glop = Glop(bench.names, constrs)
    pop_item = random.randrange(len(bench.names))
    names, init_set_low, init_set_high = copy.deepcopy(bench.names), copy.deepcopy(bench.init_set_low), copy.deepcopy(bench.init_set_high)
    names.pop(pop_item)
    init_set_low.pop(pop_item)
    init_set_high.pop(pop_item)
    for i, (name, init_low, init_high) in enumerate(zip(names, init_set_low, init_set_high)):
        glop.add_tend_value_obj_fn(name, [init_low, init_high], seed+i)
    
    glop.minimize()

    sol_dict = glop.get_all_sols()    
    return sol_dict

In [6]:
def get_trajectories(bench, iv, samp_period):
    # get the model from the experiment results
    model = experiment_results['tuned_model']

    if bench._input_vars:
        test_inp = np.sin(np.linspace(0, 10, 200))

        # simulate using the learned model
        trajectory = model.solve_ivp(
            initial_state=iv,
            inputs=test_inp,
            teval=bench.teval,
        )
        # simulate the ground truth for comparison
        true_trajectory = bench.solve_ivp(
            initial_state=iv,
            inputs=test_inp,
            teval=bench.teval,
        )
        
    else:
        # simulate using the learned model
        trajectory = model.solve_ivp(
            initial_state=iv,
            tspan=(0.0, 10.0),
            sampling_period=samp_period
        )
        # simulate the ground truth for comparison
        true_trajectory = bench.solve_ivp(
            initial_state=iv,
            tspan=(0.0, 10.0),
            sampling_period=samp_period
        )
    
    return trajectory, true_trajectory

In [7]:
def test_trajectories(bench, num_tests, samp_period):
    mses = []
    perc_errors = []
    for j in range(num_tests):
        iv = get_init_states(bench, 1, j+10000)[0]
        trajectory, true_trajectory = get_trajectories(bench, iv, samp_period)
        mse = mean_squared_error(trajectory.states.T, true_trajectory.states.T)
        mses.append(mse)
        perc_error = mean_absolute_percentage_error(trajectory.states.T, true_trajectory.states.T)
        perc_errors.append(perc_error)
            
    return statistics.mean(mses), statistics.mean(perc_errors)

In [8]:
def make_input_step(duty, on_amplitude, off_amplitude, teval):
    """produce a step response input signal for the pendulum"""
    length = len(teval)
    inp = np.zeros((length,))
    phase_idx = int(length * duty)
    inp[:phase_idx] = on_amplitude
    inp[phase_idx:] = off_amplitude
    return inp

In [9]:
def make_random_input(low, high, teval):
    length = len(teval)
    inp = np.zeros((length,))
    for i in range(len(inp)):
        inp[i] = np.random.uniform(low, high)
    return inp

In [10]:
def store_data(row, filename='benches'):
    with open('data/benches', 'a') as f:
        writer = csv.writer(f)
        writer.writerow(row)

In [11]:
def store_data_heads(row, filename='benches'):
    if not os.path.exists('data'):
        os.makedirs('data')
    
    with open(f'data/{filename}', 'w') as f:
        writer = csv.writer(f)
        writer.writerow(row)

In [12]:
def plot(trajectory, true_trajectory, var_1, var_2):
    plt.figure(figsize=(10, 6))
    # plot the results
    if var_2==-1: #plot against time
        plt.plot(trajectory.states[:, var_1], label='Trajectory Prediction')
        plt.plot(true_trajectory.states[:, var_1], label='Ground truth')
    else:
        plt.plot(trajectory.states.T[var_1], trajectory.states.T[var_2],label='Trajectory Prediction')
        plt.plot(true_trajectory.states.T[var_1], true_trajectory.states.T[var_2],label='Ground Truth')

    plt.xlabel("$x_1$")
    plt.ylabel("$x_2$")
    plt.grid()
    plt.legend()
    plt.title("Bio2 Test Trajectory Plot")
    plt.show()

In [13]:
def plot_trajectory(bench, var_1=0, var_2=-1, seed=100):
    iv = get_init_states(bench, 1, seed)[0]
    trajectory, true_trajectory = get_trajectories(bench, iv, param_dict["samp_period"])
    plot(trajectory, true_trajectory, var_1, var_2)

In [18]:
obs_types = ['id', 'poly', 'rff', 'deep']
store_data_heads(["perc_error", "time(s)",""]*4)
for i in range(1):
    store_data([f"Iteration {+1}"])
    for bench in benches:
        result = [bench.name, ""]
        for obs in obs_types:
            np.random.seed(0)
            param_dict = {"train_size":50,"samp_period":0.1,"obs_type":obs,"opt":"grid","n_obs":200,
                          "grid_param_slices":5,"n_splits":5,"rank":(1, 200, 40)}
            # generate training data
            training_data = get_training_data(bench, param_dict)
            start = time.time()
            # learn model from data
            experiment_results = auto_koopman(
                training_data,          # list of trajectories
                sampling_period=param_dict["samp_period"],    # sampling period of trajectory snapshots
                obs_type=param_dict["obs_type"],         # use Random Fourier Features Observables
                opt=param_dict["opt"],             # grid search to find best hyperparameters
                n_obs=param_dict["n_obs"],              # maximum number of observables to try
                max_opt_iter=200,       # maximum number of optimization iterations
                grid_param_slices=param_dict["grid_param_slices"],# for grid search, number of slices for each parameter
                n_splits=param_dict["n_splits"],             # k-folds validation for tuning, helps stabilize the scoring
                rank=param_dict["rank"]       # rank range (start, stop, step) DMD hyperparameter
            )
            end = time.time()

            mse = [-1]
            perc_error = [-1]
            try:
                mse, perc_error = test_trajectories(bench, 10, param_dict["samp_period"])
            except ValueError:
                print("can't compute for this setting")

            comp_time = round(end - start, 3)
            print("time taken: ", comp_time)
            print(f"The average percentage error is {perc_error}%" )

            result.append(perc_error)
            result.append(comp_time)
            result.append("")

        store_data(result)

Tuning GridSearchTuner: 100%|█████████████████████| 5/5 [00:00<00:00,  6.10it/s]


time taken:  0.82
The average percentage error is 0.002%


  np.multiply(
  return np.real(self._A @ obs.T).flatten()[: len(x)]
Tuning GridSearchTuner:  30%|██████              | 6/20 [00:10<00:24,  1.73s/it]


Error: Input X contains NaN.
PolynomialFeatures does not accept missing values encoded as NaN natively. For supervised learning, you might want to consider sklearn.ensemble.HistGradientBoostingClassifier and Regressor which accept missing values encoded as NaNs natively. Alternatively, it is possible to preprocess the data, for instance by using an imputer transformer in a pipeline or drop samples with missing values. See https://scikit-learn.org/stable/modules/impute.html You can find a list of all estimators that handle NaN values at the following page: https://scikit-learn.org/stable/modules/impute.html#estimators-that-handle-nan-values
time taken:  10.379
The average percentage error is 0.002%


  s = (x.conj() * x).real
  return sqrt(add.reduce(s, axis=axis, keepdims=keepdims))
Tuning GridSearchTuner: 100%|███████████████████| 25/25 [00:53<00:00,  2.12s/it]


time taken:  53.106
The average percentage error is 0.0%


Tuning GridSearchTuner:   0%|                            | 0/16 [00:00<?, ?it/s]

DeepKoopman is using torch device 'cpu'
DeepKoopman is using torch device 'cpu'


Tuning GridSearchTuner:   0%|                            | 0/16 [00:04<?, ?it/s]


KeyboardInterrupt: 