In [1]:
import os
import re
import numpy as np
import time
import random
import pandas as pd
import glob
import shutil
import seaborn as sns
import itertools
import math
from pathlib import Path
import json
from copy import deepcopy
from dataclasses import dataclass
from types import SimpleNamespace

from matplotlib.lines import Line2D
from scipy.integrate import solve_ivp
from sklearn.metrics import confusion_matrix
from autocatalytic_cores_lib import *
import networkx as nx
import matplotlib.pyplot as plt
from scipy.stats import linregress
from sympy import symbols, Matrix, diff, lambdify
from scipy.linalg import eigvals
from numpy.linalg import svd
from scipy.optimize import linprog
from scipy.sparse import csr_matrix
from scipy.sparse import bmat
from scipy.sparse.csgraph import connected_components
from scipy.linalg import eigh
from scipy.optimize import fsolve
from textwrap import dedent
import networkx as nx   
from matplotlib import animation

# Parameters

In [2]:
"""
- N_X_raw: Number of species in the original system
- N_Y_raw: Number of invading species
- N_R: Number of reactions
- ttot: Total simulation time
- dt: Time step for integration
- Runs: Number of simulation runs
- diluted: Whether invaders interact with the original species
- ambiguity: Allows for flexible reaction assignments
- law: Defines reaction kinetics (e.g., Michaelis-Menten)
- autonomy: If invaders can react among themselves
- degradation: Whether degradation terms are included
"""

'\n- N_X_raw: Number of species in the original system\n- N_Y_raw: Number of invading species\n- N_R: Number of reactions\n- ttot: Total simulation time\n- dt: Time step for integration\n- Runs: Number of simulation runs\n- diluted: Whether invaders interact with the original species\n- ambiguity: Allows for flexible reaction assignments\n- law: Defines reaction kinetics (e.g., Michaelis-Menten)\n- autonomy: If invaders can react among themselves\n- degradation: Whether degradation terms are included\n'

# Generate Random Network

In [3]:
def Generate_Random_Network(N_X_raw, N_Y_raw, N_R_raw, diluted, ambiguity, autonomy):

    # Check Condition Consistency
    if diluted and not autonomy:
        raise ValueError("Impossible to ask for TOP conditions AND allowing non-autonomy")

    # Build Random Network
    S_raw = np.zeros((N_X_raw + N_Y_raw, N_R_raw))
    S1_raw = np.zeros((N_X_raw + N_Y_raw, N_R_raw))
    
    # Construct stoichiometric matrix
    for i in range(N_R_raw):
        species1 = random.randint(0, N_Y_raw - 1)
        S_raw[N_X_raw + species1][i] += 1
        
        if autonomy:
            species2 = random.randint(0, N_Y_raw - 1)
            while not ambiguity and species2 == species1:
                species2 = random.randint(0, N_Y_raw - 1)
            S1_raw[N_X_raw + species2][i] += 1
            
        # The order of a chemical reaction
        total_order_for = random.randint(1, 3)
        total_order_bac = random.randint(1, 3)
        # Count the number of forward/backward reaction species already in the reaction
        stoichio_for = 0
        stoichio_bac = 0
    
        while stoichio_for < total_order_for - 1:
            # when not diluted, we count in N_Y_raw
            species = random.randint(0, N_X_raw + (N_Y_raw if not diluted else 0) - 1)
            if ambiguity or S1_raw[species][i] == 0:
                S_raw[species][i] += 1
                stoichio_for += 1
    
        while stoichio_bac < (total_order_bac - (1 if autonomy else 0)):
            species = random.randint(0, N_X_raw + N_Y_raw - 1)
            if ambiguity or S_raw[species][i] == 0:
                S1_raw[species][i] += 1
                stoichio_bac += 1
    
    Stot_raw = S1_raw - S_raw

    '''
    Reduce Matrix to Avoid Redundant and Confusion
    '''
    
    # Remove all-zero rows (empty species)
    # Species
    row_keep = ~((S_raw==0).all(axis=1) & (S1_raw==0).all(axis=1))
    S_raw  = S_raw[row_keep]
    S1_raw = S1_raw[row_keep]

    # Remove all-zero columns (reactions with net zero stoichiometry)
    col_keep = ~((S_raw==0).all(axis=0) & (S1_raw==0).all(axis=0))
    S_raw  = S_raw[:, col_keep]
    S1_raw = S1_raw[:, col_keep]

    # Remove duplicate columns
    m = S_raw.shape[1]
    keep = []
    seen = set()
    for j in range(m):
        key = tuple(S_raw[:,j].tolist()) + tuple(S1_raw[:,j].tolist())
        if key not in seen:
            seen.add(key)
            keep.append(j)
    S_raw  = S_raw[:, keep]
    S1_raw = S1_raw[:, keep]

    # Record new parameters
    S_plus = S1_raw
    S_minus = S_raw
    Stot = S_plus - S_minus
    kept_species_idx = np.where(row_keep)[0]
    N_X = sum(i< N_X_raw for i in kept_species_idx)
    N_Y = sum(N_X_raw <= i < N_X_raw+N_Y_raw for i in kept_species_idx)
    N_R = Stot.shape[1]

    df_Stot = pd.DataFrame(
    Stot.astype(int),                        
    index=range(1, Stot.shape[0]+1),         
    columns=range(1, Stot.shape[1]+1)        
    )
    print("Matrix Stot:")
    print(df_Stot.to_string())
    
    # S_plus 
    df_Sp = pd.DataFrame(
        S_plus.astype(int),
        index=range(1, S_plus.shape[0]+1),
        columns=range(1, S_plus.shape[1]+1)
    )
    print("\nS_plus:")
    print(df_Sp.to_string())
    
    # S_minus 
    df_Sm = pd.DataFrame(
        S_minus.astype(int),
        index=range(1, S_minus.shape[0]+1),
        columns=range(1, S_minus.shape[1]+1)
    )
    print("\nS_minus:")
    print(df_Sm.to_string())
    print("NX =", N_X)
    print("NY =", N_Y)
    print("NR =", N_R)

    return Stot, N_X, N_Y, N_R, S_plus, S_minus

