In [None]:
import matplotlib.pyplot as plt

from plotlyflask_server.data_parser import sort_RAFT_table as sRt
import pandas as pd
import numpy as np
import plotly.express as px
import re
from scipy.optimize import curve_fit
import itertools
from tqdm import tqdm
import os

In [None]:
# plotly forms in graphs do not work with jupyterlab 7 or higher out of the box (https://github.com/plotly/plotly.py/issues/4336) so the following lines are sometimes necessary:
import plotly.io as pio
from functools import wraps
# another fix is installing jupyterlab_mathjax2 or viewing it with html in browser"

def iframe_decorator(func):
    @wraps(func)
    def inner(*args, **kwargs):
        pio.renderers.default = "iframe"
        out = func(*args, **kwargs)
        pio.renderers.default = "plotly_mimetype+notebook"
        return out
    return inner

In [None]:
# rectify/balance the table to the same amount of conversion and mass columns for easier data manipulation
sRt.df["t6h-conversion"] = np.nan
sRt.df["t10h-conversion"] = np.nan

# get all conversion headers, sort them by the hours
conversion_list = []
for column in sRt.df.columns:
    if "conversion" in column:
        conversion_list.append(column)

conversion_list.sort(key=lambda x: int(re.findall(r"\d+", x)[0]))

# get all the Mn and Mw headers
Mn_list = []
Mw_list = []
pdi_list = []
for column in sRt.df.columns:
    if "Mn" in column:
        Mn_list.append(column)
    if "Mw" in column:
        Mw_list.append(column)
    if "Ð" in column:
        pdi_list.append(column)

print(conversion_list, Mn_list, Mw_list, pdi_list, sep="\n")

In [None]:
# change time format from hours to minutes for better overview of sampling times in the later kinetic plots
def change_time_format(time_format):
    h_m_s = str(time_format).split(":")
    m_format = int(h_m_s[0]) * 60 + int(h_m_s[1]) + int(h_m_s[2]) / 60
    return m_format

def change_time_format_h(time_format):
    h_m_s = str(time_format).split(":")
    h_format = int(h_m_s[0]) + (int(h_m_s[1]) + int(h_m_s[2]) / 60) /60
    return h_format

# creating a table as a lookup to correct all sample measurement times

# get the hours from the headers with regex
hours_list = []
two_digit_regex = r"\d+"
for column in conversion_list:
    hours_list.append(int(re.findall(two_digit_regex, column)[0]))
hours_list.sort()

exact_times = pd.read_excel(sRt.INPUT_FILE_PATH, sheet_name="exact sampling times")

time_correction_df = pd.DataFrame(data=exact_times.iloc[3:12, 3:])
time_correction_df.columns = exact_times.loc[2][3:]

time_correction_df.reset_index(inplace=True, drop=True)
ext_time_corr_df_data =[]
for row in time_correction_df.iterrows():
    reactor_nr = row[1]["Reactor"]
    if type(reactor_nr) == int or reactor_nr == "closing of reactors": # that applies to reactor 15 and the closing of reactors
        row[1]["Reactor"] = str(reactor_nr)
        ext_time_corr_df_data.append(row[1])
    else: # that applies to all reactors which are described per row in pairs like "3+4"
        reactor_nr_s = (row[1]["Reactor"]).split("+")
        for reactor_nr in reactor_nr_s:
            row[1]["Reactor"] = reactor_nr
            ext_time_corr_df_data.append(row[1].copy())
ext_time_corr_df = pd.DataFrame(data=ext_time_corr_df_data, columns=exact_times.loc[2][3:])
ext_time_corr_df.reset_index(drop=True, inplace=True)
print("The timer was reset to 0 after the first closing of reactors in the \"t = 0\"-sampling-step")
ext_time_corr_df.columns = ["Reactor", 0, 1, 2, 4, 6, 8, 10, 15]

# change time format to hours
time_cols = ext_time_corr_df.columns.difference(["Reactor"])
ext_time_corr_df[time_cols] = ext_time_corr_df[time_cols].apply(lambda x: [change_time_format_h(d) for d in x])
ext_time_corr_df[0] = ext_time_corr_df[0].apply(lambda x: 0 )

# set index to reactor
ext_time_corr_df.set_index("Reactor", inplace=True)

ext_time_corr_df

In [None]:
# replace the "sample determiner" with columns describing for the experiment number (next cell) and the actual reagents that made out the "determiner"
reaction_descriptors_dict = {}
abbreviation_keys = pd.read_excel(sRt.INPUT_FILE_PATH, sheet_name="Legend for Abbreviations")
abbreviation_keys.dropna(how="all", inplace=True)
for row in abbreviation_keys.itertuples():
    reaction_descriptors_dict[str(row.Symbol)] = row.Name

reaction_descriptors_dict

In [None]:
sRt.df

In [None]:
# generate a reformatted table with all entries attributing to a sample analysis taken, described with a column of the right time
kinetic_curves = []
for index, polymerisation_kinetic in sRt.df.iterrows():

    kinetic_curve_entries = pd.DataFrame(index=range(len(polymerisation_kinetic[conversion_list])),
        data={"time" : hours_list, "conversion" : polymerisation_kinetic[conversion_list].values,
              "Mn" : polymerisation_kinetic[Mn_list].values, "Mw" : polymerisation_kinetic[Mw_list].values,
              "dispersity" : polymerisation_kinetic[pdi_list].values
              })

    kinetic_curve_entries["exp_nr"] = str(polymerisation_kinetic["Experiment number"])
    kinetic_curve_entries["monomer"] = reaction_descriptors_dict[polymerisation_kinetic["monomer"]]
    kinetic_curve_entries["RAFT-Agent"] = reaction_descriptors_dict[polymerisation_kinetic["RAFT-Agent"]]
    kinetic_curve_entries["solvent"] = reaction_descriptors_dict[polymerisation_kinetic["solvent"]]

    # the times are dependent on the current reactor, get current
    current_reactor_nr = str(polymerisation_kinetic["reactor"])
    current_time_list = ext_time_corr_df.loc[current_reactor_nr]
    kinetic_curve_entries["time"] = list(current_time_list)

    kinetic_curves.append(kinetic_curve_entries)


In [None]:
'''
deprecated: more theoretical accurate version but practically worse applicable
    # L is responsible for scaling the output range from [0,1] to [0,L]
    # b and x0 are responsible for shifting the the function on the x and y axis
    # k is responsible for scaling the input, which remains in (-inf,inf)
'''

def sigmoid (x, L ,x0, k, b):
    y = L / (1 + np.exp(-k*(x-x0)))+b
    return (y)

def sigmoid_derivative(x, L ,x0, k, b):
    y = (L*k*np.exp(-k*(x-x0)))/((np.exp(-k*(x-x0))+1)**2)
    return (y)

def neg_growth(x, l, k):
    y = l * (1 - np.exp(k * (-x)))
    return y

def neg_growth_derivative(x, l, k):
    y = l * k * np.exp(k*(-x))
    return y

# as the Mn values do not all start at 0
def neg_growth_abscissa(x, l, k, b):
    y = l * (1 - np.exp(k * (-x))) + b
    return y

def linear_growth(x, m):
    y = m * x
    return y

def linear_growth_derivative(m):
    return m

'''
xrange = np.arange(-0.2, 5, 0.1)
example_fig = px.line()
for testparams in ([1,1], [2,1], [1,2]):
    example_fig.add_scatter(x=xrange, y=neg_growth(xrange, *testparams))
    example_fig.add_scatter(x=xrange, y=neg_growth_derivative(xrange, *testparams), opacity=0.5, line=dict(dash="dot"))

print("visualisation of the functions")
example_fig.show()
# '''
print("functions loaded")

In [None]:
# checking out max and min of Mn/Mw and Ð
from plotly.subplots import make_subplots
min_Mn, max_Mn = [], []
min_Mw, max_Mw = [], []
min_pdi, max_pdi = [], []

for kc in kinetic_curves:
    min_Mn.append(kc["Mn"].min()); max_Mn.append(kc["Mn"].max())
    min_Mw.append(kc["Mw"].min()); max_Mw.append(kc["Mw"].max())
    min_pdi.append(kc["dispersity"].min()); max_pdi.append(kc["dispersity"].max())

for _ in (min_Mn, max_Mn, min_Mw, max_Mw):
    _.sort()
    _ = [j/100000 for j in _]

for _ in (min_pdi, max_pdi):
    _.sort()

