# **Stirred (not shaken)**

What can we learn about a system if we inject a tracer and measure the outlet concentration as a function of time?

**MOD510: Project 3**

Date: **November 4th 2025**

Names: **Malvin Varpe & Torgrim Odde**

**Learning obectives:** By completing this project, the student will:
* Implement an ODE solver using Euler,  Runge-Kutta (second fourth order) algorithm
* Use mixing tank models to model how the shape of a medical tracer changes after traveling in an aorta arch
* Compare the ODE model to CDF simulation results and extract information about the cardiac output (flooding rate) and volume of the aortic arch
* Investigate if one can use the tracer signal to identify if there is an aneurysm in the aorta

**Abstract:**

**Introduction:**


# Exercise 1 - Write your own ODE solver
**Part 1**
Implement an ODE solver, that solves an arbitrary (initial value) system of ODEs of the form

$$\frac{d\vec{y}}{dt} = \vec{f}(\vec{y}, t) (4)$$

where the solution $\vec{y}$ may be a vector. Since the equation system is completely generic, the solver has to take in as argument the function that computes the right hand side-vector; that is, you are not allowed to hard-code in a specific model.
In addition, the solver needs to know the starting time, $t_0$, the corresponding initial condition(s) $\vec{f}(\vec{y}, t)$, as well as the final simulation time, $t_f$.
The user should be able to choose between the following methods:
1. Eulers method
2. Runge-Kutta second order (RK2)
3. Runge-Kutta fourth order (RK4)

Tip: It can be a good idea to encapsulate your solver inside a custom class, and to implement one class function for each method.

In [17]:
import numpy as np

# Implementing solvers

class ODESolver:
    def __init__(self, f):
        self.f = f  # f(y, t) -> dy/dt as np.array

    def _step_euler(self, y, t, h): # Simple Euler solution
        return y + h * self.f(y, t)

    def _step_rk2(self, y, t, h): # R2K solution
        k1 = self.f(y, t)
        k2 = self.f(y + h*k1, t + h)
        return y + 0.5*h*(k1 + k2)

    def _step_rk4(self, y, t, h): # R4K solution
        k1 = self.f(y, t)
        k2 = self.f(y + 0.5*h*k1, t + 0.5*h)
        k3 = self.f(y + 0.5*h*k2, t + 0.5*h)
        k4 = self.f(y + h*k3, t + h)
        return y + (h/6.0)*(k1 + 2*k2 + 2*k3 + k4)

    def solve(self, y0, tspan, h, method="rk4"):
        steppers = {
            "euler": self._step_euler, 
            "rk2": self._step_rk2, 
            "rk4": self._step_rk4
        }
        
        step = steppers[method.lower()]
        t0, tf = tspan
        N = int(np.ceil((tf - t0)/h)) + 1
        t = np.linspace(t0, tf, N)
        y = np.empty((N, len(np.atleast_1d(y0))), dtype=float)
        y[0] = y0
        for i in range(N-1):
            h_i = t[i+1]-t[i]
            y[i+1] = step(y[i], t[i], h_i)
        return t, y
    
    


In [18]:
# We can build the RHS for any n by integrating C over C(0) = 0 for chosen Euler/R2K/R4K solution. Implementing a function for the injection C, so that it can change over time.

def rhs_tanks(n, tau, c_in = lambda t: 0.0):  # For default, c_in(t) = 0
    k = n / tau
    def f(C, t):
        dC = np.zeros_like(C)
        dC[0] = k * (c_in(t) - C[0])               # dC0/dt = (n/tau)*(c_in(t) - C0)
        for i in range(1, n):
            dC[i] = k * (C[i - 1] - C[i])  # dCi/dt = (n/tau)*(C_{i-1} - Ci)
        return dC
    return f


**Part 2:**

Test your implementation against the analytical solutions in equation (3) for n = 1, 2 and 3.
* for n = 1 use $\tau$ = 2, C0(0) = 1
* for n = 2 use $\tau$ = 2, C0(0) = 1, C1(0) = 0
* for n = 3 use $\tau$ = 2, C0(0) = 1, C1(0) = C2(0) = 0

In [19]:
import matplotlib.pyplot as plt
import pandas as pd

def analytical_last_tank(n, t, tau):
    """
    Special ICs: C0(0)=1, others 0. (matches the problem statement)
      n=1: C0(t) = exp(-t/tau)
      n=2: C1(t) = (2t/tau) * exp(-2t/tau)
      n=3: C2(t) = (9 t^2 / (2 tau^2)) * exp(-3t/tau)
    """
    if n == 1:
        return np.exp(-t / tau)
    elif n == 2:
        return (2.0 * t / tau) * np.exp(-2.0 * t / tau)
    elif n == 3:
        return (9.0 * (t ** 2) / (2.0 * (tau ** 2))) * np.exp(-3.0 * t / tau)
    else:
        raise ValueError("Analytical expression implemented only for n in {1, 2, 3}.")

