# System Identification - FOPDT Model

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy
import skopt
from skopt.space import Real, Integer
from problems.optprob.problems import ( 
    solve_problem_with_optimizer,
    solve_problem_with_optimizer_n_repeats
)
from problems.optprob.plot_utils import (
    function_evaluations_plot, best_guesses_plot, best_guesses_plot_n_repeats
)
from problems.sys_id_fopdt import (
    make_simulate_function,
    rms_prediction_error,
    SysIdFOPDT, 
    calculate_reasonable_bounds,
    SysIdFOPDTRealDelay
)

import lpfgopt
lpfgopt.__version__

## Load Input-Output Dataset

In [None]:
data_dir = 'data'
plot_dir = 'plots'
os.makedirs(plot_dir, exist_ok=True)
os.listdir(data_dir)

In [None]:
filename = 'io_data_fopdt.csv'
input_output_data = pd.read_csv(os.path.join(data_dir, filename))
input_output_data

In [None]:
fig, axes = plt.subplots(3, 1, sharex=True, figsize=(7, 5.5))

data = input_output_data.set_index('Time')

for ax, name in zip(axes, data.columns):
    data[name].plot(ax=ax, grid=True, title=name)

plt.tight_layout()
plt.show()

## Construct Simulation Function

In [None]:
input_col = 'Input1'
t = input_output_data['Time'].to_numpy()
u_data = input_output_data[input_col].to_numpy()
y_data = input_output_data['Output'].to_numpy()

# Determine average time interval
time_step_sizes = np.diff(t)
dt = np.mean(time_step_sizes)
assert np.max(np.abs(time_step_sizes - dt)) < dt / 10
dt

In [None]:
# Make simulation function
simulate = make_simulate_function(dt, u_data)

[-11.89612273, 360.83577464, 500.66886236, 534.13787627]

# Test simulate function
K = -10.0
tau = 400.0
n_delay = 2  # must be integer 
y_base = 500.0
u_base = 5.0
y_init = 530.0

y_model = simulate(K, tau, n_delay, y_base, u_base, y_init)
rms_prediction_error(y_model[n_delay+1:], y_data[n_delay+1:])

In [None]:
plt.figure(figsize=(7, 2.5))
plt.plot(t, input_output_data["Output"].to_numpy(), label='data')
plt.plot(t, y_model, label='y_model')
plt.xlabel('Time')
plt.grid()
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
%%timeit

y_model = simulate(K, tau, n_delay, y_base, u_base, y_init)

## Construct Optimization Problem Class

In [None]:
bounds = calculate_reasonable_bounds(t, u_data, y_data)
var_names = list(bounds.keys())
bounds = list(bounds.values())
bounds

In [None]:
# TODO: Include option to specify initial guess ranges
# Low	High
# [-12	-8]
# [150	200]
# [150	250]
# [470	500]
# 5	5  # Note: span is zero!  This is because the problem is overspecified.  I.e. one variable is redundant.  (Could have chosen y_base or y_init)
# [470	490]

In [None]:
problem = SysIdFOPDT(bounds, dt, u_data, y_data)

# Test cost function evaluation
x = [K, tau, n_delay, y_base, y_init]
rms_error = problem.cost_function_to_minimize(x, u_base=u_base)
rms_error

In [None]:
# Global minimum
K, tau, n_delay, y_base, y_init = [-12.38211837, 416.07825797, 6, 500.71792073, 535.87442099]

x_global_minimum = K, tau, n_delay, y_base, y_init
print(problem.cost_function_to_minimize(x_global_minimum))

In [None]:
y_model = simulate(K, tau, n_delay, y_base, u_base, y_init)

plt.figure(figsize=(7, 2.5))
plt.plot(t, input_output_data["Output"].to_numpy(), label='data')
plt.plot(t, y_model, label='y_model')
plt.xlabel('Time')
plt.grid()
plt.legend()
plt.title('Global Best Solution')
plt.tight_layout()
plt.show()

In [None]:
sol = solve_problem_with_optimizer(
    problem, 
    lpfgopt.minimize, 
    problem.bounds,
    discrete=[2],
    points=50,
    tol=0.01,
    maxit=1000,
    seedval=0
)
sol

In [None]:
function_evaluations_plot(problem)
plt.tight_layout()
plt.show()

In [None]:
best_guesses_plot(problem)
plt.tight_layout()
plt.show()