# Compute Null Space

In [4]:
def compute_null_space(S_minus, S_plus, tol=1e-12):
    S = S_plus - S_minus
    m, n = S.shape
    U, sigma, Vt = svd(S, full_matrices=True)
    rank = np.sum(sigma > tol)
    # Left nullspace: y^T S = 0
    dimL = m - rank
    basisL = U[:, rank:] if dimL > 0 else np.zeros((m, 0))
    # Right nullspace: S x = 0
    dimR = n - rank
    basisR = Vt[rank:, :].T if dimR > 0 else np.zeros((n, 0))
    return basisL, dimL, basisR, dimR
    

# Praful's MGF

In [5]:
import algorithm_1 as algo1
import auxiliary_functions as aux
import algorithm_3 as algo3

In [6]:
'''
MGF calculation:
flux, alpha, _, _ = algo1.growthRateGraph(output_matrix, input_matrix, max_steps, time_limit_iteration)

Check autonomy:
_, _, auto = aux.checkAutonomy(input_matrix, output_matrix)

Find strongest subnetworks:
algo3.growthRateInSubgraphDefinitive(input_matrix, output_matrix, nameScenario, time_limit_iteration, ecoli_species)
'''

'\nMGF calculation:\nflux, alpha, _, _ = algo1.growthRateGraph(output_matrix, input_matrix, max_steps, time_limit_iteration)\n\nCheck autonomy:\n_, _, auto = aux.checkAutonomy(input_matrix, output_matrix)\n\nFind strongest subnetworks:\nalgo3.growthRateInSubgraphDefinitive(input_matrix, output_matrix, nameScenario, time_limit_iteration, ecoli_species)\n'

# MGF Calculation and Statistics

In [7]:
'''
We can directly call Praful's code for computing MGF
And also the one from Von Neumann model tutorial 
only for networks fit autonomous conditions
instead of using our self-bulid one
'''

def extract_candidate_cores(df, N_X, N_Y, N_R):
    """
    From DataFrame with columns AC, M1..Mn, R1..Rm,
    returns list of {'AC', 'species', 'reactions'}.
    """
    cores = []
    for _, row in df.iterrows():
        sp = [i for i in range(N_X + N_Y) if row.get(f"M{i+1}", 0) >= 0.5]
        rx = [j for j in range(N_R) if row.get(f"R{j+1}", 0) >= 0.5]
        if sp and rx:
            cores.append({'AC': row.get("AC"), 'species': sp, 'reactions': rx})
    return cores

def identify_and_score_cores(S_minus, S_plus, Stot, N_X, N_Y, N_R, output_dir):
    """
    1) Find forward cores on Stot.
    2) Find reverse cores on -Stot.
    3) For each core, compute its true MGF in the original direction.
    Returns a list of dicts with keys:
      core, species, reactions, alpha, optimal_flow
    """

    # forward cores
    fwd_file = os.path.join(output_dir, "cores_forward.xlsx")
    _ = ComputeAutocatalyticCores(Stot, fwd_file)
    df_fwd = pd.read_excel(fwd_file)
    forwards = extract_candidate_cores(df_fwd, N_X, N_Y, N_R)

    # reverse cores
    rev_file = os.path.join(output_dir, "cores_reverse.xlsx")
    _ = ComputeAutocatalyticCores(-Stot, rev_file)
    df_rev = pd.read_excel(rev_file)
    reverses = extract_candidate_cores(df_rev, N_X, N_Y, N_R)

    results = []
    # score forward
    for core in forwards:
        SM_sub = Stot[np.ix_(core['species'], core['reactions'])]
        S_plus_sub = np.where(SM_sub > 0, SM_sub, 0)
        S_minus_sub = np.where(SM_sub < 0, -SM_sub, 0)
        alpha, flow = dinkelbach_mgf(S_plus_sub, S_minus_sub)
        label = f"Forward AC {int(core['AC'])}"
        results.append({
            'core': label,
            'species': core['species'],
            'reactions': core['reactions'],
            'alpha': alpha,
            'optimal_flow': flow
        })

    # score reverse
    for core in reverses:
        SM_sub = Stot[np.ix_(core['species'], core['reactions'])]
        S_plus_sub = np.where(SM_sub > 0, SM_sub, 0)
        S_minus_sub = np.where(SM_sub < 0, -SM_sub, 0)
        alpha_inv, flow = dinkelbach_mgf(S_plus_sub, S_minus_sub)
        # alpha = 1.0/alpha_inv if alpha_inv != 0 else 0.0
        # print(f"Reverse core {core['AC']} actual alpha (1/alpha_inv): {alpha:.5f}")
        label = f"Reverse AC {int(core['AC'])}"
        results.append({
            'core': label,
            'species': core['species'],
            'reactions': core['reactions'],
            'alpha': alpha_inv,
            'optimal_flow': flow
        })

    return results

# Von Neumann Growth Factor 

From tutorial Von Neumann Growth Model (and a Generalization) 

https://python.quantecon.org/von_neumann_model.html