molar_mass_distribution_fig = make_subplots(specs=[[{"secondary_y": True}]])

molar_wg_comparison = px.line()
molar_wg_comparison.add_scatter(y=min_Mw, name="min Mw", line_color="darkgoldenrod", opacity=0.8, fill="tozeroy", fillcolor="orange")
molar_wg_comparison.add_scatter(y=max_Mw, name="max Mw", line_color="darkgoldenrod", opacity=0.8)
molar_wg_comparison.add_scatter(y=min_Mn, name="min Mn", line_color="firebrick", opacity=0.8, fill="tozeroy", fillcolor="red")
molar_wg_comparison.add_scatter(y=max_Mn, name="max Mn", line_color="firebrick", opacity=0.8)
# molar_wg_comparison.update_layout(title="Mn and Mw comparison", yaxis_title=r"$\\text{Mn and Mw }[g/mol] \cdot 10^{-5}$")

pdi_comparison = px.line()
pdi_comparison.add_scatter(y=min_pdi, name="min Ð", line_color="cornflowerblue", opacity=0.8, fill="tozeroy", fillcolor="rgba(0, 0, 255, 0.2)")
pdi_comparison.add_scatter(y=max_pdi, name="max Ð", line_color="cornflowerblue", opacity=0.8)
pdi_comparison.update_traces(yaxis="y2")

molar_mass_distribution_fig.add_traces(molar_wg_comparison.data + pdi_comparison.data)
molar_mass_distribution_fig.update_layout(title="Mn, Mw and Ð comparison", yaxis_title=r"$\\text{Mn and Mw }[g/mol] \cdot 10^{-5}$", yaxis2_title="Ð")

# pio.renderers.default = "jupyterlab"
molar_mass_distribution_fig.show()

In [None]:
kin_curve_fits_fig = px.line(title="Kinetic Curve Fit")
colors = px.colors.qualitative.Plotly # set up a simple color palette
extended_xdata = np.linspace(-1, 16.5, 100) # x data array for plotting the fits


def add_fits_to_plot(figure, fit_func, fit_func_params,  fit_func_derivative=None, ext_xdata=extended_xdata, *args, **kwargs):
    figure.add_scatter(
        x=ext_xdata, y=fit_func(ext_xdata, *fit_func_params),
        opacity=1, line=dict(dash="dot"), name=f"{fit_func.__name__} fit", *args, **kwargs)
    if fit_func_derivative:
        figure.add_scatter(
            x=ext_xdata, y=fit_func_derivative(ext_xdata, *fit_func_params),
            opacity=0.3, line=dict(dash="dash"), name=f"{fit_func_derivative.__name__}", *args, **kwargs)

def fit_and_exclude_outliers(x, y, fit_func, p0, bounds, nan_policy="omit", iteration=1, outliers=None):
    outliers = outliers if outliers is not None else []

    # exclude the nan values from the data
    mask = ~np.isnan(y)
    x, y = x[mask], y[mask]

    cf_data = curve_fit(f=fit_func, xdata=x, ydata=y, p0=p0, nan_policy=nan_policy, maxfev=800 * 10, bounds=bounds)

    # calculate the fit points
    fit_points = np.array([fit_func(x, *cf_data[0]) for x in x])
    # calculate the standard deviation of the residuals between the fit and the data points
    sigma = np.std(fit_points - y)

    # exclude the outliers
    msk = ~(np.abs(fit_points - y) > 2 * sigma)
    if not msk.all():
        return fit_and_exclude_outliers(x=x[msk], y=y[msk], fit_func=fit_func, p0=cf_data[0], bounds=bounds, nan_policy=nan_policy, iteration=iteration+1, outliers=outliers + [x[~msk]])

    # calculate the squared error of fit and data points
    sq_err = np.mean(np.square(y - fit_points))

    result = {"x": x, "y": y, "p_opt" : cf_data[0], "p_cov" : cf_data[1], "sq_err": sq_err, "excluded_points" : (x[~msk],y[~msk]), "iteration" : iteration, "outliers" : outliers}
    return result

In [None]:
# rephrased question from reviewer 3: "how good do all three different fitting functions fit the data in comparison?"
# to answer this question we will fit the data with all three functions and compare the r² values of the fits. Then we'll see if we can categorize some of the monomers/RAFT-agents/solvents to a specific fit type.
r_squared_errors = [[], [], []] # three lists for the three fitting functions
fitting_function = [linear_growth, sigmoid, neg_growth] # the three fitting functions
initial_guesses = [[1], [max, 1, 1, 0], [max, 0.1]] # initial guesses for the fitting functions
bounds = [([-np.inf], [np.inf]), ([0, -np.inf, -np.inf, -np.inf], [1, np.inf, np.inf, np.inf]), ([0, -np.inf], [1, np.inf])] # bounds for the fitting functions
for idx, kinetic_curve in enumerate(kinetic_curves):
    # first make sure the datapoints are in the right format and not sometimes int sometimes float
    xdata = np.array(kinetic_curve["time"].values, dtype=float)
    ydata = np.array(kinetic_curve["conversion"].values, dtype=float)

    for fit_idx, fit_func in enumerate(fitting_function):
        p_initial = [_ if not callable(_) else _(ydata) for _ in initial_guesses[fit_idx]] # if the initial guess has a function, call it
        ng_fit = fit_and_exclude_outliers(x=xdata, y=ydata, fit_func=fit_func, p0=p_initial, bounds=bounds[fit_idx])
        r_squared_errors[fit_idx].append(ng_fit["sq_err"])

In [None]:
# calculate the amount of with which of the fit-types is the best
exp_nums = [_["exp_nr"].iloc[0] for _ in kinetic_curves]
best_fit = [0, 0, 0]
for exp, lin, sig, n_growth in zip(exp_nums, *r_squared_errors):
    if lin < sig and lin < n_growth:
        best_fit[0] += 1
    elif sig < lin and sig < n_growth:
        best_fit[1] += 1
    else:
        best_fit[2] += 1
print(f"The best fit for the experiments is:\n"
      f"Linear growth: {best_fit[0]} times with {np.mean(r_squared_errors[0]):.4f} mean and {sum(r_squared_errors[0]):.4f} in sum\n"
      f"Sigmoid growth: {best_fit[1]} times with {np.mean(r_squared_errors[1]):.4f} mean and {sum(r_squared_errors[1]):.4f} in sum\n"
      f"Negative growth: {best_fit[2]} times with {np.mean(r_squared_errors[2]):.4f} mean and {sum(r_squared_errors[2]):.4f} in sum")


In [None]:
# plot the r² values of the fit-types
r_squared_fig = px.line(title="R² values of the curve fittings", labels={"x":"Kinetic Curve", "y":"R² value"})
r_squared_fig.add_scatter(y=r_squared_errors[0], x=exp_nums, name="Linear growth fit", line_color="blue")
r_squared_fig.add_scatter(y=r_squared_errors[1], x=exp_nums, name="Sigmoid fit", line_color="green")
r_squared_fig.add_scatter(y=r_squared_errors[2], x=exp_nums, name="Negative growth fit", line_color="red")
r_squared_fig.update_layout(
    paper_bgcolor='white',  # Set background to light mode
    plot_bgcolor='white',   # Set plot background to white
    font=dict(color='black', size=15),  # Set font color to black for light mode
    margin=dict(l=0, r=0, t=90, b=10),  # Tight margins to reduce whitespace
    width=1000, #height=300,
    # turn off the grid
    xaxis=dict(showgrid=False, zeroline=False, showline=True, linewidth=2, linecolor='black'),
    # move the legend to the top left corner
    legend=dict(
        orientation="h",
        yanchor="bottom",
        y=1.0,
        xanchor="right",
        x=1.0,
        bgcolor='rgba(255, 255, 255, 0.8)',  # Semi-transparent white background for the legend
        bordercolor='black',
        borderwidth=1
    ),
    # reinsert axis description
    xaxis_title="Experiment number",
    yaxis_title="R² value",
)
r_squared_fig.write_image(os.path.join(os.getcwd(), "data", "data exploration figures", "r_squared of the fits.svg"))
r_squared_fig.show()

In [None]:
# categorize the monomers/RAFT-agents/solvents by the best fit type
import copy
reaction_descriptors = list(reaction_descriptors_dict.values()) # get the reaction descriptors in a list of strings
types_in_fit ={"monomer":{_:0 for _ in reaction_descriptors[1:17]},
               "RAFT-Agent":{_:0 for _ in reaction_descriptors[17:27]},
               "solvent":{_:0 for _ in reaction_descriptors[27:30]}}
