In [None]:
!python drawarray.py --image lake.png --max water.txt --min bottom.txt

In [None]:
class FluxLimiter:
    def __init__(self, uminus, ucenter, uplus, flux_limiter):
        self.uminus = uminus
        self.ucenter = ucenter
        self.uplus = uplus
        self.R = [
            (e - c) / (c - w) if c != w else 0
            for w, c, e in zip(self.uminus, self.ucenter, self.uplus)
        ]
        self.flux_limiter = flux_limiter

    @staticmethod
    def superbee(R):
        return [min(max(1, r), 2, 2 * r) for r in R]

    @staticmethod
    def vanLeer(R):
        return [2 * r / (r + 1) for r in R]

    @staticmethod
    def vanAlbada(R):
        return [r ** 3 / (r ** 2 + 1) for r in R]

    def right_interface_left_limit(self):
        return [
            uc + 0.5 * f * (up - um)
            for uc, f, up, um in zip(
                self.ucenter, self.flux_limiter(self.R), self.uplus, self.uminus
            )
        ]

    def left_interface_right_limit(self):
        return [
            uc - 0.5 * f * (up - um)
            for uc, f, up, um in zip(
                self.ucenter, self.flux_limiter(self.R), self.uplus, self.uminus
            )
        ]


class RiemannSolver:
    @staticmethod
    def inv(value, eps=1e-2):
        if abs(value) > eps:
            return 1.0 / value
        else:
            return 0

    @staticmethod
    def intercell_flux_lax_friedrichs(F, U_L, U_R, S_L, S_R):
        return [
            0.5 * (ful + fur) - 0.5 * s * (ur - ul)
            for s, ul, ur, ful, fur in zip(
                max(abs(S_L), abs(S_R)), U_L, U_R, F(U_L), F(U_R)
            )
        ]

    @staticmethod
    def intercell_flux_hll(F, U_L, U_R, S_L, S_R):
        if 0 <= S_L:
            return F(U_L)
        elif S_L <= 0 <= S_R:
            return [
                (S_R * ful - S_L * fur + S_L * S_R * (ur - ul))
                * RiemannSolver.inv(S_R - S_L)
                for ul, ur, ful, fur in zip(U_L, U_R, F(U_L), F(U_R))
            ]
        elif S_R <= 0:
            return F(U_R)
        else:
            raise ValueError()

    @staticmethod
    def intercell_flux(F, U_L, U_R, S_L, S_R):
        return RiemannSolver.intercell_flux_hll(F, U_L, U_R, S_L, S_R)

   These are not shallow water equations, but let's try to solve them anyway:

   $$
   \begin{align}
       w &= b + h\\
       \partial_t(w) &= \partial_t(b) + \partial_t(h) = \partial_t(h)\\
       \partial_t(h) + \partial_0(h u_0) &= 0\\
       \partial_t(h u_0) + \partial_0(h u_0 u_0) + \partial_0(\frac{1}{2} g h^2 + ghb) &= 0\\
       \partial_t(g) &= 0\\
       \partial_t(b) &= 0
   \end{align}
   $$

   $$
   Q =
   \left[\begin{matrix}
   b\\
   w\\
   (w - b) u_0\\
   g\\
   \end{matrix}\right]\\
   $$

   $$
   \begin{array}{lllllllll}
   +\partial_t&[
       &Q_0
       &Q_1
       &Q_2
       &Q_3
   &]^\top\\
   +\partial_0&[
       &0
       &Q_2
       &Q_2 Q_2 (Q_1 - Q_0)^{-1} + \frac{1}{2}Q_3 (Q_1 - Q_0)^2 + Q_3 (Q_1 - Q_0) Q_0
       &0
   &]^\top\\
   \end{array}\\
   = \partial_t Q + \partial_0 F(Q) = 0\\
   $$

 $$
 \partial_Q F
 =
\left(
                  \begin{array}{cccc}
                   0 & 0 & 0 & 0 \\
                   \frac{Q_1 Q_2}{\left(Q_0-Q_1\right){}^2} & -\frac{Q_0 Q_2}{\left(Q_0-Q_1\right){}^2} &
                     \frac{Q_1}{Q_1-Q_0} & 0 \\
                   \frac{Q_2{}^2}{\left(Q_0-Q_1\right){}^2}-Q_0 Q_3 & Q_1 Q_3-\frac{Q_2{}^2}{\left(Q_0-Q_1\right){}^2} &
                     \frac{2 Q_2}{Q_1-Q_0} & \frac{1}{2} \left(Q_1{}^2-Q_0{}^2\right) \\
                   0 & 0 & 0 & 0 \\
                  \end{array}
                  \right)
 $$

 $$
