# 2025 Symbolic regression Monod paper
# Posttreatment of symbolic regression equations

## Locating data

In [None]:
cwd = 

In [None]:
import numpy as np
import scipy as sp
import pandas as pd
import matplotlib.pyplot as plt
#import seaborn as sns
#import itertools
from sympy import *
from IPython.display import display # to display symbolic expressions

import os
#import string
#from collections import Counter

# Functions

In [None]:
from sympy.core.numbers import Integer, NegativeOne
from sympy.functions.elementary.complexes import sign
from sympy.physics.units.quantities import Quantity
from sympy.solvers.ode import constantsimp

In [None]:
t, C, n, S = symbols("t C n S", nonegative=True)
U, F, C1, C2 = symbols("U F C1 C2", positive=True)
Nc, gmax, S0 = symbols("Nc gmax S0", positive=True)
nu = symbols("nu", real=True)

symbols_dict = {"C": C,
                "F": F,
                "n": n,
                "S": S,
                "t": t,
                "U": U,
                "C1": C1,
                "C2": C2,
               }

In [None]:
def is_a_remarkable_number(a):
    return not isinstance(a, NegativeOne) and isinstance(a, Number)

def contains_remarkable_number(expr):
    
    for a in list(expr.atoms()):
        if is_a_remarkable_number(a):
            return True
            
    return False

def replace_numbers_by_Ks(expr):
    x = 1
    for a in preorder_traversal(expr):
        if is_a_remarkable_number(a):
            expr = expr.subs(a, sign(a) * Quantity("K{}".format(x)))
            x +=1
            
    return expr

def count_Quantitys(expr):

    out = 0
    for a in preorder_traversal(expr):
        if isinstance(a, Quantity):
            out += 1
            
    return out

def count_parameters(expr, Kxpr=False):
    
    while contains_remarkable_number(expr):
        expr = replace_numbers_by_Ks(expr)
        expr = constantsimp(expr, [i for i in expr.atoms(Quantity)]) # simplify constants
        out = count_Quantitys(expr)
    if Kxpr:
        out = (out, expr)
    
    return out

# Naive SR

## Splitting data

In [None]:
allfiles = []
for root, dirs, files in os.walk("{}/data/".format(cwd)):
    for file in files:
        if "Rs0i" in file:
            allfiles.append(os.path.join(root, file))

In [None]:
for file in allfiles:
    
    name = file[-file[::-1].index("/"):-file[::-1].index(".")-1]
    df = pd.read_csv(file)
    concs = sorted(list(df["C"].unique()))
    conc_dict = dict(zip(concs, np.array(np.arange(len(concs)) % 2, dtype=bool)))
    df["C_filter"] = df["C"].apply(lambda x: conc_dict[x])
    
    tst_data = df[df["C_filter"]].copy()
    tst_data.drop("C_filter", axis = 1, inplace = True)
    tst_data.to_csv("{}/data_test/{}.csv".format(cwd, name), index=False)
    
    trn_data = df[~df["C_filter"]].copy()
    trn_data.drop("C_filter", axis = 1, inplace = True)
    trn_data.to_csv("{}/data_train/{}.csv".format(cwd, name), index=False)

## Run naive SR on command line

## Posttreatment of SR halls-of-fame

In [None]:
allfiles = []
for root, dirs, files in os.walk("{}/SR_Rs0_naiveSR/".format(cwd)):
    if "Rs0i" in root:
        for file in files:
            if file == "hall_of_fame.csv":
                allfiles.append(os.path.join(root, file))