types_in_lin_fit = copy.deepcopy(types_in_fit)
types_in_sig_fit = copy.deepcopy(types_in_fit)
types_in_neg_fit = copy.deepcopy(types_in_fit)

for idx, kinetic_curve in enumerate(kinetic_curves):
    exp_nr = kinetic_curve["exp_nr"].iloc[0]
    if r_squared_errors[0][idx] < r_squared_errors[1][idx] and r_squared_errors[0][idx] < r_squared_errors[2][idx]:
        types_in_lin_fit["monomer"][kinetic_curve["monomer"].iloc[0]] += 1
        types_in_lin_fit["RAFT-Agent"][kinetic_curve["RAFT-Agent"].iloc[0]] += 1
        types_in_lin_fit["solvent"][kinetic_curve["solvent"].iloc[0]] += 1
    elif r_squared_errors[1][idx] < r_squared_errors[0][idx] and r_squared_errors[1][idx] < r_squared_errors[2][idx]:
        types_in_sig_fit["monomer"][kinetic_curve["monomer"].iloc[0]] += 1
        types_in_sig_fit["RAFT-Agent"][kinetic_curve["RAFT-Agent"].iloc[0]] += 1
        types_in_sig_fit["solvent"][kinetic_curve["solvent"].iloc[0]] += 1
    else:
        types_in_neg_fit["monomer"][kinetic_curve["monomer"].iloc[0]] += 1
        types_in_neg_fit["RAFT-Agent"][kinetic_curve["RAFT-Agent"].iloc[0]] += 1
        types_in_neg_fit["solvent"][kinetic_curve["solvent"].iloc[0]] += 1

# remove the monomers/RAFT-agents/solvents that were not used in a fit at all
types_in_lin_fit = {k: {kk: vv for kk, vv in v.items() if vv > 0} for k, v in types_in_lin_fit.items()}
types_in_sig_fit = {k: {kk: vv for kk, vv in v.items() if vv > 0} for k, v in types_in_sig_fit.items()}
types_in_neg_fit = {k: {kk: vv for kk, vv in v.items() if vv > 0} for k, v in types_in_neg_fit.items()}
print(types_in_lin_fit, types_in_sig_fit, types_in_neg_fit, sep="\n\n")

In [None]:
kinetics_df = pd.DataFrame(columns=['exp_nr', 'max_con', 'theo_max_con', 'theo_react_end', 'max_mn', 'monomer',
       'RAFT-Agent', 'solvent', 'fit_p1', 'fit_p2', 'p1_variance',
       'p1_p2_covariance', 'p2_variance', 'squared_error', 'conv_time_data',
       'Mn_time_data', 'Mw_time_data', 'dispersity_time_data']) # create new dataframe with kinetics per row

for idx, kinetic_curve in enumerate(kinetic_curves):
    # first make sure the datapoints are in the right format and not sometimes int sometimes float
    xdata = np.array(kinetic_curve["time"].values, dtype=float)
    ydata_conv = np.array(kinetic_curve["conversion"].values, dtype=float)
    ydata_dispersity = np.array(kinetic_curve["dispersity"].values, dtype=float)
    ydata_Mn = np.array(kinetic_curve["Mn"].values, dtype=float)/100000 # make it more comparable to conversion
    ydata_Mw = np.array(kinetic_curve["Mw"].values, dtype=float)/100000

    # fitting section for conversion
    p_initial = [max(ydata_conv), 0.1] # this is a mandatory initial guess
    ng_fit = fit_and_exclude_outliers(x=xdata, y=ydata_conv, fit_func=neg_growth, p0=p_initial, bounds=([0, -np.inf], [1, np.inf]))
    # l_fit = fit_and_exclude_outliers(x=xdata, y=ydata, fit_func=linear_growth, p0=[max(ydata)/7], bounds=([0], [np.inf]))

    popt, pcov = ng_fit["p_opt"], ng_fit["p_cov"]
    conv_time_data = np.array([ng_fit["x"], ng_fit["y"]])
    squared_error = ng_fit["sq_err"]

    # fitting section for Mn
    max_Mn = max(ydata_Mn[~np.isnan(ydata_Mn)]) 
    p_initial = [max_Mn, 0.1, 0]
    ng_fit_Mn = fit_and_exclude_outliers(x=xdata, y=ydata_Mn, fit_func=neg_growth_abscissa, p0=p_initial, bounds=([0, -np.inf, -np.inf], [1, np.inf, np.inf]))

    popt_Mn, pcov_Mn = ng_fit_Mn["p_opt"], ng_fit_Mn["p_cov"]
    Mn_time_data = np.array([ng_fit_Mn["x"], ng_fit_Mn["y"]])
    squared_error_Mn = ng_fit_Mn["sq_err"]

    # fitting section for Mw
    ng_fit_Mw = fit_and_exclude_outliers(x=xdata, y=ydata_Mw, fit_func=neg_growth_abscissa, p0=p_initial, bounds=([0, -np.inf, -np.inf], [1, np.inf, np.inf]))

    popt_Mw, pcov_Mw = ng_fit_Mw["p_opt"], ng_fit_Mw["p_cov"]
    Mw_time_data = np.array([ng_fit_Mw["x"],ng_fit_Mw["y"]])
    squared_error_Mw = ng_fit_Mw["sq_err"]
    
    # combining dispersity with times omitting Nan
    nan_val_in_d_mask = ~np.isnan(ydata_dispersity)
    dispersity_time_data = np.array([xdata[nan_val_in_d_mask], ydata_dispersity[nan_val_in_d_mask]])


    kinetics_df.loc[idx] = {"exp_nr":kinetic_curve["exp_nr"].iloc[1], "max_con":max(ydata_conv),
                            "theo_max_con":"yet to calc", "theo_react_end":"yet to calc", "max_mn":max_Mn,
                            "monomer":kinetic_curve["monomer"].iloc[1], "RAFT-Agent":kinetic_curve["RAFT-Agent"].iloc[1],
                            "solvent":kinetic_curve["solvent"].iloc[1],
                            "fit_p1":popt[0],"fit_p2":popt[1],
                            "p1_variance":pcov[0][0],"p1_p2_covariance":pcov[0][1],"p2_variance":pcov[1][1],
                            "squared_error":squared_error, "conv_time_data":conv_time_data,
                            "Mn_time_data":Mn_time_data, "Mw_time_data":Mw_time_data,
                            "dispersity_time_data": dispersity_time_data}

''' comment out here if plot is needed.
    marker_dict = dict(color=colors[idx%len(colors)]) # set same colors per kinetic
    kin_curve_fits_fig.add_scatter(x=xdata, y=ydata, mode="lines+markers", opacity=1, name=kinetic_curve["exp_nr"].iloc[1], marker=marker_dict)
    add_fits_to_plot(neg_growth, neg_growth_derivative, popt, marker_dict)

kin_curve_fits_fig.update_layout(
    yaxis=dict(
        range=[-0.1,1]
    ),

)
kin_curve_fits_fig.show()
# '''

kinetics_df.reset_index(drop=True, inplace=True)
len_kinetics = len(kinetics_df)
kinetics_df.drop(axis="index", index=kinetics_df[kinetics_df["max_con"] <= 0].index, inplace=True)
kinetics_dropped_bec_max_con = len_kinetics-len(kinetics_df)
print(f"{kinetics_dropped_bec_max_con} kinetics data points dropped because max con was less than 0")
kinetics_df.reset_index(drop=True, inplace=True)
# kinetics_df