In [8]:
class Neumann(object):

    """
    This class describes the Generalized von Neumann growth model as it was
    discussed in Kemeny et al. (1956, ECTA) and Gale (1960, Chapter 9.5):

    Let:
    n ... number of goods
    m ... number of activities
    A ... input matrix is m-by-n
        a_{i,j} - amount of good j consumed by activity i
    B ... output matrix is m-by-n
        b_{i,j} - amount of good j produced by activity i

    x ... intensity vector (m-vector) with non-negative entries
        x'B - the vector of goods produced
        x'A - the vector of goods consumed
    p ... price vector (n-vector) with non-negative entries
        Bp - the revenue vector for every activity
        Ap - the cost of each activity

    Both A and B have non-negative entries. Moreover, we assume that
    (1) Assumption I (every good which is consumed is also produced):
        for all j, b_{.,j} > 0, i.e. at least one entry is strictly positive
    (2) Assumption II (no free lunch):
        for all i, a_{i,.} > 0, i.e. at least one entry is strictly positive

    Parameters
    ----------
    A : array_like or scalar(float)
        Part of the state transition equation.  It should be `n x n`
    B : array_like or scalar(float)
        Part of the state transition equation.  It should be `n x k`
    """

    def __init__(self, A, B):

        self.A, self.B = list(map(self.convert, (A, B)))
        self.m, self.n = self.A.shape

        # Check if (A, B) satisfy the basic assumptions
        assert self.A.shape == self.B.shape, 'The input and output matrices \
              must have the same dimensions!'
        assert (self.A >= 0).all() and (self.B >= 0).all(), 'The input and \
              output matrices must have only non-negative entries!'

        # (1) Check whether Assumption I is satisfied:
        if (np.sum(B, 0) <= 0).any():
            self.AI = False
        else:
            self.AI = True

        # (2) Check whether Assumption II is satisfied:
        if (np.sum(A, 1) <= 0).any():
            self.AII = False
        else:
            self.AII = True

    def __repr__(self):
        return self.__str__()

    def __str__(self):

        me = """
        Generalized von Neumann expanding model:
          - number of goods          : {n}
          - number of activities     : {m}

        Assumptions:
          - AI:  every column of B has a positive entry    : {AI}
          - AII: every row of A has a positive entry       : {AII}

        """
        # Irreducible                                       : {irr}
        return dedent(me.format(n=self.n, m=self.m,
                                AI=self.AI, AII=self.AII))

    def convert(self, x):
        """
        Convert array_like objects (lists of lists, floats, etc.) into
        well-formed 2D NumPy arrays
        """
        return np.atleast_2d(np.asarray(x))


    def bounds(self):
        """
        Calculate the trivial upper and lower bounds for alpha (expansion rate)
        and beta (interest factor). See the proof of Theorem 9.8 in Gale (1960)
        """

        n, m = self.n, self.m
        A, B = self.A, self.B

        f = lambda α: ((B - α * A) @ np.ones((n, 1))).max()
        g = lambda β: (np.ones((1, m)) @ (B - β * A)).min()

        UB = fsolve(f, 1).item()  # Upper bound for α, β
        LB = fsolve(g, 2).item()  # Lower bound for α, β

        return LB, UB


    def zerosum(self, γ, dual=False):
        """
        Given gamma, calculate the value and optimal strategies of a
        two-player zero-sum game given by the matrix

                M(gamma) = B - gamma * A

        Row player maximizing, column player minimizing

        Zero-sum game as an LP (primal --> α)

            max (0', 1) @ (x', v)
            subject to
            [-M', ones(n, 1)] @ (x', v)' <= 0
            (x', v) @ (ones(m, 1), 0) = 1
            (x', v) >= (0', -inf)

        Zero-sum game as an LP (dual --> beta)

            min (0', 1) @ (p', u)
            subject to
            [M, -ones(m, 1)] @ (p', u)' <= 0
            (p', u) @ (ones(n, 1), 0) = 1
            (p', u) >= (0', -inf)

        Outputs:
        --------
        value: scalar
            value of the zero-sum game

        strategy: vector
            if dual = False, it is the intensity vector,
            if dual = True, it is the price vector
        """

        A, B, n, m = self.A, self.B, self.n, self.m
        M = B - γ * A

        if dual == False:
            # Solve the primal LP (for details see the description)
            # (1) Define the problem for v as a maximization (linprog minimizes)
            c = np.hstack([np.zeros(m), -1])

            # (2) Add constraints :
            # ... non-negativity constraints
            bounds = tuple(m * [(0, None)] + [(None, None)])
            # ... inequality constraints
            A_iq = np.hstack([-M.T, np.ones((n, 1))])
            b_iq = np.zeros((n, 1))
            # ... normalization
            A_eq = np.hstack([np.ones(m), 0]).reshape(1, m + 1)
            b_eq = 1

            res = linprog(c, A_ub=A_iq, b_ub=b_iq, A_eq=A_eq, b_eq=b_eq,
                          bounds=bounds)

        else:
            # Solve the dual LP (for details see the description)
            # (1) Define the problem for v as a maximization (linprog minimizes)
            c = np.hstack([np.zeros(n), 1])

            # (2) Add constraints :
            # ... non-negativity constraints
            bounds = tuple(n * [(0, None)] + [(None, None)])
            # ... inequality constraints
            A_iq = np.hstack([M, -np.ones((m, 1))])
            b_iq = np.zeros((m, 1))
            # ... normalization
            A_eq = np.hstack([np.ones(n), 0]).reshape(1, n + 1)
            b_eq = 1

            res = linprog(c, A_ub=A_iq, b_ub=b_iq, A_eq=A_eq, b_eq=b_eq,
                          bounds=bounds)

        if res.status != 0 or res.x is None:
            # LP infeasible or error
            return np.nan, None

        # Pull out the required quantities
        value = res.x[-1]
        strategy = res.x[:-1]

        return value, strategy


    def expansion(self, tol=1e-8, maxit=1000):
        """
        The algorithm used here is described in Hamburger-Thompson-Weil
        (1967, ECTA). It is based on a simple bisection argument and utilizes
        the idea that for a given γ (= α or β), the matrix "M = B - γ * A"
        defines a two-player zero-sum game, where the optimal strategies are
        the (normalized) intensity and price vector.

        Outputs:
        --------
        alpha: scalar
            optimal expansion rate
        """

        LB, UB = self.bounds()

        for iter in range(maxit):

            γ = (LB + UB) / 2
            ZS = self.zerosum(γ=γ, dual=False)
            V = ZS[0]     # value of the game with γ

            if V >= 0:
                LB = γ
            else:
                UB = γ

            if abs(UB - LB) < tol:
                γ = (UB + LB) / 2
                x = self.zerosum(γ=γ)[1]
                p = self.zerosum(γ=γ, dual=True)[1]
                break

        return γ, x, p

    def interest(self, tol=1e-8, maxit=1000):
        """
        The algorithm used here is described in Hamburger-Thompson-Weil
        (1967, ECTA). It is based on a simple bisection argument and utilizes
        the idea that for a given gamma (= alpha or beta),
        the matrix "M = B - γ * A" defines a two-player zero-sum game,
        where the optimal strategies are the (normalized) intensity and price
        vector

        Outputs:
        --------
        beta: scalar
            optimal interest rate
        """

        LB, UB = self.bounds()

        for iter in range(maxit):
            γ = (LB + UB) / 2
            ZS = self.zerosum(γ=γ, dual=True)
            V = ZS[0]

            if V > 0:
                LB = γ
            else:
                UB = γ

            if abs(UB - LB) < tol:
                γ = (UB + LB) / 2
                p = self.zerosum(γ=γ, dual=True)[1]
                x = self.zerosum(γ=γ)[1]
                break

        return γ, x, p
    
