### Load libraries and functions

In [1]:

import sys
sys.path.insert(0, '../src/')

from loopless_utils import loops_enumeration_from_fva

from load_modify_sample_utils import load_model, get_objective_functions, get_reaction_bounds, modify_model
from load_modify_sample_utils import sample_optgp


### Load and inspect model (for more info see `load_modify_samply.ipynb`)

In [None]:

ec_cobra_model, ec_cobra_reactions, ec_cobra_reaction_ids,  = load_model("../ext_data/models/e_coli_core.xml")

objective_functions = get_objective_functions(ec_cobra_model)
print(objective_functions)

default_reaction_bounds = get_reaction_bounds(ec_cobra_model)
print(default_reaction_bounds)


Set parameter Username
Set parameter LicenseID to value 2642044
Academic license - for non-commercial use only - expires 2026-03-25
['BIOMASS_Ecoli_core_w_GAM']
{'PFK': (0.0, 1000.0), 'PFL': (0.0, 1000.0), 'PGI': (-1000.0, 1000.0), 'PGK': (-1000.0, 1000.0), 'PGL': (0.0, 1000.0), 'ACALD': (-1000.0, 1000.0), 'AKGt2r': (-1000.0, 1000.0), 'PGM': (-1000.0, 1000.0), 'PIt2r': (-1000.0, 1000.0), 'ALCD2x': (-1000.0, 1000.0), 'ACALDt': (-1000.0, 1000.0), 'ACKr': (-1000.0, 1000.0), 'PPC': (0.0, 1000.0), 'ACONTa': (-1000.0, 1000.0), 'ACONTb': (-1000.0, 1000.0), 'ATPM': (8.39, 1000.0), 'PPCK': (0.0, 1000.0), 'ACt2r': (-1000.0, 1000.0), 'PPS': (0.0, 1000.0), 'ADK1': (-1000.0, 1000.0), 'AKGDH': (0.0, 1000.0), 'ATPS4r': (-1000.0, 1000.0), 'PTAr': (-1000.0, 1000.0), 'PYK': (0.0, 1000.0), 'BIOMASS_Ecoli_core_w_GAM': (0.0, 1000.0), 'PYRt2': (-1000.0, 1000.0), 'CO2t': (-1000.0, 1000.0), 'RPE': (-1000.0, 1000.0), 'CS': (0.0, 1000.0), 'RPI': (-1000.0, 1000.0), 'SUCCt2_2': (0.0, 1000.0), 'CYTBD': (0.0, 1000.

### Modify the model to create two different conditions (for more info see `load_modify_samply.ipynb`)

In [3]:

# Set optimal percentage to 100
ec_cobra_model_condition_100 = modify_model(
    cobra_model         = ec_cobra_model,
    objective_function  = "BIOMASS_Ecoli_core_w_GAM",
    optimal_percentage  = 100,
    objective_direction = "max"
)

updated_objective_functions = get_objective_functions(ec_cobra_model_condition_100)
print(updated_objective_functions)

updated_reaction_bounds = get_reaction_bounds(ec_cobra_model_condition_100)
print(updated_reaction_bounds.get("BIOMASS_Ecoli_core_w_GAM"))

# -----------

# Set optimal percentage to 0
ec_cobra_model_condition_0 = modify_model(
    cobra_model         = ec_cobra_model,
    objective_function  = "BIOMASS_Ecoli_core_w_GAM",
    optimal_percentage  = 0,
    objective_direction = "max"
)

updated_objective_functions = get_objective_functions(ec_cobra_model_condition_0)
print(updated_objective_functions)

updated_reaction_bounds = get_reaction_bounds(ec_cobra_model_condition_0)
print(updated_reaction_bounds.get("BIOMASS_Ecoli_core_w_GAM"))


Read LP format model from file /tmp/tmpfpl9zzsi.lp
Reading time = 0.00 seconds
: 72 rows, 190 columns, 720 nonzeros
['BIOMASS_Ecoli_core_w_GAM']
(0.872922, 1000)
Read LP format model from file /tmp/tmpl8injke1.lp
Reading time = 0.01 seconds
: 72 rows, 190 columns, 720 nonzeros
['BIOMASS_Ecoli_core_w_GAM']
(0.0, 1000)


### Identify loopy reactions in model (for more info see `loopless.ipynb`)

In [4]:

loopy_reactions_fva_100 = loops_enumeration_from_fva(ec_cobra_model_condition_100, fraction_of_optimum=0.999)
print(loopy_reactions_fva_100)

loopy_reactions_100 = [item[0] for item in loopy_reactions_fva_100]
print(loopy_reactions_100)


loopy_reactions_fva_0 = loops_enumeration_from_fva(ec_cobra_model_condition_0, fraction_of_optimum=0)
print(loopy_reactions_fva_0)

loopy_reactions_0 = [item[0] for item in loopy_reactions_fva_0]
print(loopy_reactions_0)


[('SUCDi', 994.7794007141792), ('FRD7', 995.0539767141795)]
['SUCDi', 'FRD7']
[('SUCDi', 980.0), ('FRD7', 1000.0)]
['SUCDi', 'FRD7']


### Remove loopy reactions from the 2 models created above to reduce the thermodynamically infeasible solutions from sampling

In [5]:

ec_cobra_model_condition_100.reactions.get_by_id("FRD7").bounds = (0, 0)
ec_cobra_model_condition_0.reactions.get_by_id("FRD7").bounds = (0, 0)


### Perform sampling on the modified models with the loopy reaction "FRD7" removed. (for more info see `load_modify_samply.ipynb`)

In [6]:

samples_optgp_condition_100 = sample_optgp(ec_cobra_model_condition_100, 
                                           n_samples = 3000, 
                                           thinning=100, 
                                           reaction_in_rows = True)


samples_optgp_condition_0 = sample_optgp(ec_cobra_model_condition_0, 
                                         n_samples = 3000, 
                                         thinning=100, 
                                         reaction_in_rows = True)


Read LP format model from file /tmp/tmp86llkbxn.lp
Reading time = 0.00 seconds
: 72 rows, 190 columns, 720 nonzeros
Read LP format model from file /tmp/tmpid97e8sf.lp
Reading time = 0.01 seconds
: 72 rows, 190 columns, 720 nonzeros


### Here we can see how to work with `escher` and integrate flux sampling values into the given metabolic map. First build the map from the `json` file using the `Builder` class

In [7]:

from escher import Builder

builder = Builder(map_json='../ext_data/models/e_coli_core_full_map.json')
builder


Builder()

### Create dictionaries mapping reaciton IDs to statistics of interest (std, mean, range)

In [9]:

import numpy as np

stds = np.std(samples_optgp_condition_100, axis=1)
means = np.mean(samples_optgp_condition_100, axis=1)
# flux range (max - min)
ranges = np.ptp(samples_optgp_condition_100, axis=1)


mean_flux_dict = {}
std_flux_dict = {}
range_flux_dict = {}

for i in range( len(ec_cobra_reaction_ids) ):
    rxn_id = ec_cobra_reaction_ids[i]
    mean_flux_dict[rxn_id] = means[i]
    std_flux_dict[rxn_id] = stds[i]
    range_flux_dict[rxn_id] = ranges[i]


mean_flux_dict = {k: float(v) for k, v in mean_flux_dict.items()}
std_flux_dict = {k: float(v) for k, v in std_flux_dict.items()}
range_flux_dict = {k: float(v) for k, v in range_flux_dict.items()}

print(mean_flux_dict)
print(std_flux_dict)
print(range_flux_dict)


{'PFK': 7.486068913297662, 'PFL': 0.0055606466788236655, 'PGI': 4.858993776773165, 'PGK': -16.025381427907572, 'PGL': 4.96204843148236, 'ACALD': -0.0023513999752231224, 'AKGt2r': -0.0007289617391095303, 'PGM': -14.719426030592032, 'PIt2r': 3.2113757487333494, 'ALCD2x': -0.0011017420185548976, 'ACALDt': -0.0012496579566682149, 'ACKr': -0.0018737754630853505, 'PPC': 2.497533980303867, 'ACONTa': 6.011339199746686, 'ACONTb': 6.011339199746686, 'ATPM': 8.398773166518627, 'PPCK': 0.008731639430010804, 'ACt2r': -0.0018737754630853505, 'PPS': 0.008631173114858658, 'ADK1': 0.008631173114858658, 'AKGDH': 5.033465455569609, 'ATPS4r': 45.51648581947021, 'PTAr': 0.0018737754630853505, 'PYK': 1.7860988155425828, 'BIOMASS_Ecoli_core_w_GAM': 0.8729648377778452, 'PYRt2': -0.0012490380571748064, 'CO2t': -22.819479800022066, 'RPE': 2.680545162260196, 'CS': 6.011339199746686, 'RPI': -2.2815032692221395, 'SUCCt2_2': 0.011070560634867924, 'CYTBD': 43.62676497687049, 'D_LACt2': -0.0010669491874412897, 'ENO':

### Assign the average of the fluxes from flux sampling as a colour scale to the reaction arrows

In [10]:

builder.reaction_data = mean_flux_dict
builder


Builder(allow_building_duplicate_reactions=False, and_method_in_gene_reaction_rule='mean', cofactors=['atp', '…

### Assign the standard deviation of the fluxes from flux sampling as a colour scale to the reaction arrows

In [11]:

builder.reaction_data = std_flux_dict
builder


Builder(allow_building_duplicate_reactions=False, and_method_in_gene_reaction_rule='mean', cofactors=['atp', '…

### Assign the range of the fluxes from flux sampling as a colour scale to the reaction arrows

In [12]:

builder.reaction_data = range_flux_dict
builder


Builder(allow_building_duplicate_reactions=False, and_method_in_gene_reaction_rule='mean', cofactors=['atp', '…

In [7]:

import json
from pathlib import Path
from typing import Literal
from contextlib import suppress
# contextlib.suppress is a context manager in Python that allows you to ignore specific exceptions inside a with block — cleanly and explicitly.
from collections import defaultdict
import cobra
from cobramod.core import pathway as pt
from cobramod.visualization.converter import JsonDictionary


BIGG_COFACTORS = ['atp_c0', 'atp_c', 'adp_c', 'adp_c0',
                  'atp_c0', 'atp_c', 'adp_c', 'adp_c0',
                  'udp_c0', 'udp_c', 'ump_c0', 'ump_c',
                  'amp_c', 'amp_c0',
                  'gdp_c0', 'gdp_c', 'gtp_c0', 'gtp_c',
                  'accoa_c', 'accoa_c0', 'coa_c', 'coa_c0',  # acetyl-CoA
                  'q8_c0', 'q8_c', 'q8h2_c', 'q8h2_c0', 'mqn8_c', 'mqn8_c0', 'mql8_c', 'mql8_c0', 'q8h2_c', 'q8h2_c0',
                  'actp_c0', 'actp_c',
                  'h2o_c', 'h2o_c0', 'h2o_e', 'h2o[e]',
                  'pi_e', 'pi[e]', 'pi_c', 'pi_c0', 'ppi_c0', 'ppi_c',
                  'pep_c', 'pep_c0',
                  'h_c', 'h_c0', 'h_e', 'h[e]',
                  'o2_c', 'o2_c0', 'o2_e', 'o2[e]',
                  'co2_c', 'co2_c0', 'co2_e', 'co2[e]',
                  'nadp_c', 'nadp_c0', 'nadph_c', 'nadph_c0', 'nad_c', 'nad_c0', 'nadh_c', 'nadh_c0',
                  'nadp_e', 'nadp[e]', 'nadph_e', 'nadph_c0', 'nad_e', 'nad[e]', 'nadh_e', 'nadh[e]',
                  'fadh2_c', 'fadh2_c0', 'fad_c', 'fad_c0',
                  'nh4_c', 'nh4_c0', 'nh4_e', 'nh4[e]',
                  'pyr_c0', 'pyr_c'
                  ]
BIGG_BUILDING_BLOCLS = ['ala_L_c0', 'asp_L_c0', ' gln_L_c0', 'glu_L_c0', 'glu_L_c0', 'ser_L_c0', 'trp_L_c0', 'met_L_c0', 'lys_L_c0', 'cyst_L_c0',
                        ]

# Based on 10.1093/gigascience/giy021
MODELSEED_COFACTORS = [
    "cpd00001_c0",  # h2o
    "cpd00002_c0",  # atp
    "cpd00003_c0",  # nad
    "cpd00004_c0",
    "cpd00005_c0",
    "cpd00006_c0",  # nadp
    "cpd00007_c0",
    "cpd00008_c0",  # adp
    "cpd00009_c0",  # HZ added
    "cpd00010_c0",  # CoA
    "cpd00011_c0",  # co2
    "cpd00012_c0",  # ppi
    "cpd00013_c0",  # NH3
    "cpd00014_c0",
    "cpd00015_c0",  # fad
    "cpd00018_c0",  # amp-like
    "cpd00020_c0",  # pyruvate
    "cpd00022_c0",
    "cpd00031_c0",  # gdp-like
    "cpd00038_c0",  # gtp
    "cpd00056_c0",  # ttp
    "cpd00061_c0",  # pep
    "cpd00067_c0",  # H+
    "cpd15353_c0",
    "cpd15499_c0",
    "cpd15561_c0",
    "cpd00097_c0",
    "cpd00982_c0",
    "cpd01270_c0",
    "cpd00052_c0",
    "cpd00062_c0",
    "cpd00068_c0",
    "cpd00115_c0",
    "cpd00241_c0",
    "cpd00356_c0",
    "cpd00357_c0",
    "cpd00358_c0",
    "cpd00530_c0",
    "cpd00977_c0",
    "cpd01775_c0"
]

EXCLUDED_COMPOUNDS = BIGG_COFACTORS + MODELSEED_COFACTORS

In [8]:

def remove_cycles(hierarchy):
    """
    Takes a {parent: (child,…)} hierarchy and returns the same structure
    with cycles removed (based on DFS). All original keys are preserved,
    even if they end up with no children.
    """
    graph = {k: set(v) for k, v in hierarchy.items()}

    # Ensure all nodes are in the graph (even leaf-only ones)
    for children in graph.values():
        for c in children:
            graph.setdefault(c, set())

    visited, on_stack = set(), set()

    def dfs(u):
        visited.add(u)
        on_stack.add(u)

        for v in list(graph[u]):
            if v not in visited:
                dfs(v)
            elif v in on_stack:
                # Back-edge → would close a cycle
                graph[u].remove(v)

        on_stack.remove(u)

    for node in graph:
        if node not in visited:
            dfs(node)

    # ✅ Keep all nodes, even those with no children
    return {
        k: (None if not v else (sorted(v)[0] if len(v) == 1 else tuple(sorted(v))))
        for k, v in graph.items()
    }

# %%
from collections import defaultdict

def build_reaction_hierarchy(reaction_dict):
    def extract_id(m): return m.id if hasattr(m, 'id') else str(m)
    def get_ids(metabolite_list): return {extract_id(m) for m in metabolite_list}

    def get_effective_products(data):
        if data.get('reversibility'):
            return get_ids(data.get('f_products', [])) | get_ids(data.get('f_reactants', []))
        return get_ids(data.get('f_products', []))

    def get_effective_reactants(data):
        if data.get('reversibility'):
            return get_ids(data.get('f_reactants', [])) | get_ids(data.get('f_products', []))
        return get_ids(data.get('f_reactants', []))

    hierarchy = defaultdict(set)
    used_fallback = set()

    # Step 1: filtered fields (f_products/f_reactants), respecting reversibility
    for parent, data1 in reaction_dict.items():
        parent_outputs = get_effective_products(data1)
        for child, data2 in reaction_dict.items():
            if parent != child:
                child_inputs = get_effective_reactants(data2)
                if parent_outputs & child_inputs:
                    hierarchy[parent].add(child)

    # Step 2: add missing reactions using unfiltered products/reactants
    all_children = set(child for children in hierarchy.values() for child in children)
    missing_parents = all_children - set(hierarchy)

    for parent in missing_parents:
        data1 = reaction_dict[parent]
        parent_outputs = (
            get_ids(data1.get('products', [])) | get_ids(data1.get('reactants', []))
            if data1.get('reversibility')
            else get_ids(data1.get('products', []))
        )
        for child, data2 in reaction_dict.items():
            if parent != child:
                child_inputs = (
                    get_ids(data2.get('reactants', [])) | get_ids(data2.get('products', []))
                    if data2.get('reversibility')
                    else get_ids(data2.get('reactants', []))
                )
                if parent_outputs & child_inputs:
                    hierarchy[parent].add(child)
                    used_fallback.add(parent)

    return {k: tuple(sorted(v)) for k, v in hierarchy.items()}


# %%
def build_escher_map(
    model, 
    type: Literal["pathway", "graph"] = "pathway", 
    reaction_list=[], 
    pathway=None, KEGG_pathway_id=None, # TODO (Haris Zafeiropoulos, 2025-07-15): use pathway name or id as input, instead of list of reactions
    map_name="test_map",
    vertical=False,
    prev_gc = {}  # dev
):
    """
    Build Escher map from cobra model.

    Example:
        glycolysis = ['ALCD2x', 'ENO', 'FBA', 'FBP', 'GAPD', 'PFK', 'PGK', 'PGM', 'PPCK', 'PPS', 'PYK', 'TPI']
        ppp = ['FBA', 'FBP', 'GND', 'PFK', 'PGL', 'RPE', 'RPI', 'TKT1']
    """
    test_path = Path.cwd().joinpath("test_map.html")
    if len(reaction_list) > 0:

        try:
            members = {model.reactions.get_by_id(x) for x in reaction_list}
        except KeyError as e:
            print("Reaction(s) in list not part of the model.")

    members_parts = {}
    for reaction in members:

        members_parts[reaction.id] = {}

        members_parts[reaction.id]["reversibility"] = reaction.reversibility

        products  = [x for x in reaction.products]
        reactants = [x for x in reaction.reactants]

        members_parts[reaction.id]["products"]  = products
        members_parts[reaction.id]["reactants"] = reactants

        filtered_products  = [x for x in reaction.products if x.id not in EXCLUDED_COMPOUNDS]
        filtered_reactants = [x for x in reaction.reactants if x.id not in EXCLUDED_COMPOUNDS]

        members_parts[reaction.id]["f_products"]  = filtered_products
        members_parts[reaction.id]["f_reactants"] = filtered_reactants

    if len(prev_gc) > 0:
        child_graph = prev_gc
    else:
        child_graph = remove_cycles(build_reaction_hierarchy(members_parts))

    if type == "pathway":
        cobramod_obj = pt.Pathway(id="test_group", members=members)

    elif type == "graph":
        cobramod_obj = JsonDictionary()
        cobramod_obj.reaction_strings = {x.id: x.build_reaction_string() for x in members}

    with suppress(FileNotFoundError):
        test_path.unlink()

    cobramod_obj.graph = child_graph

    if type == "pathway":
        builder = cobramod_obj.visualize(vis = "escher-custom")
    elif type == "graph":
        builder = cobramod_obj.visualize(
            filepath           = test_path,
            vertical           = vertical,
            custom_integration = True
        )

    data = json.loads(builder.map_json)

    for _, r in data[1]["reactions"].items():
        for rxn in members:
            if r["name"] == rxn.id:
                r["reversibility"] = rxn.reversibility

    outfile = map_name + ".json"
    with open(outfile, "w") as f:
        json.dump(data, f)

    return child_graph, data, members



In [15]:

glycolysis = ['ALCD2x', 'ENO', 'FBA', 'FBP', 'GAPD', 'PFK', 'PGK', 'PGM', 'PPCK', 'PPS', 'PYK', 'TPI']
ppp = ['FBA', 'FBP', 'GND', 'PFK', 'PGL', 'RPE', 'RPI', 'TKT1']

gc, data, members = build_escher_map(ec_cobra_model, reaction_list=ppp, map_name="test", type="graph")


IndexError: list assignment index out of range