In [None]:
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import spsolve
import matplotlib.pyplot as plt


# --------------------------------------------------
# Utility functions
# --------------------------------------------------
def boundary_flux(rho, V, Pe, dx):
    """Return the left and right boundary fluxes for diagnostic purposes."""
    rho_minus1 = rho[1] - 2.0 * dx * Pe * V[0] * rho[0]
    rho_plus1  = rho[-2] + 2.0 * dx * Pe * V[-1] * rho[-1]

    drho_dx_left  = (rho[1]   - rho_minus1) / (2.0 * dx)
    drho_dx_right = (rho_plus1 - rho[-2])   / (2.0 * dx)

    F_left  = V[0]  * rho[0]  - (1.0 / Pe) * drho_dx_left
    F_right = V[-1] * rho[-1] - (1.0 / Pe) * drho_dx_right
    return F_left, F_right


def fvm_update_rho(rho, V, Pe, dx, dt):
    """Finite‑volume update of rho for one time step."""
    N = len(rho)
    flux = np.zeros(N + 1)

    for i_face in range(1, N):
        V_face = 0.5 * (V[i_face - 1] + V[i_face])
        adv_flux = rho[i_face - 1] * V_face if V_face >= 0 else rho[i_face] * V_face
        diff_flux = (rho[i_face] - rho[i_face - 1]) / dx
        flux[i_face] = adv_flux - (1.0 / Pe) * diff_flux

    # impermeable boundaries
    flux[0] = 0.0
    flux[N] = 0.0

    rho_new = rho - (dt / dx) * (flux[1:] - flux[:-1])
    return rho_new


# --------------------------------------------------
# Main solver
# --------------------------------------------------
def run_simulation(
    N: int = 100,
    t_final: float = 10.0,
    k: float = 0.0225,
    w: float = 0.5,
    gamma: float = 0.5,
):
    """
    Solve the 1‑D problem with adjustable parameters.
    Returns x, rho, a, V at the final time.
    """
    # grid
    x = np.linspace(-1.0, 1.0, N)
    dx = x[1] - x[0]

    # physical constants
    rho_0, alpha, mu_0, S, D, L = 1_000.0, 0.2, 1.0, 10.0, 0.9, 1.0
    Pe = (k * rho_0 * L**2 * alpha) / (mu_0 * S * D)

    # time step (CFL + diffusion limit)
    dt_conv = dx / ((k * rho_0 * L * alpha) / (mu_0 * S))
    dt_diff = dx**2 / (2 * D)
    dt = min(dt_conv, dt_diff) / 5.0
    if dt < 1e-6:
        raise ValueError("Time step too small.")

    n_steps = round(t_final / dt)

    # initial conditions
    sigma = 0.15
    rho = np.exp(-(x**2) / (2 * sigma**2))
    rho /= np.trapz(rho, x)              # normalise mass
    a = np.ones(N)
    V = np.zeros(N)

    # time integration
    for n in range(n_steps):
        a_new  = a.copy()
        rho_new = rho.copy()

        # build linear system for V
        M = np.zeros((N, N))
        C = np.zeros(N)

        for i in range(1, N - 1):
            M[i, i - 1] = 1.0 / dx**2
            M[i, i]     = -2.0 / dx**2 - gamma * a[i]
            M[i, i + 1] = 1.0 / dx**2
            C[i]        = -(rho[i + 1] - rho[i - 1]) / (2.0 * dx)

        # left boundary
        M[0, 0] = -2.0 / dx**2 - gamma * a[0]
        M[0, 1] =  2.0 / dx**2
        d_rho_dx_left = (-3.0 * rho[0] + 4.0 * rho[1] - rho[2]) / (2.0 * dx)
        C[0] = -d_rho_dx_left - 2.0 * rho[0] / dx

        # right boundary
        M[-1, -1] = -2.0 / dx**2 - gamma * a[-1]
        M[-1, -2] =  2.0 / dx**2
        d_rho_dx_right = (3.0 * rho[-1] - 4.0 * rho[-2] + rho[-3]) / (2.0 * dx)
        C[-1] = -d_rho_dx_right + 2.0 * rho[-1] / dx

        V_new = spsolve(csr_matrix(M), C)

        # update a (upwind convection + reaction)
        for i in range(1, N - 1):
            adv_a = V_new[i] * (a[i] - a[i - 1]) / dx if V_new[i] > 0 \
                    else V_new[i] * (a[i + 1] - a[i]) / dx
            a_new[i] = a[i] - dt * adv_a + dt * w * (1.0 - a[i])

        a_new[0] = a_new[-1] = 0.0

        # update rho
        rho_new = fvm_update_rho(rho, V_new, Pe, dx, dt)

        # overwrite
        a, rho, V = a_new, rho_new, V_new

    return x, rho, a, V


# --------------------------------------------------
# Convergence study
# --------------------------------------------------
def convergence_analysis(N_list, t_final, k, w, gamma):
    """
    Compute L2 errors for rho, a, V on progressively refined grids.
    Returns a DataFrame with errors and mass conservation.
    """
    N_ref = max(N_list)
    x_ref, rho_ref, a_ref, V_ref = run_simulation(N_ref, t_final, k, w, gamma)

    rows = []
    for N in N_list:
        x, rho, a, V = run_simulation(N, t_final, k, w, gamma)

        # interpolate to reference grid
        rho_i = np.interp(x_ref, x, rho)
        a_i   = np.interp(x_ref, x, a)
        V_i   = np.interp(x_ref, x, V)

        L2_rho = np.sqrt(np.trapz((rho_ref - rho_i) ** 2, x_ref))
        L2_a   = np.sqrt(np.trapz((a_ref   - a_i)   ** 2, x_ref))
        L2_V   = np.sqrt(np.trapz((V_ref   - V_i)   ** 2, x_ref))
        mass   = np.trapz(rho, x)

        rows.append(dict(N=N, L2_rho=L2_rho, L2_a=L2_a, L2_V=L2_V, mass=mass))

    df = pd.DataFrame(rows).sort_values("N")
    return df


# --------------------------------------------------
# Parameter sets and execution
# --------------------------------------------------
parameter_sets = [
    {"name": "Pe=0.5, w=0.5, γ=0.5", "k": 0.0225, "w": 0.5, "gamma": 0.5},
    {"name": "Pe=5.0, w=0.5, γ=0.5", "k": 0.2250, "w": 0.5, "gamma": 0.5},
    {"name": "Pe=0.5, w=0.2, γ=0.5", "k": 0.0225, "w": 0.2, "gamma": 0.5},
]

N_list  = [20, 40, 60, 80, 100]
t_final = 10.0

results_dict = {}
for p in parameter_sets:
    print(f"\n=== Convergence study: {p['name']} ===")
    df = convergence_analysis(N_list, t_final, p["k"], p["w"], p["gamma"])
    display(df)
    results_dict[p["name"]] = df


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 6))

linestyles = {"rho": "-", "a": "--", "V": ":"}

for case_name, df in results_dict.items():
    for var in ["rho", "a", "V"]:
        col   = f"L2_{var}"
        label = f"{case_name} – {var}"
        plt.loglog(df["N"], df[col],
                   marker="o",
                   linestyle=linestyles[var],
                   label=label)

plt.xlabel("N (number of grid points)")
plt.ylabel("L2 error")
plt.title("Grid‑refinement convergence for ρ, a and V\n(multiple parameter sets)")
plt.grid(True, which="both", ls="--")
plt.legend(fontsize=8, frameon=False)
plt.tight_layout()
plt.savefig("L2_error_loglog_allVars_allCases.png", dpi=300)
plt.show()