In [12]:
import numpy as np
import matplotlib.pyplot as plt


def stable_time_step(vx, Dx, Dz, dx, dz):
    dx2, dz2 = dx * dx, dz * dz
    max_diffusion = max(np.max(Dx), np.max(Dz), 1.0e-10)
    max_velocity = max(np.max(vx), 1.0e-10)
    diffusive_dt = dx2 * dz2 / (2 * max_diffusion * (dx2 + dz2))
    cfl_dt = dx / max_velocity
    dt = min(cfl_dt, diffusive_dt) * 0.5
    return min(dt, 1.0)  # Clamp the time step to a maximum of 1.0

def euler_advection_diffusion_timestep(c0, vx, Dx, Dz, src, dt, dx, dz, bc="periodic", upwind=True):
    nz, nx = c0.shape
    c = c0.copy()
    Fx = np.zeros((nz, nx + 1))
    Fz = np.zeros((nz + 1, nx))

    # Diffusive fluxes
    dcdx = (c[:, 1:] - c[:, :-1]) / dx
    dcdy = (c[1:, :] - c[:-1, :]) / dz
    Fx[:, 1:nx] = -Dx[:, 1:nx] * dcdx
    Fz[1:nz, :] = -Dz[1:nz, :] * dcdy

    # Advective fluxes
    if upwind:
        Fx[:, 1:nx] += np.where(vx[:, 1:nx] > 0, vx[:, 1:nx] * c[:, :-1], vx[:, 1:nx] * c[:, 1:]) / dx
    else:
        Fx[:, 1:nx] += vx[:, 1:nx] * (c[:, 1:] + c[:, :-1]) * 0.5 / dx

    # Debugging: Print flux terms
    print("Fx:", Fx)
    print("Fz:", Fz)

    # Update concentration
    c[:, :] += dt * src

    # Debugging: Before applying boundary conditions
    print("c before boundary conditions:", c)

    # Boundary conditions
    if bc == "zero":
        c[0, :] = 0  # Top boundary
        c[-1, :] = 0  # Bottom boundary
        c[:, 0] = 0  # Left boundary
        c[:, -1] = 0  # Right boundary
    elif bc == "reflecting":
        # Reflect boundaries without overwriting central values
        c[0, 1:-1] = c[1, 1:-1]  # Top boundary
        c[-1, 1:-1] = c[-2, 1:-1]  # Bottom boundary
        c[1:-1, 0] = c[1:-1, 1]  # Left boundary
        c[1:-1, -1] = c[1:-1, -2]  # Right boundary
    elif bc == "periodic":
        c[0, :] = c[-2, :]
        c[-1, :] = c[1, :]
        c[:, 0] = c[:, -2]
        c[:, -1] = c[:, 1]

    # Debugging: After applying boundary conditions
    print("c after boundary conditions:", c)
    return c



def create_grid(P0, Z0, domain_size, params):
    if P0.shape != Z0.shape:
        raise ValueError(f"P0 and Z0 must have the same shape. Got P0.shape={P0.shape}, Z0.shape={Z0.shape}")

    nz, nx = P0.shape
    dx = domain_size[0] / nx
    dz = domain_size[1] / nz

    vx = np.full((nz, nx + 1), params['current_velocity'])
    Dx = np.full((nz, nx + 1), params['diffusion'])
    Dz = np.full((nz + 1, nx), params['diffusion'])

    dt = stable_time_step(vx, Dx, Dz, dx, dz)
    print(f"Stable timestep: {dt:.5f}s")

    return {'nz': nz, 'nx': nx, 'dx': dx, 'dz': dz, 'vx': vx, 'Dx': Dx, 'Dz': Dz, 'dt': dt, 'P': P0.copy(), 'Z': Z0.copy()}


def lotka_volterra_sources(P, Z, params):
    alpha, beta, delta, gamma, K = params['alpha'], params['beta'], params['delta'], params['gamma'], params['K']
    K = max(K, 1.0e-6)
    P = np.maximum(P, 1e-6)
    Z = np.maximum(Z, 1e-6)

    prey_growth = alpha * P * (1.0 - P / K)
    prey_death = -beta * P * Z
    predator_gain = delta * beta * P * Z
    predator_death = -gamma * Z

    src_prey = prey_growth + prey_death
    src_pred = predator_gain + predator_death

    # Debugging: Print source terms
    print("src_prey:", src_prey)
    print("src_pred:", src_pred)

    # Add optional noise for debugging symmetry
    noise = np.random.normal(0, 0.01, size=src_prey.shape)
    src_prey += noise
    src_pred += noise

    return src_prey, src_pred


def predator_prey_advection_diffusion_step(P, Z, vx, Dx, Dz, params, dt, dx, dz, bc="periodic"):
    src_prey, src_pred = lotka_volterra_sources(P, Z, params)

    P_updated = euler_advection_diffusion_timestep(P, vx, Dx, Dz, src_prey, dt, dx, dz, bc=bc)
    Z_updated = euler_advection_diffusion_timestep(Z, vx, Dx, Dz, src_pred, dt, dx, dz, bc=bc)

    return P_updated, Z_updated


# Test Framework
def test_conservation_of_mass():
    nx, nz = 3, 3
    P0 = np.random.rand(nz, nx)
    Z0 = np.random.rand(nz, nx)
    domain_size = (10, 10)
    grid = create_grid(P0, Z0, domain_size, params)

    P = grid['P'].astype(np.float64)
    Z = grid['Z'].astype(np.float64)

    timesteps = 3
    for t in range(timesteps):
        P, Z = predator_prey_advection_diffusion_step(
            P, Z, grid['vx'], grid['Dx'], grid['Dz'], params, grid['dt'], grid['dx'], grid['dz'], bc="periodic"
        )
        print(f"Timestep {t}: P min={np.min(P)}, P max={np.max(P)}, Z min={np.min(Z)}, Z max={np.max(Z)}")


# Define the parameters dictionary before running the test
params = {
    'alpha': 5.2,         # Prey reproduction rate
    'beta': 3.0,          # Predation rate
    'delta': 6.2,         # Predator reproduction rate
    'gamma': 6.2,         # Predator death rate
    'K': 1.0,             # Carrying capacity
    'current_velocity': 0.1,  # Advection velocity
    'diffusion': 0.01     # Diffusion rate
}

# Run test
test_conservation_of_mass()



Stable timestep: 1.00000s
src_prey: [[-1.27891762  0.38947293 -1.17360746]
 [ 0.92881186  0.5810731   0.04355964]
 [ 0.47839597  0.34411586  0.75418457]]
src_pred: [[ 7.55007786 -1.45039248  7.2684091 ]
 [ 0.85115389 -1.02549314  3.42898831]
 [ 1.49654441 -2.26498297  1.13668946]]
Fx: [[0.         0.02625072 0.00527541 0.        ]
 [0.         0.0170581  0.00421016 0.        ]
 [0.         0.0150758  0.00433028 0.        ]]
Fz: [[0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [8.47205244e-04 1.26641751e-04 3.27815074e-04]
 [1.83819359e-04 3.97080928e-05 5.56908021e-04]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00]]
c before boundary conditions: [[-0.46133873  0.63186837 -0.3839513 ]
 [ 1.4692824   0.77746339  0.73795851]
 [ 0.94773277  0.53321964  1.22735963]]
c after boundary conditions: [[0.77746339 0.77746339 0.77746339]
 [0.77746339 0.77746339 0.77746339]
 [0.77746339 0.77746339 0.77746339]]
Fx: [[0.         0.02540309 0.02294071 0.        ]
 [0.         0.00636357 0.01114014 0. 