# Import and Load Libraries

In [1]:
# Python Standard Library and 3rd Party Imports 

import pandas as pd
import numpy as np
import cobra
import networkx as nx
import scipy
import copy
import re
from collections import Counter, defaultdict
from itertools import chain, combinations
from pathlib import Path

In [2]:
# COBRA imports

from cobra import Model, Reaction
from cobra.core import Group
from cobra.flux_analysis import flux_variability_analysis
from cobra.flux_analysis.fastcc import fastcc
from cobra.io import read_sbml_model, save_matlab_model, write_sbml_model
from cobra.util.solver import linear_reaction_coefficients
from cobra.util.array import create_stoichiometric_matrix
from rapidfuzz import fuzz, process

# Functions

In [3]:
def lump_reaction(model, coupled_df, verbose=True, Search_COBRA_groups=False, label_type="Group"):
    """
    Create a lumped model by replacing fully coupled reaction groups with pseudo-reactions.

    Parameters:
    - model: cobra.Model (the original model, will remain unchanged)
    - coupled_df: pd.DataFrame with either:
        - columns ['Coupled_Reactions', 'COBRA_Groups'] (default format), or
        - column ['Reactions'], and Search_COBRA_groups=True
    - verbose: bool, if True print progress messages
    - Search_COBRA_groups: bool, if True, search model groups for each reaction if not supplied
    - label_type: str, either 'Group' or 'Module' — used in pseudo-reaction naming (required)

    Returns:
    - model_lumped: cobra.Model with pseudo-reactions added
    - translation_df: pd.DataFrame mapping pseudo-reactions to original reactions and groups
    """
    import copy
    from cobra import Reaction
    from collections import defaultdict
    import pandas as pd

    model_lumped = copy.deepcopy(model)
    translation_data = []

    # Handle case with only 'Reactions' column — determine COBRA_Groups if needed
    if Search_COBRA_groups and 'Reactions' in coupled_df.columns:
        cobra_groups_list = []
        for i, row in coupled_df.iterrows():
            reaction_list = row['Reactions']
            found_groups = set()
            for rxn_id in reaction_list:
                for group in model.groups:
                    if rxn_id in [rxn.id for rxn in group.members]:
                        found_groups.add(group.name)
            cobra_groups_list.append(list(found_groups))
        coupled_df = coupled_df.rename(columns={'Reactions': 'Coupled_Reactions'})
        coupled_df['COBRA_Groups'] = cobra_groups_list

    for i, row in coupled_df.iterrows():
        group_reactions = row['Coupled_Reactions']
        cobra_groups = row['COBRA_Groups']

        valid_reactions = []
        for rxn_id in group_reactions:
            if rxn_id in model_lumped.reactions:
                valid_reactions.append(rxn_id)
            else:
                if verbose:
                    print(f"⚠️ Reaction '{rxn_id}' not found in model — skipping.")

        if len(valid_reactions) < 2:
            if verbose:
                print(f"⏭️ Skipping group {i} — fewer than 2 valid reactions.")
            continue

        lower_bounds = []
        upper_bounds = []
        for rxn_id in valid_reactions:
            rxn = model_lumped.reactions.get_by_id(rxn_id)
            lower_bounds.append(rxn.lower_bound)
            upper_bounds.append(rxn.upper_bound)
        min_lb = max(lower_bounds)
        max_ub = min(upper_bounds)

        net_stoich = defaultdict(float)
        for rxn_id in valid_reactions:
            rxn = model_lumped.reactions.get_by_id(rxn_id)
            for met, coeff in rxn.metabolites.items():
                net_stoich[met] += coeff

        cleaned_stoich = {met: coeff for met, coeff in net_stoich.items() if abs(coeff) > 1e-10}

        pseudo_id = f"Pseudo_{label_type}_{i}"
        pseudo_rxn = Reaction(id=pseudo_id)
        pseudo_rxn.name = f"Lumped reaction for: {', '.join(valid_reactions)}"
        pseudo_rxn.lower_bound = min_lb
        pseudo_rxn.upper_bound = max_ub
        pseudo_rxn.add_metabolites(cleaned_stoich)

        model_lumped.add_reactions([pseudo_rxn])

        for group in model_lumped.groups:
            if group.name in cobra_groups:
                group.add_members([pseudo_rxn])

        for rxn_id in valid_reactions:
            rxn = model_lumped.reactions.get_by_id(rxn_id)
            for group in model_lumped.groups:
                if rxn in group.members:
                    group.members.remove(rxn)
            model_lumped.reactions.remove(rxn)

        translation_data.append({
            'Pseudo_Reaction_ID': pseudo_id,
            'Original_Reactions': valid_reactions,
            'COBRA_Groups': cobra_groups
        })

        if verbose:
            print(f"✅ {label_type} {i}: Created '{pseudo_id}' from {valid_reactions}")

    translation_df = pd.DataFrame(translation_data)
    return model_lumped, translation_df