def compute_von_neumann_alpha_beta(S_plus, S_minus, tol=1e-8):
    """
    Compute von Neumann alpha (expansion) and beta (interest) for the network.
    Returns alpha, beta.
    """
    # Build A, B for Neumann: A = input = S_minus^T, B = output = S_plus^T
    A = S_minus.T
    B = S_plus.T
    model = Neumann(A, B)
    alpha, _, _ = model.expansion(tol=tol)
    beta, _, _  = model.interest(tol=tol)
    return alpha, beta


# Construct Kinetic Constants and Initial Conditions

In [9]:
def compute_mu(mu0, Z):
    
    if Z <= 0:
        raise ValueError
    
    return mu0 + math.log(Z)

def Construct_Kinetics(N_X, N_Y, N_R, S_plus, S_minus, degradation):
    X0 = [2.0 * random.uniform(0.95, 1.05) for _ in range(N_X)]
    Y0 = [0.01 * random.uniform(0.95, 1.05) for _ in range(N_Y)]
    ini_concentration = np.concatenate([X0, Y0])
    kf = [random.uniform(1.95, 2.05) for _ in range(N_R)]
    # confine the chemical potentials of species X
    mu_0_X = [random.uniform(-1, 1) for _ in range(N_X)]
    mu_X_actual = np.array([compute_mu(mu_0_X[i], X0[i]) for i in range(N_X)])
    mu_0_Y = [random.uniform(-1, 1) for _ in range(N_Y)]
    mu_0 = mu_0_X + mu_0_Y
    mu_0_vector = np.array(mu_0).reshape(-1, 1)
    DeltaG0 = np.dot(S_plus.T, mu_0_vector).flatten() - np.dot(S_minus.T, mu_0_vector).flatten()
    print(f"DeltaG0={DeltaG0}")
    print(f"kr/kf:{np.exp(DeltaG0)}")
    kr = kf * np.exp(DeltaG0)

    # degradation coefficient
    kd = None
    if degradation:
        kd = 0.01*np.array([random.uniform(0.95, 1.05) for _ in range(N_Y)])
    # try to change some forms

    return ini_concentration, kf, kr, kd, mu_X_actual, mu_0_vector, DeltaG0

# Dynamical Simulation

In [10]:
'''
Version 2: Self adaptation simulation 
Without fixing a total time length, checking the concentration change dynamically, 
if the concentration maintains the same for a set time length, regard the system entersteady state
'''
def Solve_Concentration_SS(
    S_minus, S_plus, X0, Y0, N_X, N_Y, N_R,
    kf, kr, kd, law,
    dt=1e-2, tol_ss=1e-5, consec_steps=100,
    rtol=1e-2, atol=1e-5):
    """
    ntegrate iteratively with a fixed step size dt until the change in Y over consecutive consec_steps steps is < tol_ss.
    Returns:
      t_hist:    shape (nt,)
      Y_hist:    shape (nt, N_Y)
      X_hist:    shape (nt, N_X)
    """
    # ODE
    def dydt(t, Y):
        curr = np.zeros(N_R)
        ratef = np.ones(N_R) if law=="MM" else kf.copy()
        rater = kr.copy()
        X_Y = np.concatenate((X0, Y))
        for l in range(N_R):
            # forward
            prod = np.prod([s**p for s,p in zip(X_Y, S_minus[:,l])])
            ratef[l] = prod/(kf[l]+prod) if law=="MM" else kf[l]*prod
            # reverse
            term = 1.0
            for s,p in zip(X_Y, S_plus[:,l]):
                term *= min(s**p, 1e10)
            rater[l] *= term
            curr[l] = ratef[l] - rater[l]
        if kd is not None:
            curr -= np.maximum(kd * Y, 0)
        return ((S_plus - S_minus)[N_X:,:] @ curr)

    t = 0.0
    Y = Y0.copy()
    t_hist = [t]
    Y_hist = [Y.copy()]
    X_hist = [X0.copy()]

    small_count = 0
    # prevent infinite loops.
    max_steps = 500000  
    for step in range(max_steps):
        # take short step integration among [t, t+dt]
        sol = solve_ivp(
            dydt, [t, t+dt], Y, method="LSODA",
            max_step=dt, rtol=rtol, atol=atol
        )
        Y_new = sol.y[:,-1]
        t = sol.t[-1]

        # record
        t_hist.append(t)
        Y_hist.append(Y_new.copy())
        X_hist.append(X0.copy())

        # Check new concentration difference
        if np.max(np.abs(Y_new - Y)) < tol_ss:
            small_count += 1
        else:
            small_count = 0

        # Exit if the condition is met for consec_steps consecutive steps
        if small_count >= consec_steps:
            break

        Y = Y_new

    return np.array(t_hist), np.vstack(Y_hist), np.vstack(X_hist)