In [None]:
n_repeats = 100
maxiter = 1000
method = "LeapFrog"
fun_evals, unique_solutions, best_guesses = solve_problem_with_optimizer_n_repeats(
    problem, 
    lpfgopt.minimize, 
    n_repeats, 
    problem.bounds,
    discrete=[2],
    points=50,
    tol=0.01,
    maxit=maxiter,
)
ax = best_guesses_plot_n_repeats(fun_evals, title=f"{problem.name} - {method} - {n_repeats} Trials")
ax.set_xlim([0, maxiter])
plt.tight_layout()
plt.savefig(os.path.join(plot_dir, f"{problem.name}_{method}_convergence_plot_{n_repeats}.png"), dpi=150)
plt.show()

In [None]:
best_guess = min(best_guesses)
print(f"f(x): {best_guess[0]}")
print(f"x: {[float(x) for x in best_guess[1]]}")

In [None]:
x0 = [-12.382173313786181, 416.0781322544548, 6, 500.7178812643235, 535.8717305574411]

f_final = problem(x0)
assert f_final == 2.336465558781636

# Fix n_delay to current value and reduce problem to 5 remaining variables
f5 = lambda x: problem([x[0], x[1], 6, x[2], x[3]])

x0 = [-12.382086606678401, 416.0675766336414, 500.71805709681706, 535.8744229434178]

x0 = [-12.382173313786181, 416.0781322544548, 500.7178812643235, 535.8717305574411]
assert f5(x0) == f_final
f5(x0)

In [None]:
# Do additional gradient descent at this point.
res = scipy.optimize.minimize(f5, x0=x0, tol=1e-6)
assert res.status == 0
res.fun, res.x

## Bayesian Optimization

In [None]:
bounds

In [None]:
# Define dimensions - these should be the same as bounds above
dimensions = [
    Real(np.float64(-85.37499999999989), np.float64(85.37499999999989), transform='normalize'),
    Real(np.float64(15.02181818181818), np.float64(4406.4), transform='normalize'),
    Integer(0, 88),  # Integers don't need/support transform parameter
    Real(np.float64(390.72511918274705), np.float64(609.2851191827467), transform='normalize'),
    Real(np.float64(474.79), np.float64(584.0699999999999), transform='normalize')
]

dimensions

In [None]:
# Run Bayesian optimization
# 250 Trials takes 15 mins
problem.reset()
res = skopt.gp_minimize(
    problem,
    dimensions,
    n_calls=250,
    noise=1e-10,
    random_state=0,
    n_initial_points=20,
    verbose=True
)
res

In [None]:
res['x'], res['fun']

In [None]:
problem.best_guess

In [None]:
function_evaluations_plot(problem)
plt.tight_layout()
plt.show()

In [None]:
best_guesses_plot(problem)
plt.tight_layout()
plt.show()

In [None]:
method = "BayesOpt"
n_repeats = 10
fun_evals, unique_solutions, best_guesses = solve_problem_with_optimizer_n_repeats(
    problem,
    skopt.gp_minimize,
    n_repeats,
    dimensions,
    noise=1e-10,
    n_calls=250,
    n_initial_points=20,
)
unique_solutions

In [None]:
title = f"{problem.name} - {method} - {n_repeats} Trials"
ax = best_guesses_plot_n_repeats(fun_evals, title=title)
plt.tight_layout()
filename = f"{problem.name}_{method}_convergence_plot_{n_repeats}.png"
plt.savefig(os.path.join(plot_dir, filename), dpi=150)
plt.show()

In [None]:
min(best_guesses)

In [None]:
results_dir = "results"
os.makedirs(results_dir, exist_ok=True)

exp_name = f"{problem.name}_{method}_{n_repeats}"
os.makedirs(os.path.join(results_dir, exp_name), exist_ok=True)

# Save results
np.save(os.path.join(results_dir, exp_name, "fun_evals.npy"), np.stack(fun_evals))
best_guesses_x = np.stack([item[1] for item in best_guesses])
assert best_guesses_x.shape == (n_repeats, 5)
np.save(os.path.join(results_dir, exp_name, "best_guesses_x.npy"), best_guesses_x)
best_guesses_fun = np.stack([item[0] for item in best_guesses])
assert best_guesses_fun.shape == (n_repeats,)
np.save(os.path.join(results_dir, exp_name, "best_guesses_fun.npy"), best_guesses_fun)