In [4]:
def are_reactions_interconnected(model, reaction_ids):
    """
    Check whether a list of reactions are all interconnected via shared metabolites.
    
    Parameters:
    - model: cobra.Model
    - reaction_ids: list of reaction IDs to test

    Returns:
    - bool: True if all reactions are connected via shared metabolites
    """
    # Create a graph where nodes = reactions, edges = shared metabolite
    G = nx.Graph()
    G.add_nodes_from(reaction_ids)

    # Build edges based on shared metabolites
    for i, rxn1_id in enumerate(reaction_ids):
        if rxn1_id not in model.reactions:
            continue
        rxn1 = model.reactions.get_by_id(rxn1_id)
        mets1 = set(rxn1.metabolites)

        for rxn2_id in reaction_ids[i+1:]:
            if rxn2_id not in model.reactions:
                continue
            rxn2 = model.reactions.get_by_id(rxn2_id)
            mets2 = set(rxn2.metabolites)

            # Add edge if they share at least one metabolite
            if mets1 & mets2:
                G.add_edge(rxn1_id, rxn2_id)

    # Check if the graph is fully connected
    return nx.is_connected(G)

In [5]:
def analyze_model(model, model_name, carb_list, Direction="Original"):
    """
    Run FBA simulations for a COBRA model across multiple carbon source conditions.

    Parameters
    ----------
    model : cobra.Model
        The COBRA model to simulate.

    model_name : str
        Identifier for the model (used in the output DataFrame).

    carb_list : list of str
        List of exchange reaction IDs corresponding to carbon sources to be tested.

    Direction : str, optional
        Flux direction for the carbon sources:
        - "Original" (default): lb = -10, ub = 10 (uptake allowed)
        - "Reversed": lb = 0, ub = 10 (uptake blocked, export only)

    Returns
    -------
    results_df : pandas.DataFrame
        DataFrame with columns ['Model', 'Carbon_Source', 'Objective_Value'].

    flux_activity_df : pandas.DataFrame
        Boolean DataFrame with reactions as rows and carbon sources as columns.

    filtered_flux_activity_df : pandas.DataFrame
        Subset of `flux_activity_df` with reactions active in some but not all conditions.
    """
    flux_threshold = 1e-6  # Minimum flux considered active
    results = []

    # Initialize flux activity matrix
    flux_activity_df = pd.DataFrame({'Reaction': [rxn.id for rxn in model.reactions]})
    flux_activity_df.set_index('Reaction', inplace=True)

    for carb in carb_list:
        model_temp = model.copy()

        for rxn_id in carb_list:
            if rxn_id in model_temp.reactions:
                rxn = model_temp.reactions.get_by_id(rxn_id)

                if rxn_id == carb:
                    if Direction == "Reversed":
                        rxn.lower_bound = 0.0
                        rxn.upper_bound = 10.0
                    else:  # "Original"
                        rxn.lower_bound = -10.0
                        rxn.upper_bound = 0
                else:
                    rxn.lower_bound = 0.0
                    rxn.upper_bound = 0.0

        # Run FBA
        solution = model_temp.optimize()

        # Record results
        results.append({
            'Model': model_name,
            'Carbon_Source': carb,
            'Objective_Value': solution.objective_value
        })

        # Track active reactions
        active_flux = solution.fluxes.abs() > flux_threshold
        flux_activity_df[carb] = active_flux

    # Keep reactions active in some but not all conditions
    filtered_flux_activity_df = flux_activity_df[
        ~(flux_activity_df.all(axis=1) | ~flux_activity_df.any(axis=1))
    ]

    results_df = pd.DataFrame(results)
    return results_df, flux_activity_df, filtered_flux_activity_df

# Load Model and Data

In [None]:
# Set the project root path by going up from the notebook location
notebook_dir = Path(__file__).parent if '__file__' in globals() else Path().resolve()
project_root = notebook_dir.parent.parent  

# Construct raw paths 
raw_data_path = project_root / "data" / "raw" 
raw_sbml_path = raw_data_path / "sbml_files" 
raw_mat_path = raw_data_path / "matlab_files" 
raw_csv_path = raw_data_path / "csv_files" 

# Construct processed paths 
processed_data_path = project_root / "data" / "processed" 
processed_sbml_path = processed_data_path / "sbml_files" 
processed_csv_path = processed_data_path / "csv_files" 