def Solve_Concentration(
    S_minus, S_plus, X0, Y0,
    N_X, N_Y, N_R,
    kf, kr, kd,
    law, dt=1e-2, tol_ss=1e-5, consec_steps=100,
    rtol=1e-2, atol=1e-5):

    t_hist, Y_hist, X_hist = Solve_Concentration_SS(
        S_minus, S_plus, X0, Y0,
        N_X, N_Y, N_R,
        kf, kr, kd, law,
        dt=dt,
        tol_ss=tol_ss,
        consec_steps=consec_steps,
        rtol=rtol,
        atol=atol
    )

    # Wrapped to have the same sol interface as the old version.
    sol = SimpleNamespace(
        t = np.array(t_hist),       
        y = np.array(Y_hist).T       
    )

    X_history = np.array(X_hist)

    return sol, X_history

In [11]:
def compute_affinity_and_fluxes(S_minus, S_plus, X0, Y_vec, N_X, N_Y, N_R, kf, kr, law):
    """
    Given X0, Y_vec at specific time point, compute net flux current and affinity
    Return: Aff (N_R,), curr (N_R,)
    """

    curr = np.zeros(N_R)
    Aff  = np.zeros(N_R)
    ratef = np.ones(N_R) if law=="MM" else kf.copy()
    rater = deepcopy(kr)
    X_Y = np.concatenate((X0, Y_vec))

    for l in range(N_R):
        prod = np.prod([s**p for s, p in zip(X_Y, S_minus[:, l])])
        ratef[l] = prod/(kf[l]+prod) if law=="MM" else kf[l]*prod
        term = 1.0
        for s, p in zip(X_Y, S_plus[:, l]):
            term *= min(s**p, 1e10)
        rater[l] *= term

        curr[l] = ratef[l] - rater[l]
        if not np.isfinite(curr[l]):
            curr[l] = 0.0

        # affinity
        if rater[l] != 0 and ratef[l] >= 0:
            Aff[l] = np.log(ratef[l]/rater[l])
        else:
            Aff[l] = np.nan

    return Aff, curr

def compute_energy_flow(curr, Aff):
    """ energy flow for each reaction = curr * Aff """
    return curr * Aff


def compute_entropy_production_rate(curr, Aff):
    """Entropy production rate for whole system = Sigma (curr * Aff) """
    return np.dot(curr, Aff)


def compute_chemical_work(Stot, curr, N_X, mu_X_actual):
    """
    Chemical Work = mu_actual * Curr_ext
    Curr_ext[i] = - Sigma_j (Stot[i,j] * curr[j])
    """
    Curr_ext = - np.dot(Stot[N_X, :], curr)
    return np.dot(mu_X_actual, Curr_ext)


def Dynamic_Simulation(S_minus: np.ndarray,
                       S_plus: np.ndarray,
                       Stot: np.ndarray,
                       X0: np.ndarray,
                       Y0: np.ndarray,
                       N_X: int,
                       N_Y: int,
                       N_R: int,
                       kf: np.ndarray,
                       kr: np.ndarray,
                       kd,
                       mu_X_actual: np.ndarray,
                       law: str):
    """
    adaptive step size concentration simulation and calculate thermodynamic quantities for the entire time series.

    Returns:
      sol_t, sol_y, flux_ts, affinity_ts, energy_flow_ts, entropy_prod_ts, chemical_work_ts
    """
    sol, X_hist = Solve_Concentration(
        S_minus, S_plus, X0, Y0,
        N_X, N_Y, N_R,
        kf, kr, kd,
        law
    )
    sol_t = sol.t
    sol_y = sol.y.T  # (nt, N_Y)

    # Compute Thermodynamic quantities
    flux_list = []
    aff_list = []
    ef_list = []
    ep_list = []
    cw_list = []
    for Y_vec, X_vec in zip(sol_y, X_hist):
        Aff, curr = compute_affinity_and_fluxes(
            S_minus, S_plus,
            X0=X_vec, Y_vec=Y_vec,
            N_X=N_X, N_Y=N_Y, N_R=N_R,
            kf=kf, kr=kr,
            law=law
        )
        flux_list.append(curr)
        aff_list.append(Aff)
        ef_list.append(compute_energy_flow(curr, Aff))
        ep_list.append(compute_entropy_production_rate(curr, Aff))
        cw_list.append(compute_chemical_work(Stot, curr, N_X, mu_X_actual))

    flux_ts = np.array(flux_list)
    affinity_ts = np.array(aff_list)
    energy_flow_ts = np.array(ef_list)
    entropy_prod_ts = np.array(ep_list)
    chemical_work_ts = np.array(cw_list)

    return sol_t, sol_y, flux_ts, affinity_ts, energy_flow_ts, entropy_prod_ts, chemical_work_ts