## Standard Derivative-Free Optimizers

In [None]:
bounds = calculate_reasonable_bounds(t, u_data, y_data)
bounds_real_delay = {("theta" if name == "n_delay" else name): b for name, b in bounds.items()}
bounds_real_delay['theta'] = tuple(b * dt for b in bounds_real_delay['theta'])
bounds_real_delay

In [None]:
K = -12.38205772
tau = 417.6967877
theta = 94.32477261
y_base = 500.7177154
u_base = 5.0
y_init = 535.8743564
dt = 15.08

In [None]:
problem = SysIdFOPDTRealDelay(bounds_real_delay, dt, u_data, y_data)

# Test cost function evaluation
x = [K, tau, theta, y_base, y_init]
rms_error = problem.cost_function_to_minimize(x, u_base=u_base)
rms_error

In [None]:
# Generate random start point within the bounds
bounds_array = np.array(list(bounds_real_delay.values()))
x0 = np.random.uniform(bounds_array[:, 0], bounds_array[:, 1], size=bounds_array.shape[0])

sol = solve_problem_with_optimizer(
    problem,
    scipy.optimize.minimize,
    x0,
    method='Powell',
    bounds=list(problem.bounds.values()),
    tol=0.01,
    options={'maxiter': 1000},
)
sol


In [None]:
function_evaluations_plot(problem)
plt.tight_layout()
plt.show()

In [None]:
best_guesses_plot(problem)
plt.tight_layout()
plt.show()

In [None]:
def make_scipy_minimizer_with_random_x0(init_range):
    init_range_array = np.array(init_range)

    def minimizer(fun, **kwargs):
        x0 = np.random.uniform(init_range_array[:, 0], init_range_array[:, 1], size=len(init_range))
        return scipy.optimize.minimize(fun, x0, **kwargs)

    return minimizer


scipy_minimizer = make_scipy_minimizer_with_random_x0(list(problem.bounds.values()))

res = scipy_minimizer(
    problem,
    method='Powell',
    bounds=list(problem.bounds.values()),
    tol=0.01,
    options={'maxiter': 1000}
)
assert res.status == 0
res.fun, res.x

In [None]:
method = 'Nelder-Mead'
n_repeats = 100
maxiter = 1000
fun_evals, unique_solutions, best_guesses = solve_problem_with_optimizer_n_repeats(
    problem,
    scipy_minimizer,
    n_repeats,
    method=method,
    bounds=list(problem.bounds.values()),
    tol=0.01,
    options={'maxiter': 1000}
)
title = f"{problem.name} - {method} - {n_repeats} Trials"
ax = best_guesses_plot_n_repeats(fun_evals, title=title)
ax.set_xlim([0, maxiter])
plt.tight_layout()
filename = f"{problem.name}_{method}_convergence_plot_{n_repeats}.png"
plt.savefig(os.path.join(plot_dir, filename), dpi=150)
plt.show()

In [None]:
method = 'Powell'
n_repeats = 100
maxiter = 1000
fun_evals, unique_solutions, best_guesses = solve_problem_with_optimizer_n_repeats(
    problem,
    scipy_minimizer,
    n_repeats,
    method=method,
    bounds=list(problem.bounds.values()),
    tol=0.01,
    options={'maxiter': maxiter}
)
title=f"{problem.name} - {method} - {n_repeats} Trials"
ax = best_guesses_plot_n_repeats(fun_evals, title=title)
ax.set_xlim([0, maxiter])
plt.tight_layout()
filename = f"{problem.name}_{method}_convergence_plot_{n_repeats}.png"
plt.savefig(os.path.join(plot_dir, filename), dpi=150)
plt.show()

In [None]:
method = 'LeapFrog'
n_repeats = 100
maxiter = 1000
fun_evals, unique_solutions, best_guesses = solve_problem_with_optimizer_n_repeats(
    problem,
    lpfgopt.minimize,
    n_repeats,
    list(problem.bounds.values()),
    points=50,
    tol=0.01,
    maxit=maxiter,
)
title = f"{problem.name} - {method} - {n_repeats} Trials"
ax = best_guesses_plot_n_repeats(fun_evals, title=title)
ax.set_xlim([0, maxiter])
plt.tight_layout()
filename = f"{problem.name}_{method}_convergence_plot_{n_repeats}.png"
plt.savefig(os.path.join(plot_dir, filename), dpi=150)
plt.show()