In [7]:
# read model
model = read_sbml_model(str(raw_sbml_path / "iMS520.xml"))

In [10]:
# import pairs coupled and reactions blocked by F2C2
Coupled_Pairs_df = pd.read_csv(raw_csv_path / 'fctable_iMS520_context.csv', header=None)
F2C2_Blocked_Reactions_df = pd.read_csv(raw_csv_path / 'blocked_reactions_iMS520_context.csv', header=None)

In [18]:
# Load a curated list of exchange reactions from file
exchange_df_full = pd.read_csv(raw_csv_path / 'iMS520_exchanges.csv')

# Context-specific exchanges
final_exchanges_df = pd.read_csv(raw_csv_path / 'iMS520_context_exchanges.csv')

**Extract Information** from original model

In [11]:
objective_id = "biomass_BIF"

Model Basics
- Model ID: COBRAModel
- Number of Reactions: 771
- Number of Metabolites: 680
- Number of Genes: 520
- Number of Exchange Reactions: 88
- Reversible Reactions: 205
- Irreversible Reactions: 566
- Objective reaction(s): biomass_BIF

- FBA Objective value (biomass): 0.3144

In [12]:
# Get the stoichiometric matrix as a NumPy array
stoich_dense = create_stoichiometric_matrix(model)  # Already a NumPy array

# Compute the rank of the stoichiometric matrix
rank = np.linalg.matrix_rank(stoich_dense)

# Degrees of freedom
num_reactions = len(model.reactions)
dof = num_reactions - rank

# Output
print(f"Stoichiometric Matrix Rank: {rank}")
print(f"Number of Reactions: {num_reactions}")
print(f"Degrees of Freedom: {dof}")

Stoichiometric Matrix Rank: 620
Number of Reactions: 771
Degrees of Freedom: 151


# Model Exchanges (Context)

In [13]:
# Create a deep copy of the model to avoid modifying the original
model_copy = copy.deepcopy(model)

**Step 1**: Create Files to invest exchanges

In [19]:
# Run FBA on the model
solution = model_copy.optimize()

# Extract all exchange reactions from the model
exchange_reactions = model_copy.exchanges

# Create a DataFrame summarizing each exchange reaction
exchange_df = pd.DataFrame([{
    "ID": rxn.id,
    "Name": rxn.name,
    "Lower_FB": rxn.lower_bound,
    "Upper_FB": rxn.upper_bound,
    "Flux": solution.fluxes.get(rxn.id, 0.0),
    "Active_in_FBA": abs(solution.fluxes.get(rxn.id, 0.0)) > 1e-10,
    "Active_in_FBA_Copy": abs(solution.fluxes.get(rxn.id, 0.0)) > 1e-10,
} for rxn in exchange_reactions])

# Filter to only active exchange reactions and add 'Import' Column
exchange_df_sub = (
    exchange_df[exchange_df["Active_in_FBA_Copy"] == True]
    .assign(Import=lambda df: df["Flux"].apply(lambda v: v < 0 if v != 0 else pd.NA))
    .copy()
)

# Filter the list to those reactions marked for inclusion
exchange_df2 = exchange_df_full[exchange_df_full["Include"] == True]

# Merge the curated exchange list with Import info from FBA
merged_df = exchange_df2.merge(
    exchange_df_sub[["ID", "Import"]],
    on="ID",
    how="left"
)

**Step 2**: Import modified exchange reactions incl directionalities

In [20]:
# Get IDs from final_exchanges_df
allowed_ids = set(final_exchanges_df["ID"])

# Optional: Block all exchange reactions NOT in the list
# for rxn in model_copy.exchanges:
    # if rxn.id not in allowed_ids:
        # rxn.lower_bound = 0.0
        # rxn.upper_bound = 0.0

# Handle Import = False and Import = True separately
for _, row in final_exchanges_df.iterrows():
    rxn_id = row["ID"]
    is_import = row["Import"]

    # Get the reaction from the model
    rxn = model_copy.reactions.get_by_id(rxn_id)

    if is_import is False:
        rxn.lower_bound = 0.0

    elif is_import is True:
        prev_lower = rxn.lower_bound

        # Create reversed reaction
        reversed_rxn = Reaction(id=f"{rxn.id}_reversed")
        reversed_rxn.name = f"Reversed {rxn.name}"
        reversed_rxn.add_metabolites({met: -coef for met, coef in rxn.metabolites.items()})
        reversed_rxn.lower_bound = 0.0
        reversed_rxn.upper_bound = abs(prev_lower)

        # Add the new reversed reaction to the model
        model_copy.add_reactions([reversed_rxn])

        # Find all groups that contain this reaction and update them
        for group in model_copy.groups:
            if rxn in group.members:
                group.remove_members([rxn])
                group.add_members([reversed_rxn])

        # Remove the original reaction from the model
        model_copy.remove_reactions([rxn])