\left(
                  \begin{array}{cccc}
                   0 & 0 & \frac{-3 Q_0 Q_2+2 Q_1 Q_2-\sqrt{4 Q_3 Q_1{}^5-12 Q_0 Q_3 Q_1{}^4+12 Q_0{}^2 Q_3 Q_1{}^3-4
                     Q_0{}^3 Q_3 Q_1{}^2+Q_0{}^2 Q_2{}^2}}{2 \left(Q_0-Q_1\right){}^2} & \frac{-3 Q_0 Q_2+2 Q_1
                     Q_2+\sqrt{4 Q_3 Q_1{}^5-12 Q_0 Q_3 Q_1{}^4+12 Q_0{}^2 Q_3 Q_1{}^3-4 Q_0{}^3 Q_3 Q_1{}^2+Q_0{}^2
                     Q_2{}^2}}{2 \left(Q_0-Q_1\right){}^2} \\
                   \left\{\frac{Q_0 \left(Q_0{}^3-Q_1 Q_0{}^2-Q_1{}^2 Q_0+Q_1{}^3\right)}{2 \left(-Q_3 Q_0{}^3+Q_1 Q_3
                     Q_0{}^2+Q_1{}^2 Q_3 Q_0+Q_2{}^2-Q_1{}^3 Q_3\right)},-\frac{Q_1{}^4-Q_0 Q_1{}^3-Q_0{}^2
                     Q_1{}^2+Q_0{}^3 Q_1}{2 \left(Q_3 Q_0{}^3-Q_1 Q_3 Q_0{}^2-Q_1{}^2 Q_3 Q_0-Q_2{}^2+Q_1{}^3
                     Q_3\right)},0,1\right\} & \left\{\frac{Q_3 Q_1{}^4-2 Q_0 Q_3 Q_1{}^3+Q_0{}^2 Q_3 Q_1{}^2-Q_2{}^2
                     Q_1+2 Q_0 Q_2{}^2}{Q_2 \left(-Q_3 Q_0{}^3+Q_1 Q_3 Q_0{}^2+Q_1{}^2 Q_3 Q_0+Q_2{}^2-Q_1{}^3
                     Q_3\right)},-\frac{-Q_1 Q_3 Q_0{}^3+2 Q_1{}^2 Q_3 Q_0{}^2-Q_1{}^3 Q_3 Q_0-Q_1 Q_2{}^2}{Q_2
                     \left(-Q_3 Q_0{}^3+Q_1 Q_3 Q_0{}^2+Q_1{}^2 Q_3 Q_0+Q_2{}^2-Q_1{}^3 Q_3\right)},1,0\right\} &
                     \left\{0,-\frac{Q_0 Q_2-2 Q_1 Q_2-\sqrt{4 Q_3 Q_1{}^5-12 Q_0 Q_3 Q_1{}^4+12 Q_0{}^2 Q_3 Q_1{}^3-4
                     Q_0{}^3 Q_3 Q_1{}^2+Q_0{}^2 Q_2{}^2}}{2 \left(-Q_3 Q_1{}^3+2 Q_0 Q_3 Q_1{}^2-Q_0{}^2 Q_3
                     Q_1+Q_2{}^2\right)},1,0\right\} & \left\{0,-\frac{Q_0 Q_2-2 Q_1 Q_2+\sqrt{4 Q_3 Q_1{}^5-12 Q_0 Q_3
                     Q_1{}^4+12 Q_0{}^2 Q_3 Q_1{}^3-4 Q_0{}^3 Q_3 Q_1{}^2+Q_0{}^2 Q_2{}^2}}{2 \left(-Q_3 Q_1{}^3+2 Q_0
                     Q_3 Q_1{}^2-Q_0{}^2 Q_3 Q_1+Q_2{}^2\right)},1,0\right\} \\
                  \end{array}
                  \right)
 $$

In [None]:
import math