In [None]:
for f, file in enumerate(allfiles):
    
    name = file[:-file[::-1].index("/")-1]
    feat = name[:-name[::-1].index("/")-1]
    name = name[-name[::-1].index("-"):] # e.g. "Rs0i6"

    data = pd.read_csv(file, index_col=False )
    eqns = "_".join(data["Equation"])
    feat = 2 * int("F" in eqns) + int("U" in eqns) # {"n_C": 0, "n_C_U": 1, "F_C": 2, "F_C_U": 3,}
    
    trn_data, tst_data = pd.read_csv("{}/data_train/{}.csv".format(cwd, name)), pd.read_csv("{}/data_test/{}.csv".format(cwd, name))
    trn_rhos, tst_rhos = trn_data["rho"], tst_data["rho"]
    trn_rho_bar, tst_rho_bar = trn_rhos.mean(), tst_rhos.mean()
    
    eqns = data["Equation"].apply(lambda x: x.replace("^", "**") )
    
    exprs, csts_counts, trn_RSSs, tst_RSSs = [], [], [], []
    for i, expr in enumerate(eqns):
        expr = simplify(parse_expr(expr, symbols_dict))
        exprs.append(expr)
        csts_counts.append( count_parameters(expr) ) # count constants in each model
        # eval error
        trn_rho_hats = trn_data[["C", "n", "F", "U"]].apply(lambda x: expr.evalf(subs = {C: x.iloc[0], n: x.iloc[1], F: x.iloc[2], U: x.iloc[3],}), axis=1)
        tst_rho_hats = tst_data[["C", "n", "F", "U"]].apply(lambda x: expr.evalf(subs = {C: x.iloc[0], n: x.iloc[1], F: x.iloc[2], U: x.iloc[3],}), axis=1)
        trn_RSSs.append( np.sum( (trn_rhos - trn_rho_hats)**2 ) )
        tst_RSSs.append( np.sum( (tst_rhos - tst_rho_hats)**2 ) )
    
    data = pd.concat([data, pd.DataFrame({"strn": name, "feat": feat, "csts": csts_counts, "expr": exprs, "RSS_trn": trn_RSSs, "RSS_tst": tst_RSSs}, ), ], axis=1)
    data["R2_trn"] = 1 - data["RSS_trn"] / np.sum( (trn_rhos - trn_rho_bar)**2 )
    data["R2_tst"] = 1 - data["RSS_tst"] / np.sum( (tst_rhos - tst_rho_bar)**2 )

    print(f / len(allfiles), end="\r")
    alldata = data if f == 0 else pd.concat([alldata, data], axis = 0, ignore_index = True,)

alldata.to_csv("{}/SR_Rs0_naiveSR/Rs0.csv".format(cwd), index=False)

## Visualisation of feature-set impact on SR Fig. 3

In [None]:
df = pd.read_csv("{}/SR_Rs0_naiveSR/Rs0.csv".format(cwd), )
df[["train_R2", "test_R2"]] = df[["R2_trn", "R2_tst"]]
df.sort_values(["csts", "feat", "test_R2"], inplace=True)

In [None]:
import seaborn as sns

step = 0.2

title = "Random Forest regression on experimental data"

abcisses = "model complexity: number of parameters"
ordonnees = r"performance: coefficient of determination $R^2$"
couleurs = "feature set"

In [None]:
df = pd.read_csv("{}/SR_naive_results/Rs0.csv".format(cwd,))
df["test_R2"] = 1 - df["Loss"]
#

In [None]:
# train scores
Y = df[["csts", "feat"]].apply(lambda x: "_".join(str(k) for k in x.values), axis=1)
Y = df[["train_R2", "csts", "feat"]].groupby(Y).median()
X = Y["csts"] - 1 + ( Y["feat"] - 3/2 ) * step
Y = Y["train_R2"]

# test score boxplots
df.rename(columns = {"csts": abcisses, "feat": couleurs, "test_R2": ordonnees}, inplace=True) # variable-names
df[couleurs] = df[couleurs].apply(lambda x: {0: r"{$C$, $N$}",
                                             1: r"{$C$, $N$, $t$}",
                                             2: r"{$C$, $N_c$}",
                                             3: r"{$C$, $N_c$, $t$}",
                                             }[x])

In [None]:
fig = plt.figure(figsize=(8,5), constrained_layout=True)
gs = fig.add_gridspec(1, 1)