# Note: This version of model_copy restores the original FBA value of 0.314!

In [21]:
# Make export reactions also irreversible

# Assume `model` is your cobra.Model and `reaction_ids` is your list of reaction IDs
reaction_ids = ['EX_co2(e)', 'EX_h(e)', 'EX_h2o(e)']  # Replace with your actual reaction IDs

for rxn_id in reaction_ids:
    if rxn_id in model_copy.reactions:
        model_copy.reactions.get_by_id(rxn_id).lower_bound = 0.0
    else:
        print(f"Reaction {rxn_id} not found in model.")

In [22]:
# Create a deep copy of the model to avoid modifying the original
model_copy2 = copy.deepcopy(model_copy)

In [23]:
for rxn in model_copy2.reactions:
    if rxn.id.endswith("_reversed"):
        if rxn.upper_bound == 0.0:
            rxn.upper_bound = 2.0

# Note: This version of model_copy exepectedly restores a higher FBA value of 0.644!

In [24]:
### Export as .mat for matlab applications

save_matlab_model(model_copy2, raw_mat_path / "iMS520_context.mat") # use generic model to get unbiased couplings (filter later!)

# FASTCC model reduction

In [25]:
# Reduce model with COBRApy FASTCC 
consistent_generic_model = fastcc(model_copy2)

# Note: This version of model_copy reproduces the higher FBA value of 0.644!

  warn("need to pass in a list")


In [26]:
# Get the stoichiometric matrix as a NumPy array
stoich_dense = create_stoichiometric_matrix(consistent_generic_model)  # Already a NumPy array

# Compute the rank of the stoichiometric matrix
rank = np.linalg.matrix_rank(stoich_dense)

# Degrees of freedom
num_reactions = len(consistent_generic_model.reactions)
dof = num_reactions - rank

# Output
print(f"Stoichiometric Matrix Rank: {rank}")
print(f"Number of Reactions: {num_reactions}")
print(f"Degrees of Freedom: {dof}")

Stoichiometric Matrix Rank: 436
Number of Reactions: 544
Degrees of Freedom: 108


In [27]:
# Get sets of reaction IDs from both models
original_rxns = set(r.id for r in model_copy.reactions)
consistent_rxns = set(r.id for r in consistent_generic_model.reactions)

# Find reactions that were removed
removed_rxns = sorted(original_rxns - consistent_rxns)

# F2C2 Coupling Integration

**Step 1**: Make F2C2 Data accessible

In [28]:
# Add reaction annotations to match F2C2 blocked reactions
model_rxns = [rxn.id for rxn in model.reactions]
F2C2_Blocked_Reactions_df.loc[len(F2C2_Blocked_Reactions_df)] = model_rxns

# Identify unblocked reactions from the second row (index 1)
unblocked_mask = F2C2_Blocked_Reactions_df.loc[0] == 0
unblocked_reactions = F2C2_Blocked_Reactions_df.loc[1][unblocked_mask].tolist()

# Update Coupled_Pairs_df index and columns with unblocked reactions
Coupled_Pairs_df.index = unblocked_reactions
Coupled_Pairs_df.columns = unblocked_reactions

In [29]:
# Filter dataframe for reactions in model

model_rxns = set([rxn.id for rxn in consistent_generic_model.reactions])
valid_rxns = Coupled_Pairs_df.index.intersection(model_rxns)
Coupled_Pairs_df = Coupled_Pairs_df.loc[valid_rxns, valid_rxns]

In [30]:
# Get number of how many fully coupled pairs
fully_coupled_count = int(((Coupled_Pairs_df == 1).sum().sum()) - len(Coupled_Pairs_df))

*SideQuest* : Investigate difference in 'blocking' between FASTCC and F2C2

In [31]:
# Extract and convert reactions from models to sets for fast comparison
generic_rxns = {rxn.id for rxn in consistent_generic_model.reactions}
f2c2_unblocked_rxns = set(unblocked_reactions)

# Compare overlaps and differences
overlap_generic = list(generic_rxns & f2c2_unblocked_rxns)

# Reactions unblocked by F2C2 but removed by FASTCC
missing_from_generic = list(f2c2_unblocked_rxns - generic_rxns)

# Reactions kept by FASTCC but blocked by F2C2 (ideally empty)
unexpected_in_generic = list(generic_rxns - f2c2_unblocked_rxns)

In [32]:
# Study on F2C2 Blocking - does this give a feasible solution?