In [None]:
data_type_list = ["conv_time_data", "Mn_time_data", "Mw_time_data", "dispersity_time_data"]
data_list_name = ["conv over time", "Mn over time", "Mw over time", "dispersity over time"]
def plot_Mn_Conversion_Dispersity_difference():
    mn_conversion_diff_fig = px.line(title="Mn Conversion difference")
    for idx, data_type in (enumerate(data_type_list)):
        for kin in kinetics_df[data_type].head(40):
            mn_conversion_diff_fig.add_scatter(x=kin[0], y=kin[1], mode="lines+markers", opacity=0.5, marker=dict(color=colors[idx%len(colors)]), showlegend=False, legendgroup=str(idx))
        # custom legend
        mn_conversion_diff_fig.add_scatter(x=[None], y=[None], marker=dict(color=colors[idx%len(colors)]), name= data_list_name[idx], legendgroup=str(idx))
    # adding a threshold line because the SEC peaks smear into syspeak at that point
    mn_conversion_diff_fig.add_scatter(x=[0, 16], y=[1000*10**(-5), 1000*10**(-5)], mode="lines", line=dict(dash="dash"), name="SEC limit", legendgroup="threshold")
    mn_conversion_diff_fig.update_layout(legend=dict(orientation="h", yanchor="bottom", y=1.05, xanchor="left", x=-0.1 ), yaxis_title=r"$\text{Mn, Mw }[g/mol] \cdot 10^{-5}/ \ [ \%]$", xaxis_title="Time [h]")
    print("saving resources to plot for conversion Mn and Mw of the first 40 kinetics")
    mn_conversion_diff_fig.show()
plot_Mn_Conversion_Dispersity_difference()

In [None]:
# normalize the errors by dividing them by their respective standard deviation
def normalize_errors(err):
    return err / np.std(err)

for error in ["squared_error", "p1_variance", "p2_variance", "p1_p2_covariance"]:
    kinetics_df[error] = normalize_errors(kinetics_df[error])

In [None]:
# kinetics_df[kinetics_df["squared_error"]>0.01] 29/317 entries
# calculating covariance between errors to use only the reasonable ones

# permutate all combinations of the errors
errorcombs = [[],[],[]]
for err1, err2 in itertools.combinations(["squared_error", "p1_variance", "p2_variance", "p1_p2_covariance"], 2):
    # print(f"The covariance between {err1} and {err2} is {np.cov(kinetics_df[err1], kinetics_df[err2])}")
    # print(f"The correlation between {err1} and {err2} is {np.correlate(kinetics_df[err1], kinetics_df[err2])}")
    # print("\n")
    errorcombs[0].append(f"{err1}/{err2}")
    errorcombs[1].append(np.cov(kinetics_df[err1], kinetics_df[err2]))
    errorcombs[2].append(*np.correlate(kinetics_df[err1], kinetics_df[err2]))
errorcombs_dc = {"name": errorcombs[0], "covariance":errorcombs[1], "correlation":errorcombs[2]}
errorcombs_df = pd.DataFrame(data=errorcombs_dc)
errorcombs_df["i_correlation"] = errorcombs_df["correlation"] * (-1)

# plot bar plot per a pair of errors with superimposed correlation
err_fig = px.bar(title="Correlation between Errors", labels={"correlation":"correlation"}, log_y=True)
err_fig.add_bar(x=errorcombs_df["name"], y=errorcombs_df["correlation"], name="positive correlation", marker_color="green")
err_fig.add_bar(x=errorcombs_df["name"], y=errorcombs_df["i_correlation"], name="negative correlation", marker_color="crimson")
err_fig.show()

In [None]:
# violin plots of the errors
for errors in ["p1_variance", "p2_variance", "p1_p2_covariance"]:
    kinetics_df[errors] = kinetics_df[errors].apply(lambda x: np.absolute(x))
violin_err_fig = px.violin(kinetics_df, y=["squared_error", "p1_variance", "p2_variance", "p1_p2_covariance"], box=True, points="all", log_y=True)
violin_err_fig.update_layout(title="Errors normalize by \u03C3", xaxis_title="Error type", yaxis_title="value", yaxis=dict(title="log(value)", range=(-22, 3)))
violin_err_fig.show()

In [None]:
# get information about within which error margin the fits and therefore kinetics are good
# split error in quartiles, index per error

# make sure the data exploration figures folder exists
if not os.path.exists("data/data exploration figures"):
    os.makedirs("data/data exploration figures")

title_dic = {"squared_error":"r² - error",
             "p1_variance":"variance of parameter 1",
             "p2_variance":"variance of parameter 2",
             "p1_p2_covariance":"covariance (Params 1 & 2)"}

def get_quartile_indexes(error_type):
    quartile_len = len(kinetics_df) / 4
    quartile_ranges = np.array([(a * quartile_len, (a + 1) * quartile_len) for a in range(4)], dtype=int)
    quartiles_list = list()
    for q in range(4):
        quartiles_list.append(kinetics_df.sort_values(by=[error_type]).iloc[quartile_ranges[q][0]:quartile_ranges[q][1]])
    return quartiles_list

fig = px.line()
def export_quartile_figures():
    for err in tqdm(["squared_error", "p1_variance", "p2_variance", "p1_p2_covariance"]):
        err_quartiles = get_quartile_indexes(err)

        # plot the single quartiles
        for nr, n in enumerate(err_quartiles):
            iks = n.index
            fg = px.line(title=f"kinetics (quartile {nr+1} of the {title_dic[err]}) ", labels={"x":"time", "y":"conversion"})
            for ik in iks:
                marker_dict = dict(color=colors[ik%len(colors)])
                fit_data = kinetics_df.iloc[ik]
                xdt, ydt = fit_data["xdata"], fit_data["ydata"]
                fg.add_scatter(x=xdt, y=ydt, mode="lines+markers", name=kinetic_curves[ik]["exp_nr"].iloc[0], marker=marker_dict)
                add_fits_to_plot(fig, neg_growth, neg_growth_derivative, [fit_data["fit_p1"], fit_data["fit_p2"]], marker=marker_dict)
                fg.update_layout(yaxis=dict(range=[-0.1,1]), xaxis_title="Time [h]", yaxis_title="Conversion [%]")
            fg.write_image(f"data/data exploration figures/{err} ({nr+1} quartile).svg")

# export_quartile_figures() ; print("exported")

In [None]:
# descry when a fitting function aligns to the datapoints in a reasonable way.
# a point for every quartile further from the first (with the greatest error) for each single error is given.
# lastly that score is normalized by dividing by the maximum score (that is 3*4=12)
error_list = ["squared_error", "p1_variance", "p2_variance", "p1_p2_covariance"]
error_dic = {}
score = np.zeros(len(kinetics_df), int)
for error in error_list:
    quartiles = get_quartile_indexes(error)
    # for every error we want to give an error per index
    for sc, quartile in enumerate(quartiles):
        if sc == 0:
            continue
        # each quartile is a dataframe, where the latter one raise a higher error ( 0, 1, 2, 3)
        for num in quartile.index:
            score[num]+=sc
kinetics_df["error_score"]=score

In [None]:
p1_values = kinetics_df["fit_p1"].values
average_conversion = p1_values.mean()
min_con, max_con = p1_values.min(), p1_values.max()

print(f"According to the fits the mean maximum conversion is {average_conversion:.3n}, the minimum maximum is {min_con:.3n} and the maximum overall is {max_con:.3n}")

In [None]:
# determining apposite reaction end values.
# The moment where 90% of the maximum conversion have been reached can be seen as a practical maximum conversion point
kinetics_df["theo_max_con"] = kinetics_df["fit_p1"].apply(lambda x: x*0.9)

def y_converted_negative_growth(y, l, k):
    return -np.log(1 - y / l) / k

kinetics_df["theo_react_end"] = \
    [y_converted_negative_growth(y, fit_p1, fit_p2) for y, fit_p1, fit_p2 in zip(kinetics_df["theo_max_con"], kinetics_df["fit_p1"], kinetics_df["fit_p2"])]

# the theoretical maximal conversion must be capped at reasonable time (we take two days here) that is applying 139/313 entries
kinetics_df["theo_react_end"] = [30 if x > 30 else x for x in kinetics_df["theo_react_end"].values]
# recalculate the apposite maximal conversion
kinetics_df["theo_max_con"] = [neg_growth(x, p1, p2) for x, p1, p2 in zip(kinetics_df["theo_react_end"], kinetics_df["fit_p1"], kinetics_df["fit_p2"])]
# find the mean dispersity of the last 3 values (there are always at least 4), as the first low molar weights are usually too uncertain due to SEC sensitivity
kinetics_df["mean_dispersity"] = kinetics_df["dispersity_time_data"].apply(lambda x: np.mean(x[1][-3:]))

In [None]:
# searching manually for interesting experiments to be reproduced by MR.
search_queue_monomer = kinetics_df["monomer"].apply(lambda x: x in ["Styrene",  "4-Methylstyrene", "Benzyl acrylate"])
search_queue_solvent = kinetics_df["solvent"].apply(lambda x: x in ["Dimethylformamide"])
kinetics_df[search_queue_monomer & search_queue_solvent]