# Steady State Concentration distribution

In [12]:
def compute_shannon_entropy(Y: np.ndarray) -> float:
    """
    Given a set of non-negative Y concentrations, 
    first normalize them as p_i = Y_i / sum(Y_i), 
    then calculate the Shannon entropy H = -sum_i p_i * log(p_i). 
    If sum(Y) == 0, return 0.
    """
    Y = np.asarray(Y, float)
    total = Y.sum()
    if total <= 0:
        return 0.0
    p = Y / total

    p_nonzero = p[p > 0]
    return -np.sum(p_nonzero * np.log(p_nonzero))

# Lyapunov

In [13]:
def compute_core_jacobian_max_eig_at_time(core, S_minus, S_plus,
                                          X0, Y0, kf, kr, kd,
                                          N_X, law="mass_action",
                                          t_target=1.0, eps=1e-6):
    """
    For a single core, compute numeric Jacobian of dY/dt at Y0 and return max real eigenvalue.
    """
    kf = np.asarray(kf, float)
    kr = np.asarray(kr, float)
    if kd is not None:
        kd = np.asarray(kd, float)

    # extract sub‐indices
    sp_idx = core['species']
    rx_idx = core['reactions']

    S_minus_sub = S_minus[np.ix_(sp_idx, rx_idx)]
    S_plus_sub  = S_plus[np.ix_(sp_idx, rx_idx)]
    kf_sub = kf[rx_idx]
    kr_sub = kr[rx_idx]

    # build Y‐sub vector
    Y_idx = [i - N_X for i in sp_idx if i >= N_X]
    Y0_sub = np.asarray(Y0, float)[Y_idx]
    kd_sub = None if kd is None else kd[Y_idx]

    # f_sub
    def f_sub(Y_sub):
        curr = compute_net_flux(
            S_minus_sub, S_plus_sub, X0, Y_sub,
            kf_sub, kr_sub, law, kd_sub
        )

        # row_count = len(Y_idx)，col_count = len(rx_idx)
        Sdiff = (S_plus_sub - S_minus_sub)[len(sp_idx)-len(Y_idx):, :]
        return Sdiff @ curr

    # Integral
    sol = solve_ivp(lambda t, y: f_sub(y),
                    [0, t_target], y0=Y0_sub,
                    method="LSODA", rtol=1e-6, atol=1e-8, max_step=1e-2)
    Y_tt = sol.y[:, -1]

    n = Y_tt.size
    J = np.zeros((n, n))
    f0 = f_sub(Y_tt)
    for j in range(n):
        Yp = Y_tt.copy()
        Yp[j] += eps
        J[:, j] = (f_sub(Yp) - f0) / eps

    eigs = np.linalg.eigvals(J)
    return np.max(eigs.real), J, eigs


# Entropy Production Rate of Each Core

In [14]:
def Compute_Core_EPR(energy_flow: np.ndarray,
                                 cores: list) -> pd.DataFrame:
    """
    For each core, sum the energy_flow of all reactions on that core at each point in time.
    
    Parameters
    ----------
    energy_flow : np.ndarray, shape (nt, N_R)
        
    cores : list of dict
        [{'core': str, 'reactions': [r0, r1, ...]}, ...]
    
    Returns
    -------
    pd.DataFrame
        Row index is the time step (0..nt-1), and the columns are the core labels.
    """
    nt, _ = energy_flow.shape

    df = pd.DataFrame(
        np.zeros((nt, len(cores))),
        columns=[c['core'] for c in cores]
    )
    # For each core, sum the corresponding reaction column.
    for c in cores:
        label = c['core']
        rxns = c['reactions']

        df[label] = energy_flow[:, rxns].sum(axis=1)
    return df


# Visulization

