In [20]:
%%writefile scheme.py
r"""
Numerical scheme (Section 5.1): CN diffusion + explicit reaction/taxis, Neumann (ghost reflection).

Model variables:
  - Diffusing: S (Sensitive), R (Resistant), I (Drug), C (Chemoattractant/Signal)
  - ODE/Immobile: P (Quiescent Stroma), A (Activated Stroma)

- Neumann BC is enforced for diffusion via a ghost-reflection Neumann Laplacian.
- For taxis terms, Neumann-style ghost reflection is enforced via reflect-padding
  when computing gradients/divergences.

This file is intended as a reproducible reference implementation of the scheme
described in \Cref{subsec:numerical_scheme}.
"""

from __future__ import annotations

from typing import Callable

import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as spla


# -----------------------------
# 1) Baseline parameter set (Table 1)
# -----------------------------
P_BASE = {
    # Diffusion (baseline hybrid regime)
    "d_S": 1e-3,
    "d_R": 1e-3,
    "d_I": 5.0,        # fast-diffusing exogenous drug (single-dose)

    # Chemoattractant diffusion
    "d_c": 5e-3,

    # Kinetics
    "lambda_S": 0.5,
    "lambda_R": 0.5,
    "K": 2.0,
    "alpha": 0.1,
    "xi": 0.1,

    # Drug-dependent terms
    "gamma_I": 1.0,
    "delta_0": 0.5,
    "K_I": 0.5,

    # Feedback strength (drug-dependent amplification of R)
    "eta": 0.2,

    # Stromal conversion
    "theta": 1.0,
    "beta": 0.5,

    # Chemoattractant dynamics (used if c_mode == "closed_loop")
    "kappa_c": 0.8,   # production rate (closed-loop: S -> C)
    "rho_c": 1.6,     # decay rate (always active)

    # Taxis sensitivities (set to 0 for base; vary in experiments)
    "chi_S": 0.0,
    "chi_R": 0.0,

    # Regime switch for the signal equation:
    #   - "open_loop":   ∂t C = d_c ΔC - rho_c C   (no dependence on S,R)
    #   - "closed_loop": ∂t C = d_c ΔC + kappa_c S - rho_c C
    "c_mode": "open_loop",
}


# -----------------------------
# 2) Neumann operators (ghost reflection)
# -----------------------------
def neumann_laplacian_1d(N: int, h: float) -> sp.csr_matrix:
    """
    1D second-order Laplacian with homogeneous Neumann BC via ghost reflection:
      u_{-1} = u_{1}, u_{N} = u_{N-2}.
    """
    e = np.ones(N)
    L = sp.diags([e, -2.0 * e, e], offsets=[-1, 0, 1], shape=(N, N), format="lil")

    if N >= 2:
        L[0, 1] = 2.0
        L[N - 1, N - 2] = 2.0

    return (L.tocsr()) / (h * h)


def neumann_laplacian_2d(Nx: int, Ny: int, h: float) -> sp.csr_matrix:
    """
    2D Neumann Laplacian on [0,1]^2 via Kronecker sum.
    """
    Lx = neumann_laplacian_1d(Nx, h)
    Ly = neumann_laplacian_1d(Ny, h)
    Ix = sp.eye(Nx, format="csr")
    Iy = sp.eye(Ny, format="csr")
    return sp.kron(Iy, Lx, format="csr") + sp.kron(Ly, Ix, format="csr")


def grad_neumann(F: np.ndarray, h: float) -> tuple[np.ndarray, np.ndarray]:
    """
    Neumann-style gradient via reflect padding. Returns (Fx, Fy).
    """
    Fp = np.pad(F, pad_width=1, mode="reflect")
    Fx = (Fp[1:-1, 2:] - Fp[1:-1, :-2]) / (2.0 * h)
    Fy = (Fp[2:, 1:-1] - Fp[:-2, 1:-1]) / (2.0 * h)
    return Fx, Fy


def div_neumann(Jx: np.ndarray, Jy: np.ndarray, h: float) -> np.ndarray:
    """
    Neumann-style divergence via reflect padding. Returns div(J).
    """
    Jxp = np.pad(Jx, pad_width=1, mode="reflect")
    Jyp = np.pad(Jy, pad_width=1, mode="reflect")
    dJx = (Jxp[1:-1, 2:] - Jxp[1:-1, :-2]) / (2.0 * h)
    dJy = (Jyp[2:, 1:-1] - Jyp[:-2, 1:-1]) / (2.0 * h)
    return dJx + dJy


# -----------------------------
# 3) Nonlinearities and reactions
# -----------------------------
def phi(I: np.ndarray) -> np.ndarray:
    """Switch function phi(I) = tanh(5I)."""
    return np.tanh(5.0 * I)


def delta(I: np.ndarray, p: dict) -> np.ndarray:
    """Inhibition function delta(I)."""
    return p["delta_0"] * I / (I + p["K_I"] + 1e-12)