In [None]:
# one longrun test for the theo react end cap (146) for up to 72h
# two to see reproducible exp from within the second quantile in average (score of 4/5)
#   one "fast" fit point (202)
#   one for a weak conversion (90)
kinetics_df[kinetics_df['exp_nr'].isin(["205", "90", "146"])]

In [None]:
# to find the optimal threshold parameters for search one has to keep in mind that with high conversion (assuming
# around 80%) increasing side reactions can take place. After conversion the reactions should be sorted after time,
# then error score and lastly polymer dispersity. A multiple-decreasing-threshold-sorting-algorithm would be good:
# So first priority would be sorting after nearest to 80% conversion.

# create a score ingesting the importance of the different kinetic descriptors
#     Conversion*1 + time²*(-0.8) + error_score*(0.5) + dispersity*(0.3)
#     while spanning between the optimum and the least bearable values like in the following:
#          Conversion: |con-0.8| - 0 (0.8 is the optimum)
#             using a linear decreasing function -x*m+b
#          Time: 0 - np.inf (0 is the optimum) (more than 30 is not bearable and set as maximum)
#             using a negative potential function -x**2+b
#          Error: 0 - 12 (0 is the optimum) (the error is more negligible)
#             using a linear decreasing function -x*m+b
#          Dispersity: 1 is the optimum, at 1.5 half the score should be lost. The further from there the less impact.
#             using a reciprocal function 1/(2x-1)
score = []
for row in kinetics_df.itertuples():
    row_score = ((0.8-np.abs(row.theo_max_con - 0.8))/0.8*1 + (-(row.theo_react_end/30)**2+1)*0.8 + ((12-row.error_score)/12)*0.5) + (1/(2*row.mean_dispersity-1)*0.3)
    normalized_row_score = row_score / ( 1 + 0.8 + 0.5 + 0.3)
    score.append(normalized_row_score)
kinetics_df["score"] = score
kinetics_df

In [None]:
def color_variant(hex_color, brightness_offset=1):
    """ takes a color like #87c95f and produces a lighter or darker variant """
    if len(hex_color) != 7:
        raise Exception("Passed %s into color_variant(), needs to be in #87c95f format." % hex_color)
    rgb_hex = [hex_color[x:x+2] for x in [1, 3, 5]]
    new_rgb_int = [int(hex_value, 16) + brightness_offset for hex_value in rgb_hex]
    new_rgb_int = [min([255, max([0, i])]) for i in new_rgb_int] # make sure new values are between 0 and 255
    # hex() produces "0x88", we want just "88"; also we zfill to pad with leading zeros if necessary, e.g. 9 -> 09
    return "#" + "".join([hex(i)[2:].zfill(2) for i in new_rgb_int])

In [None]:
# Some search functions:
def search_for_exp(exp_nr: str | list) -> pd.DataFrame:
    if type(exp_nr) == str:
        print(f"Searching for experiment {exp_nr}")

        return kinetics_df[kinetics_df["exp_nr"] == exp_nr]
    else:
        return kinetics_df[kinetics_df["exp_nr"].isin(exp_nr)]

def plot_exp(exp_nr: str | list, plot_mn: bool=False, plot_mw: bool=False, fit_curves: tuple=(True, True)):
    plot_data = search_for_exp(exp_nr)
    exp_fig = px.line(title=f"Kinetic Curve Fit for {exp_nr}")
    for kinetic_to_plot in plot_data.itertuples():
        x_data, ydata = kinetic_to_plot.conv_time_data
        marker_dict = dict(color=colors[int(kinetic_to_plot.Index)%len(colors)])
        exp_fig.add_scatter(x=x_data, y=ydata, mode="lines+markers", name=kinetic_to_plot.exp_nr, marker=marker_dict, legendgroup=str(kinetic_to_plot.exp_nr))
        if fit_curves[0]:
            if fit_curves[1]:
                add_fits_to_plot(exp_fig, neg_growth, [kinetic_to_plot.fit_p1, kinetic_to_plot.fit_p2], fit_func_derivative=neg_growth_derivative, marker=marker_dict, legendgroup=str(kinetic_to_plot.exp_nr), showlegend=False)
            else:
                add_fits_to_plot(exp_fig, neg_growth, [kinetic_to_plot.fit_p1, kinetic_to_plot.fit_p2], marker=marker_dict, legendgroup=str(kinetic_to_plot.exp_nr), showlegend=False)

        if plot_mn:
            marker_dict["color"] = color_variant(marker_dict["color"], 30)
            x2_data, y2_data = kinetic_to_plot.Mn_time_data
            exp_fig.add_scatter(x=x2_data, y=y2_data, mode="lines+markers", name="Mn of " + kinetic_to_plot.exp_nr, marker=marker_dict, legendgroup=str(kinetic_to_plot.exp_nr))
        if plot_mw:
            marker_dict["color"] = color_variant(marker_dict["color"], -60)
            x2_data, y2_data = kinetic_to_plot.Mw_time_data
            exp_fig.add_scatter(x=x2_data, y=y2_data, mode="lines+markers", name="Mw of " + kinetic_to_plot.exp_nr, marker=marker_dict, legendgroup=str(kinetic_to_plot.exp_nr), opacity=0.5)

    exp_fig.update_layout(yaxis=dict(range=[-0.1,1]), xaxis_title="Time [h]", yaxis_title="Conversion [%]")
    exp_fig.show()

def find_optimal_synthesis(monomer: str | list):
    search_q_monomer = [x in monomer for x in kinetics_df["monomer"]]
    result_df = kinetics_df[search_q_monomer].sort_values(by=["score"], ascending=False)
    return result_df

def refine_search(dataframe: pd.DataFrame, monomer: list = None, solvent: list = None, raft_agent: list = None):
    len_df = len(dataframe)
    search_q_monomer, search_q_solvent, search_q_raft_agent = [np.array([True] * len_df) for _ in range(3)]
    if monomer:
        search_q_monomer = dataframe["monomer"].apply(lambda x: x in [*monomer])
    if solvent:
        search_q_solvent = dataframe["solvent"].apply(lambda x: x in [*solvent])
    if raft_agent:
        search_q_raft_agent = dataframe["RAFT-Agent"].apply(lambda x: x in [*raft_agent])
    return dataframe[search_q_monomer & search_q_solvent & search_q_raft_agent]

# search_for_exp(["145", "90", "253"])
# plot_exp(["145", "90", "253"], True, True)
plot_exp(["241", "146", "392"], True, True)
# find_optimal_synthesis(["Styrene", "4-Methylstyrene", "Benzyl acrylate"])
# refine_search(kinetics_df, solvent=["Dimethylformamide"], monomer=["Styrene", "4-Methylstyrene", "Benzyl acrylate"], raft_agent=["2-Cyan-2-propylbenzodithioat"])
# refine_search(find_optimal_synthesis(["Styrene", "4-Methylstyrene", "Benzyl acrylate"]), solvent=["Dimethylformamide"], raft_agent=["2-Cyan-2-propylbenzodithioat"])

In [None]:
from IPython.display import Markdown, display
def display_color(hex_color, width=6):
    if type(hex_color) == str:
        display(Markdown(f'<span style="font-family: monospace">{hex_color} <span style="color: {hex_color}">{chr(9608)*width}</span></span>'))
    else:
        display(Markdown('<br>'.join(
            f'<span style="font-family: monospace">{color} <span style="color: {color}">{chr(9608)*width}</span></span>'
            for color in hex_color)))
display_color(colors)

In [None]:
# manually typing in the proof experimental data from michael
# "MRG-046-G-ZL-1-E-DMF" -> exp
# "MRG-046-G-ZL-4-B-DMF" -> exp
# "MRG-046-G-ZL-15-F-DMF" -> exp
# "exp", "monomer", "RAFT-Agent", "solvent", "time", "conversion", "Mn", "Mw"


""" !The experiment numbers changed to from 146, 90 and 205 to 241, 145 and 343!
M. Ringleb:
146 evaluation of the observations was done after ca. 90 hno addition of NMR solvent and SEC eluent to vials for 15 h sample -->
potential explanation for problems with evaluation or deviations (sample stood for 5 h without being quenched) --> for NMR addition of 400 yL of CDCl3 prior to filling to NMR tube

90 evaluation of this points was done after 20 h time in the reactor
double addition of NMR solvent and SEC eluent to the vials for t =10h
also for NMR sampling for t=10 h sample the septum was pushed through the lid, so the vial stood open for ca. 9hours before it was filled to the NMR tube "

205 evaluation of this points was done after 20 h time in the reactor"""

