### Import all relevant dependencies

In [1]:
import sys
import os
import pandas as pd


sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))
from pathlib import Path
import yaml
from src.statistical_models.statistical_model_library.fcs_gaussian_noise_model import FCSGaussianNoiseModel
from src.visualization.plotting_functions import *
from src.math_utils.derivatives.numeric_derivative_calculator import NumericDerivativeCalculator
from src.model.hahn_stack_model import HahnStackModel
from src.math_utils.scaler.hahn_parameter_scaler import HahnParameterScaler
from src.model.parameter_set.hahn_parameter_set import HahnParameterSet

### Define the Experiment bounds for the free parameters as well as operating conditions, experiment repetitions and true parameter set

In [2]:
path = "../data/config/oed_config_demo.yaml"

with open(path, "r") as f:
    config = yaml.safe_load(f)

number_designs = config["number_designs"]  #Amount of LH Designs
n_rep = config["n_rep"]  #Amount of experiment repetitions
n_current_values = config["n_current_values"]  #Amount of individual current values
sigma = config["sigma"]  #experiment variance (10mV variance in repeated experiments)

# lower and upper bounds for operating conditions
upper_bounds_operating_conditions = np.array(config["upper_bounds_operating_conditions"])
lower_bounds_operating_conditions = np.array(config["lower_bounds_operating_conditions"])

I_S_array = np.linspace(1, 480, n_current_values)  # initialize applicable current array request

# initialize applicable parameter set
unscaled_theta_true = np.array(list(HahnParameterSet().free_parameters.values())[:6])
names_theta = list(HahnParameterSet().free_parameters.keys())[:6]

# initialize lower and upper bounds for free parameter values
unscaled_upper_bounds_free_params = np.array(config["unscaled_upper_bounds_free_params"])
unscaled_lower_bounds_free_params = np.array(config["unscaled_lower_bounds_free_params"])

print(names_theta, unscaled_lower_bounds_free_params, unscaled_theta_true, unscaled_upper_bounds_free_params)

['E_A', 'j_0_ref', 'r_el', 'D_CL_ref', 'D_GDL_ref', 'f_CL'] ['1000' '100' '1e-7' '1e-9' '1e-7' '0.01'] [7.1477e+04 2.1308e+03 4.2738e-06 3.3438e-08 8.6266e-06 3.6693e-01] ['100000' '10000' '1e-5' '1e-7' '1e-5' '1']


In [3]:
scaler = HahnParameterScaler()

X_train = np.vstack([
    unscaled_lower_bounds_free_params,
    unscaled_upper_bounds_free_params
]).T

# Stack operating condition bounds (rows = condition, columns = [min, max])
operating_condition_bounds = np.vstack([
    lower_bounds_operating_conditions,
    upper_bounds_operating_conditions
]).T

# Determine current range for scaling
current_bounds = np.array([[I_S_array.min(), I_S_array.max()]])

stacked_params = np.vstack((operating_condition_bounds, current_bounds))
scaled_theta_true = scaler.scale_theta(unscaled_theta_true, X_train)

scaled_upper_bounds = scaler.scale_params(upper_bounds_operating_conditions, operating_condition_bounds)
scaled_lower_bounds = scaler.scale_params(lower_bounds_operating_conditions, operating_condition_bounds)

scaled_lower_bounds_theta, _ = scaler.scale(unscaled_lower_bounds_free_params, X_train)
scaled_upper_bounds_theta, _ = scaler.scale(unscaled_upper_bounds_free_params, X_train)

# Print results
print("Scaled theta:", scaled_theta_true)
print("Rescaled theta:", unscaled_theta_true)

Scaled theta: [0.71188889 0.20513131 0.42159596 0.32765657 0.86127273 0.36053535]
Rescaled theta: [7.1477e+04 2.1308e+03 4.2738e-06 3.3438e-08 8.6266e-06 3.6693e-01]


### Define the parametric function including handover of bounds