# Validation & plotting

tau = 2.0
t0, tf = 0.0, 10.0         # simulate long enough to see decay
h = 0.5                  # step size (can be adjusted to test convergence)
                            #using large h to clearly show solver differences
methods = ["euler", "rk2", "rk4"]

records = []               # for error table

for n in [1, 2, 3]:
    # Initial conditions:
    # n=1: C0(0)=1 -> Exponential decau
    # n=2: C0(0)=1, C1(0)=0 -> peak then decay
    # n=3: C0(0)=1, C1(0)=C2(0)=0 -> Lower peak, broader decay
    C_init = np.zeros(n)
    C_init[0] = 1.0

    f = rhs_tanks(n, tau)
    solver = ODESolver(f)

    for method in methods:
        t, C = solver.solve(C_init, (t0, tf), h, method=method)
        num_last = C[:, -1]
        exact_last = analytical_last_tank(n, t, tau)

        # Plot numerical vs analytical for this case
        plt.figure()
        plt.plot(t, exact_last, label="Analytical")
        plt.plot(t, num_last, linestyle="--", label=f"Numerical ({method.upper()})")
        plt.title(f"n={n}, {method.upper()} vs Analytical (last tank)")
        plt.xlabel("Time t")
        plt.ylabel(f"C_{n-1}(t)")
        plt.legend()
        plt.show()

        # Errors
        max_abs_err = float(np.max(np.abs(num_last - exact_last)))
        l2_err = float(np.sqrt(np.trapezoid((num_last - exact_last) ** 2, t)))
        records.append({
            "n": n,
            "method": method.upper(),
            "step_size_h": h,
            "max_abs_error": max_abs_err,
            "L2_error": l2_err
        })

# Printing error table
df_errors = pd.DataFrame.from_records(records).sort_values(["n", "method"])
print(df_errors.to_string(index=False))

ImportError: cannot import name 'using_string_dtype' from 'pandas._config' (C:\Users\ofsmvarp\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\LocalCache\local-packages\Python313\site-packages\pandas\_config\__init__.py)

From the plots we can see that with a low step size **h**, the three solvers perform differently for all values of **n**.
The Euler, which is first order, has the largest error of the three and we can see a clear difference between the numerical and analytical solution.
The R2K, being second order, performs better, but still with slight errors.
The R4K, being 4th order, outperforms the two other solvers by far and the numerical solution is almost identical to the analytical solution.

This is also represented in the error table, showing how the R4K outperforms the other solvers by a relative large margin, while the Euler has the largest error.

We can reduce the timestep **h** in order to improve performance and reduce the difference between the numerical and analytical solutions for all solvers.

**Part 3:** 

Pick n =1, 2, or 3
1. plot the numerical error at t = 1, for the last tank, and for dt values in
the range 0.1, . . . 0.001. (You may try lower values if you have a very fast
solver)
2. does the numerical error scale as expected for Euler, RK2, and RK4?

In [6]:
n_choice = 3
tau = 2.0
t_final = 1.0
dt_vals = np.logspace(-1, -3, 21)  # 0.1 ... 0.001
methods = ["euler", "rk2", "rk4"]

errors = {m: [] for m in methods}

for h in dt_vals:
    C0 = np.zeros(n_choice); C0[0] = 1.0
    f = rhs_tanks(n_choice, tau)
    solver = ODESolver(f)
    for m in methods:
        t, C = solver.solve(C0, (0.0, t_final), h, method=m)
        num = C[-1, -1]
        exact = analytical_last_tank(n_choice, t[-1], tau)
        errors[m].append(abs(num - exact))

def observed_order(hs, errs, floor=1e-15):
    hs = np.array(hs)
    errs = np.maximum(np.array(errs), floor)
    mask = errs > 1e-12
    if np.sum(mask) >= 2:
        x = np.log(hs[mask])
        y = np.log(errs[mask])
        p = np.polyfit(x, y, 1)[0]
        return p
    return np.nan

orders = {m: observed_order(dt_vals, errors[m]) for m in methods}

plt.figure()
for m in methods:
    plt.loglog(dt_vals, errors[m], marker="o", label=f"{m.upper()} (p≈{orders[m]:.2f})")