proof_data = [
    ["241b", "1", "E", "DMF", 0, 0.0, 0.0, 0.0],
    ["241b", "1", "E", "DMF", 15, 0.05, 3000, 3400],
    ["241b", "1", "E", "DMF", 60, 0.24, 3500, 3900],
    ["241b", "1", "E", "DMF", 72, 0.24, 3500, 4100],
    ["241b", "1", "E", "DMF", 84, 0.24, 3700, 4200],
    ["145b", "4", "B", "DMF", 0, 0.0, 0.0, 0.0],
    ["145b", "4", "B", "DMF", 2.95, 0.02, 870, 970],
    ["145b", "4", "B", "DMF", 10, 0.10, 2200, 2500],
    ["343b", "15", "F", "DMF", 0, 0.0, 0.0, 0.0],
    ["343b", "15", "F", "DMF", 5.62, 0.85, 7700, 14800],
    ["343b", "15", "F", "DMF", 8, 0.90, 7100, 15100],
    ]

# calculating dispersity
for i in range(len(proof_data)):
    if proof_data[i][7] == 0:
        proof_data[i].append(0)
    else:
        proof_data[i].append(proof_data[i][6]/proof_data[i][7])

proof_df_points = pd.DataFrame(data=proof_data, columns=["exp_nr", "monomer", "RAFT-Agent", "solvent", "time", "conversion", "Mn", "Mw", "dispersity"])
# reformatting to kinetics_df format
proof_df = pd.DataFrame()
for kinetic_nr in proof_df_points["exp_nr"].unique():
    kinetic = proof_df_points[proof_df_points["exp_nr"] == kinetic_nr]
    proof_kinetic = pd.DataFrame({"exp_nr":kinetic["exp_nr"].iloc[0],
                                  "monomer":reaction_descriptors_dict[kinetic["monomer"].iloc[0]],
                                  "RAFT-Agent":reaction_descriptors_dict[kinetic["RAFT-Agent"].iloc[0]],
                                  "solvent":reaction_descriptors_dict[kinetic["solvent"].iloc[0]],

                                  "conv_time_data":[[kinetic["time"].values, kinetic["conversion"].values]],
                                  "Mn_time_data":[[kinetic["time"].values, kinetic["Mn"].values/100000]],
                                  "Mw_time_data":[[kinetic["time"].values, kinetic["Mw"].values/100000]],
                                  "dispersity_time_data":[[kinetic["time"].values, kinetic["dispersity"].values]],})

    proof_df = pd.concat([proof_df, proof_kinetic])
proof_df

In [None]:
# comparing proof and original experiments data
proof_fig = px.line()

for idx, data in (enumerate(data_type_list)):
    comb_d = pd.concat([proof_df, search_for_exp(["241", "145", "343"])])
    for kin, exp_nr in zip(comb_d[data], comb_d["exp_nr"]):
        proof_fig.add_scatter(x=kin[0], y=kin[1], mode="lines+markers", opacity=0.5, name=exp_nr, marker=dict(color=colors[idx%len(colors)]), showlegend=False, legendgroup=str(idx))
    # custom legend
    proof_fig.add_scatter(x=[None], y=[None], marker=dict(color=colors[idx%len(colors)]), name=data_list_name[idx], legendgroup=str(idx))
# adding a threshold line because the SEC peaks smear into syspeak at that point
proof_fig.add_scatter(x=[0, 84], y=[1000*10**(-5), 1000*10**(-5)], mode="lines", line=dict(dash="dash"), name="SEC limit", legendgroup="threshold")
proof_fig.update_layout(legend=dict(orientation="h", yanchor="bottom", y=1.05, xanchor="left", x=-0.1 ), yaxis_title=r"$Mn, Mw \ [g/mol] \cdot 10^{-5}\text{ and conversion} \ [ \%]$", xaxis_title="Time [h]",
                        paper_bgcolor='white',  # Set background to light mode
                        plot_bgcolor='white',   # Set plot background to white
                        font=dict(color='black'),  # Set font color to black for light mode
                        margin=dict(l=0, r=0, t=10, b=0),  # Tight margins to reduce whitespace
                        width=500,
                        ) #height=300,
                        
proof_fig.show()

"""
Y. Köster:
The Comparison of proof and prior data shows that the experimental data is not easily reproducible with respective accuracy in small orders of magnitude (Conversion after 15 h around 10 %)
Especially for the RAFT synthesis with this concern of low reaction rate an extrapolation further than double the time (to 30 h) seems not feasible.
 Therefore the former time cap of 70 h gets changed to 30 h.
"""

In [None]:
# I want to compare "long-term experiment" and "low and high conversion reproducibility" separately
# 241b and 241 are necessary for the prior
# 145, 343 and their b's are for the later
comparisons = [["241"], ["145", "343"]]
# datatypes to compare from data_type_list = ["conv_time_data", "Mn_time_data", "Mw_time_data"]
# @iframe_decorator
def compare_redone_exp_mw(repl_exp_nr: list):
    comp_fig = px.line()
    comb_df = pd.concat([search_for_exp(repl_exp_nr), proof_df[[nr in [c + "b" for c in repl_exp_nr] for nr in proof_df["exp_nr"]]]])
    
    max_time = 0 # to get the lenght of the sec limit line
    for idx, data in (enumerate([data_type_list[2]])):
        for kin, exp_nr in zip(comb_df[data], comb_df["exp_nr"]):
            comp_fig.add_scatter(x=kin[0], y=kin[1]*10**5, mode="markers+lines", opacity=1, name=exp_nr,)# marker=dict(color=colors[idx%len(colors)]), showlegend=False, legendgroup=str(idx))
            max_time = max(max_time, kin[0][-1])
            
            # adding fitting only to 241
            if exp_nr == "241":
                xdata = np.array(kin[0], dtype=float)
                ydata_Mw = np.array(kin[1]*10**5, dtype=float)
                p_initial = [max(ydata_Mw), 0.1]
                ng_fit_Mw = fit_and_exclude_outliers(x=xdata, y=ydata_Mw, fit_func=neg_growth, p0=p_initial, bounds=([0, -np.inf], [np.inf, np.inf]))
                comp_fig.add_scatter(x=np.linspace(-1, 80, 100), y=neg_growth(np.linspace(-1, 80, 100), *[ng_fit_Mw["p_opt"][0], ng_fit_Mw["p_opt"][1]]),
                                     opacity=1, line=dict(dash="dot"), name="Prediction")
                
        # custom legend
        # comp_fig.add_scatter(x=[None], y=[None], marker=dict(color=colors[idx%len(colors)]), name=data_list_name[idx], legendgroup=str(idx))
    # adding a threshold line because the SEC peaks smear into syspeak at that point
    comp_fig.add_scatter(x=[0, max_time], y=[1000, 1000], mode="lines", line=dict(dash="dash"), name="SEC limit", legendgroup="threshold")
    comp_fig.update_layout(legend=dict(orientation="h", yanchor="bottom", y=1.05, xanchor="left", x=-0.1 ), yaxis_title="<i>M</i><sub>w</sub> [g mol<sup>-1</sup>]", xaxis_title="Time [h]",
                           paper_bgcolor='white',  # Set background to light mode
                           plot_bgcolor='white',   # Set plot background to white
                           font=dict(color='black'),  # Set font color to black for light mode
                           margin=dict(l=0, r=0, t=10, b=0),  # Tight margins to reduce whitespace
                           width=500, height=250,
                           xaxis=dict(showgrid=False),
                           yaxis=dict(showgrid=True, gridcolor="rgba(0,0,0,0.2)", gridwidth=1),
                           )
    
    return comp_fig

# compare_redone_exp_mw(comparisons[0]).write_image(os.path.join(os.getcwd(), "data", "data exploration figures", "long-therm-experiment-mw.svg"))
compare_redone_exp_mw(comparisons[0]).show()