ax = fig.add_subplot(gs[:,:],)
ax = sns.boxplot(data=df, y=ordonnees, x=abcisses, hue=couleurs, showfliers=False, ax=ax, palette="tab10", )
ax.scatter(X, Y, marker="x", c="k", zorder=4,)

ax.set(xlim=(-0.5,16.5), ylim=(0,1), )
#ax.set_title(title)

### SAVING
name = "{}/plot/SR_boxplot_Rs0.pdf".format(cwd,)
#plt.savefig(name, facecolor='w', edgecolor='w', transparent=False, bbox_inches="tight")
plt.show()

# Main SR

In [None]:
code = "Rs0"
cwd2 = # location of the SR outputs

In [None]:
datalocation = 

In [None]:
allfiles = []
for root, dirs, files in os.walk(cwd2):
    if "test" in root:
        continue
    for file in files:
        if file == "hall_of_fame.csv":
            allfiles.append(os.path.join(root, file))

In [None]:
for f, file in enumerate(allfiles):
    
    name = file[:-file[::-1].index("/")-1]
    feat = name[:-name[::-1].index("/")-1]
    name = name[-name[::-1].index("-"):] # e.g. "Rs0i6"
    feat = feat[-feat[::-1].index("/"):]
    template = {"final_model_1": 0, "final_model_2": 1, "final_model_3": 2, "no_template": 0, "template_1": 1, "template_2": 2, }[feat]
    
    eqns = pd.read_csv(file, index_col=False )
    Loss = eqns["Loss"]
    eqns["Equation"] = eqns["Equation"].apply(lambda x: x.replace("^", "**") )
    if template == 1: # template 1
        eqns.loc[:,["q", "m", "g",]] = eqns.loc[:,"Equation"].apply(lambda x: [y[y.index("= ")+2:] for y in x.split(";")] ).to_list()
        eqns["gr"] = eqns["g"].apply(lambda x: x.replace("#1", "F").replace("#2", "C") )
    elif template == 2: # template 2
        eqns.loc[:,["q", "m", "g", "h"]] = eqns.loc[:,"Equation"].apply(lambda x: [y[y.index("= ")+2:] for y in x.split(";")] ).to_list()
        eqns["h"] = eqns["h"].apply(lambda x: x.replace("#1", "F") )
        eqns["gr"] = eqns.loc[:,["g", "h"]].apply(lambda x: x.iloc[0].replace("#1", "(C*({}))".format(x.iloc[1])), axis=1 )
    if template in [1,2]:
        eqns["ad"] = eqns.loc[:,["q", "m"]].apply(lambda x: "{}/({}+U**{})".format(x.iloc[0], x.iloc[0], x.iloc[1],) , axis=1 )
        eqns["Equation"] = eqns.loc[:,["ad", "gr"]].apply(lambda x: "{}*{}".format(x.iloc[0], x.iloc[1],) , axis=1 )
    eqns = eqns["Equation"]
    
    data = pd.read_csv("{}/{}.csv".format(datalocation, name), )
    
    exprs, Kxprs, csts_counts, RSSs = [], [], [], []
    for i, expr in enumerate(eqns):
        expr = simplify(parse_expr(expr, symbols_dict))
        exprs.append(expr)
        csts_count, Kxpr = count_parameters(expr, Kxpr=True)
        csts_counts.append( csts_count ) # count constants in each model
        Kxprs.append( Kxpr )
        # eval error
        rho_hats = data[["C", "n", "F", "U"]].apply(lambda x: expr.evalf(subs = {C: x.iloc[0], n: x.iloc[1], F: x.iloc[2], U: x.iloc[3],}), axis=1).to_numpy(dtype=float)
        RSSs.append( np.sum( (data["rho"] - rho_hats)**2 ) )        
    
    eqns = pd.DataFrame({"name": name, "template": template, "ct": csts_counts, "eq": exprs, "Kq": Kxprs, "Loss": Loss, "RSS": RSSs, "NNN": data.shape[0], # number of data points
                                         }, )
    eqns["AIC"] = eqns["NNN"] * np.log( (eqns["RSS"] / eqns["NNN"]).to_numpy(dtype=float) ) + 2 * (eqns["ct"] + 1) # 10.1016/j.idm.2019.12.010, p. 124
    alleqns = eqns if f == 0 else pd.concat([alleqns, eqns,], axis = 0)