In [15]:
def dynamic_network_visualization(G, pos, core_species_colors, core_nodes, N_RY, sol, Energy_Flow, NetFlux_values, output_dir, realization_index, snapshot_interval=100):
    """
    Dynamic network visualization:
    Update the size of nodes in the reaction network based on simulation data (time series, energy flow, net flux) (e.g., changes in Y species concentration);
    Determine edge width and arrow color based on current frame's energy flow and net flux;
    Generate an animation and save it as an MP4 file.
    
    Parameters:
    G: networkx graph object (typically a MultiDiGraph)
    pos: Dictionary of node positions (node -> (x, y))
    core_species_colors: dict, Color mapping for core species, e.g., {"X1": "red"}
    core_nodes: set, Set of all core nodes (generally the set of keys from core_species_colors)
    N_RY: Total number of reactions (e.g., the number of reactions in the entire system)
    sol: Simulation results object, required to contain attributes sol.t (time array) and sol.y (state matrix, assuming rows correspond to Y species)
    Energy_Flow: Array of shape (len(sol.t), N_RY) representing energy flow data
    NetFlux_values: Array of shape (len(sol.t), N_RY) representing net flux data (positive and negative values may occur)
    output_dir: Output directory for saving the animation
    realization_index: Index number of the current simulation instance (used for file naming)
    snapshot_interval: Interval index for updating animation frames (default: 100)
        
    Returns:
    ani: Dynamic animation
    
    """
    # Calculate the coordinate range
    x_vals = [p[0] for p in pos.values()]
    y_vals = [p[1] for p in pos.values()]
    x_margin = (max(x_vals) - min(x_vals)) * 0.2
    y_margin = (max(y_vals) - min(y_vals)) * 0.2
    x_lim = (min(x_vals) - x_margin, max(x_vals) + x_margin)
    y_lim = (min(y_vals) - y_margin, max(y_vals) + y_margin)

    fig, ax = plt.subplots(figsize=(12, 12))
    frame_indices = list(range(0, len(sol.t), snapshot_interval))

    def update_frame(i):
        ax.clear()
        t_current = sol.t[i]
        ax.set_xlim(x_lim)
        ax.set_ylim(y_lim)
        # Update node size based on the node's category: fixed size or proportional to concentration.
        core_nodes_dyn = []
        non_core_nodes_dyn = []
        core_sizes = []
        non_core_sizes = []
        for node in G.nodes():
            if node.startswith("X"):
                size_val = 500  # Fixed species X size
            else:
                try:
                    # Assuming a node name like "Y1", its index would be 0.
                    idx = int(node[1:]) - 1
                    size_val = sol.y[idx, i] * 3000  # Adjust magnification factor based on concentration.
                except:
                    size_val = 500
            if node in core_nodes:
                core_nodes_dyn.append(node)
                core_sizes.append(size_val)
            else:
                non_core_nodes_dyn.append(node)
                non_core_sizes.append(size_val)
        # Draw core nodes (squares with black borders)
        if core_nodes_dyn:
            nx.draw_networkx_nodes(G, pos,
                                   nodelist=core_nodes_dyn,
                                   node_size=core_sizes,
                                   node_color=[core_species_colors.get(n, "lightcoral") for n in core_nodes_dyn],
                                   node_shape='s',
                                   edgecolors='black',
                                   linewidths=2,
                                   ax=ax)
        # Draw non-core nodes (circles)）
        if non_core_nodes_dyn:
            non_core_colors_dyn = []
            for node in non_core_nodes_dyn:
                if node.startswith("X"):
                    non_core_colors_dyn.append("lightgreen")
                else:
                    non_core_colors_dyn.append("lightcoral")
            nx.draw_networkx_nodes(G, pos,
                                   nodelist=non_core_nodes_dyn,
                                   node_size=non_core_sizes,
                                   node_color=non_core_colors_dyn,
                                   node_shape='o',
                                   ax=ax)
        nx.draw_networkx_labels(G, pos, ax=ax)
        # Dynamically update edges based on the energy flow and net flow of each reaction
        for j in range(N_RY):
            reac_name = f"Reaction {j+1}"
            energy_val = Energy_Flow[i, j]
            netflux_val = NetFlux_values[i, j]
            edge_width = max(0.5, abs(energy_val) * 5)
            if netflux_val > 0.1:
                arrow_color = "red"
            elif netflux_val < -0.1:
                arrow_color = "blue"
            else:
                arrow_color = "gray"
            # For MultiDiGraph, extract (u, v, k) tuples: note keys=True
            edge_list = [(u, v, k) for u, v, k, data in G.edges(keys=True, data=True) if data.get("reaction", "") == reac_name]
            if edge_list:
                for (u, v, k) in edge_list:
                    nx.draw_networkx_edges(G, pos, edgelist=[(u, v)], ax=ax,
                                           width=edge_width, arrowstyle='-|>',
                                           arrowsize=15, edge_color=arrow_color)
        ax.set_title(f"Dynamic Reaction Network at t = {t_current:.2f}")
        ax.axis('off')

    ani = animation.FuncAnimation(fig, update_frame, frames=frame_indices, interval=200, repeat=True)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    dyn_viz_path = os.path.join(output_dir, f"Dynamic_Network_Realization_{realization_index}.mp4")
    ani.save(dyn_viz_path, writer='ffmpeg', dpi=200)
    plt.close()
    print(f"Dynamic network visualization saved to {dyn_viz_path}")
    return ani

# Save data

In [16]:
class DataManager:
    # directory
    def __init__(self, base_dir: str):
        self.base = Path(base_dir)
        self.base.mkdir(parents=True, exist_ok=True)

        self._index = []

    def save_run(self, run_idx: int,
                 S_minus: np.ndarray,
                 S_plus: np.ndarray,
                 flux: np.ndarray,
                 affinity: np.ndarray,
                 energy_flow: np.ndarray,
                 entropy_production_rate: np.ndarray,
                 chemical_work: np.ndarray,
                 sol_t: np.ndarray,
                 sol_y: np.ndarray,
                 mgf: float,
                 vn_alpha: float,
                 vn_beta: float,
                 H: float):
        """Create a new subdirectory for each successful simulation"""
        sub = self.base / f"run_{run_idx:04d}"
        sub.mkdir(exist_ok=True)

        # S matrix
        np.save(sub / "S_minus.npy", S_minus)
        np.save(sub / "S_plus.npy",  S_plus)

        # ODE solution
        np.save(sub / "t.npy", sol_t)
        np.save(sub / "Y.npy", sol_y)
        np.save(sub / "flux.npy", flux)
        np.save(sub / "affinity.npy", affinity)
        np.save(sub / "entropy_production_rate", entropy_production_rate)
        np.save(sub / "energy_flow", energy_flow)
        np.save(sub / "chemical_work", chemical_work)

        # Network parameter
        with open(sub / "stats.json", "w") as f:
            json.dump({
                "mgf": mgf,
                "vn_alpha": vn_alpha,
                "vn_beta": vn_beta,
                "shannon_entropy": H
            }, f, indent=2)

        # renew index
        self._index.append({
            "run": run_idx,
            "path": str(sub),
            "mgf": mgf,
            "vn_alpha": vn_alpha,
            "vn_beta": vn_beta,
            "shannon_entropy": H
        })

    def finalize(self):
        """generate a master index table to make it easier for pandas to read"""
        df = pd.DataFrame(self._index)
        df.to_csv(self.base / "index_summary.csv", index=False)


# Simulation