In [None]:
def compare_redone_exp_conv(repl_exp_nr: list):
    comp_fig = px.line()
    # comb_df = pd.concat([search_for_exp(repl_exp_nr), proof_df[[nr in [c + "b" for c in repl_exp_nr] for nr in proof_df["exp_nr"]]]])  just to have the legend side by side: 145 145b 343 343b
    comb_df = pd.concat([search_for_exp("145"), proof_df[proof_df["exp_nr"]=="145b"], search_for_exp("343"), proof_df[proof_df["exp_nr"]=="343b"]])

    max_time = 0 # to get the lenght of the sec limit line
    color_wheel = 0
    for idx, data in (enumerate([data_type_list[0]])):
        for kin, exp_nr in zip(comb_df[data], comb_df["exp_nr"]):
            comp_fig.add_scatter(x=kin[0], y=kin[1]*100, mode="markers", opacity=1, name=exp_nr, marker=dict(color=colors[color_wheel%len(colors)]))# marker=dict(color=colors[idx%len(colors)]), showlegend=False, legendgroup=str(idx))
            color_wheel += 1
            max_time = max(max_time, kin[0][-1])
            # add fitting to the first of the experimentation pars
            if exp_nr == "145" or exp_nr == "343":
                comp_xdata = np.array(kin[0], dtype=float)
                comp_ydata_conv = np.array(kin[1]*100, dtype=float)
                comp_p_initial = [max(ydata_conv), 0.1]
                comp_ng_fit_conv = fit_and_exclude_outliers(x=comp_xdata, y=comp_ydata_conv, fit_func=neg_growth,
                                                            p0=comp_p_initial, bounds=([0, -np.inf], [np.inf, np.inf]))
                # add_fits_to_plot(comp_fig, neg_growth, [comp_ng_fit_conv["p_opt"][0], comp_ng_fit_conv["p_opt"][1]])
                # print(comp_ng_fit_conv["p_opt"], comp_ng_fit_conv["outliers"])
                comp_fig.add_scatter(
                    x=np.linspace(0, int(kin[0][-1]+1), 100),
                    y=neg_growth(
                        np.linspace(0, int(kin[0][-1]+1), 100), *[comp_ng_fit_conv["p_opt"][0],  comp_ng_fit_conv["p_opt"][1]]),
                    opacity=0.5, line=dict(dash="dot"), showlegend=False, marker=dict(color="gray"))
        # custom legend
        # comp_fig.add_scatter(x=[None], y=[None], marker=dict(color=colors[idx%len(colors)]), name=data_list_name[idx], legendgroup=str(idx))
    # adding a threshold line because the SEC peaks smear into syspeak at that point
    # comp_fig.add_scatter(x=[0, max_time], y=[1000*10**(-5), 1000*10**(-5)], mode="lines", line=dict(dash="dash"), name="SEC limit", legendgroup="threshold") # r"$Mn\ [g/mol] \cdot 10^{-5}$"
    comp_fig.update_layout(legend=dict(orientation="h", yanchor="bottom", y=1.05, xanchor="left", x=-0.1 ), yaxis_title="Conversion [%]", xaxis_title="Time [h]",
                            paper_bgcolor='white',  # Set background to light mode
                            plot_bgcolor='white',   # Set plot background to white
                            font=dict(color='black'),  # Set font color to black for light mode
                            margin=dict(l=0, r=0, t=10, b=0),  # Tight margins to reduce whitespace
                            width=500, height=250,
                           xaxis=dict(showgrid=False),
                           yaxis=dict(showgrid=True, gridcolor="rgba(0,0,0,0.2)", gridwidth=1),
                           )    
    return comp_fig
# compare_redone_exp_conv(comparisons[1]).write_image(os.path.join(os.getcwd(), "data", "data exploration figures", "reproducibility_at_low_conversion.svg"))
compare_redone_exp_conv(comparisons[1]).show()

In [None]:
# search for all kinetics with falling Mn, Mw Values, they start appearing a lot when the Mn is over 0.25 * 10**5. They got now all curated out in the initial curation step.

In [None]:
# how many number unique did it actually make through curation
interests = ["monomer", "RAFT-Agent", "solvent"]
for interest in interests:
    print(str(kinetics_df[interest].nunique()) + " for " + interest)
    print(kinetics_df[interest].unique())

In [None]:
refine_search(dataframe=kinetics_df, raft_agent=["Benzyl 1H-pyrrole-1-carbodithioate"])


In [None]:
# plot_exp(["241", "145", "343"], True, True)
search_for_exp(["241", "145", "343"])

In [None]:
# create three 2D permutation tables (for each solvent)
# I want to create the three tables with same headers (on first row and first column) and then fill them by iterating through the table and the discarded table to show which where tried, which "worked", which did not and where thus discarded and which are not tried. 

# Since there are fewer agents than monomers prior will be the filling the first row
same_row_headers = [str(x) for x in range(1,17)] # just the index
same_column_headers = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"] 
DMF_p_table = pd.DataFrame(data=np.zeros((16, 10), dtype=int), columns=same_column_headers, index=same_row_headers)
Tol_p_table = pd.DataFrame(data=np.zeros((16, 10), dtype=int), columns=same_column_headers, index=same_row_headers)
DMSO_p_table = pd.DataFrame(data=np.zeros((16, 10), dtype=int), columns=same_column_headers, index=same_row_headers)

# fill the tables
def fill_dataset_into_tables(dataset: pd.DataFrame, fill: int):
    for row in dataset.itertuples():
        monomer = row.monomer
        agent = row._4 # since the - of RAFT-Agent is not allowed as an attribute name
        solvent = row.solvent
        match solvent:
            case "DMF":
                DMF_p_table.loc[monomer, agent] += fill
            case "Tol":
                Tol_p_table.loc[monomer, agent] += fill
            case "DMSO":
                DMSO_p_table.loc[monomer, agent] += fill

fill_dataset_into_tables(sRt.df, 10)
fill_dataset_into_tables(sRt.discarded_df, -1)
# fill_dataset_into_tables(sRt.permutations_df, -3)
# looking at e.g "4" "A" as monomer['4-Methylstyrene' raft_agent '2-Cyan-2-propylbenzodithioat' 4 experiments were introduced but 3 were discarded
# a reaction that worked once will now just be set as "working whilst 
def value_format(val):
    if val > 0:
        return True
    elif val == 0:
        return np.nan
    else:
        return False

DMSO_p_table = DMSO_p_table.map(value_format)
Tol_p_table = Tol_p_table.map(value_format)
DMF_p_table = DMF_p_table.map(value_format)

In [None]:
# write permutation tables to file
# with pd.ExcelWriter(os.path.join(os.getcwd(), "data", "data exploration figures", "permutation tables.xlsx")) as writer:
#     DMSO_p_table.to_excel(writer, sheet_name="DMSO", index=True)
#     Tol_p_table.to_excel(writer, sheet_name="Toluene", index=True)
#     DMF_p_table.to_excel(writer, sheet_name="DMF", index=True)

In [None]:
# re-involve the abortive experiments with a score of 0
# 1st get the experiments from the discarded_df
# 2nd count those (number-)unique and bring them in the same shape as the kinetics_df:
#   translate monomer and agent numbers and letters
#   leave conversion, time, score and so forth 0
# 3rd append them to the kinetics_df
# 4th drop all duplicate from the discarded only so no failed experiments will be shown twice when re-inserted: 
#   combine both, drop all duplicates for monomer, solvent, RAFT-Agent
#   keep the remaining exp_nr and mask with them what should be left in the reformatted_discarded and drop the rest
#   finally concat the filtered_reformatted_discarded and the kinetics_df
reformatted_discarded = sRt.discarded_df[["Experiment number", "monomer", "RAFT-Agent", "solvent"]].copy()
# rename and reorder columns
reformatted_discarded.columns = ["exp_nr", "monomer", "RAFT-Agent", "solvent"]
reformatted_discarded["exp_nr"] = reformatted_discarded["exp_nr"].astype(str)
mappables = ["monomer", "RAFT-Agent", "solvent"]
reformatted_discarded[mappables] = reformatted_discarded[mappables].map(lambda x: reaction_descriptors_dict[x])

reformatted_discarded["score"] = 0

In [None]:
discarded_kinetics = len(reformatted_discarded)
discarded_kinetics_w_o_duplicates = len(reformatted_discarded.drop_duplicates(subset=["monomer", "RAFT-Agent", "solvent"], keep="first"))
print(f"{discarded_kinetics} discarded kinetics")
print(f"{discarded_kinetics - discarded_kinetics_w_o_duplicates} (additional) duplicates in those discarded kinetics")

