This is an expansion of example_optimize_simple.ipynb

In [None]:
import os

import matplotlib.pyplot as plt
import numpy as np
import polars as pl
import pandas as pd
import seaborn as sns

from delayed_reactant_labeling.predict import DRL
from delayed_reactant_labeling.optimize import RateConstantOptimizerTemplate
from delayed_reactant_labeling.visualize import VisualizeMultipleSolutions

In [None]:
reactions = [
    ('k1', ['A', 'cat'], ['B'],),
    ('k-1', ['B'], ['A', 'cat'],),
    ('k2', ['B'], ['C', 'cat']),

    # labeled
    ('k1', ['A-d10', 'cat'], ['B-d10'],),
    ('k-1', ['B-d10'], ['A-d10', 'cat'],),
    ('k2', ['B-d10'], ['C-d10', 'cat'])
]
concentration_initial = {'A': 1, 'cat': 1 / 5}
concentration_labeled = {'A-d10': 0.5}
dilution_factor = 1 / 2
time_pre = np.linspace(0, 10, 50)
time_post = np.linspace(10, 90, 8 * 50)
rate_values = [0.1, 1, 10]  # the model will try these values

In [None]:
def explore_boundary(k1, kr1, k2):
    path = f'optimization/example_model_boundaries/k1_{k1}_kr1_{kr1}_k2_{k2}/'
    os.mkdir(path)

    #"real" fake data
    rate_constants_real = {'k1': k1, 'k-1': kr1, 'k2': k2}
    drl_real = DRL(rate_constants=rate_constants_real, reactions=reactions)
    real_data_pre, real_data = drl_real.predict_concentration(
        t_eval_pre=time_pre,
        t_eval_post=time_post,
        dilution_factor=dilution_factor,
        initial_concentrations=concentration_initial,
        labeled_concentration=concentration_labeled)
    fig, axs = plt.subplots(1, 2, sharey='row', figsize=(10, 4), layout='tight', width_ratios=(1, 5))
    real_data_pre.to_pandas().plot('time', ax=axs[0])
    real_data.to_pandas().plot('time', ax=axs[1])
    fig.savefig(f'{path}/real_data.png', dpi=500)
    plt.close(fig)

    print(real_data[-100:].mean())

    # add noise
    rng = np.random.default_rng(42)
    fig, ax = plt.subplots()
    fake_data = []
    for col in real_data.columns[:-1]:  # last column contains time array
        I_eq = real_data[-100:, col].mean()
        if I_eq > 1e-9:
            noise = rng.normal(loc=0, scale=0.10 * I_eq, size=(real_data.shape[0]))
            fake_col = real_data[col] + noise  # noise is loosely based on intensity
        else:
            fake_col = real_data[col]
        fake_col[fake_col < 1e-10] = 1e-10  # no negative intensity
        fake_data.append(fake_col)
        ax.scatter(real_data['time'], fake_col, label=col, marker='.')

    ax.legend(ncol=4)
    fake_data.append(real_data['time'])
    fake_data = pl.DataFrame(fake_data, real_data.columns)
    fig.savefig(f'{path}/fake_data.png', dpi=500)
    plt.close(fig)

    class RateConstantOptimizer(RateConstantOptimizerTemplate):
        @staticmethod
        def create_prediction(x: np.ndarray, x_description: list[str]) -> pl.DataFrame:
            rate_constants = pd.Series(x, x_description)
            drl = DRL(reactions=reactions, rate_constants=rate_constants)
            _, pred_labeled = drl.predict_concentration(
                t_eval_pre=time_pre,
                t_eval_post=time_post,
                initial_concentrations=concentration_initial,
                labeled_concentration=concentration_labeled,
                dilution_factor=dilution_factor,
                rtol=1e-8,
                atol=1e-8, )
            return pred_labeled

        @staticmethod
        def calculate_curves(data: pl.DataFrame) -> dict[str, pl.Series]:
            curves = {}
            for chemical in ['A', 'B', 'C']:
                chemical_sum = data[[chemical, f'{chemical}-d10']].sum(axis=1)
                curves[f'ratio_{chemical}'] = data[chemical] / chemical_sum
            return curves

    def METRIC(y_true: np.ndarray, y_pred: np.ndarray) -> float:
        return np.average(np.abs(y_pred - y_true), axis=0)

    RCO = RateConstantOptimizer(raw_weights={}, experimental=fake_data, metric=METRIC)

    base_pred = RCO.create_prediction(x=list(rate_constants_real.values()), x_description=list(rate_constants_real.keys()))
    base_errors = RCO.calculate_error_functions(base_pred)
    base_error = RCO.calculate_total_error(errors=base_errors)

    dimension_description = ['k1', 'k-1', 'k2']
    bounds = [(1e-9, 100),    # k1
              (0,    100),    # k-1 / kr1 as input to the func.
              (1e-9, 100),]   # k2
    RCO.optimize_multiple(path=f'{path}/multiple_guess/', n_runs=500, x_bounds=bounds, x_description=dimension_description, n_jobs=-2)

    VMS = VisualizeMultipleSolutions(f'{path}/multiple_guess/', max_guess=500)

    # error / run
    fig, ax = VMS.show_error_all_runs()
    ax.set_ylabel("error")
    eq = VMS.complete_found_error < 1.005 * VMS.complete_found_error.min()
    ax_ins = ax.inset_axes([0.15, 0.5, 0.4, 0.4])
    ax_ins.scatter(np.arange(sum(eq)), sorted(VMS.complete_found_error[eq]))
    ax.indicate_inset_zoom(ax_ins, edgecolor='black')
    ax.set_title(f"Error using real rate constants: {base_error:.4f}")
    fig.savefig(f'{path}/error_per_run.png', dpi=500)
    plt.close(fig)

    # k values for best runs
    fig, axs = plt.subplots(3, 1, layout='tight', figsize=(8, 6))
    for i in range(3):
        ax = axs[i]
        eq = np.where(VMS.complete_found_error < VMS.complete_found_error.min()*1.005)
        best_X = VMS.complete_optimal_X[eq]
        sns.histplot(best_X[:, i], ax=ax)
        yl, yu = ax.get_ylim()
        k = list(rate_constants_real.values())[i]
        ax.plot([k, k], [yl, yu], label='true', color="tab:orange")
        ax.set_ylim(yl, yu)
        ax.set_title(VMS.x_description[i])
    axs[0].legend()
    fig.savefig(f'{path}/best_ks.png', dpi=500)
    plt.close(fig)

    fig, ax = VMS.show_rate_constants(max_error=VMS.complete_found_error.min()*1.01, index_constant_values=None)
    ax.set_yscale("linear")
    ax.scatter([1, 2, 3], list(rate_constants_real.values()), label="true")
    ax.legend()
    fig.savefig(f'{path}/rate_constants_boxplot.png', dpi=500)
    plt.close(fig)


In [None]:
for k1 in rate_values:
    for kr1 in rate_values:
        for k2 in rate_values:
            explore_boundary(k1, kr1, k2)