# **GALERKIN in TIME**

## **Install**

### Firedrake

In [None]:
try:
    !wget "https://fem-on-colab.github.io/releases/firedrake-install-release-real.sh" -O "/tmp/firedrake-install.sh"
    !bash "/tmp/firedrake-install.sh"
    from firedrake import *  # noqa: F401
except:
    from firedrake import *  # noqa: F401

ModuleNotFoundError: No module named 'firedrake'

### Irksome

In [None]:
try:
    !python3 -m pip install --no-dependencies git+https://github.com/firedrakeproject/Irksome.git
    from irksome import *  # noqa: F401
except:
    from irksome import *  # noqa: F401

Python was not found; run without arguments to install from the Microsoft Store, or disable this shortcut from Settings > Apps > Advanced app settings > App execution aliases.


ModuleNotFoundError: No module named 'irksome'

### Other

In [None]:
from pathlib import Path
from typing import Dict, List, Union

## **General functions**

### Time integrals

In [None]:
from ufl.measure import Measure

class DxTime(Measure):
    def __init__(self, time_deg=None, space_deg=None):
        super().__init__("dx_time", domain=None, metadata={})
        self.time_deg = time_deg; self.space_deg = space_deg
        self.quad_rule = TimeQuadratureLabel(time_deg)

    def __call__(self, *, time_deg=None, space_deg=None):
        return DxTime(
            time_deg if time_deg is not None else self.time_deg,
            space_deg if space_deg is not None else self.space_deg,
        )

    def __rmul__(self, ufl_stuff):
        dx_space = dx(self.space_deg) if self.space_deg else dx
        return self.quad_rule(ufl_stuff * dx_space)

dx_time = DxTime()

### Nonlinear Schrödinger: Energy

