In [1]:
import os
import time
from pathlib import Path

import numpy as np
import tomli
from generate_data import sample_init_state

from models import get_model_class
from unicycle import SimulationConfig, InferenceConfig, Unicycle, simulate_system

base_path = Path(os.getcwd()).parent

Using backend: pytorch
Other supported backends: tensorflow.compat.v1, tensorflow, jax, paddle.
paddle supports more examples now and is recommended.


Params

In [2]:
config = 'configs/config.toml'
abs_config_path = (base_path / config).resolve()
with open(abs_config_path, 'rb') as f:
    data = tomli.load(f)
    simulation_cfg = SimulationConfig(**data['simulation'])

Evaluation of computation time for single prediction

In [3]:
model_names = ['Numerical', 'FNO', 'DeepONet']
dx_steps = [0.01, 0.005, 0.001, 0.0005]

perf_table = np.zeros((len(model_names), len(dx_steps)))

predictor_trials = 1000
for i, model_name in enumerate(model_names):
    
    for j, dx in enumerate(dx_steps):

        # Update dx
        simulation_cfg.dx = dx

        # Generate random initial conditions
        init_state = np.ones(3, dtype=np.float32)
        ml_unicycle = Unicycle(init_state, simulation_cfg)
        if model_name != 'Numerical':

            # Create dummy P1
            ml_unicycle.P1 = get_model_class(model_name)(
                n_states=3,
                m_inputs=2,
                num_points=simulation_cfg.NX,
                dt=simulation_cfg.dt,
                dx=simulation_cfg.dx,
                delays=simulation_cfg.delays,
            ).to('cuda')
            ml_unicycle.P1.eval()

            # Create dummy P2
            ml_unicycle.P2 = get_model_class(model_name)(
                n_states=3,
                m_inputs=2,
                num_points=simulation_cfg.NX,
                dt=simulation_cfg.dt,
                dx=simulation_cfg.dx,
                delays=simulation_cfg.delays,
            ).to('cuda')
            ml_unicycle.P2.eval()

            ml_unicycle.ml_predictors = [ml_unicycle.P1, ml_unicycle.P2]
            ml_unicycle.use_only_ml_predictor()

        # Evaluate predictor
        start_time = time.time()
        for k in range(predictor_trials):
            if ml_unicycle.predict_exact:
                ml_unicycle.predict()
            if ml_unicycle.predict_ml and ml_unicycle.ml_predictors is not None:
                ml_unicycle.ml_predict()
        end_time = time.time()
        perf_table[i, j] = (end_time - start_time) / predictor_trials * 1000


# Print perf table
print("RAW CALCULATION TIMES (ms)")

header = f"{'Step size':>15}" + "".join([f"{ts:>12.4f}" for ts in dx_steps])
print(header)
print("-" * len(header))

# Print each row
for i, name in enumerate(model_names):
    print(f"{name:>15}" + "".join([f"{perf:12.4f}" for perf in perf_table[i]]))

RAW CALCULATION TIMES (ms)
      Step size      0.0100      0.0050      0.0010      0.0005
---------------------------------------------------------------
      Numerical      8.9615     18.2229    106.1784    229.0006
            FNO     29.4117     30.8366     32.2608     33.0773
       DeepONet      8.0931     11.6781     11.2469     23.6340


Evaluation of computation time for trajectory

In [4]:
model_names = ['Numerical', 'FNO', 'DeepONet']
l2_norm_table = np.zeros(len(model_names))
model_params = dict()

num_trajectories = 5
for i, model_name in enumerate(model_names):
    
    # Construct model config
    inference_cfg = None
    if model_name != 'Numerical':
        D1, D2 = simulation_cfg.delays
        dx_str = f'{simulation_cfg.dx}'.split('.')[1]
        P1 = (base_path / f'models/Unicycle_delays_{int(D1 * 100)}_{int(D2 * 100)}_dx_{dx_str}_{model_name}_P1.pth').resolve()
        P2 = (base_path / f'models/Unicycle_delays_{int(D1 * 100)}_{int(D2 * 100)}_dx_{dx_str}_{model_name}_P2.pth').resolve()
        inference_cfg = InferenceConfig(model_name, P1, P2)

    # Evaluate model performance
    total_time = 0
    total_trajectory_error = 0
    for _ in range(num_trajectories):

        # Simulation
        init_state = sample_init_state()
        ml_unicycle = Unicycle(init_state, simulation_cfg, inference_cfg)
        if inference_cfg is not None:
            ml_unicycle.use_only_ml_predictor()
            model_params[model_name] = ml_unicycle.get_model_params()

        # Computation time
        states, _, _, _, _ = simulate_system(ml_unicycle)

        # L2 trajectory error
        squared_magnitudes = np.sum(states**2, axis=1)
        total_trajectory_error += np.sqrt(np.trapezoid(squared_magnitudes, dx=simulation_cfg.dx))

    # Average
    l2_norm_table[i] = total_trajectory_error / num_trajectories


# Model parameters
print("PARAMETERS")
for name, data in model_params.items():
    print(f"{name:>15}" + "".join([f"{number:>12d}" for number in data]))


# Print L2 trajectory error
print('\nAVERAGE L2 TRAJECTORY ERROR')

for i, name in enumerate(model_names):
    print(f"{name:>15}" + f"{l2_norm_table[i]:12.4f}")

PARAMETERS
            FNO       42211       42211
       DeepONet     1124824     1124824

AVERAGE L2 TRAJECTORY ERROR
      Numerical      5.1311
            FNO      5.1436
       DeepONet      4.8737