plt.gca().invert_xaxis()
plt.xlabel("Time step h")
plt.ylabel("Absolute error at t=1 (last tank)")
plt.title(f"Convergence at t=1 for n={n_choice}")
plt.legend()
plt.show()

rows = []
for i, h in enumerate(dt_vals):
    row = {"h": h}
    for m in methods:
        row[f"err_{m.upper()}"] = errors[m][i]
    rows.append(row)

df = pd.DataFrame(rows)
display(df)

print("Observed orders (slope on log-log error vs h):")
for m in methods:
    print(f"  {m.upper()}: p ≈ {orders[m]:.3f}")

NameError: name 'analytical_last_tank' is not defined

From the convergence plot, we see the errors of the Euler, R2K and R4K solvers on a log-log scale. The Euler slope ~1, meaning that if we halve **h**, the error is reduced by ~half. The R2K slope is ~2, meaning that if we halve **h**, the error is reduced by ~4x. The R4K slope is ~4, meaning that if we **halve** h, the error is reduced by ~16x. Each curve shows a smooth line since the error scales with $error(h) ∝ h^p$ (where p= 1 for Euler, 2 for R2K and 4 for R4K).

The table shows how as the step size **h** gets smaller, the error in each column decreases. We see that Euler has consistently the largest error, whilst the R4K error becomes tiny even for moderate values of **h**. All numerical errors scale as expected.

#  Exercise 2: Fit CSTR model to CFD model:Healthy aorta  
**Part 1**  
Using the formula we have for q, we can compute an estimate. 
$$
q = \frac{M_{inj}} {\int_0^{\infty} C_{\text{out}}(t) \, dt}
$$

We have been given the $ M_{inj} $, so by using the concentrations in the csv-file, we can find an estimate for the integral. This is also possible because the value of the integral is 0 towards infinity, and we assume that the values of the concentrations are 0 for the times after the csv times.  
The integral can then be estimated by calculating the area of the trapezoid under the graph for each time interval. We can then use np.trapezoid (which calculates the area under the graph using trapezoid method, with inputs y and x as vectors)

In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def read_file(filepath, printing=False):
    
    data = pd.read_csv(r'data\healthy_rect_1s.csv')
    M_inj = 83.333 #grams

    times = np.array(data['Time'])
    concentrations_out = np.array(data['Concentration'])

    area_under_curve = np.trapezoid(concentrations_out, times)

    q_estimate = M_inj / area_under_curve * 60 /1000  # Convert to L/min
    if printing:
        print(f"Estimated flow rate q: {q_estimate} L/min")
    return times, concentrations_out


times, concentrations_out = read_file(r'data\healthy_rect_1s.csv', printing=True)

Estimated flow rate q: 5.005983664163233 L/min


**Part 2**  
Using inspiration from both the np.interp function and the if t< value. 

In [7]:
def c_inj(t, inj_method="rect", dt = 1):

    """
    Returns the concentration of injected tracer at time t for different injection methods.

    Parameters:
    t : Time at which to evaluate the concentration.
    inj_method : Options are "rect", "ramp", or "biphasic". Default is "rect".
    dt : Duration parameter for the injection profile. Not used for "biphasic".

    Returns:
    Concentration of injected tracer at time t.
    """


    if inj_method == "rect":
        return 1.0 if 0 < t < dt else 0.0

    elif inj_method == "ramp":
        t_vals = [0, dt, dt, dt + 1]
        c = [0, 1, 0, 0]
        return np.interp(t, t_vals, c)
    
    elif inj_method == "biphasic":
        if 0 < t < 1: return 0.75
        elif 2 < t < 3: return 1
        else: return 0.0   
    else:
        raise ValueError("Unknown injection method")
    
# Example usage to create a function for c_inj with chosen method and dt
def rect_1s(t):
    return c_inj(t, inj_method="rect", dt=1) # Gives rect 1s function
def rect_2s(t):
    return c_inj(t, inj_method="rect", dt=2) # Gives rect 2s function
def rect_3s(t):
    return c_inj(t, inj_method="rect", dt=3) # Gives rect 3s function
def ramp_1s(t):
    return c_inj(t, inj_method="ramp", dt=1) # Gives ramp 1s function
def biphasic_3s(t):
    return c_inj(t, inj_method="biphasic") # Gives biphasic function


**Part 3**  
Using the least square optimization to find the best fitted value for $\tau = V_{tot}/q $, and grid search for number of compartments.  
We were thinking about creating an expression for $\tau$, but this might be more computationally heavy than computing for values of tau instead of running through the values with optimization.  
The solution is given where the derivative of the SSR is zero. Therefore we compute a numerical derivative for the SSR at each step, given in appendix A. The formulas we use are:
$$
\frac{dC_{N-1}}{d\tau} = \frac{C_{N-1}(t, \tau + \Delta \tau) - C_{N-1}(t, \tau)}{\Delta \tau}
$$


