In [142]:
import sympy

# System is a vector of differential equations

system = [
    "f'=w",
    "g'=-p*q*v",
    "w'=-w+u+v",
    "u'=-u+r*v",
    "v'=-v",
    "r'=-r**2",
    "p'=p*v",
    "q'=v-q"
]

# Init is a vector representing the system's initial condition

init = [0, 0, 0, 0, 0, 1, 1, 1]
sympy.init_printing()

In [143]:
# Utilities for handling the initial conditions

def split_eq(equation: str) -> list[str]:
    return equation.split("=")


def extract_dv(eq: str) -> str:
    lhs = split_eq(equation=eq)[0]
    if len(lhs) == 1: 
        return lhs
    if lhs.endswith("_prime"):
        parts = lhs.split("_prime")
        dv = ""
        for part in parts[:-1]:
            dv += part
        return dv
    if lhs.endswith("'"):
        return lhs.split("'")[0]

def extract_ivars(eq: str):
    rhs = split_eq(equation=eq)[1]
    expr = sympy.parse_expr(rhs)
    return expr.free_symbols

def rewrite_eq_with_init(eq: str, init: float) -> str: 
    parts = split_eq(equation=eq)
    lhs = parts[0]
    rhs = parts[1]
    dv = extract_dv(eq=eq)
    replace_with = dv
    if init > 0:
        replace_with += f"+{init}"
    elif init < 0:
        replace_with += f"-{init}"
    replace_with = f"({replace_with})"
    rhs = rhs.replace(dv, replace_with)
    result = lhs + "=" + rhs
    return result 

In [144]:
# Expression parsing utilities

import sympy

def parse_rhs(expr: str):
    return sympy.parse_expr(expr)

In [149]:
%matplotlib inline

import sys
import networkx as nx
import matplotlib.pyplot as plt
from sympy import Symbol

from networkx import DiGraph

class CrnEquation:
    """
    Expects a single equation and its numeric initial condition
    """

    def __init__(self, eq: str, init: float):
        self.original = eq
        if init == 0:
            self.adjusted = eq
        else:
            self.adjusted = rewrite_eq_with_init(eq=eq, init=init)
        self.dv = extract_dv(eq=eq)
        self.parts = self.adjusted.split("=")
        self.expanded = sympy.expand(self.parts[1])
        self.is_crn_computable = self.crn_computable()
        self.ivars = extract_ivars(eq=eq)
        self.has_been_split = False
        self.split_equations = {"first": None, "second": None}
        self.pos = self.rewrite_pos()
        self.neg = self.rewrite_neg()
        self.first_eq = None
        self.second_eq = None
        self.has_been_split = False

    def crn_computable(self) -> bool:
        rhs = str(self.expanded)
        rhs = sympy.parse_expr(rhs)
        pos, neg = self.split_rhs(rhs)
        self.pos_terms = pos
        self.neg_terms = neg

        # CONDITION ONE 
        # Determine whether a negative constant exists. If one does, the expression is not CRN computable
        for el in neg:
            match type(el):
                case sympy.core.numbers.Integer:
                    cast = int(el)
                    if cast < 0:
                        return False
                case sympy.core.numbers.Float:
                    cast = float(el)
                    if cast < 0:
                        return False    
                # CONDITION TWO
                # Determine whether ALL expressions with negative terms on the RHS contain the dependent variable. If not, the expression is
                # not CRN computable 
                case _:
                    expr = sympy.parse_expr(str(el))
                    if sympy.Symbol(self.dv) in expr.free_symbols:
                        continue
                    else:
                        return False
        return True 

    def split_rhs(self, rhs):
        ispos = lambda x: x.as_coeff_Mul()[0].is_positive
        pos, neg = sympy.sift(sympy.Add.make_args(rhs), ispos, binary=True)
        return pos, neg 

    def transform_item(self, item: str):
        return f"({item}_1-{item}_2)"

    def rewrite_pos(self):
        pos_expr = ""
        for term in self.pos_terms:
            pos_expr += str(term) + "+" 
        pos_expr = pos_expr[:-1]
        if pos_expr:
            return sympy.parse_expr(pos_expr)
        return pos_expr

    def rewrite_neg(self):
        neg_expr = ""
        for term in self.neg_terms:
            neg_expr += str(term)
        neg_expr = neg_expr[1:]
        if neg_expr:
            return sympy.parse_expr(neg_expr)
        return neg_expr 

    def split_extended(self):
        self.rewritten_pos = str(self.pos)
        self.rewritten_neg = str(self.neg)
        self.rewritten_pos = self.rewritten_pos.replace(self.dv, self.transform_item(self.dv))
        self.rewritten_neg = self.rewritten_neg.replace(self.dv, self.transform_item(self.dv))
        p_term = self.gen_p_term()
        first_eq = f"{self.dv}_1'=({self.rewritten_pos if self.rewritten_pos else 1})*({p_term})"
        second_eq = f"{self.dv}_2'=({self.rewritten_neg if self.rewritten_neg else 1})*({p_term})"
        self.first_eq = first_eq
        self.second_eq = second_eq
        self.has_been_split = True
        return first_eq, second_eq
          
        
    def gen_p_term(self):        
        sym = self.dv
        sym_1 = Symbol(f"{sym}_1")
        sym_2 = Symbol(f"{sym}_2")
        p_plus = Symbol("p_1")
        p_minus = Symbol("p_2")
        poly = (p_plus+p_minus)*sym_1*sym_2
        return poly

    def __str__(self):
        return f"""
            CrnEquation [ 
                original: {self.original},
                dv: {self.dv}
                ivars: {self.ivars},
                adjusted: {self.adjusted},
                expanded: {self.expanded}
                parts: {self.parts},
                crn_computable: {self.is_crn_computable}
                has_been_split: {self.has_been_split or False}
                first_eq: {self.first_eq or None}
                second_eq: {self.second_eq or None}
            ]"""

    def __repr__(self):
        return str(self) 