# Create a deep copy of the model F2C2 was run on 
F2C2_model = copy.deepcopy(model_copy)

# Get the Reaction objects from the model
reactions_to_remove = [F2C2_model.reactions.get_by_id(rid) for rid in unexpected_in_generic]

# Remove them from the model
F2C2_model.remove_reactions(reactions_to_remove)

  warn("need to pass in a list")


In [33]:
# Not feasible 
solution = F2C2_model.optimize()
# Print full solution object
print(solution)

<Solution infeasible at 0x1add49690>




**Step 2**: Implement Enzyme Subsets into model(s)

In [34]:
# Get original reaction names 
original_reactions = list(Coupled_Pairs_df.index)

# Identify reactions to remove
ex_removed = [r for r in original_reactions if r.startswith("EX_")]
biomass_removed = "biomass_BIF" in original_reactions
atpm_removed = "ATPM" in original_reactions

# Combine into a single exclusion list
reactions_to_remove = set(ex_removed)
if biomass_removed:
    reactions_to_remove.add("biomass_BIF")
if atpm_removed:
    reactions_to_remove.add("ATPM")

# Filter the DataFrame
filtered_reactions = [r for r in original_reactions if r not in reactions_to_remove]
Coupled_Pairs_df_cleaned = Coupled_Pairs_df.loc[filtered_reactions, filtered_reactions]

# Report
print(f"Total 'EX_' reactions removed: {len(ex_removed)}")
print(f"'biomass_BIF' was removed: {biomass_removed}")
print(f"'ATPM' was removed: {atpm_removed}")

Total 'EX_' reactions removed: 14
'biomass_BIF' was removed: True
'ATPM' was removed: True


In [35]:
# Build a graph of fully coupled reactions
G = nx.Graph()

for row in Coupled_Pairs_df_cleaned.index:
    for col in Coupled_Pairs_df_cleaned.columns:
        if Coupled_Pairs_df_cleaned.loc[row, col] == 1 and row != col:
            G.add_edge(row, col)

# Find connected components (fully coupled groups)
coupled_groups = list(nx.connected_components(G))

In [36]:
# Create a mapping from reaction ID to COBRA group name
reaction_to_group = {}

for group in model.groups:
    group_name = group.name
    for member in group.members:
        if hasattr(member, 'id'):  # make sure it's a Reaction, not a Metabolite or Gene
            reaction_to_group[member.id] = group_name

In [37]:
# Combine coupled groups with their annotated groups in the COBRA model and obtain in- and output of coupled groups

combined_data = []

for group in coupled_groups:
    group_reactions = list(group)
    
    # COBRA Group Names
    group_names = set()
    for rxn in group_reactions:
        if rxn in reaction_to_group:
            group_names.add(reaction_to_group[rxn])
    
    # Net Stoichiometry
    stoich = defaultdict(float)
    for rxn_id in group_reactions:
        rxn = model.reactions.get_by_id(rxn_id)
        for met, coeff in rxn.metabolites.items():
            stoich[met] += coeff

    inputs = [met.id for met, coeff in stoich.items() if coeff < 0]
    outputs = [met.id for met, coeff in stoich.items() if coeff > 0]

    # Append Combined Info
    combined_data.append({
        "Coupled_Reactions": group_reactions,
        "COBRA_Groups": list(group_names) if group_names else ["Unassigned"],
        "Num_Inputs": len(inputs),
        "Num_Outputs": len(outputs),
        "Input_Metabolites": inputs,
        "Output_Metabolites": outputs
    })

# Create the final DataFrame
Fully_Coupled_df = pd.DataFrame(combined_data)

In [38]:
# Check whether the reactions per group form a single connected component (i.e., fully connected cluster)
Fully_Coupled_df['Cluster'] = Fully_Coupled_df['Coupled_Reactions'].apply(
    lambda rxn_list: are_reactions_interconnected(consistent_generic_model, rxn_list)
)

# Remove Entries that are not fully connected

Fully_Coupled_df = Fully_Coupled_df[Fully_Coupled_df['Cluster'] != False]

# Remove objective function (if in df)
Fully_Coupled_df['Reactions'] = Fully_Coupled_df['Coupled_Reactions'].apply(lambda rxns: [r for r in rxns if r != objective_id])

*Option 2*: Selective lumping

In [39]:
columns = ['Coupled_Reactions','COBRA_Groups']

Fully_Coupled_df_v2 =  Fully_Coupled_df[columns].copy()

In [40]:
# perform lumping only excl. exchange reactions 

lumped_model_v2, lumping_log_v2 = lump_reaction(consistent_generic_model, coupled_df=Fully_Coupled_df_v2, verbose=False, label_type="Group", Search_COBRA_groups=False)