$$
\frac{dSSR}{d\tau} = 2 \sum_{i=0}^{N-1} r_i \frac{dr_i}{d\tau} = -2 \sum_{i=0}^{N-1} (d_i - C_{N-1}(t_i; \tau)) \frac{dC_{N-1}(t_i; \tau)}{d\tau}
$$  
Because we use numpy arrays and vectors, these formulas are easy to implement, as they run over each value as a standard.
To find the optimal solution at each number of n we compute for, we wanted to use Newton's method. However, it was difficult to find a double derivative(derivative of the derivative) that was not hard to compute for each value, so we used the secant method instead (Newton's method with an approximation for double derivative). This method involves trying to find a new tau by using the formula: 

$$
\tau_{k+1} = \frac{\tau_{k-1} f(\tau _k) - \tau _k f(\tau _{k-1})}{f(\tau _k)-f(\tau _{k-1})}
$$
Where we use the derivative of the SSR as f. This formula comes from page 113 in the book. [[Hiorth, 2024]](#hiorth3)  
If the difference between the old and the new guess is lower than a low tolerance value, we can assume that this is the minimum for the tau for the specific number of compartments. By some testing we find that the optimal $\tau$-value should be around 2, which is also the value that is given in the first exercise. That is why we start with this value, and use 2.1 for the initial second guess. The first guess is important, as the method only find local minimum values, and not global.  
Because we have got values for different injection methods, we also want to find the minimums for these as well, to see if the $\tau$ is similar. We then look at the last time values and the time steps to decide on h and t_init and finish.


In [8]:
def solve_tanks(n=3,
         tau=2.0, 
         method="rk4", 
         h=0.02, 
         t_init=0.0,
         t_final=5.0, 
         c_in_func= lambda t: 0.0,
         C_init=None):
    
    """
    Solve the n-tank mixing ODE system.

    Parameters:
        n (int): Number of tanks.
        tau: Residence time (V_tot/q).
        method: "euler", "rk2", or "rk4".
        h: Time step size.
        t_init: Initial time.
        t_final: Final time.
        c_in_func (function by t): Inlet concentration profile, c_in_func(t).
        C_init (array or None): Initial concentrations (default: zeros, first tank 1).

    Returns:
        t: Time points.
        C: Concentrations in each tank. C[i, j] is concentration in tank j at time t[i].
    """
    
    if C_init is None:
        C_init = np.zeros(n)
    elif C_init.shape[0] != n:
        raise ValueError("C_init length must match number of tanks n.")
    
    f = rhs_tanks(n, tau, c_in=c_in_func)
    solver = ODESolver(f)
    t, C = solver.solve(C_init, (t_init, t_final), h, method=method)
    return t, C


In [9]:
def ssr(modelled, observed):
    """
    Calculate the sum of squared residuals (SSR) between modelled and observed data.

    Parameters:
        modelled: Modelled output values.
        observed: Observed/measured values.

    Returns:
        Sum of squared residuals.
    """

    if len(modelled) != len(observed):

        raise ValueError(f"Modelled and observed arrays must have the same length. Currently the lengths are {len(modelled)} for modelled and {len(observed)} for observed.")
    return np.sum((modelled - observed) ** 2)


def ssr_derivative(n=3, 
                tau=2.0, 
                delta_tau=1e-5,
                method="rk4",
                h=0.02,
                t_init=0.0,
                t_final=5.0,
                c_in_func=rect_1s,
                filepath=r'data\healthy_rect_1s.csv'):
    """
    Compute the sum of squared residuals (SSR) and its numerical derivative with respect to tau.

    Parameters:
        n (int): Number of tanks.
        tau: Residence time parameter.
        delta_tau: Small increment for numerical derivative.
        method: ODE solver method ("euler", "rk2", "rk4").
        h): Time step size for ODE solver.
        t_init: Initial time.
        t_final: Final time.
        c_in_func (function): Concentration profile function of injection.
        filepath: Path to CSV file with observed data.

    Returns:
        ssr_: Sum of squared residuals between model and observed data.
        der_ssr: Numerical derivative of SSR with respect to tau.
    """
    times, concentrations_out = read_file(filepath)
    t, C_model = solve_tanks(n=n,
                             tau=tau,
                             method=method,
                             h=h,
                             t_init=t_init,
                             t_final=t_final,
                             c_in_func=c_in_func)
    t2, C_model_delta = solve_tanks(n=n,
                             tau=tau+delta_tau,
                             method=method,
                             h=h,
                             t_init=t_init,
                             t_final=t_final,
                             c_in_func=c_in_func)
    der_c_out = (C_model_delta[:, -1] - C_model[:, -1]) / delta_tau
    der_ssr = -2*np.sum((concentrations_out-C_model[:,-1])*der_c_out)
    
    ssr_ = ssr(C_model[:, -1], concentrations_out)
    return ssr_, der_ssr

def new_tau(last_tau, current_tau, last_der, current_der):
    """
    Compute the next tau value using the secant method.

    Parameters:
        last_tau: Previous tau value.
        current_tau: Current tau value.
        last_der: Derivative of SSR at last_tau.
        current_der: Derivative of SSR at current_tau.

    Returns:
        Updated guess for tau value. 
    """
    return((last_tau*current_der - current_tau*last_der) / (current_der - last_der))



In [None]:
def secant_optimization(n=3, 
                tau_initial_1=2.0, 
                tau_initial_2=3.0, 
                tol=1e-5, 
                max_iter=100,  
                h=0.02,
                t_init=0.0,
                t_final=5.0,
                c_in_func=rect_1s,
                filepath=r'data\healthy_rect_1s.csv'):
    """
    Optimize tau for the n-tank mixing model using the secant method to minimize the sum of squared residuals (SSR).

    Parameters:
        n (int): Number of tanks.
        tau_initial_1: First initial guess for tau.
        tau_initial_2: Second initial guess for tau.
        tol: Tolerance for convergence.
        max_iter (int): Maximum number of iterations.
        h: Time step size for ODE solver.
        t_init: Initial time for simulation.
        t_final: Final time for simulation.
        c_in_func (function): Concentration profile function of injection.
        filepath: Path to CSV file with observed data.

    Returns:
        best_tau: Optimized tau value for n.
        best_ssr: SSR at optimized tau.
    """
    
    tau_last = tau_initial_1
    tau_current = tau_initial_2

    for iteration in range(max_iter):
        ssr_last, der_last = ssr_derivative(n=n, tau=tau_last, h=h, t_init=t_init, t_final=t_final, c_in_func=c_in_func, filepath=filepath)
        ssr_current, der_current = ssr_derivative(n=n, tau=tau_current, h=h, t_init=t_init, t_final=t_final, c_in_func=c_in_func, filepath=filepath)

        tau_new = new_tau(tau_last, tau_current, der_last, der_current)

        if abs(tau_new - tau_current) < tol:
            # print(f"Converged after {iteration+1} iterations.")
            return tau_new, ssr_current

        tau_last = tau_current
        tau_current = tau_new

        if iteration == max_iter - 1:
            raise ValueError(f"No convergence for n={n} within maximum iterations.")


In [11]:
def find_best_n(h=0.02,
                t_init=0.0,
                t_final=5.0,
                c_in_func=rect_1s,
                filepath=r'data\healthy_rect_1s.csv',
                n_values=range(1, 150, 10)):
    
    results = []
    for n in n_values:
        best_tau, best_ssr = secant_optimization(n=n,
                                                 h=h,
                                                 t_init=t_init,
                                                 t_final=t_final,
                                                 c_in_func=c_in_func,
                                                 filepath=filepath)
        results.append({
            "n": n,
            "best_tau": best_tau,
            "best_ssr": best_ssr
        })
    return pd.DataFrame(results)


In [16]:
def best_n_tot(h=0.02,
                t_init=0.0,
                t_final=5.0,
                c_in_func=rect_1s,
                filepath=r'data\healthy_rect_1s.csv',
                n_values=range(1, 150, 10)):
    df_results = find_best_n(h=h,
                             t_init=t_init,
                             t_final=t_final,
                             c_in_func=c_in_func,
                             filepath=filepath,
                             n_values=n_values)
    best_row = df_results.loc[df_results['best_ssr'].idxmin()]
    df_closer = find_best_n(h=h,
                            t_init=t_init,
                            t_final=t_final,
                            c_in_func=c_in_func,
                            filepath=filepath,
                            n_values=range(int(best_row['n'] - 9), int(best_row['n'] + 9)))
    best_row = df_closer.loc[df_closer['best_ssr'].idxmin()]
    return best_row['n'], best_row['best_tau'], best_row['best_ssr']
display(best_n_tot())

(np.float64(49.0),
 np.float64(2.3653645670575854),
 np.float64(0.03156337890095677))

# Bibliography  

<div id="hiorth3"></div> **A. Hiorth**.  *Computational Engineering and Modeling*, https://github.com/ahiorth/CompEngineering, 2021.