class SWE:
    def __init__(
        self, *, grid, bottom, water, velocity, gravity: float, friction: float
    ):
        self.grid = grid
        self.q0 = bottom
        self.q1 = water
        self.q2 = [(w - b) * v for w, b, v in zip(water, bottom, velocity)]
        self.q3 = [gravity] * len(grid)

    def __getitem__(self, p):
        if p == -1:
            q = self[p + 1]
            q[2] *= -1
            return q
        elif p == len(self.grid):
            q = self[p - 1]
            q[2] *= -1
            return q
        else:
            return [
                self.q0[p],
                self.q1[p],
                self.q2[p],
                self.q3[p],
            ]

    def update(self, qs):
        for i, q in enumerate(qs):
            self.q1[i] = q[1]
            self.q2[i] = q[2]

    @staticmethod
    def F(q):
        return [
            0,
            q[2],
            q[2] * q[2] * SWE.inv(q[1] - q[0])
            + 0.5 * q[3] * (q[1] - q[0]) ** 2
            + q[3] * (q[1] - q[0]) * q[0],
            0,
        ]

    @staticmethod
    def inv(value, eps=1e-2):
        if abs(value) > eps:
            return 1.0 / value
        else:
            return 0

    @staticmethod
    def dF_eigenvalues(q):
        yield 0
        yield 0
        yield 0
        yield 0
        try:
            yield (
                -3 * q[0] * q[2]
                + 2 * q[1] * q[2]
                - math.sqrt(
                    4 * q[3] * q[1] ** 5
                    - 12 * q[0] * q[3] * q[1] ** 4
                    + 12 * q[0] ** 2 * q[3] * q[1] ** 3
                    - 4 * q[0] ** 3 * q[3] * q[1] ** 2
                    + q[0] ** 2 * q[2] ** 2
                )
            ) * SWE.inv(2 * (q[0] - q[1]) ** 2)
        except ValueError as e:
            pass
        try:
            yield (
                -3 * q[0] * q[2]
                + 2 * q[1] * q[2]
                + math.sqrt(
                    4 * q[3] * q[1] ** 5
                    - 12 * q[0] * q[3] * q[1] ** 4
                    + 12 * q[0] ** 2 * q[3] * q[1] ** 3
                    - 4 * q[0] ** 3 * q[3] * q[1] ** 2
                    + q[0] ** 2 * q[2] ** 2
                )
            ) * SWE.inv(2 * (q[0] - q[1]) ** 2)
        except ValueError as e:
            pass

    @staticmethod
    def nonnegative_ql(ql, q, qr):
        if qr[1] < qr[0] or ql[1] < ql[0]:
            return [ql[0], q[0] + ql[1] - ql[0], ql[2], ql[3]]
        else:
            return ql

    @staticmethod
    def nonnegative_qr(ql, q, qr):
        if qr[1] < qr[0] or ql[1] < ql[0]:
            return [qr[0], qr[0] + q[1] - q[0], qr[2], qr[3]]
        else:
            return qr

In [None]:
import math


def inv(value, eps=1e-2):
    if abs(value) > eps:
        return 1.0 / value
    else:
        return 0


def get_flux(swe, I_L_in, C, I_R_in):
    I_L = swe.nonnegative_ql(I_L_in, C, I_R_in)
    I_R = swe.nonnegative_qr(I_L_in, C, I_R_in)

    S_L = min(SWE.dF_eigenvalues(I_L))
    S_R = max(SWE.dF_eigenvalues(I_R))

    try:
        hl = I_L[1] - I_L[0]
        hr = I_R[1] - I_R[0]
        hroe = 0.5 * (hl + hr)
        vl = I_L[2] * inv(hl)
        vr = I_R[2] * inv(hr)
        vroe = (vl * math.sqrt(hl) + vr * math.sqrt(hr)) * inv(
            math.sqrt(hl) + math.sqrt(hr)
        )
        ROE_S_L = vroe - math.sqrt(I_L[2] * hroe)
        S_L = min(S_L, ROE_S_L)
    except ValueError as e:
        pass

    try:
        hl = I_L[1] - I_L[0]
        hr = I_R[1] - I_R[0]
        hroe = 0.5 * (hl + hr)
        vl = I_L[2] * inv(hl)
        vr = I_R[2] * inv(hr)
        vroe = (vl * math.sqrt(hl) + vr * math.sqrt(hr)) * inv(
            math.sqrt(hl) + math.sqrt(hr)
        )
        ROE_S_R = vroe + math.sqrt(I_R[2] * hroe)
        S_R = max(S_R, ROE_S_R)
    except ValueError as e:
        pass

    return RiemannSolver.intercell_flux(swe.F, I_L, I_R, S_L, S_R)