def compute_reactions(
    S: np.ndarray,
    R: np.ndarray,
    I: np.ndarray,
    C: np.ndarray,
    P: np.ndarray,
    A: np.ndarray,
    p: dict,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Pointwise reaction terms (explicit):
      G_S, G_R, G_I, G_C, G_P, G_A.
    """
    phi_I = phi(I)
    delta_I = delta(I, p)
    sat = 1.0 - (S + R) / p["K"]

    # Tumor kinetics
    G_S = (
        p["lambda_S"] * S * sat
        - p["alpha"] * S
        - delta_I * S
        + p["xi"] * (1.0 - phi_I) * R
    )

    G_R = (
        p["lambda_R"] * R * sat
        + p["alpha"] * S
        + p["eta"] * phi_I * A * R
        - p["xi"] * (1.0 - phi_I) * R
    )

    # Drug decay (single-dose, open-loop)
    G_I = -p["gamma_I"] * I

    # Signal dynamics
    c_mode = p.get("c_mode", "closed_loop")
    if c_mode == "open_loop":
        # Open-loop relaxing cue: ∂t C = d_c ΔC - rho_c C
        G_C = -p["rho_c"] * C
    elif c_mode == "closed_loop":
        # Bidirectional feedback: ∂t C = d_c ΔC + kappa_c S - rho_c C
        G_C = p["kappa_c"] * S - p["rho_c"] * C
    else:
        raise ValueError(f"Unknown c_mode: {c_mode}. Use 'open_loop' or 'closed_loop'.")

    # Stromal switching (ODE)
    G_P = -p["theta"] * phi_I * P + p["beta"] * (1.0 - phi_I) * A
    G_A =  p["theta"] * phi_I * P - p["beta"] * (1.0 - phi_I) * A

    return G_S, G_R, G_I, G_C, G_P, G_A


# -----------------------------
# 4) Taxis term (explicit)
# -----------------------------
def taxis_term(
    W: np.ndarray,
    signal: np.ndarray,
    chi: float,
    h: float,
    mode: str = "linear",
    alpha_sat: float = 0.0,
) -> np.ndarray:
    """
    Returns explicit taxis flux contribution:
        -div( chi * W * grad(signal) )  [linear]
        -div( chi * W * grad(signal)/(1+alpha_sat*|grad(signal)|) ) [flux_sat]
    """
    if chi == 0.0:
        return np.zeros_like(W)

    sx, sy = grad_neumann(signal, h)

    if mode == "linear":
        Jx = chi * W * sx
        Jy = chi * W * sy

    elif mode == "flux_sat":
        if alpha_sat <= 0.0:
            raise ValueError("alpha_sat must be positive for flux_sat.")
        gnorm = np.sqrt(sx * sx + sy * sy)
        scale = 1.0 / (1.0 + alpha_sat * gnorm)
        Jx = chi * W * sx * scale
        Jy = chi * W * sy * scale

    else:
        raise ValueError(f"Unknown taxis mode: {mode}")

    return -div_neumann(Jx, Jy, h)


# -----------------------------
# 5) Crank–Nicolson diffusion step
# -----------------------------
def cn_factorized(L: sp.csr_matrix, d: float, dt: float) -> tuple[Callable, sp.csr_matrix]:
    """
    Pre-factorize (I - dt/2 d L) for repeated solves.
    Returns (solve_lhs, A_rhs) where:
      solve_lhs(b) solves (I - dt/2 d L) x = b.
      A_rhs = (I + dt/2 d L).
    """
    N = L.shape[0]
    I = sp.eye(N, format="csr")
    A_lhs = (I - 0.5 * dt * d * L).tocsc()
    A_rhs = (I + 0.5 * dt * d * L).tocsr()
    solve_lhs = spla.factorized(A_lhs)
    return solve_lhs, A_rhs


# -----------------------------
# 6) Initialization
# -----------------------------
def homogeneous_equilibrium(p: dict) -> tuple[float, float]:
    """Baseline equilibrium for S, R (I*=0 long-time reduction)."""
    S_star = (p["xi"] * p["K"]) / (p["alpha"] + p["xi"])
    R_star = (p["alpha"] * p["K"]) / (p["alpha"] + p["xi"])
    return S_star, R_star


def initialize_fields(
    Nx: int,
    Ny: int,
    p: dict,
    eps: float = 1e-3,
    seed: int = 42,
    P_T: float = 1.0,
) -> dict[str, np.ndarray]:
    """
    Initialize S, R, I, C, P, A around the homogeneous equilibrium branch.
    For C:
      - open_loop:  C* = 0
      - closed_loop: C* = kappa_c S* / rho_c
    """
    rng = np.random.default_rng(seed)
    S_star, R_star = homogeneous_equilibrium(p)

    S = S_star + eps * rng.uniform(-1.0, 1.0, size=(Ny, Nx))
    R = R_star + eps * rng.uniform(-1.0, 1.0, size=(Ny, Nx))
    I = 0.0    + eps * rng.uniform(-1.0, 1.0, size=(Ny, Nx))

    # Signal equilibrium depends on c_mode
    c_mode = p.get("c_mode", "closed_loop")
    if c_mode == "open_loop":
        C_star = 0.0
    elif c_mode == "closed_loop":
        C_star = p["kappa_c"] * S_star / (p["rho_c"] + 1e-12)
    else:
        raise ValueError(f"Unknown c_mode: {c_mode}. Use 'open_loop' or 'closed_loop'.")

    C = C_star + eps * rng.uniform(-1.0, 1.0, size=(Ny, Nx))

    # Immobile stroma pool
    P = P_T * np.ones((Ny, Nx))
    A = np.zeros((Ny, Nx))

    # Enforce nonnegativity
    S = np.maximum(S, 0.0)
    R = np.maximum(R, 0.0)
    I = np.maximum(I, 0.0)
    C = np.maximum(C, 0.0)

    return {"S": S, "R": R, "I": I, "C": C, "P": P, "A": A}


# -----------------------------
# 7) One-step update
# -----------------------------
def step_system(
    state: dict[str, np.ndarray],
    p: dict,
    h: float,
    dt: float,
    Nx: int,
    Ny: int,
    L2: sp.csr_matrix,
    solvers: dict[str, tuple[Callable, sp.csr_matrix]],
    taxis: dict | None = None,
) -> dict[str, np.ndarray]:
    """
    Advances system state by dt.
    Variables: S, R, I, C (CN diffusion + explicit reaction/taxis),
               P, A (explicit Euler ODE).

    taxis config example:
      taxis = {
        "S": {"signal": "C", "chi": float, "mode": "linear"/"flux_sat", "alpha_sat": float},
        "R": {"signal": "C", "chi": float, ...},
      }
    """
    S, R, I, C, P, A = (
        state["S"], state["R"], state["I"], state["C"], state["P"], state["A"]
    )

    # 1) Reactions
    G_S, G_R, G_I, G_C, G_P, G_A = compute_reactions(S, R, I, C, P, A, p)

    # 2) Taxis (explicit)
    T_S = np.zeros_like(S)
    T_R = np.zeros_like(R)

    if taxis is not None:
        for species in ("S", "R"):
            if species not in taxis:
                continue
            cfg = taxis[species]
            sig_ref = cfg["signal"]
            signal_field = state[sig_ref] if isinstance(sig_ref, str) else sig_ref

            flux_term = taxis_term(
                state[species],
                signal_field,
                cfg["chi"],
                h,
                cfg.get("mode", "linear"),
                cfg.get("alpha_sat", 0.0),
            )
            if species == "S":
                T_S = flux_term
            else:
                T_R = flux_term

    # 3) CN diffusion + explicit forcing
    def cn_update(var: np.ndarray, solve_lhs: Callable, rhs_mat: sp.csr_matrix, F: np.ndarray) -> np.ndarray:
        rhs = rhs_mat.dot(var.reshape(-1)) + dt * F.reshape(-1)
        return solve_lhs(rhs).reshape((Ny, Nx))

    S_new = cn_update(S, solvers["S"][0], solvers["S"][1], G_S + T_S)
    R_new = cn_update(R, solvers["R"][0], solvers["R"][1], G_R + T_R)
    I_new = cn_update(I, solvers["I"][0], solvers["I"][1], G_I)
    C_new = cn_update(C, solvers["C"][0], solvers["C"][1], G_C)

    # 4) ODE variables (explicit Euler)
    P_new = P + dt * G_P
    A_new = A + dt * G_A

    # 5) Nonnegativity safeguard
    S_new = np.maximum(S_new, 0.0)
    R_new = np.maximum(R_new, 0.0)
    I_new = np.maximum(I_new, 0.0)
    C_new = np.maximum(C_new, 0.0)
    P_new = np.maximum(P_new, 0.0)
    A_new = np.maximum(A_new, 0.0)

    return {"S": S_new, "R": R_new, "I": I_new, "C": C_new, "P": P_new, "A": A_new}


# -----------------------------
# 8) Main / Test
# -----------------------------
if __name__ == "__main__":
    # Domain
    L_domain = 1.0
    Nx = Ny = 51
    h = L_domain / (Nx - 1)
    dt = 1e-2

    # Operator
    L2 = neumann_laplacian_2d(Nx, Ny, h)

    # Solvers (Including C)
    solvers = {
        "S": cn_factorized(L2, P_BASE["d_S"], dt),
        "R": cn_factorized(L2, P_BASE["d_R"], dt),
        "I": cn_factorized(L2, P_BASE["d_I"], dt),
        "C": cn_factorized(L2, P_BASE["d_c"], dt),
    }

    # Example: open-loop unidirectional cue
    p = dict(P_BASE)
    p["c_mode"] = "open_loop"
    p["chi_S"] = 1.0
    p["chi_R"] = 0.0

    state = initialize_fields(Nx, Ny, p, eps=1e-3)

    taxis_config = {
        "S": {"signal": "C", "chi": p["chi_S"], "mode": "linear"},
    }

    state = step_system(state, p, h, dt, Nx, Ny, L2, solvers, taxis=taxis_config)

    print("Step completed.")
    print(f"Max S: {state['S'].max():.5f}")
    print(f"Max C: {state['C'].max():.5f}")

Overwriting scheme.py


In [22]:
%%writefile experiments.py
r"""
experiments.py
Reproducible numerical experiments for Section 5 (Regimes I, I', II, III).

Regime definitions (aligned with the manuscript):
  - Regime I   (Base): no chemotaxis (chi_S = chi_R = 0). Relaxation to homogeneity.
  - Regime I'  (Unidirectional): open-loop signal C (c_mode="open_loop") + chemotaxis of S/R up ∇C.
                                Signal relaxes independently of (S,R).
  - Regime II  (Bidirectional, well-posed): closed-loop signal C (c_mode="closed_loop": S -> C)
                                + flux-saturated taxis (mode="flux_sat").
                                Produces bounded finite-band patterns.
  - Regime III (Bidirectional, aggregation/ill-posed tendency): closed-loop signal C (S -> C)
                                + linear taxis (mode="linear").
                                Leads to rapid amplification / numerical breakdown.

Assumes: scheme.py is present in the same directory.
"""

from __future__ import annotations

import os
from typing import Callable

import numpy as np
import matplotlib.pyplot as plt
import scipy.sparse as sp

from scheme import (
    P_BASE,
    neumann_laplacian_2d,
    cn_factorized,
    initialize_fields,
    step_system,
    homogeneous_equilibrium,
)

# -----------------------------
# 0) I/O & metrics
# -----------------------------
FIGDIR = "figures"
os.makedirs(FIGDIR, exist_ok=True)


def savefig(filename: str, dpi: int = 300) -> None:
    path = os.path.join(FIGDIR, filename)
    plt.tight_layout()
    plt.savefig(path, dpi=dpi)
    print(f"Saved: {path}")
    plt.close()


def l2_deviation(field: np.ndarray, target: float | np.ndarray, h: float) -> float:
    """Discrete L2 norm of (field - target) over [0,1]^2."""
    diff = field - target
    return float(np.sqrt(np.sum(diff * diff) * h * h))


# -----------------------------
# 1) Shared discretization
# -----------------------------
L_DOMAIN = 1.0
Nx = Ny = 51
h = L_DOMAIN / (Nx - 1)
dt = 1e-2  # as in the manuscript


def build_solvers(L2: sp.csr_matrix, p: dict, dt: float) -> dict[str, tuple[Callable, sp.csr_matrix]]:
    """Build CN solver packs for all diffusing variables used by step_system."""
    return {
        "S": cn_factorized(L2, p["d_S"], dt),
        "R": cn_factorized(L2, p["d_R"], dt),
        "I": cn_factorized(L2, p["d_I"], dt),
        "C": cn_factorized(L2, p["d_c"], dt),
    }


def run_system(
    p: dict,
    T: float,
    seed: int,
    eps: float,
    taxis_cfg: dict | None,
) -> dict:
    """
    Integrate with the IMEX scheme in scheme.py.

    Returns a dict with time series and final state:
      - times, E_S, maxS, maxC, final_state
    """
    L2 = neumann_laplacian_2d(Nx, Ny, h)
    solvers = build_solvers(L2, p, dt)

    state = initialize_fields(Nx, Ny, p, eps=eps, seed=seed)
    S_star, _ = homogeneous_equilibrium(p)

    n_steps = int(round(T / dt))
    times = np.empty(n_steps + 1, dtype=float)
    E_S = np.empty(n_steps + 1, dtype=float)
    maxS = np.empty(n_steps + 1, dtype=float)
    maxC = np.empty(n_steps + 1, dtype=float)

    for n in range(n_steps + 1):
        t = n * dt
        times[n] = t
        E_S[n] = l2_deviation(state["S"], S_star, h)
        maxS[n] = float(np.max(state["S"]))
        maxC[n] = float(np.max(state["C"]))

        if n == n_steps:
            break

        state = step_system(state, p, h, dt, Nx, Ny, L2, solvers, taxis=taxis_cfg)

        # Early stop if numerics break down (Regime III)
        if not np.isfinite(state["S"]).all() or not np.isfinite(state["C"]).all():
            times = times[: n + 1]
            E_S = E_S[: n + 1]
            maxS = maxS[: n + 1]
            maxC = maxC[: n + 1]
            break

    return {
        "times": times,
        "E_S": E_S,
        "maxS": maxS,
        "maxC": maxC,
        "state": state,
    }


# -----------------------------
# 2) Regime I: Base (no chemotaxis)
# -----------------------------
def run_regime_I_base(T: float = 5.0) -> None:
    print(f"\n--- Regime I (Base, no chemotaxis), T={T} ---")

    p = dict(P_BASE)
    p["c_mode"] = "open_loop"  # signal present but decoupled; taxis off anyway
    p["chi_S"] = 0.0
    p["chi_R"] = 0.0

    out = run_system(p, T=T, seed=42, eps=1e-3, taxis_cfg=None)

    # Plot E_S decay
    plt.figure(figsize=(6, 4))
    plt.semilogy(out["times"], out["E_S"], lw=2)
    plt.xlabel("Time")
    plt.ylabel(r"$E_S(t)=\|S(\cdot,t)-S^*\|_{L^2}$")
    plt.title("Regime I: relaxation to homogeneity")
    plt.grid(True, which="both", alpha=0.3)
    savefig("Regime_I_L2_Decay.png")


# -----------------------------
# 3) Regime I': Unidirectional (open-loop) taxis
# -----------------------------
def run_regime_I_prime_oneway(T: float = 5.0) -> None:
    print(f"\n--- Regime I' (Unidirectional open-loop taxis), T={T} ---")

    p = dict(P_BASE)
    p["c_mode"] = "open_loop"  # key: C evolves independently of (S,R)
    p["chi_S"] = 0.5
    p["chi_R"] = 0.5

    taxis_cfg = {
        "S": {"signal": "C", "chi": p["chi_S"], "mode": "linear"},
        "R": {"signal": "C", "chi": p["chi_R"], "mode": "linear"},
    }

    out = run_system(p, T=T, seed=101, eps=1e-3, taxis_cfg=taxis_cfg)

    plt.figure(figsize=(6, 4))
    plt.semilogy(out["times"], out["E_S"], color="red", lw=2)
    plt.xlabel("Time")
    plt.ylabel(r"$E_S(t)=\|S(\cdot,t)-S^*\|_{L^2}$")
    plt.title("Regime I': transient focusing then decay (open-loop)")
    plt.grid(True, which="both", alpha=0.3)
    savefig("Regime_I_Prime_Damping.png")


def run_overlay_base_vs_oneway(T: float = 5.0) -> None:
    """Produces the overlay plot used in the manuscript (blue solid vs red dashed)."""
    print(f"\n--- Overlay plot (Regime I vs Regime I'), T={T} ---")

    # Base
    p0 = dict(P_BASE)
    p0["c_mode"] = "open_loop"
    p0["chi_S"] = 0.0
    p0["chi_R"] = 0.0
    out_base = run_system(p0, T=T, seed=42, eps=1e-3, taxis_cfg=None)

    # Oneway
    p1 = dict(P_BASE)
    p1["c_mode"] = "open_loop"
    p1["chi_S"] = 0.5
    p1["chi_R"] = 0.5
    taxis_cfg = {
        "S": {"signal": "C", "chi": p1["chi_S"], "mode": "linear"},
        "R": {"signal": "C", "chi": p1["chi_R"], "mode": "linear"},
    }
    out_oneway = run_system(p1, T=T, seed=101, eps=1e-3, taxis_cfg=taxis_cfg)

    plt.figure(figsize=(6.4, 4.4))
    plt.semilogy(out_base["times"], out_base["E_S"], "b-", linewidth=2, label="Base model (Regime I)")
    plt.semilogy(out_oneway["times"], out_oneway["E_S"], "r--", linewidth=2,
                 label="Unidirectional (Regime I$'$)\n(open-loop signal relaxation)")
    plt.xlabel("Time $t$")
    plt.ylabel(r"$E_S(t)=\|S(\cdot,t)-S^*\|_{L^2}$")
    plt.title("Transient focusing vs. monotone relaxation")
    plt.grid(True, which="both", alpha=0.3)
    plt.legend()
    savefig("fig_norm_evolution_oneway_overlay.png")


# -----------------------------
# 4) Regime II: Bidirectional feedback + flux saturation (well-posed patterns)
# -----------------------------
def run_regime_II_patterns(T: float = 50.0, alpha_sat: float = 100.0) -> None:
    print(f"\n--- Regime II (Bidirectional feedback, flux-saturated), T={T} ---")

    p = dict(P_BASE)
    p["c_mode"] = "closed_loop"  # key: S -> C feedback
    # In the manuscript this corresponds to chi'_S (we store it in chi_S for the code)
    p["chi_S"] = 0.5
    p["chi_R"] = 0.0  # R does not taxi in this minimal feedback demo (can be enabled if desired)

    taxis_cfg = {
        "S": {"signal": "C", "chi": p["chi_S"], "mode": "flux_sat", "alpha_sat": alpha_sat},
    }

    out = run_system(p, T=T, seed=2024, eps=1e-3, taxis_cfg=taxis_cfg)

    # Trace of max(S)
    plt.figure(figsize=(6, 4))
    plt.plot(out["times"], out["maxS"], lw=2)
    plt.xlabel("Time")
    plt.ylabel("max(S)")
    plt.title("Regime II: bounded dynamics under flux saturation")
    plt.grid(True, alpha=0.3)
    savefig("Regime_II_MaxS.png")

    # Spatial snapshot + cross-section figure (S, C, and a representative slice)
    S = out["state"]["S"]
    C = out["state"]["C"]

    # choose a horizontal slice through the global maximum of S
    max_idx = np.unravel_index(np.argmax(S), S.shape)
    y_idx = int(max_idx[0])
    xs = np.linspace(0.0, 1.0, S.shape[1])

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    plt.imshow(S, origin="lower", extent=[0, 1, 0, 1])
    plt.title("S (finite-band pattern)")
    plt.axhline(y=y_idx * h, color="w", linestyle=":", alpha=0.7)

    plt.subplot(1, 3, 2)
    plt.imshow(C, origin="lower", extent=[0, 1, 0, 1])
    plt.title("c (feedback signal)")
    plt.axhline(y=y_idx * h, color="w", linestyle=":", alpha=0.7)

    plt.subplot(1, 3, 3)
    S_slice = S[y_idx, :]
    C_slice = C[y_idx, :]
    plt.plot(xs, S_slice, lw=2, label="S")
    # scale c for visualization only
    scale = (np.max(S_slice) / (np.max(C_slice) + 1e-12)) if np.max(C_slice) > 0 else 1.0
    plt.plot(xs, scale * C_slice, "--", lw=2, label="c (scaled)")
    plt.title(f"Cross-section (row {y_idx})")
    plt.legend()

    savefig("fig_regimeII_patterns_S_c.png")


# -----------------------------
# 5) Regime III: Bidirectional feedback + linear taxis (aggregation/breakdown)
# -----------------------------
def run_regime_III_collapse(T: float = 10.0) -> None:
    print(f"\n--- Regime III (Bidirectional feedback, linear taxis), T={T} ---")

    p = dict(P_BASE)
    p["c_mode"] = "closed_loop"  # key: S -> C feedback
    p["chi_S"] = 0.5
    p["chi_R"] = 0.0

    taxis_cfg = {
        "S": {"signal": "C", "chi": p["chi_S"], "mode": "linear"},
    }

    out = run_system(p, T=T, seed=999, eps=1e-3, taxis_cfg=taxis_cfg)

    plt.figure(figsize=(6, 4))
    plt.plot(out["times"], out["maxS"], lw=2, label="max(S)")
    plt.plot(out["times"], out["maxC"], lw=2, label="max(c)")
    plt.yscale("log")
    plt.xlabel("Time")
    plt.ylabel("Amplitude (log scale)")
    plt.title("Regime III: rapid amplification / numerical breakdown")
    plt.grid(True, alpha=0.3)
    plt.legend()
    savefig("regimeIII_blowup_maxS.png")


# -----------------------------
# 6) Main
# -----------------------------
if __name__ == "__main__":
    run_regime_I_base(T=5.0)
    run_regime_I_prime_oneway(T=5.0)
    run_overlay_base_vs_oneway(T=5.0)

    # Bidirectional experiments: longer horizon typically needed for patterns
    run_regime_II_patterns(T=50.0, alpha_sat=100.0)
    run_regime_III_collapse(T=10.0)

Overwriting experiments.py


In [23]:
%%writefile code_regimeI_base.py
r"""
code_regimeI_base.py

Reproducible code for:
  \subsection{Base model: relaxation to homogeneity (Regime I)}
  \label{subsec:numerical_base_regimeI}

Generates (and saves) the two figure assets:
  - Figure \label{fig:homogeneous}: S snapshots at t = 0, 2.5, 5.0
  - Figure \label{fig:norm_evolution}: L^2 deviation curves (base model = blue solid)

Implementation details aligned with the manuscript (final symbol system):
  - Regime I is the base hybrid model *without chemotaxis*.
  - Variables: S, R, I (diffuse); P, A (ODE/immobile). The auxiliary signal C is kept in the
    code infrastructure but is set to open-loop decay and does not affect S,R because chi_S=chi_R=0.
  - Scheme: Crank–Nicolson for diffusion, explicit reaction/taxis (here taxis is off).
  - BCs: Homogeneous Neumann via ghost-point reflection (as implemented in scheme.py).
  - Parameters: consistent with Table 1 via scheme.P_BASE.

Assumes: scheme.py is present in the same directory.
"""

from __future__ import annotations

import os
import numpy as np
import matplotlib.pyplot as plt

from scheme import (
    P_BASE,
    neumann_laplacian_2d,
    cn_factorized,
    initialize_fields,
    step_system,
    homogeneous_equilibrium,
)

# ----------------------------
# 0) I/O
# ----------------------------
FIGDIR = "figures"
os.makedirs(FIGDIR, exist_ok=True)

FIG_HOMOGENEOUS = os.path.join(FIGDIR, "fig_homogeneous.png")
FIG_NORM_EVOLUTION = os.path.join(FIGDIR, "fig_norm_evolution.png")


def savefig(path: str, dpi: int = 450) -> None:
    plt.tight_layout()
    plt.savefig(path, dpi=dpi)
    plt.close()
    print(f"Saved: {path}")


# ----------------------------
# 1) Parameters and regime setup (Regime I)
# ----------------------------
p = dict(P_BASE)

# Regime I = no chemotaxis
p["chi_S"] = 0.0
p["chi_R"] = 0.0

# Keep signal infrastructure but make it open-loop so it is decoupled from (S,R)
# (this is consistent with "base model without chemotaxis"; C plays no dynamical role here)
p["c_mode"] = "open_loop"

# ----------------------------
# 2) Domain / grid
# ----------------------------
L = 1.0
Nx = Ny = 51
h = L / (Nx - 1)

dt = 1e-2
T_final = 5.0
snapshot_times = (0.0, 2.5, 5.0)

seed = 42
eps = 1e-3

# ----------------------------
# 3) Operators and CN solvers
# ----------------------------
L_h = neumann_laplacian_2d(Nx, Ny, h)

solvers = {
    "S": cn_factorized(L_h, p["d_S"], dt),
    "R": cn_factorized(L_h, p["d_R"], dt),
    "I": cn_factorized(L_h, p["d_I"], dt),
    "C": cn_factorized(L_h, p["d_c"], dt),  # present for completeness
}

# ----------------------------
# 4) Homogeneous equilibrium and initialization
# ----------------------------
S_star, R_star = homogeneous_equilibrium(p)
I_star = 0.0

state = initialize_fields(Nx, Ny, p, eps=eps, seed=seed)

# ----------------------------
# 5) Diagnostics
# ----------------------------
def l2_deviation(field: np.ndarray, target: float, h: float) -> float:
    """Discrete L2 norm over the unit square."""
    diff = field - target
    return float(np.sqrt(np.sum(diff * diff) * h * h))


nsteps = int(round(T_final / dt))
snap_indices = {int(round(t / dt)): t for t in snapshot_times}
snapshots_S: dict[float, np.ndarray] = {}

times = np.empty(nsteps + 1, dtype=float)
E_S = np.empty(nsteps + 1, dtype=float)
E_R = np.empty(nsteps + 1, dtype=float)
E_I = np.empty(nsteps + 1, dtype=float)

# ----------------------------
# 6) Time stepping (Regime I: taxis off)
# ----------------------------
for n in range(nsteps + 1):
    t = n * dt
    times[n] = t

    S = state["S"]
    R = state["R"]
    I = state["I"]

    E_S[n] = l2_deviation(S, S_star, h)
    E_R[n] = l2_deviation(R, R_star, h)
    E_I[n] = l2_deviation(I, I_star, h)

    if n in snap_indices:
        snapshots_S[snap_indices[n]] = S.copy()

    if n == nsteps:
        break

    state = step_system(
        state,
        p,
        h,
        dt,
        Nx,
        Ny,
        L_h,
        solvers,
        taxis=None,  # Regime I: no chemotaxis
    )

# ----------------------------
# 7) Plotting
# ----------------------------

# Figure 1: S snapshots (homogenization)
fig, axes = plt.subplots(1, 3, figsize=(12, 3.8))
for ax, t_snap in zip(axes, snapshot_times):
    im = ax.imshow(
        snapshots_S[t_snap],
        origin="lower",
        extent=[0, 1, 0, 1],
    )
    ax.set_title(f"$S$ at $t={t_snap:.1f}$")
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.colorbar(im, ax=ax, fraction=0.046, pad=0.03)

plt.suptitle("Base model (Regime I): progressive homogenization (Neumann BC)")
savefig(FIG_HOMOGENEOUS)

# Figure 2: L2 deviation curves
plt.figure(figsize=(6.4, 4.4))
plt.semilogy(times, E_S, label="Base: $E_S(t)$", linewidth=2)
plt.semilogy(times, E_R, label="Base: $E_R(t)$", linewidth=2, linestyle="--")
plt.semilogy(times, E_I, label="Base: $E_I(t)$", linewidth=2, linestyle="-.")

plt.xlabel("t")
plt.ylabel(r"$\|W(\cdot,t)-W^*\|_{L^2(U)}$")
plt.title("Base model (Regime I): exponential relaxation (semi-log)")
plt.grid(True, which="both", alpha=0.3)
plt.legend()
savefig(FIG_NORM_EVOLUTION)

Overwriting code_regimeI_base.py


In [24]:
%%writefile code_regimeIprime_oneway.py
r"""
code_regimeIprime_oneway.py

Reproducible code for:
  \subsection{Unidirectional chemotaxis: transient focusing without generation (Regime I')}
  \label{subsec:numerical_base_oneway}

Goal:
  Produce the *red dashed* curve in Figure \label{fig:norm_evolution} (Regime I')
  vs the *blue solid* curve (Regime I Base).

Alignment with final manuscript (symbol-consistent):
  - Variables: S, R, I, c (diffuse); P, A (ODE/immobile).
  - Taxis: S and R follow gradients of c.
  - Regime I' (unidirectional/open-loop): c satisfies diffusion–decay and is independent of (S,R),
    i.e.,  c_t = d_c Δc - ρ_c c  (no production term).
    This is implemented by setting p["c_mode"]="open_loop" (see scheme.py).
  - Geometry: Unit square, Neumann BCs via ghost reflection (implemented in scheme.py).

Assumes: scheme.py and code_regimeI_base.py are consistent with the same Table-1 parameters.
"""

from __future__ import annotations

import os
import numpy as np
import matplotlib.pyplot as plt

from scheme import (
    P_BASE,
    neumann_laplacian_2d,
    cn_factorized,
    initialize_fields,
    step_system,
    homogeneous_equilibrium,
)

# ----------------------------
# 0) I/O
# ----------------------------
FIGDIR = "figures"
os.makedirs(FIGDIR, exist_ok=True)
FIG_NORM_OVERLAY = os.path.join(FIGDIR, "fig_norm_evolution_oneway_overlay.png")


def savefig(path: str, dpi: int = 450) -> None:
    plt.tight_layout()
    plt.savefig(path, dpi=dpi)
    plt.close()
    print(f"Saved: {path}")


def l2_deviation(field: np.ndarray, target: float, h: float) -> float:
    """Discrete L2 norm over the unit square."""
    diff = field - target
    return float(np.sqrt(np.sum(diff * diff) * h * h))


# ----------------------------
# 1) Domain / Grid
# ----------------------------
L_DOMAIN = 1.0
Nx = Ny = 51
h = L_DOMAIN / (Nx - 1)
dt = 1e-2
T_final = 5.0

# Use distinct seeds so the two runs are not identical
seed_base = 42
seed_oneway = 101

eps = 1e-3


# ----------------------------
# 2) Shared discretization
# ----------------------------
L2 = neumann_laplacian_2d(Nx, Ny, h)


def run_regime_I_base(T: float = 5.0) -> tuple[np.ndarray, np.ndarray]:
    """
    Regime I: base model without chemotaxis (chi_S=chi_R=0).
    Signal c is present but open-loop and dynamically irrelevant to S,R here.
    """
    p = dict(P_BASE)
    p["chi_S"] = 0.0
    p["chi_R"] = 0.0
    p["c_mode"] = "open_loop"  # keep consistency, though taxis is off

    solvers = {
        "S": cn_factorized(L2, p["d_S"], dt),
        "R": cn_factorized(L2, p["d_R"], dt),
        "I": cn_factorized(L2, p["d_I"], dt),
        "C": cn_factorized(L2, p["d_c"], dt),
    }

    state = initialize_fields(Nx, Ny, p, eps=eps, seed=seed_base)
    S_star, _ = homogeneous_equilibrium(p)

    n_steps = int(T / dt)
    times = np.empty(n_steps + 1, dtype=float)
    dev_S = np.empty(n_steps + 1, dtype=float)

    for n in range(n_steps + 1):
        times[n] = n * dt
        dev_S[n] = l2_deviation(state["S"], S_star, h)
        if n < n_steps:
            state = step_system(state, p, h, dt, Nx, Ny, L2, solvers, taxis=None)

    return times, dev_S


def run_regime_I_prime_oneway(T: float = 5.0) -> tuple[np.ndarray, np.ndarray]:
    """
    Regime I': unidirectional chemotaxis with open-loop signal relaxation.
      S_t = d_S ΔS - chi_S ∇·(S ∇c) + (reactions)
      R_t = d_R ΔR - chi_R ∇·(R ∇c) + (reactions)
      c_t = d_c Δc - ρ_c c   (open-loop; no production by S,R)
    """
    p = dict(P_BASE)

    # Enable chemotaxis
    p["chi_S"] = 0.5
    p["chi_R"] = 0.5

    # Open-loop signal: c evolves independently of (S,R)
    p["c_mode"] = "open_loop"

    solvers = {
        "S": cn_factorized(L2, p["d_S"], dt),
        "R": cn_factorized(L2, p["d_R"], dt),
        "I": cn_factorized(L2, p["d_I"], dt),
        "C": cn_factorized(L2, p["d_c"], dt),
    }

    state = initialize_fields(Nx, Ny, p, eps=eps, seed=seed_oneway)
    S_star, _ = homogeneous_equilibrium(p)

    taxis_cfg = {
        "S": {"signal": "C", "chi": p["chi_S"], "mode": "linear"},
        "R": {"signal": "C", "chi": p["chi_R"], "mode": "linear"},
    }

    n_steps = int(T / dt)
    times = np.empty(n_steps + 1, dtype=float)
    dev_S = np.empty(n_steps + 1, dtype=float)

    for n in range(n_steps + 1):
        times[n] = n * dt
        dev_S[n] = l2_deviation(state["S"], S_star, h)
        if n < n_steps:
            state = step_system(state, p, h, dt, Nx, Ny, L2, solvers, taxis=taxis_cfg)

    return times, dev_S


# ----------------------------
# 3) Execution & Plotting
# ----------------------------
print("Running Base Model (Regime I)...")
t_base, E_base = run_regime_I_base(T_final)

print("Running Unidirectional Model (Regime I')...")
t_oneway, E_oneway = run_regime_I_prime_oneway(T_final)

plt.figure(figsize=(6.4, 4.4))
plt.semilogy(t_base, E_base, "b-", linewidth=2, label="Base model (Regime I)")
plt.semilogy(
    t_oneway,
    E_oneway,
    "r--",
    linewidth=2,
    label="Unidirectional chemotaxis (Regime I$'$)\n(open-loop $c$ relaxation)",
)
plt.xlabel("Time $t$")
plt.ylabel(r"$E_S(t) = \|S(\cdot,t)-S^*\|_{L^2(U)}$")
plt.title("Transient focusing vs. relaxation to homogeneity")
plt.grid(True, which="both", alpha=0.3)
plt.legend()

savefig(FIG_NORM_OVERLAY)
print(f"Comparison plot saved to {FIG_NORM_OVERLAY}")

Writing code_regimeIprime_oneway.py


In [26]:
%%writefile code_regimeII_III_bifeedback.py
r"""
code_regimeII_III_bifeedback.py

Reproducible code for:
  \subsection{Bidirectional feedback: finite-band patterns vs aggregation (Regimes II--III)}
  \label{subsec:numerical_bifeedback}

Aligned with final manuscript (symbol-consistent):
  - Regime II: flux-saturated taxis (velocity-limited) yields bounded finite-band patterns.
  - Regime III: linear taxis (unregularized) yields aggregation/ill-posed numerical breakdown.
  - Feedback loop: S produces c, and S taxis up gradients of c. (S -> c -> ∇c -> drift of S)
  - Isolation: stromal switching variables (P,A) are frozen (no conversion), with A≡0, to remove hybrid effects.

Assumes: scheme.py is present (CN diffusion + explicit reaction/taxis; Neumann ghost reflection).
"""

from __future__ import annotations

import os
import numpy as np
import matplotlib.pyplot as plt

from scheme import (
    P_BASE,
    neumann_laplacian_2d,
    cn_factorized,
    initialize_fields,
    step_system,
    homogeneous_equilibrium,
)

# ----------------------------
# 0) I/O
# ----------------------------
FIGDIR = "figures"
os.makedirs(FIGDIR, exist_ok=True)

FIG_REGIMEII = os.path.join(FIGDIR, "fig_regimeII_patterns_S_c.png")
FIG_REGIMEIII_TRACE = os.path.join(FIGDIR, "fig_regimeIII_breakdown_trace.png")
FIG_REGIMEIII_SNAP = os.path.join(FIGDIR, "fig_regimeIII_snapshot_S.png")


def savefig(path: str, dpi: int = 450) -> None:
    plt.tight_layout()
    plt.savefig(path, dpi=dpi)
    plt.close()
    print(f"Saved: {path}")


# ----------------------------
# 1) Parameters (override from scheme.py defaults)
# ----------------------------
# IMPORTANT: scheme.P_BASE has c_mode="open_loop" by default; here we need bidirectional feedback.
p = dict(P_BASE)

# Domain / time
L_DOMAIN = 1.0
Nx = Ny = 51
h = L_DOMAIN / (Nx - 1)
dt = 1e-2
T_final = 50.0
eps = 1e-3
seed = 2024

# Bidirectional chemoattractant feedback: c_t = d_c Δc + kappa_c S - rho_c c
p["c_mode"] = "closed_loop"
p["d_c"] = p.get("d_c", 5e-3)  # diffusion for c

# Feedback parameters (kappa_c, rho_c already in P_BASE; keep unless overridden)
# p["kappa_c"] = 0.8
# p["rho_c"] = 1.6

# Chemotaxis sensitivity of S to c (this is chi_S' in the manuscript)
chi_S_prime = 0.5

# Flux saturation parameter (this is alpha_c in the manuscript)
alpha_c = 100.0

# Freeze stromal dynamics (isolation experiment): enforce A ≡ 0 by setting conversion rates to 0
p["theta"] = 0.0
p["beta"] = 0.0

# No chemotaxis for R in this experiment (matches your Regime-II/III numerical description)
# (If you want R taxis, add it to taxis_cfg below.)
# p["chi_R"] is unused unless you configure taxis for R.


# ----------------------------
# 2) Discretization operator & solvers
# ----------------------------
L2 = neumann_laplacian_2d(Nx, Ny, h)

solvers = {
    "S": cn_factorized(L2, p["d_S"], dt),
    "R": cn_factorized(L2, p["d_R"], dt),
    "I": cn_factorized(L2, p["d_I"], dt),
    "C": cn_factorized(L2, p["d_c"], dt),
}


# ----------------------------
# 3) Simulation core (Regime II vs III differs only by taxis mode)
# ----------------------------
def run_bifeedback(
    *,
    saturated: bool,
    T: float,
    seed: int,
    blowup_max: float = 1e24,
) -> dict:
    """
    Bidirectional feedback run:
      S_t = d_S ΔS - ∇·(chi_S' S ∇c) + reactions
      c_t = d_c Δc + kappa_c S - rho_c c
    Regime II: flux-saturated taxis (mode="flux_sat")
    Regime III: linear taxis (mode="linear")
    """
    # Initialize near homogeneous equilibrium of (S,R) at I=0
    state = initialize_fields(Nx, Ny, p, eps=eps, seed=seed)

    # Enforce isolation: A ≡ 0, P ≡ 1 (and remains so because theta=beta=0)
    state["P"][:] = 1.0
    state["A"][:] = 0.0

    # Also keep I ≡ 0 for this experiment (drug washed out)
    state["I"][:] = 0.0

    taxis_cfg = {
        "S": {
            "signal": "C",
            "chi": float(chi_S_prime),
            "mode": "flux_sat" if saturated else "linear",
            "alpha_sat": float(alpha_c) if saturated else 0.0,
        }
    }

    n_steps = int(T / dt)
    t_hist: list[float] = []
    maxS_hist: list[float] = []
    maxc_hist: list[float] = []

    status = "ok"
    blow_step = None

    for n in range(n_steps + 1):
        t = n * dt

        # record every 10 steps (keeps files small, smooth curves)
        if n % 10 == 0:
            t_hist.append(t)
            maxS_hist.append(float(np.max(state["S"])))
            maxc_hist.append(float(np.max(state["C"])))

        # breakdown check (current state)
        if (not np.isfinite(state["S"]).all()) or (np.max(state["S"]) > blowup_max):
            status = "breakdown"
            blow_step = n
            break

        if n == n_steps:
            break

        # one IMEX step
        state = step_system(state, p, h, dt, Nx, Ny, L2, solvers, taxis=taxis_cfg)

        # re-enforce isolation constraints (defensive; with theta=beta=0 this is no-op up to roundoff)
        state["P"][:] = 1.0
        state["A"][:] = 0.0
        state["I"][:] = 0.0

    return {
        "state": state,
        "status": status,
        "blow_step": blow_step,
        "t": np.array(t_hist),
        "maxS": np.array(maxS_hist),
        "maxc": np.array(maxc_hist),
    }


# ----------------------------
# 4) Regime II: flux-saturated patterns
# ----------------------------
print("Running Regime II (flux-saturated taxis)...")
res2 = run_bifeedback(saturated=True, T=T_final, seed=seed)

S2 = res2["state"]["S"]
c2 = res2["state"]["C"]

# Choose a representative cross-section: through the global maximum of S
max_idx = np.unravel_index(np.argmax(S2), S2.shape)
peak_y = int(max_idx[0])

plt.figure(figsize=(12, 4))

# (1) S heatmap
plt.subplot(1, 3, 1)
plt.imshow(S2, origin="lower", extent=[0, 1, 0, 1])
plt.title("S (Regime II: bounded patterns)")
plt.axhline(peak_y * h, color="r", linestyle=":", alpha=0.6)

# (2) c heatmap
plt.subplot(1, 3, 2)
plt.imshow(c2, origin="lower", extent=[0, 1, 0, 1])
plt.title("c (feedback signal)")
plt.axhline(peak_y * h, color="r", linestyle=":", alpha=0.6)

# (3) cross-section
plt.subplot(1, 3, 3)
xs = np.linspace(0.0, 1.0, S2.shape[1])
S_slice = S2[peak_y, :]
c_slice = c2[peak_y, :]

plt.plot(xs, S_slice, label="S")
scale = (np.max(S2) / (np.max(c2) + 1e-12))
plt.plot(xs, c_slice * scale, "--", label="c (scaled)")
plt.title(f"Cross-section at y ≈ {peak_y*h:.2f}")
plt.legend()

savefig(FIG_REGIMEII)


# ----------------------------
# 5) Regime III: linear taxis (no saturation) -> breakdown trace
# ----------------------------
print("Running Regime III (linear taxis; no saturation)...")
res3 = run_bifeedback(saturated=False, T=T_final, seed=seed)

plt.figure(figsize=(6.2, 4.2))
plt.plot(res3["t"], res3["maxS"], label="max S")
plt.plot(res3["t"], res3["maxc"], label="max c")
plt.yscale("log")
title = "Regime III: linear taxis"
if res3["status"] == "breakdown":
    title += f" (breakdown at step {res3['blow_step']})"
plt.title(title)
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.grid(True, which="both", alpha=0.3)
plt.legend()

savefig(FIG_REGIMEIII_TRACE)

# Optional: snapshot if it did not break down
if res3["status"] != "breakdown":
    S3 = res3["state"]["S"]
    plt.figure(figsize=(5.2, 4.2))
    plt.imshow(S3, origin="lower", extent=[0, 1, 0, 1])
    plt.title("S snapshot (Regime III)")
    savefig(FIG_REGIMEIII_SNAP)

Overwriting code_regimeII_III_bifeedback.py