#    eqns.to_csv("{}/sreq{}/template_{}_{}.csv".format(cwd, series, template, name))
alleqns.sort_values(["name", "ct", "Loss", "template"], inplace = True)
alleqns.to_csv("{}/sreq/{}.csv".format(cwd, code))

### RsBi0 (E. coli)

In [None]:
name = "RsBi0"

In [None]:
cwd2 = # location of the SR outputs
data = pd.read_csv("{}/data/{}.csv".format(cwd, name), )
allfiles = []
for root, dirs, files in os.walk(cwd2):
    if "test" in root:
        continue
    for file in files:
        if file == "hall_of_fame.csv":
            allfiles.append(os.path.join(root, file))

code = "RsB"

In [None]:
for file in allfiles:
    if "no_template" in file:
        file0 = file
    elif "template_1" in file:
        file1 = file
    elif "template_2" in file:
        file2 = file

In [None]:
for template, file in enumerate([file0, file1, file2]):

    eqns = pd.read_csv(file, index_col=False )
    eqns["Equation"] = eqns["Equation"].apply(lambda x: x.replace("^", "**") )
    if template == 1:
        eqns.loc[:,["q", "m", "g",]] = eqns.loc[:,"Equation"].apply(lambda x: [y[y.index("= ")+2:] for y in x.split(";")] ).to_list()
        eqns["gr"] = eqns["g"].apply(lambda x: x.replace("#1", "F").replace("#2", "C1").replace("#3", "C2") )
    elif template == 2: # template 2
        eqns.loc[:,["q", "m", "g", "h1", "h2"]] = eqns.loc[:,"Equation"].apply(lambda x: [y[y.index("= ")+2:] for y in x.split(";")] ).to_list()
        eqns["h1"] = eqns["h1"].apply(lambda x: x.replace("#1", "F") )
        eqns["h2"] = eqns["h2"].apply(lambda x: x.replace("#1", "F") )
        eqns["gr"] = eqns.loc[:,["g", "h1", "h2"]].apply(lambda x: x.iloc[0].replace("#1", "(C1*({}))".format(x.iloc[1])).replace("#2", "(C2*({}))".format(x.iloc[2])), axis=1 )
    if template in [1,2]:
        eqns["ad"] = eqns.loc[:,["q", "m"]].apply(lambda x: "{}/({}+U**{})".format(x.iloc[0], x.iloc[0], x.iloc[1],) , axis=1 )
        eqns["Equation"] = eqns.loc[:,["ad", "gr"]].apply(lambda x: "{}*{}".format(x.iloc[0], x.iloc[1],) , axis=1 )
    Loss = eqns["Loss"]
    eqns = eqns["Equation"]
    
    exprs, Kxprs, csts_counts, RSSs = [], [], [], []
    for i, expr in enumerate(eqns):
        expr = simplify(parse_expr(expr, symbols_dict))
        exprs.append(expr)
        csts_count, Kxpr = count_parameters(expr, Kxpr=True)
        csts_counts.append( csts_count ) # count constants in each model
        Kxprs.append( Kxpr )
        # eval error
        def evalexpr(x):
            try:
                return float(expr.evalf(subs = {C1: x.iloc[0], C2: x.iloc[1], n: x.iloc[2], F: x.iloc[3], U: x.iloc[4],}))
            except:
                return np.nan
        rho_hats = data[["C1", "C2", "n", "F", "U"]].apply(evalexpr, axis=1).to_numpy(dtype=float)
        RSSs.append( np.nansum( (data["rho"] - rho_hats)**2 ) )        
    
    eqns = pd.DataFrame({"strn": name, "template": template, "ct": csts_counts, "eq": exprs, "Kq": Kxprs, "Loss": Loss, "RSS": RSSs, "NNN": data.shape[0], # number of data points
                                         }, )
    eqns["AIC"] = eqns["NNN"] * np.log( (eqns["RSS"] / eqns["NNN"]).to_numpy(dtype=float) ) + 2 * (eqns["ct"] + 1) # compute AIC following: 10.1016/j.idm.2019.12.010, p. 124
    megaeqns = eqns if template == 0 else pd.concat([megaeqns, eqns], axis = 0)