In [17]:
def main(num_networks=5, output_dir=r"D:\Simulation_Data"):
    os.makedirs(output_dir, exist_ok=True)
    dm = DataManager(output_dir)
    mgf_list, vn_alpha_list, vn_beta_list, entropy_list = [], [], [], []
    run_idx = 0

    while run_idx < num_networks:
        Stot, N_X, N_Y, N_R, S_plus, S_minus = Generate_Random_Network(
            N_X_raw=random.randint(3, 6),
            N_Y_raw=random.randint(3, 6),
            N_R_raw=random.randint(5, 10),
            diluted=False, ambiguity=False, autonomy=True
        )
        S_plus_trim = S_plus[N_X:, :]
        S_minus_trim = S_minus[N_X:, :]
        _, _, auto = aux.checkAutonomy(S_minus_trim, S_plus_trim)
        if not auto:
            continue

        ini_conc, kf, kr, kd, mu_X_actual, mu_0_vector, DeltaG0 = Construct_Kinetics(
            N_X, N_Y, N_R, S_plus, S_minus, degradation=False
        )
        X0 = ini_conc[:N_X]
        Y0 = ini_conc[N_X:]

        try:
            _, db_alpha, *_ = algo1.growthRateGraph(
                S_plus_trim, S_minus_trim,
                max_steps=1000, time_limit_iteration=100
            )
        except Exception:
            continue
        if np.isnan(db_alpha):
            continue
        mgf = db_alpha

        vn_alpha, vn_beta = compute_von_neumann_alpha_beta(
            S_plus_trim, S_minus_trim
        )
        if np.isnan(vn_alpha) or np.isnan(vn_beta):
            continue

        # Dynamic_Simulation
        sol_t, sol_y, flux_ts, affinity_ts, energy_flow_ts, entropy_prod_ts, chemical_work_ts = \
            Dynamic_Simulation(
                S_minus, S_plus, Stot,
                X0, Y0, N_X, N_Y, N_R,
                kf, kr, kd,
                mu_X_actual,
                law="MA"
            )

        # Compute Shannon entropy
        H = compute_shannon_entropy(sol_y[-1])

        dm.save_run(
            run_idx,
            S_minus, S_plus,
            sol_t, sol_y,
            flux_ts, affinity_ts,
            energy_flow_ts, entropy_prod_ts,
            chemical_work_ts,
            mgf, vn_alpha, vn_beta, H
        )
        mgf_list.append(mgf)
        vn_alpha_list.append(vn_alpha)
        vn_beta_list.append(vn_beta)
        entropy_list.append(H)
        run_idx += 1

    dm.finalize()

    # Plot
    def scatter_and_save(x, y, xlabel, ylabel, filename):
        plt.figure()
        plt.scatter(x, y)
        plt.xlabel(xlabel)
        plt.ylabel(ylabel)
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, filename))
        plt.close()

    arrs = {
        'vn_alpha': np.array(vn_alpha_list),
        'vn_beta':  np.array(vn_beta_list),
        'mgf':      np.array(mgf_list),
        'entropy':  np.array(entropy_list)
    }
    scatter_and_save(arrs['vn_alpha'], arrs['mgf'], "von Neumann α", "MGF", "vn_alpha_vs_mgf.png")
    scatter_and_save(arrs['vn_alpha'], arrs['vn_beta'], "von Neumann α", "von Neumann β", "vn_alpha_vs_vn_beta.png")
    scatter_and_save(arrs['vn_beta'], arrs['entropy'], "von Neumann β", "Shannon Entropy", "vn_beta_vs_entropy.png")
    scatter_and_save(arrs['vn_alpha'], arrs['entropy'], "von Neumann α", "Shannon Entropy", "vn_alpha_vs_entropy.png")
    scatter_and_save(arrs['mgf'], arrs['entropy'], "MGF", "Shannon Entropy", "mgf_vs_entropy.png")

    print(f"Finished simulation of {num_networks} networks. Results in {output_dir}.")

if __name__ == "__main__":
    main()

Matrix Stot:
    1  2  3  4  5
1   0  1  0  0  0
2   0  1 -1  1  1
3   0  0  1  0  0
4  -1  0  0  0 -1
5   1  0  0 -1  0
6  -2 -2  1  0 -1
7   0 -1  0  0  1
8   1  1 -1  0  0
9   1  0  0  0  0
10  0  0 -1 -1  0
11  0  0  0  2  1

S_plus:
    1  2  3  4  5
1   0  1  0  0  0
2   0  1  0  1  1
3   0  0  1  0  0
4   0  0  0  0  0
5   1  0  0  0  0
6   0  0  1  0  0
7   0  0  0  0  1
8   1  1  0  0  0
9   1  0  0  0  0
10  0  0  0  0  0
11  0  0  0  2  1

S_minus:
    1  2  3  4  5
1   0  0  0  0  0
2   0  0  1  0  0
3   0  0  0  0  0
4   1  0  0  0  1
5   0  0  0  1  0
6   2  2  0  0  1
7   0  1  0  0  0
8   0  0  1  0  0
9   0  0  0  0  0
10  0  0  1  1  0
11  0  0  0  0  0
NX = 5
NY = 6
NR = 5
Matrix Stot:
   1  2  3  4  5  6  7  8
1  0  0  0  0  0 -1  0  0
2  0  0  0  0  1 -1  1  0
3  0  0 -1 -1  0  0  0  0
4 -1  0  0  0  1  0  0  0
5  0  0  0 -1 -1  0  0  0
6  1  0  1  0 -1  1  0  1
7  1  1 -1  1  0  0  1 -1
8 -1 -1  1 -1  1 -1 -1 -1

S_plus:
   1  2  3  4  5  6  7  8
1  0  0  0  0  0 