In [4]:
hahn_fc_model = HahnStackModel(parameter_set=HahnParameterSet())
calculator = NumericDerivativeCalculator(hahn_fc_model, scaler)

statistical_model = FCSGaussianNoiseModel(model_function=hahn_fc_model,
                                          der_function=calculator,
                                          lower_bounds_x=scaled_lower_bounds,
                                          upper_bounds_x=scaled_upper_bounds,
                                          lower_bounds_theta=scaled_lower_bounds_theta,
                                          upper_bounds_theta=scaled_upper_bounds_theta,
                                          sigma=sigma,
                                          scaler = scaler,)

def blackbox_model(x):
    return statistical_model.random(theta=scaled_theta_true, x=x)

### Calculate LH Designs

Create new designs until max rel std is < 10 %

In [None]:
from multiprocessing import Pool, cpu_count
from src.utils.experiment_serialization import run_lh_experiment

NUM_RUNS = 100  # Anzahl gewünschter Wiederholungsexperimente
with Pool(processes=min(NUM_RUNS, cpu_count())) as pool:
    # Die Argumente müssen ggf. an die LH-Experimentfunktion angepasst werden!
    args = [(i, (lower_bounds_operating_conditions, upper_bounds_operating_conditions, I_S_array, scaled_theta_true, statistical_model, number_designs)) for i in range(NUM_RUNS)]
    results = pool.map(run_lh_experiment, args)



### Print and Save FIM and Std list

In [6]:
std_list = [[] for t in names_theta]

# Für jede Wiederholung
for res in results:
    stds = np.array([x[-1] for x in res])
    # Für jeden LHC
    # std_list.append(stds)
    for i in range(len(names_theta)):
        std_list[i].append(stds[:,i])

stat = []

for std in std_list:
    stat.append(np.array(std))
stat = np.array(stat)

stat_mean = stat.mean(axis=1)
stat_std = stat.std(axis=1)

In [None]:
var_list = [[] for t in names_theta]

# Für jede Wiederholung
for res in results:
    var = np.array([x[2] for x in res])
    # Für jeden LHC
    # std_list.append(stds)
    for i in range(len(names_theta)):
        var_list[i].append(var[:,i])

stat = []

for var in var_list:
    stat.append(np.array(var))
stat = np.array(stat)

var_mean = stat.mean(axis=1)
var_std = stat.std(axis=1)

In [None]:
det_list = []

# Für jede Wiederholung
for res in results:
    det = np.array([x[1] for x in res])
    # Für jeden LHC
    # std_list.append(stds)
    det_list.append(det[:])

stat = []
for det in det_list:
    stat.append(np.array(det))

stat = np.array(stat)

#det_mean = stat.mean(axis=1)
det_mean = stat.mean(axis=0)
det_std = stat.std(axis=0)

In [None]:
# save estimated thetas
root = Path.cwd().parent
data_path = root / "data" / "stats"

In [7]:
stat_mean = pd.DataFrame(stat_mean.T, columns=names_theta)
stat_mean.to_csv(data_path / "LHC_mean_std_progression.csv", index=False, header=True, sep=";")
stat_std = pd.DataFrame(stat_std.T, columns=names_theta)
stat_std.to_csv(data_path / "LHC_std_std_progression.csv", index=False, header=True, sep=";")

In [None]:
var_mean = pd.DataFrame(var_mean.T, columns=names_theta)
var_mean.to_csv(data_path / "LHC_mean_var_progression.csv", index=False, header=True, sep=";")
var_std = pd.DataFrame(var_std.T, columns=names_theta)
var_std.to_csv(data_path / "LHC_std_var_progression.csv", index=False, header=True, sep=";")

In [None]:
det_mean = pd.DataFrame(det_mean.T, columns=['det'])
det_mean.to_csv(data_path / "LHC_mean_det_progression.csv", index=False, header=True, sep=";")
det_std = pd.DataFrame(det_std.T, columns=['det'])
det_std.to_csv(data_path / "LHC_std_det_progression.csv", index=False, header=True, sep=";")