def evolution(dt, swe: SWE, p: int, flux_limiter):
    dx = swe.grid[p] - swe.grid[p - 1] if p > 0 else swe.grid[p + 1] - swe.grid[p]

    # left interface
    I_L = swe[p - 1]
    I_R = swe[p]
    F_L_STAR = get_flux(swe, I_L, swe[p], I_R)

    # right interface
    I_R = swe[p + 1]
    I_L = swe[p]
    F_R_STAR = get_flux(swe, I_L, swe[p], I_R)

    # Lie-Trotter operator splitting
    # surface / volume
    u = swe[p]
    u = [
        ui - dt / dx * (frstar - flstar)
        for ui, flstar, frstar in zip(u, F_L_STAR, F_R_STAR)
    ]
    return u

In [None]:
import numpy
import scipy.signal


def frames():
    bottom = numpy.loadtxt("data/bottom.txt")
    water = numpy.loadtxt("data/water.txt")
    bottom = scipy.signal.resample(bottom, 31)
    water = scipy.signal.resample(water, 31)

    factor = 0.1 / max(max(bottom), max(water))
    bottom *= factor
    water *= factor
    time = 0
    flux_limiter = FluxLimiter.superbee

    nofcells = bottom.shape[0]

    width = 0.1
    grid = numpy.linspace(0, width, nofcells)
    dx = grid[1] - grid[0]

    velocity = numpy.full_like(bottom, 1e-2)

    swe = SWE(
        grid=grid,
        bottom=bottom,
        water=water,
        velocity=velocity,
        gravity=9.82,
        friction=10,
    )

    for i in range(100):
        if i == 0:
            yield {
                "time": time,
                "grid": swe.grid,
                "bottom": [swe[p][0] for p in range(nofcells)],
                "water": [swe[p][1] for p in range(nofcells)],
                "depth": [swe[p][1] - swe[p][0] for p in range(nofcells)],
                "velocity": [
                    swe[p][2] / (swe[p][1] - swe[p][0]) for p in range(nofcells)
                ],
            }
        updated_qs = list()

        s = max(
            [
                abs(v)
                for cellno in range(nofcells)
                for v in SWE.dF_eigenvalues(swe[cellno])
            ]
        )
        dt = 0.1 * dx / s if s else 1e-3

        for p in range(nofcells):
            updated_qs.append(evolution(dt, swe, p, flux_limiter))
        swe.update(updated_qs)
        time += dt

        yield {
            "time": time,
            "grid": swe.grid,
            "bottom": [swe[p][0] for p in range(nofcells)],
            "water": [swe[p][1] for p in range(nofcells)],
            "depth": [swe[p][1] - swe[p][0] for p in range(nofcells)],
            "velocity": [swe[p][2] / (swe[p][1] - swe[p][0]) for p in range(nofcells)],
        }


import IPython.display

In [None]:
import matplotlib
import matplotlib.animation
import matplotlib.pyplot

fig, axs = matplotlib.pyplot.subplots(1, 4, figsize=(6 * 4, 4))


def update(frame):
    time = frame.pop("time")
    fig.suptitle(f"Shallow Water Equations {time}")
    for ax in axs:
        ax.cla()
    for ax in axs:
        ax.grid()
    axs[0].set(xlabel="position [m]", ylabel="height [m]", title=f"Depth Profile")
    axs[1].set(
        xlabel="position [m]",
        ylabel="velocity [m/s]",
        title=f"Velocity Profile",
    )
    axs[2].set(
        xlabel="position [m]",
        ylabel="position [m]",
        title=f"Water Column Height",
    )

    grid = frame.pop("grid")
    for key, value in frame.items():
        if key == "water":
            axs[0].step(grid, value, color="blue", label=key)
        elif key == "bottom":
            axs[0].step(grid, value, color="brown", label=key)
        elif key == "velocity":
            axs[1].step(grid, value, label=key)
        elif key == "depth":
            axs[2].step(grid, value, color="blue", label=key)
        else:
            axs[3].step(grid, value, label=key)
    axs[0].legend()
    axs[1].legend()
    axs[2].legend()
    axs[3].legend()
    # fig.tight_layout()


animation = matplotlib.animation.FuncAnimation(
    fig, update, frames=frames, interval=100, blit=False, repeat=False
)

IPython.display.HTML(animation.to_jshtml())