megaeqns.sort_values(["strn", "ct", "template"], inplace = True)
megaeqns.to_csv("{}/sreq/{}.csv".format(cwd, code,), index=False)

# Model annotation

## Annotation

In [None]:
location = "{}/sreq/RsB.csv".format(cwd,)
data = pd.read_csv(location, index_col=False )
data["eq"] = data["eq"].apply(lambda expr: simplify(parse_expr(expr, symbols_dict)))
data["Kq"] = data["Kq"].apply(lambda expr: simplify(parse_expr(expr, symbols_dict)))

In [None]:
i = 17
for idx in range(5*i, 5*(i+1) ):
    print(data.index[idx])
    display(data.loc[data.index[idx], "Kq"])
    display(data.loc[data.index[idx], "eq"])

In [None]:
j = 45
annot = "a*M12"
data.loc[j, "annotation"] = annot

In [None]:
data.to_csv(location, index=False)

## Annotation results

### Checking and summarizing

In [None]:
location = "{}/sreq/RsB.csv".format(cwd,)
alldata = pd.read_csv(location, index_col=False )
data = alldata[alldata["annotation"].notna()]

In [None]:
data.loc[data.groupby("name")["Loss"].idxmin()][["name", "Loss", "annotation"]]

## Visualisation of model performance and interpretability for Rs0-datasets Fig. 4

In [None]:
from matplotlib.ticker import ScalarFormatter

y_formatter = ScalarFormatter(useOffset=False)

In [None]:
code = "Rs0"

location = "{}/sreq/{}.csv".format(cwd, code)
alldata = pd.read_csv(location, index_col=False )

X = alldata[alldata["annotation"].notna()].copy()
X["colour"] = X["annotation"].apply(lambda x: ("w" if ("L" in x ) else "r" ) if "M" in x else "b" )
X["edge"] = X["annotation"].apply(lambda x: "w" if x in ["L", "M", "a*L", "a*M", ] else "k")

In [None]:
YYY = 4 # number of rows
param_max = 11 # odd number for max number of parameters on x axis
figsize = (8,5)
strns = sorted([ 105, 1287, 1292, 1299, 1842, 1896, 1972, 1977, 2160, 2164, 2443, 2659, 3031, 3237,  403,    6] ,)

fig = plt.figure(figsize=figsize, constrained_layout=True)
gs = fig.add_gridspec(YYY, 4)
fig.supxlabel("number of model parameters")
fig.supylabel("performance: coefficient of determination R2")