In [None]:
def nls(
    N:          int = 200,
    deg:        int = 1,
    time_deg:   int = 1,
    Nt:         int = 400,
    dt:         Union[int, float] = 1.0e-3,
    L:          Union[int, float] = 20.0,
    speed:      Union[int, float] = 0.5,
    eta:        Union[int, float] = 1.0,
    x0:         Union[int, float] = 10.0,
    output_dir: str = "output/nls/",
    write_qois: bool = False,
    write_vtk:  bool = False,
) -> Dict[str, List[float]]:
    """
    1D nonlinear Schrödinger equation with standard continuous Petrov-Galerkin (energy-conserving).

    Returns
    - Dictionary: {"time": [...], "mass": [...], "momentum": [...], "energy": [...]}
    """

    # Ensure output directory exists
    out_path = Path(output_dir)
    out_path.mkdir(parents=True, exist_ok=True)

    # Convert parameters to UFL objects
    dt_c = Constant(dt)
    speed_c = Constant(speed)
    eta_c = Constant(eta)
    x0_c = Constant(x0)

    # Mesh and coordinates (interval)
    mesh = IntervalMesh(N, 0.0, L)
    x, = SpatialCoordinate(mesh)

    # Function spaces and functions
    V = FunctionSpace(mesh, "CG", deg)
    Z = V * V
    psi = Function(Z, name="state")
    a, b = split(psi)
    v, w = TestFunctions(Z)

    # Initial condition (soliton)
    amp = eta_c / cosh(eta_c * (x - x0_c))
    phase = speed_c * x
    a_ic = amp * cos(phase); b_ic = amp * sin(phase)
    a.project(a_ic); b.project(b_ic)

    # Continuous Petrov-Galerkin residual
    dx_time_2     = dx_time(time_deg=2*time_deg)
    dx_time_2plus = dx_time(time_deg=2*time_deg+1)
    dx_time_4plus = dx_time(time_deg=4*time_deg+3)
    amp_squared = a**2 + b**2
    F = (
        (  # Real
          - inner(Dt(b), v) * dx_time_2
          - 0.5 * inner(grad(a), grad(v)) * dx_time_2plus
          + inner(amp_squared * a, v) * dx_time_4plus
        )
        (  # Imaginary
            inner(Dt(a), w) * dx_time_2
          - 0.5 * inner(grad(b), grad(w)) * dx_time_2plus
          + inner(amp_squared * b, w) * dx_time_4plus
        )
    )

    # Quantities of interest
    M = amp_squared * dx
    P = (a * b.dx(0) - b * a.dx(0)) * dx
    E = 0.5 * (inner(grad_a, grad_a) + inner(grad_b, grad_b) + amp_squared**2) * dx

    # Time stepper
    t = Constant(0.0)
    bcs = [DirichletBC(Z.sub(i), 0.0, "on_boundary") for i in (0, 1)]
    stepper = GalerkinTimeStepper(F, time_deg, t, dt_c, psi, bcs=bcs)

    # Set up outputs
    if write_qois:
        qoi_path = out_path / "qois.csv"
        with qoi_path.open("w", encoding="utf-8") as f:
            f.write("time,mass,momentum,energy\n")
        t_arr: List[float] = []
        M_arr: List[float] = []
        P_arr: List[float] = []
        E_arr: List[float] = []
    
    if write_vtk:
        vtk = VTKFile(str(out_path / "psi_mag.pvd"))
        a_out, b_out = psi.subfunctions
        a_out.rename("Real part (Re(psi))"); b_out.rename("Imaginary part (Im(psi))")
        vtk.write(a_out, b_out, time=float(t))

    def record_and_log():
        t_out = float(t)
        M_out = float(assemble(M))
        P_out = float(assemble(P))
        E_out = float(assemble(E))
        print(BLUE % f"Time (t): {t_out}")
        print(GREEN % f"\tMass:     {M_out}")
        print(GREEN % f"\tMomentum: {P_out}")
        print(GREEN % f"\tEnergy:   {E_out}")
        if write_qois:
            t_arr.append(t_out)
            M_arr.append(M_out)
            P_arr.append(P_out)
            E_arr.append(E_out)
            with qoi_path.open("a", encoding="utf-8") as f:
                f.write(f"{t_out},{M_out},{P_out},{E_out}\n")

    record_and_log()

    # Time loop
    for _ in range(Nt):
        stepper.advance()
        t.assign(float(t) + float(dt_c))
        if write_vtk: vtk.write(a_out, b_out, time=float(t))
        record_and_log()

    return {"time": t_arr, "mass": M_arr, "momentum": P_arr, "energy": E_arr}

### Incompressible Navier-Stokes: Energy/Helicity