In [None]:
# append the reformatted discarded table above to the kinetics data
# this series withholds all the exp numbers that should still be present in the discarded set
exp_nr_to_keep = pd.concat([kinetics_df, reformatted_discarded.drop_duplicates(subset=["monomer", "RAFT-Agent", "solvent"], keep="first")]).drop_duplicates(subset=["monomer", "RAFT-Agent", "solvent"], keep=False)["exp_nr"]
# create a mask
reform_to_keep = reformatted_discarded["exp_nr"].isin(exp_nr_to_keep)
unique_discarded_non_over_all_duplicate = len(reformatted_discarded[reform_to_keep])
print(f"{unique_discarded_non_over_all_duplicate} discarded experiments which where unique over all")

In [None]:
combined_df = pd.concat([kinetics_df, reformatted_discarded[reform_to_keep]])


In [None]:
success_f_kinetics = len(kinetics_df)
success_f_kinetics_w_o_duplicates = len(kinetics_df.drop_duplicates(subset=["monomer", "RAFT-Agent", "solvent"], keep="first"))
print(f"{success_f_kinetics} successful and reasonable kinetics")
print(f'{success_f_kinetics-success_f_kinetics_w_o_duplicates} (additional) duplicates')
print(f"{success_f_kinetics_w_o_duplicates} unique successful and reasonable kinetics")

In [None]:
import plotly.graph_objects as go


# make a pie chart for the different dataparts
# difference to evaluation table xlsx kinetics_dropped_bec_max_con = len_kinetics-len(kinetics_df) = 3 
# we got 552 (311+241) kinetics in total,
# 241 accurately described successful reaction kinetics  (~313 in skript before~)
# 314 discarded (~311 in skript before, of which 198 (311-113) are duplicates~) 
              # "dk unique over sk & dk": discarded_kinetics_w_o_duplicates
# sun_entries = ['Successful', 'Unique ', 'Duplicates ', 'Discarded', 'Unique', 'Duplicates']
sun_entries = ['Successful (44%)', 'Unique (35%)', 'Duplicates (9%)', 'Discarded (56%)', 'Unique (33%)', 'Duplicates (23%)']
sun_values = [success_f_kinetics,
              success_f_kinetics_w_o_duplicates,
              success_f_kinetics - success_f_kinetics_w_o_duplicates,
              discarded_kinetics,
              discarded_kinetics_w_o_duplicates,
              discarded_kinetics - discarded_kinetics_w_o_duplicates,
              ]
# data_point_parents=["", "Successful", "Successful", "", "Discarded", "Discarded"]
data_point_parents=["", "Successful (44%)", "Successful (44%)", "", "Discarded (56%)", "Discarded (56%)"]

# sun_entries.append("unique over all discarded"); sun_values.append(unique_discarded_non_over_all_duplicate); data_point_parents.append(["Successful", "Discarded"])

kin_in_database_fig = go.Figure(go.Sunburst(
    labels=sun_entries,
    parents=data_point_parents,
    values=sun_values,
    branchvalues="total",
    insidetextorientation="horizontal",
    rotation=180
))
kin_in_database_fig.update_layout(title={
        'text': "Kinetic experiments: <br> successful, discarded, redone or unique",
        'y': 0.9,
        'x': 0.5,
        'xanchor': 'center',
        'yanchor': 'top'
    },
    paper_bgcolor='white',  # Set background to light mode
    plot_bgcolor='white',   # Set plot background to white
    font=dict(color='black', size=15),  # Set font color to black for light mode
    margin=dict(l=0, r=0, t=90, b=10),  # Tight margins to reduce whitespace
    width=500, #height=300,
)

# Customize tick marks (if relevant on radial axes)
kin_in_database_fig.update_traces(
    marker=dict(line=dict(color='black', width=0.5)),  # Add outlines for the slices
    textinfo="label+value",  # Display labels and values
    hoverinfo="label+value+percent entry",  # Show relevant hover information
)
# kin_in_database_fig.write_image(os.path.join(os.getcwd(), "data", "data exploration figures", "kin_in_database_fig.svg"))
kin_in_database_fig.show()

In [None]:
# which "solvent", "RAFT-Agent", "monomer" work good in general:
# px.sunburst(kinetics_df, path= ["solvent", "RAFT-Agent", "monomer"], values="max_con", maxdepth=2)

In [None]:
import functools
# theoretical, semi-practical and practical reaction fit
def sigmoid (x, L ,x0, k, b):
    y = L / (1 + np.exp(-k*(x-x0)))+b
    return (y)

x = np.linspace(0, 1, 100)

# prefill functions
fitting_functions = [("Theoretical - linear", functools.partial(linear_growth, m=1)),
                     ("Semi-practical - sigmoidal", functools.partial(sigmoid, L=0.9, x0=0.5, k=10, b=0)),
                     ("Practical - negative growth", functools.partial(neg_growth, l=1, k=2))]

fitting_function_comparison_fig = px.line(labels={"x":"Time", "y":"Conversion"}) #title="Comparison of theoretical, semi-practical and practical <br>reaction fits for living polymerizations kinetics",
for curve_type, function in fitting_functions:
    fitting_function_comparison_fig.add_scatter(x=x,y=function(x), mode="lines", name=curve_type)

fitting_function_comparison_fig.update_layout(
    xaxis=dict(
        # showticklabels=False,# Hide tick labels on x-axis - sets the title setoff to 0, so the following workaround is done:
        tickfont=dict(color="rgba(0,0,0,0)", size=1),
        showgrid=False,      # Hide grid lines
        title="Time"
    ),
    yaxis=dict(
        range=[0,1],
        showticklabels=False,
        showgrid=False,
        title="Conversion"
    ),
    legend=dict(
        yanchor="top",
        y=0.99,
        xanchor="left",
        x=0.01
    ),
    paper_bgcolor='white',  # Set background to light mode
    plot_bgcolor='white',   # Set plot background to white
    font=dict(color='black'),  # Set font color to black for light mode
    margin=dict(l=0, r=0, t=0, b=0),  # Tight margins to reduce whitespace #t=80 with title
    width=500, height=400,
)
# fitting_function_comparison_fig.write_image(os.path.join(os.getcwd(), "data", "data exploration figures", "fitting_function_comparison_fig.svg"))
fitting_function_comparison_fig.show()

In [None]:
reformatted_discarded["max_con"] = 0
combined_df_with_duplicates = pd.concat([reformatted_discarded, kinetics_df])
combined_df_with_duplicates

In [None]:
# search for duplicates of discarded and successful kinetics to look out for reproducibility and how different the experiments are
groupby_params = kinetics_df.groupby(["monomer", "RAFT-Agent", "solvent"])
groupby_params_labels = [f"{solvent} with {monomer} in {_6}" for solvent, monomer, _6 in groupby_params.groups.keys()]

x = range(len(groupby_params_labels))
y_max_conv = [group["max_con"].values for _, group in groupby_params]
fig, ax = plt.subplots(figsize=(10, 6))
for i, values in enumerate(y_max_conv):
    ax.scatter([x[i]]*len(values), values,label=groupby_params_labels[i] if len(values) == 1 else "")

ax.set_xticks(x)
ax.set_xticklabels(groupby_params_labels, rotation=45, ha="right")
ax.set_ylabel("Max Conversion")
ax.set_xlabel("Solvent-Monomer-Raft")
ax.set_title("Max Conversion by Solvent, Monomer and RAFT-agent")
plt.tight_layout()


In [None]:
rafts = kinetics_df["RAFT-Agent"].unique()
for raft in rafts:
    # raft_subset = kinetics_df[kinetics_df["RAFT-Agent"] == raft]
    raft_subset = combined_df_with_duplicates[combined_df_with_duplicates["RAFT-Agent"] == raft]
    groupby_params_rafts = raft_subset.groupby(["monomer", "solvent"])
    groupby_params_labels_rafts = [f"{monomer} in {solvent}" for monomer, solvent in groupby_params_rafts.groups.keys()]
    
    x = range(len(groupby_params_labels_rafts))
    y_max_conv = [group["max_con"].values for _, group in groupby_params_rafts]
    fig, ax = plt.subplots(figsize=(10, 6))
    for i, values in enumerate(y_max_conv):
        ax.scatter([x[i]]*len(values), values,label=groupby_params_labels_rafts[i] if len(values) == 1 else "")

    ax.set_xticks(x)
    ax.set_xticklabels(groupby_params_labels_rafts, rotation=45, ha="right")
    ax.set_ylabel("Max Conversion")
    ax.set_xlabel("Solvent-Monomer")
    ax.set_title(f"Max Conversion for all exp with {raft}")
    plt.tight_layout()
    plt.show()
    plt.close()