# Note: lumped_model_v2 can still carry flux and reproduce the FBA value 0.644!

In [41]:
total_length = Fully_Coupled_df_v2['Coupled_Reactions'].apply(len).sum()

**Step 3**: Validation 

In [42]:
# Get all pseudo-reaction IDs
pseudo_reactions = [rxn.id for rxn in lumped_model_v2.reactions if rxn.id.startswith("Pseudo_")]

# Use Flux Variability Analysis to test max possible flux
# fraction_of_optimum=0.0 tells FVA to ignore the model's objective and simply check whether each reaction can carry any flux under the current constraints.
fva_results = flux_variability_analysis(lumped_model_v2, reaction_list=pseudo_reactions, fraction_of_optimum=0.0) 

# Identify if any pseudoreactions are blocked (min=max=0)
fva_results['is_blocked'] = (fva_results['minimum'].abs() < 1e-6) & (fva_results['maximum'].abs() < 1e-6)

# Check if any reactions are blocked
if not fva_results['is_blocked'].any():
    print("✅ All pseudo-reactions validated — they can carry flux.")
else:
    print("❌ Some pseudo-reactions are blocked and cannot carry flux:")
    blocked = fva_results[fva_results['is_blocked']]
    print(blocked[['minimum', 'maximum']])

✅ All pseudo-reactions validated — they can carry flux.


In [43]:
# Get the stoichiometric matrix as a NumPy array
stoich_dense = create_stoichiometric_matrix(lumped_model_v2)  # Already a NumPy array

# Compute the rank of the stoichiometric matrix
rank = np.linalg.matrix_rank(stoich_dense)

# Degrees of freedom
num_reactions = len(lumped_model_v2.reactions)
dof = num_reactions - rank

# Output
print(f"Stoichiometric Matrix Rank: {rank}")
print(f"Number of Reactions: {num_reactions}")
print(f"Degrees of Freedom: {dof}")

Stoichiometric Matrix Rank: 430
Number of Reactions: 516
Degrees of Freedom: 86


# Module-oriented Reaction Lumping

**Step 1**: Group Sanity Check 

In [44]:
# Create a mapping for all groups and their corresponding reactions

# Extract groups and their reactions
group_mapping_data = [
    {
        'COBRA_Group': group.name,
        'Reactions': [member.id for member in group.members if member.__class__.__name__ == 'Reaction']
    }
    for group in lumped_model_v2.groups
    if any(member.__class__.__name__ == 'Reaction' for member in group.members)
]
Raw_Group_df = pd.DataFrame(group_mapping_data)

# Normalize group names (case + spacing)
Raw_Group_df['Normalized_Group'] = Raw_Group_df['COBRA_Group'].apply(
    lambda x: ' '.join(x.strip().split()).title()
)

# Group by normalized name, merge reactions, remove duplicates
Normalized_Group_df = (
    Raw_Group_df.groupby('Normalized_Group')
    .agg({'Reactions': lambda lists: list(set(r for sub in lists for r in sub))})
    .reset_index()
    .rename(columns={'Normalized_Group': 'COBRA_Group'})
)

In [45]:
# Detect suspiciously similar group names using fuzzy matching

group_names = Normalized_Group_df['COBRA_Group'].tolist()
suspicious_matches = []
SIMILARITY_THRESHOLD = 85 # max.: 100 

for i, name1 in enumerate(group_names):
    for j, name2 in enumerate(group_names):
        if i >= j:
            continue
        score = fuzz.ratio(name1, name2)
        if score >= SIMILARITY_THRESHOLD and name1 != name2:
            suspicious_matches.append({
                'Group_1': name1,
                'Group_2': name2,
                'Similarity_Score': score
            })

# Build suspicious match DataFrame and default merge plan
suspicious_df = pd.DataFrame(suspicious_matches)
suspicious_df['Merge_Into'] = suspicious_df['Group_1']

In [46]:
# Manual Check of suspicious match DataFrame + check of merge names

suspicious_df.loc[suspicious_df['Group_1'] == 'Pentose Phosphate Pathwa', 'Merge_Into'] = 'Pentose Phosphate Pathway'

In [47]:
# Create mapping from each group name to its cleaned (merged) name
merge_map = {}

for _, row in suspicious_df.iterrows():
    merge_map[row['Group_1']] = row['Merge_Into']
    merge_map[row['Group_2']] = row['Merge_Into']

# Apply merge mapping to normalized group names
Normalized_Group_df['Final_Group'] = Normalized_Group_df['COBRA_Group'].apply(
    lambda g: merge_map.get(g, g)
)