class Crn:
    """
    Expects a vector of differential equations and an equal-sized vector of initial conditions
    """
    def __init__(self, system: list[str], init: list[float]):
        self.original_system = system
        self.system: list[CrnEquation] = []
        self.init = init
        self.rewrite_queue: list[CrnEquation] = []
        if len(self.original_system) != len(self.init):
            sys.exit(1)
        self.handle_initial_values()
        for eq in self.system:
            if not eq.is_crn_computable:
                self.rewrite_queue.append(eq)
        self.graph = self.build_graph()
        
    def build_graph(self) -> DiGraph:
        graph = DiGraph()
        for eq in self.system:
            # Apply observations here
            for ivar in eq.ivars:
                graph.add_edge(eq.dv, ivar)
        return graph

    def handle_initial_values(self): 
        for eq, init in zip(self.original_system, self.init):
            crn_equation = CrnEquation(eq=eq, init=init)
            self.system.append(crn_equation)

    def show_graph(self):
        edge_colours = ["black"]
        edges = self.graph.edges
        pos = nx.spring_layout(self.graph, k=1, iterations=20)
        nx.draw_networkx_nodes(self.graph, pos, node_size = 500)
        nx.draw_networkx_labels(self.graph, pos)
        nx.draw_networkx_edges(self.graph, pos, edgelist=edges, edge_color='k', arrows=True, arrowsize=10)
        plt.show()

    def normalize(self):
        while len(self.rewrite_queue) > 0:
            for eq in self.rewrite_queue:
                ivars = eq.ivars
            if eq.has_been_split:
                self.rewrite_queue.remove(eq)
                continue
            eq.split_extended()
            for eq in self.system:
                if eq.dv in ivars:
                    self.rewrite_queue.append(eq)



            

    def __str__(self):
        return f"""
Crn [
    system: {self.system}
    rewrite_queue: {self.rewrite_queue},
]
"""

In [150]:
crn = Crn(system=system, init=init)
eq = crn.system[3]
x, y = eq.split_extended()
print(x)
print(y)

u_1'=(r*v)*(u_1*u_2*(p_1 + p_2))
u_2'=((u_1-u_2))*(u_1*u_2*(p_1 + p_2))


In [151]:
crn.normalize()
print(crn.rewrite_queue)

[]


Look at Surface CRN

Create web version of this package eventually 



In [152]:
print(crn)


Crn [
    system: [
            CrnEquation [ 
                original: f'=w,
                dv: f
                ivars: {w},
                adjusted: f'=w,
                expanded: w
                parts: ["f'", 'w'],
                crn_computable: True
                has_been_split: False
                first_eq: None
                second_eq: None
            ], 
            CrnEquation [ 
                original: g'=-p*q*v,
                dv: g
                ivars: {p, v, q},
                adjusted: g'=-p*q*v,
                expanded: -p*q*v
                parts: ["g'", '-p*q*v'],
                crn_computable: False
                has_been_split: True
                first_eq: g_1'=(1)*(g_1*g_2*(p_1 + p_2))
                second_eq: g_2'=(p*q*v)*(g_1*g_2*(p_1 + p_2))
            ], 
            CrnEquation [ 
                original: w'=-w+u+v,
                dv: w
                ivars: {u, v, w},
                adjusted: w'=-w+u+v,
                expand