for i, strn in enumerate(strns):
    name = code + "i" + str(strn)
    ax = fig.add_subplot(gs[i % YYY, i // YYY])
    ax.yaxis.tick_right()
    ax.set(xlim=(0.5, param_max + 0.5), xticks = 2 * np.arange(param_max // 2 + 1) + 1 if i % YYY == YYY - 1 else [], ylim=(0, 1), yticks = [0, 0.5, 1,]  if i // YYY == 3 else [], ylabel= "{}".format(strn), )
    plt.setp(ax.get_yminorticklabels(), visible=False) # <--- Hide the minor ticks
    ax.yaxis.set_major_formatter(y_formatter)

    # scatter plot
    for t, template in enumerate(alldata["template"].unique()):
        # all models
        df = alldata[(alldata["name"] == name) & (alldata["template"] == t)]
        ax.scatter(df["ct"], 1 - df["Loss"], c = "k", marker = ["o", "v", "^"][t], alpha = 0.05, s = 30, zorder = 1)
        # biologically interpretable models
        if name in X["name"].unique():
            df = X[(X["name"] == name) & (X["template"] == t)]
            ax.scatter(df["ct"], 1 - df["Loss"], c = df["colour"], edgecolor = df["edge"], marker = ["o", "v", "^"][t], alpha = 0.6, s = 40, zorder = 2)

# saving
path = "{}/plot/Pareto_{}.pdf".format(cwd, code)
plt.savefig(path, facecolor='w', edgecolor='w', transparent=False, bbox_inches="tight")
plt.show()

## Visualisation of model performance and interpretability for RsB dataset Fig. S5

In [None]:
code = "RsB"

location = "{}/sreq/{}.csv".format(cwd, code)
alldata = pd.read_csv(location, index_col=False )

X = alldata[alldata["annotation"].notna()].copy()
X["colour"] = X["annotation"].apply(lambda x: ("w" if ("L" in x ) else "r" ) if "M" in x else "b" )
X["edge"] = X["annotation"].apply(lambda x: "w" if x in ["L", "M", "a*L", "a*M", ] else "k")

In [None]:
annotation_dict = {"a*L1":      [4.5, 0.07, r"$g_{\rm max}S_1$", 1],
                   "a*L1*L2":   [0.7, 0.20, r"$g_{\rm max}S_1S_2$", 1],
                   "a*(L1+L2)": [0.7, 0.55, r"$g_{\rm max}(S_1+S_2)$", 1],
                   "a*M1":      [7.0, 0.30, r"$g_{\rm max} \dfrac{S_1}{K+S_1}$", 1],
                   "a*M1*M2":   [8.5, 0.60, r"$g_{\rm max}\dfrac{S_1}{K_1+S_1}\times\dfrac{S_2}{K_2+S_2}$", 1],
                  }

In [None]:
param_max = 15 # odd number for max number of parameters on x axis
figsize = (8,5)
fig = plt.figure(figsize=figsize, constrained_layout=True)
ax = fig.add_subplot()

ax.set(xlim=(0.5, param_max + 0.5), xticks = 2 * np.arange(param_max // 2 + 1) + 1, ylim=(0, 1), yticks = [0, 0.2, 0.4, 0.6, 0.8, 1,],
       xlabel = "number of model parameters", ylabel = "performance: coefficient of determination R2", )#title = r"$E.$ $coli$ data" )
plt.setp(ax.get_yminorticklabels(), visible=False) # <--- Hide the minor ticks
ax.yaxis.set_major_formatter(y_formatter)

# scatter plot
for t, template in enumerate(alldata["template"].unique()):
    # all models
    df = alldata[alldata["template"] == t]
    ax.scatter(df["ct"], 1 - df["Loss"], c = "k", marker = ["o", "v", "^"][t], alpha = 0.15, s = 30, zorder = 2)
    # biologically interpretable models
    df = X[X["template"] == t]
    ax.scatter(df["ct"], 1 - df["Loss"], c = df["colour"], edgecolor = df["edge"], marker = ["o", "v", "^"][t], alpha = 0.6, s = 40, zorder = 3)
for idx in X.index:
    x, y, txt = X.loc[idx,["ct", "Loss", "annotation",]]
    txt = annotation_dict[txt]
    ax.annotate(txt[2], xy=(x, 1-y), xytext=(txt[0], txt[1]), zorder=1,
                arrowprops=dict(facecolor='black', width = 0.01, headwidth = 0, headlength = 1, alpha=0.15), 
                bbox=dict(alpha=0),)

# saving
path = "{}/plot/Pareto_{}.pdf".format(cwd, code)
plt.savefig(path, facecolor='w', edgecolor='w', transparent=False, bbox_inches="tight")
plt.show()