# Re-merge by Final_Group, removing duplicates in reaction lists
Final_Group_df = (
    Normalized_Group_df.groupby('Final_Group')
    .agg({'Reactions': lambda lists: list(set(r for sub in lists for r in sub))})
    .reset_index()
    .rename(columns={'Final_Group': 'COBRA_Group'})
)

# Add a column for the number of reactions per group
Final_Group_df['Reaction_Count'] = Final_Group_df['Reactions'].apply(len)

In [48]:
# Check whether the reactions per group form a single connected component (i.e., fully connected cluster)
Final_Group_df['Cluster'] = Final_Group_df['Reactions'].apply(
    lambda rxn_list: are_reactions_interconnected(lumped_model_v2, rxn_list)
)

# Remove Entries that are not fully connected

Final_Group_df = Final_Group_df[Final_Group_df['Cluster'] != False]

# Remove objective function (if in df)
Final_Group_df['Reactions'] = Final_Group_df['Reactions'].apply(lambda rxns: [r for r in rxns if r != objective_id])

**Step 2**: Decide which groups to merge 

In [49]:
Modules = Final_Group_df['COBRA_Group'].to_list()

In [50]:
# Export for manual investigation

Final_Group_df.to_csv(raw_csv_path / 'iMS520_context_lumpmodule.csv')

In [51]:
# Define modules that should be left intact (no lumping)

avoid_list = [
 'Anaplerotic Reactions',
 'Citrate Acid Cycle',
 'Energy Metabolism',
 'Fructose And Mannose Metabolism',
 'Glycogen Metabolism',
 'Glycolysis',
 'Glycolysis / Gluconeogenesis',
 'Other',
 'Oxidative Phosphorylation',
 'Pentose And Glucoronate Interconversions',
 'Pyruvate Metabolism',
 'Starch Sucrose Metabolism',
 'Starch Sucrose Metabolismgalactose Metabolism',
 'Unassigned']

In [52]:
# Filter & deduplicate rows in the avoid_list
avoid_reactions_series = Final_Group_df[Final_Group_df['COBRA_Group'].isin(avoid_list)]['Reactions']

avoid_reactions = set()
for reaction_list in avoid_reactions_series:
    avoid_reactions.update(reaction_list)

# Make a deep copy of the DataFrame to avoid changing the original
Lump_df = Final_Group_df.copy(deep=True)

# Iterate over rows not in avoid_list and clean their Reactions
for idx, row in Lump_df.iterrows():
    if row['COBRA_Group'] not in avoid_list:
        original_reactions = set(row['Reactions'])
        updated_reactions = original_reactions - avoid_reactions

        if original_reactions != updated_reactions:
            removed = original_reactions & avoid_reactions
            # print(f"Cleaning group '{row['COBRA_Group']}': removed {len(removed)} reaction(s): {removed}")

        Lump_df.at[idx, 'Reactions'] = list(updated_reactions)

# Identify rows where 'Reactions' is an empty list
empty_rows = Lump_df[Lump_df['Reactions'].apply(lambda x: len(x) == 0)]

# Remove these rows from the cleaned DataFrame
Final_Lump_df = Lump_df[Lump_df['Reactions'].apply(lambda x: len(x) > 1)].reset_index(drop=True)

In [53]:
# Check whether the reactions per group form a single connected component (i.e., fully connected cluster)
Final_Lump_df['Cluster_New'] = Final_Lump_df['Reactions'].apply(
    lambda rxn_list: are_reactions_interconnected(lumped_model_v2, rxn_list))

# Remove Entries that are not fully connected

Final_Lump_df = Final_Lump_df[Final_Lump_df['Cluster'] != False]

**Step 3**: Lump modules

In [54]:
# Remove rows where 'COBRA_Group' is in the avoid_list
Filtered_Lump_df = Final_Lump_df[~Final_Lump_df['COBRA_Group'].isin(avoid_list)].copy() # these won't get lumped!

#  Deduplicate reactions across all rows (ensures each reaction appears only once)
seen_reactions = set()
for idx, row in Filtered_Lump_df.iterrows():
    unique_reactions = [rxn for rxn in row['Reactions'] if rxn not in seen_reactions]
    seen_reactions.update(unique_reactions)
    Filtered_Lump_df.at[idx, 'Reactions'] = unique_reactions

# Remove reactions that start with "EX_"
for idx, row in Filtered_Lump_df.iterrows():
    non_exchange_reactions = [rxn for rxn in row['Reactions'] if not rxn.startswith("EX_")]
    Filtered_Lump_df.at[idx, 'Reactions'] = non_exchange_reactions

# Remove rows with empty reaction lists
Filtered_Lump_df = Filtered_Lump_df[Filtered_Lump_df['Reactions'].apply(len) > 1].reset_index(drop=True)