In [None]:
def ns_helicity(
    Re:         Union[int, float] = 2**16,
    N:          int = 8,
    deg:        int = 1,
    time_deg:   int = 1,
    Nt:         int = 48,
    dt:         Union[int, float] = 2**-10,
    output_dir: str = "output/ns_helicity/",
    write_qois: bool = False,
    write_vtk:  bool = False,
) -> Dict[str, List[float]]:
    """
    Energy- and helicity-preserving continuous Petrov-Galerkin Navier-Stokes scheme.

    Returns
    - Dictionary: {"time": [...], "Q1": [...], "Q2": [...]}
    """

    # Ensure output directory exists
    out_path = Path(output_dir)
    out_path.mkdir(parents=True, exist_ok=True)

    # Convert parameters to UFL objects
    Re_c = Constant(Re)
    dt_c = Constant(dt)

    # Mesh and coordinates (periodic box)
    mesh = PeriodicUnitCubeMesh(N, N, N)
    x, y, z = SpatialCoordinate(mesh)

    # Function spaces and functions
    V = VectorFunctionSpace(mesh, "CG", deg + 1)
    Q = FunctionSpace(mesh, "CG", deg)
    Z = V * Q * V * V * Q  # Mixed space: (u, p, utilde, wtilde, theta)
    up = Function(Z, name="state")
    u, p, utilde, wtilde, theta = split(up)
    v, q, vtilde1, vtilde2, eta = TestFunctions(Z)

    # (Discretely) div-free projection
    def project_div_free(target):
        Z_ = V * Q
        up_ = Function(Z_)
        (uh, ph) = split(up_)
        (vh, qh) = TestFunctions(Z_)
        F_ = (
            inner(uh - target, vh) * dx
          - inner(ph, div(vh)) * dx
          - inner(div(uh), qh) * dx
        )
        solve(F_ == 0, up_)
        return up_

    # Initial condition (scaled Hill vortex centred in the box), then div-free projection
    hh = 2 * hill([x - 0.5, y - 0.5, z - 0.5], 0.25)
    up0 = project_div_free(hh)
    u.assign(up0.subfunctions[0])
    p.assign(up0.subfunctions[1])
    utilde.assign(up0.subfunctions[0])
    w0 = project_div_free(curl(hh))
    wtilde.assign(w0.subfunctions[0])
    theta.assign(w0.subfunctions[1])

    # Continuous Petrov-Galerkin residual
    dx_time_2     = dx_time(time_deg=2*time_deg)
    dx_time_2plus = dx_time(time_deg=2*time_deg+1)
    dx_time_3     = dx_time(time_deg=3*time_deg)
    F = (
        (  # Momentum
            inner(Dt(u), v) * dx_time_2
          - inner(cross(utilde, wtilde), v) * dx_time_3
          + 1/Re_c * inner(grad(utilde), grad(v)) * dx_time_2
          - inner(p, div(v)) * dx_time_2
        )
      + (  # Mass
          - inner(div(u), q) * dx_time_2
        )
      + (  # Auxiliary velocity
            inner(utilde, vtilde1) * dx_time_2
          - inner(u, vtilde1) * dx_time_2plus
        )
      + (  # Auxiliary vorticity
            inner(wtilde, vtilde2) * dx_time_2
          - (inner(curl(u), vtilde2) + inner(u, curl(vtilde2))) * dx_time_2plus
          - inner(theta, div(vtilde2)) * dx_time_2
        )
      + (  # Auxiliary vorticity incompressibility
          - inner(div(wtilde), eta) * dx_time_2
        )
    )

    # Quantities of interest
    Q1 = 0.5 * inner(u, u) * dx  # Energy
    Q2 = 0.5 * inner(u, curl(u)) * dx  # Helicity

    # Time integrator
    t = Constant(0.0)
    stepper = GalerkinTimeStepper(F, time_deg, t, dt_c, up, aux_indices=[1,2,3,4])

    # Set up outputs
    if write_qois:
        qoi_path = out_path / "qois.csv"
        with qoi_path.open("w", encoding="utf-8") as f:
            f.write("time,kinetic_energy,helicity\n")
        t_arr: List[float] = []
        Q1_arr: List[float] = []
        Q2_arr: List[float] = []
    
    if write_vtk:
        vtk = VTKFile(str(out_path / "u.pvd"))
        u_out = up.subfunctions[0]
        u_out.rename("Velocity (u)")
        vtk.write(u_out, time=float(t))

    def record_and_log():
        t_out = float(t)
        Q1_out = float(assemble(Q1))
        Q2_out = float(assemble(Q2))
        print(BLUE % f"Time (t): {t_out}")
        print(GREEN % f"\tKinetic energy (Q1): {Q1_out}")
        print(GREEN % f"\tHelicity (Q2):       {Q2_out}")
        if write_qois:
            t_arr.append(t_out)
            Q1_arr.append(Q1_out)
            Q2_arr.append(Q2_out)
            with qoi_path.open("a", encoding="utf-8") as f:
                f.write(f"{t_out},{Q1_out},{Q2_out}\n")

    record_and_log()

    # Time loop
    for _ in range(Nt):
        stepper.advance()
        t.assign(float(t) + float(dt_c))
        if write_vtk: vtk.write(u_out, time=float(t))
        record_and_log()

    return {"time": t_arr, "energy": Q1_arr, "helicity": Q2_arr}