# Maud to shu

Example to generate [`shu`](https://github.com/biosustain/shu) inputs from [`Maud`](https://github.com/biosustain/Maud) output.

It requires [Maud](https://github.com/biosustain/Maud) and [maudtools](https://github.com/biosustain/maudtools).

In [1]:
import json
import os
from glob import glob
from math import isnan
from pathlib import Path
from typing import Dict, List

import arviz as az
import numpy as np
import pandas as pd
from maud.getting_idatas import get_idata
from maud.loading_maud_inputs import MaudInput, load_maud_input
from maud.utils import get_lognormal_parameters_from_quantiles
from maudtools.plotting import concat_experiments

Load the input that was used to run `maud sample` and the output generated by it.

In [2]:
OUTPUT_RESULTS = "maud_output"

In [3]:
mi = load_maud_input(f"../data/{OUTPUT_RESULTS}/user_input")
idata = get_idata(glob(f"../data/{OUTPUT_RESULTS}/samples/*csv"), mi, "train")

In [4]:
def concat_experiments(infd, x_var: str = "reactions", experiments: list[str] = None):
    if experiments is None:
        experiments = infd.experiments
    with_exp = {
        exp.item(): pd.DataFrame(np.concatenate(infd[:, :, i, :]), columns=infd[x_var])
        for i, exp in enumerate(experiments)
    }
    list(map(lambda x: x[1].insert(0, "experiment", x[0]), with_exp.items()))
    return pd.concat(with_exp.values())

In [5]:
def q_exploc(kcat) -> tuple[float, float, str]:
    """Calculate mu and sigma of the underlying normal distribution from a Maud prior kcat."""
    mu, sigma = (
        (np.log(kcat.exploc), kcat.scale)
        if kcat.exploc is not None
        else get_lognormal_parameters_from_quantiles(kcat.pct1, 0.01, kcat.pct99, 0.99)
    )   
    return mu, sigma, kcat.enzyme

Functions to transform the data from the maud output into shu input.

One function is used for reaction data and the other is for metabolite.

In [6]:
def data_to_shu(idata: az.InferenceData, mi: MaudInput) -> Dict[str, List[float]]:
    """Plot kcats on the left, enzyme concentrations on the right, flux as hover and flux mean as color and size of arrows."""
    edge_list = [e.split("_") for e in idata.posterior.edges.to_numpy() if "_" in e]
    edges = {ed[1]: ed[0] for ed in edge_list}
    edges_to_val = {ed[0]: ed[1] for ed in edge_list}
    fluxes = concat_experiments(idata.posterior.flux_train, "reactions").melt(
        id_vars="experiment", var_name="reaction", value_name="flux"
    )
    conc_enzymes = concat_experiments(
        idata.posterior.conc_enzyme_train,
        "enzymes",
        idata.posterior.conc_enzyme_train.experiments,
    )
    conc_enzymes.columns = [
        idata.posterior.conc_enzyme_train["enzymes"][0].values.tolist()
        if col != "experiment"
        else "experiment"
        for col in conc_enzymes.columns
    ]
    conc_enzymes.columns = [
        edges_to_val[col] if col != "experiment" else "experiment"
        for col in conc_enzymes.columns
    ]
    conc_enzymes = conc_enzymes.melt(
        id_vars="experiment", var_name="reaction", value_name="enzyme"
    )
    #kcats = pd.DataFrame(
    #    np.concatenate(idata.posterior.kcat), columns=idata.posterior["enzymes"]
    #)
    #kcats.columns = [edges_to_val[col] for col in kcats.columns]
    #kcats = kcats.melt(var_name="reaction", value_name="kcat")
    #kcats["experiment"] = "Posterior"
    kcats_priors = pd.DataFrame(
        [
            (kcat[2], point, "Prior")
            for kcat in [q_exploc(kcat) for kcat in mi.prior_input.kcat]
            for point in np.random.lognormal(kcat[0], kcat[1], size=300)
        ],
        columns=["reaction", "kcat", "experiment"]
    )
    kcats = kcats_priors
    #kcats = pd.concat([kcats, kcats_priors])

    for df in [fluxes, conc_enzymes, kcats]:
        df = df.loc[:, ~df.columns.str.contains("DRAIN")]
    merged = pd.concat(
        [
            df.sort_values(["experiment", "reaction"])
            if i == 0
            else df.sort_values(["experiment", "reaction"]).loc[
                :, ~df.columns.isin(["experiment", "reaction"])
            ]
            for i, df in enumerate([fluxes, conc_enzymes])
        ],
        axis=1,
    )
    merged.experiment = "Posterior"
    merged = pd.merge(merged, kcats, how="outer", on=["experiment", "reaction"])
    df = merged.groupby(["reaction", "experiment"]).agg(list).reset_index()
    # metabolite part
    metabolites = idata.posterior.mics.to_numpy()
    concentrations = concat_experiments(idata.posterior.conc_train, "mics").melt(
        id_vars="experiment", var_name="metabolite", value_name="concentration"
    )
    cf = concentrations.groupby(["metabolite", "experiment"]).agg(list).reset_index()
    cf.experiment = "Posterior"
    return {
        "reactions": df.reaction.to_list(),
        # "box_y": df.enzyme.apply(lambda x: np.mean([np.log(i) for i in x])).to_list(),
        "box_y": df.enzyme.apply(np.mean).to_list(),
        "left_y": df.kcat.apply(lambda x: [np.log(i) for i in x]).to_list(),
        #"left_y": df.kcat.to_list(),
        "hover_y": df.flux.to_list(),
        "colors": df.flux.apply(np.mean).to_list(),
        "sizes": df.flux.apply(np.mean).to_list(),
        "conditions": df.experiment.to_list(),
        "met_sizes": cf.concentration.apply(np.mean).to_list(),
        "met_colors": cf.concentration.apply(np.mean).to_list(),
        # "met_y": cf.concentration.apply(lambda x: [np.log10(i) for i in x]).to_list(),
        "met_y": cf.concentration.to_list(),
        "metabolites": cf.metabolite.to_list(),
        "met_conditions": cf.experiment.to_list(),
    }

In [7]:
shu_data = data_to_shu(idata, mi)

Extra processing because python JSON is not strict JSON.

In [8]:
for key, values in shu_data.items():
    if key not in ["reactions", "conditions", "metabolites", "met_conditions"]:
        for i in range(len(values)):
            if isinstance(values[i], list):
                shu_data[key][i] = [v if not isnan(v) else "NaN" for v in values[i]]
            else:
                shu_data[key][i] = values[i] if not isnan(values[i]) else "NaN"

The drains will be mapped to reactions in the map which consumes the metabolites.

In [9]:
shu_data["reactions"] = [
    reac.replace("pepdrain", "PPC").replace("pyrdrain", "PDH")
    if isinstance(reac, str)
    else "NaN"
    for reac in shu_data["reactions"]
]

The following dumped data can be now imported by shu.

In [10]:
with open(f"../data/{OUTPUT_RESULTS}.metabolism.json", "w") as f:
    json.dump(shu_data, f)