In [55]:
# perform lumping only excl. exchange reactions 

lumped_model_final, lump_log_final = lump_reaction(lumped_model_v2, coupled_df=Filtered_Lump_df, verbose=False, Search_COBRA_groups=True, label_type="Module")

**Step 4**: Validation

In [56]:
# Get all pseudo-reaction IDs
pseudo_reactions = [rxn.id for rxn in lumped_model_final.reactions if rxn.id.startswith("Pseudo_")]

# Use Flux Variability Analysis to test max possible flux
# fraction_of_optimum=0.0 tells FVA to ignore the model's objective and simply check whether each reaction can carry any flux under the current constraints.
fva_results = flux_variability_analysis(lumped_model_final, reaction_list=pseudo_reactions, fraction_of_optimum=0.0) 

# Identify if any pseudoreactions are blocked (min=max=0)
fva_results['is_blocked'] = (fva_results['minimum'].abs() < 1e-6) & (fva_results['maximum'].abs() < 1e-6)

# Check if any reactions are blocked
if not fva_results['is_blocked'].any():
    print("✅ All pseudo-reactions validated — they can carry flux.")
else:
    print("❌ Some pseudo-reactions are blocked and cannot carry flux:")
    blocked = fva_results[fva_results['is_blocked']]
    print(blocked[['minimum', 'maximum']])

✅ All pseudo-reactions validated — they can carry flux.


In [57]:
# Get the stoichiometric matrix as a NumPy array
stoich_dense = create_stoichiometric_matrix(lumped_model_final)  # Already a NumPy array

# Compute the rank of the stoichiometric matrix
rank = np.linalg.matrix_rank(stoich_dense)

# Degrees of freedom
num_reactions = len(lumped_model_final.reactions)
dof = num_reactions - rank

# Output
print(f"Stoichiometric Matrix Rank: {rank}")
print(f"Number of Reactions: {num_reactions}")
print(f"Degrees of Freedom: {dof}")

Stoichiometric Matrix Rank: 207
Number of Reactions: 225
Degrees of Freedom: 18


In [None]:
write_sbml_model(lumped_model_final, processed_sbml_path / 'iMS520_red1_context.xml')

# Validation

In [61]:
# Define Carb Lists

carb_list = ['EX_glc-D(e)_reversed','EX_fru(e)_reversed','EX_sucr(e)_reversed','EX_arab-L(e)_reversed']
carb_list_original = ['EX_glc-D(e)','EX_fru(e)','EX_sucr(e)','EX_arab-L(e)']

In [62]:
# Reassign original flux bounds

daughter_model = copy.deepcopy(lumped_model_final)

# Store original bounds as a dictionary: {reaction_id: (lower_bound, upper_bound)}
original_bounds = {
    rxn.id: (rxn.lower_bound, rxn.upper_bound)
    for rxn in model_copy.reactions
}

# Apply the original bounds to the target model
for rxn_id, (lb, ub) in original_bounds.items():
    if rxn_id in daughter_model.reactions:
        rxn = daughter_model.reactions.get_by_id(rxn_id)
        rxn.lower_bound = lb
        rxn.upper_bound = ub

In [63]:
# Run analysis for both models
results_original, flux_activity_original, filtered_flux_original = analyze_model(model, 'Original',carb_list_original)
results_mod_original, flux_activity_mod_original, filtered_flux_mod_original = analyze_model(model_copy, 'Original_Rev',carb_list,Direction="Reversed")
results_mod_original_FB, flux_activity_mod_original_FB, filtered_flux_mod_original_FB = analyze_model(model_copy2, 'Original_Rev_FB',carb_list,Direction="Reversed")
results_fastcc, flux_activity_fastcc, filtered_flux_fastcc = analyze_model(consistent_generic_model, 'FASTCC',carb_list,Direction="Reversed")
results_final, flux_activity_final, filtered_flux_final = analyze_model(lumped_model_final, 'Final_Red',carb_list,Direction="Reversed")
results_daugther, flux_activity_daughter, filtered_flux_daughter = analyze_model(daughter_model, 'daugther',carb_list,Direction="Reversed")

In [64]:
Objective_comparison_df = pd.concat([results_original,results_daugther], axis=0)

In [65]:
Objective_comparison_df.to_csv(raw_csv_path / 'iMS520_context_FBA_results.csv')

In [None]:
write_sbml_model(daughter_model, processed_sbml_path / 'iMS520_red2_context.xml') # this one works

In [None]:
newmodel = read_sbml_model(processed_sbml_path / 'iMS520_red2_context.xml') # this one does not work