In [3]:
# wave_stepper_dataset.py
from fenics import *
import numpy as np
from scipy.interpolate import griddata
from tqdm import tqdm

# ----------------------- Parameters -----------------------
T_final     = 1.0
num_steps   = 20                    # keep small initially (20 or 50)
dt          = T_final / num_steps
grid_size   = 32                    # output grid H=W=32 (stable starter)
c2          = 1.0                   # wave speed squared
modes_u0    = ["gaussian", "multi", "fourier"]
modes_f     = ["pulse", "traveling", "multi"]
train_sample_per_mode = 32  
test_sample_per_mode = 3   # Number of test simulations per (u0-mode, f-mode) 
origin_meta = "lower"
layout_meta = "THW"                 # we save sequences as (T+1, H, W)

# ----------------------- FEniCS setup ---------------------
# FEM mesh (can be finer than output grid)
nx = ny = grid_size
mesh = UnitSquareMesh(nx, ny)
V    = FunctionSpace(mesh, "Lagrange", 1)

u = TrialFunction(V)
v = TestFunction(V)

# Mass and stiffness
M = assemble(u*v*dx)
K = assemble(dot(grad(u), grad(v))*dx)  # Laplacian weak form
bc = DirichletBC(V, Constant(0.0), "on_boundary")

# FEM dof coordinates (for interpolation)
dof_coords = V.tabulate_dof_coordinates()  # shape (num_dofs, 2)

# Regular grid for output (origin lower-left)
xv = np.linspace(0, 1, grid_size)
yv = np.linspace(0, 1, grid_size)
xx, yy = np.meshgrid(xv, yv, indexing="xy")
grid_points = np.stack([xx.ravel(), yy.ravel()], axis=-1)

# -------------------- Helpers: IC & source ----------------
def expr_gaussian(A, x0, y0, sigma):
    return Expression(
        "A*exp(-(pow(x[0]-x0,2)+pow(x[1]-y0,2))/(2*pow(sigma,2)))",
        degree=3, A=A, x0=x0, y0=y0, sigma=sigma)

def generate_u0(mode, seed=None):
    rng = np.random.default_rng(seed)
    if mode == "gaussian":
        A = rng.uniform(-1.0, 1.0)
        x0, y0 = rng.random(2)
        sigma  = rng.uniform(0.06, 0.18)
        expr = expr_gaussian(A, x0, y0, sigma)
        return interpolate(expr, V)
    elif mode == "multi":
        parts = []
        k = rng.integers(2, 4)
        for _ in range(k):
            A = rng.uniform(-1.0, 1.0)
            x0, y0 = rng.random(2)
            sigma  = rng.uniform(0.05, 0.18)
            parts.append(f"{A}*exp(-(pow(x[0]-{x0},2)+pow(x[1]-{y0},2))/(2*pow({sigma},2)))")
        expr = Expression(" + ".join(parts), degree=3)
        return interpolate(expr, V)
    else: # "fourier"
        m = rng.integers(1, 4); n = rng.integers(1, 4)
        A = rng.uniform(-1.0, 1.0)
        phi, psi = rng.uniform(0, 2*np.pi, 2)
        expr = Expression("A*sin(m*pi*x[0]+phi)*sin(n*pi*x[1]+psi)",
                          degree=3, A=A, m=m, n=n, phi=phi, psi=psi)
        return interpolate(expr, V)

def generate_f_seq(mode, t_array, seed=None):
    rng = np.random.default_rng(seed)
    f_list = []
    if mode == "pulse":
        A = rng.uniform(-1.0, 1.0); x0, y0 = rng.random(2)
        sigma = rng.uniform(0.05, 0.2); omega = rng.uniform(3, 10)*np.pi
        for t in t_array:
            expr = Expression(
                "A*exp(-(pow(x[0]-x0,2)+pow(x[1]-y0,2))/(2*pow(sigma,2)))*sin(omega*t)",
                degree=3, A=A, x0=x0, y0=y0, sigma=sigma, omega=omega, t=t
            )
            f_list.append(interpolate(expr, V).vector().get_local())
    elif mode == "traveling":
        A = rng.uniform(-1.0, 1.0); x0, y0 = rng.random(2)
        vx, vy = rng.uniform(-0.4, 0.4, 2); sigma = rng.uniform(0.06, 0.2)
        for t in t_array:
            expr = Expression(
                "A*exp(-(pow(x[0]-(x0+vx*t),2)+pow(x[1]-(y0+vy*t),2))/(2*pow(sigma,2)))",
                degree=3, A=A, x0=x0, y0=y0, vx=vx, vy=vy, sigma=sigma, t=t)

            f_list.append(interpolate(expr, V).vector().get_local())
    else: # "multi"
        for t in t_array:
            terms = []
            for _ in range(2):
                A = rng.uniform(-1.0, 1.0); x0, y0 = rng.random(2)
                sigma = rng.uniform(0.05, 0.18); omega = rng.uniform(3, 10)*np.pi
                terms.append(f"{A}*exp(-(pow(x[0]-{x0},2)+pow(x[1]-{y0},2))/(2*pow({sigma},2)))*sin({omega}*{t})")
            expr = Expression(" + ".join(terms), degree=3)
            f_list.append(interpolate(expr, V).vector().get_local())
    return np.stack(f_list, axis=0)  # (T+1, num_dofs)

# -------------------- Simulation --------------------
def simulate_sequence(u0_fn, f_dofs_seq, dt, num_steps, c2=1.0):
    """
    u0_fn:   FEniCS Function in V for u^0
    f_dofs_seq: ndarray, shape (T+1, num_dofs) with f^n on V dofs
    returns u_dofs_seq: (T+1, num_dofs)
    """
    T_plus_1, num_dofs = f_dofs_seq.shape
    assert T_plus_1 == num_steps + 1

    # Prepare vectors
    u_nm1 = Function(V)   # u^{-1}
    u_n   = Function(V)   # u^{0}
    u_np1 = Function(V)

    # Set u^0
    u_n.assign(u0_fn)

    # Compute initial acceleration a0 from: M a0 = -c^2 K u^0 + F^0
    a0 = Function(V)

    # Build F^0 vector from the source at t=0
    f0 = Function(V)
    f0.vector()[:] = f_dofs_seq[0]             
    F0 = assemble(f0 * v * dx)                 

    # Right-hand side: -c^2 K u^0 + F^0  
    rhs0 = -c2 * (K * u_n.vector())          # K*u^0 : a Vector
    rhs0.axpy(1.0, F0)                        # rhs0 += F0 

    # Solve M a0 = rhs0
    solve(M, a0.vector(), rhs0)

    # Set u^{-1} consistent with central differences and v0 = 0:
    # u^{-1} = u^0 + 0.5 * dt^2 * a0
    u_nm1.assign(u_n)                          # start from u^0
    u_nm1.vector().axpy(0.5 * dt**2, a0.vector())   

    # Preassemble implicit operator  
    A = M + dt**2 * c2 * K

    # Store solution dofs
    u_seq = np.zeros((T_plus_1, num_dofs))
    u_seq[0, :] = u_n.vector().get_local()

    # March in time
    t_idx = 0
    for n in range(1, num_steps + 1):
        # f^n
        f_t = Function(V); f_t.vector()[:] = f_dofs_seq[n]

        # RHS: M(2 u^n - u^{n-1}) + dt^2 * F^n
        b = M * (2*u_n.vector() - u_nm1.vector()) + dt**2 * assemble(f_t * v * dx)
        bc.apply(A, b)

        solve(A, u_np1.vector(), b)

        # store and shift
        u_seq[n, :] = u_np1.vector().get_local()
        u_nm1.assign(u_n)
        u_n.assign(u_np1)
        t_idx += 1

    return u_seq  # (T+1, num_dofs)

# -------------------- Interpolation to regular grid --------------------
def to_grid(dof_values):
    """Map V-dofs → (H,W) using cubic interpolation (origin lower)."""
    return griddata(dof_coords, dof_values, grid_points, method="cubic").reshape(grid_size, grid_size)

def to_grid_seq(dofs_seq):  # (T+1, num_dofs) → (T+1, H, W)
    return np.stack([to_grid(dofs_seq[n]) for n in range(dofs_seq.shape[0])], axis=0)

# -------------------- Stepper pair builder ------------------
def make_stepper_pairs(u_seq, f_seq):
    """
    u_seq, f_seq: (T+1, H, W)
    returns X: (N_t, 3, H, W), Y: (N_t, H, W) with n=1..T-1
    """
    T_plus_1, H, W = u_seq.shape
    X, Y = [], []
    for n in range(1, T_plus_1 - 1):
        X.append(np.stack([u_seq[n-1], u_seq[n], f_seq[n]], axis=0))
        Y.append(u_seq[n+1])
    return np.stack(X), np.stack(Y)

# -------------------- Generate one sample ------------------------------
def generate_one(mode_u0, mode_f, seed_ic=None, seed_src=None):
    t_array = np.linspace(0.0, T_final, num_steps + 1)
    u0_fn   = generate_u0(mode_u0, seed_ic)
    f_dofs  = generate_f_seq(mode_f, t_array, seed_src)             # (T+1, dofs)
    u_dofs  = simulate_sequence(u0_fn, f_dofs, dt, num_steps, c2=c2)

    # To grid, layout THW
    u_seq = to_grid_seq(u_dofs)   # (T+1,H,W)
    f_seq = to_grid_seq(f_dofs)   # (T+1,H,W)
    u0    = u_seq[0]              # (H,W)

    return u0, f_seq, u_seq

[debian:10809] mca_base_component_repository_open: unable to open mca_btl_openib: librdmacm.so.1: cannot open shared object file: No such file or directory (ignored)


In [4]:
set_log_level(LogLevel.ERROR)
# Small set  
train_list = []
for mu in tqdm(modes_u0, desc="Train IC modes"):
    for mf in modes_f:
        for k in range(train_sample_per_mode):  # Total: 3*3*train_sample_per_mode samples
            u0, f_seq, u_seq = generate_one(mu, mf, seed_ic=1000+k, seed_src=2000+k)
            train_list.append((u0, f_seq, u_seq))

# Stack & save (THW layout + metadata)
u0_train  = np.stack([s[0] for s in train_list])            # (B,H,W)
f_train   = np.stack([s[1] for s in train_list])            # (B,T+1,H,W)
u_train   = np.stack([s[2] for s in train_list])            # (B,T+1,H,W)

np.savez_compressed(
    "train_wave_stepper.npz",
    u0=u0_train, f=f_train, u=u_train,
    layout=layout_meta, origin=origin_meta,
    T_plus_1=num_steps+1, dt=dt, c2=c2
)

# stepper pairs 
X_all, Y_all = [], []
for _, f_seq, u_seq in train_list:
    X, Y = make_stepper_pairs(u_seq, f_seq)
    X_all.append(X); Y_all.append(Y)
X_all = np.concatenate(X_all, axis=0)   # (N_total,3,H,W)
Y_all = np.concatenate(Y_all, axis=0)   # (N_total,H,W)
np.savez_compressed("train_wave_step_pairs.npz", X=X_all, Y=Y_all,
                    layout="CHW", origin=origin_meta, dt=dt, c2=c2)
print("Saved 'train_wave_stepper.npz' and 'train_wave_step_pairs.npz'")


print(f"Train (full seq) shapes → u0: {u0_train.shape}, f: {f_train.shape}, u: {u_train.shape}")
print(f"Train (step pairs) shapes → X: {X_all.shape}, Y: {Y_all.shape}")

Train IC modes: 100%|██████████| 3/3 [03:19<00:00, 66.34s/it]


Saved 'train_wave_stepper.npz' and 'train_wave_step_pairs.npz'
Train (full seq) shapes → u0: (288, 32, 32), f: (288, 21, 32, 32), u: (288, 21, 32, 32)
Train (step pairs) shapes → X: (5472, 3, 32, 32), Y: (5472, 32, 32)


## Test data 

In [5]:
# === Deterministic TEST set generation ===

# Fixed bases seeds
BASE_IC_SEED  = 4242
BASE_SRC_SEED = 9001

def deterministic_seeds(mode_u, mode_f, k, modes_u0, modes_f):
    """Return (seed_ic, seed_src) deterministically for the combo and repeat k."""
    iu = modes_u0.index(mode_u)  # 0..len-1
    jf = modes_f.index(mode_f)
    # Construct integer seeds with a simple formula (no Python hash)
    seed_ic  = BASE_IC_SEED  + 1000*iu + 100*jf + k
    seed_src = BASE_SRC_SEED + 2000*iu + 200*jf + k
    return seed_ic, seed_src

# Build the deterministic test samples
test_list = []
print("Generating deterministic TEST set...")
for mu in tqdm(modes_u0, desc="Test IC modes"):
    for mf in modes_f:
        for k in range(test_sample_per_mode):
            seed_ic, seed_src = deterministic_seeds(mu, mf, k, modes_u0, modes_f)
            u0, f_seq, u_seq = generate_one(mu, mf, seed_ic=seed_ic, seed_src=seed_src)
            test_list.append((u0, f_seq, u_seq))

# Stack and save (THW layout: (T+1, H, W))
u0_test = np.stack([s[0] for s in test_list])         # (B,H,W)
f_test  = np.stack([s[1] for s in test_list])         # (B,T+1,H,W)
u_test  = np.stack([s[2] for s in test_list])         # (B,T+1,H,W)

np.savez_compressed(
    "test_wave_stepper.npz",
    u0=u0_test, f=f_test, u=u_test,
    layout=layout_meta, origin=origin_meta,
    T_plus_1=num_steps+1, dt=dt, c2=c2, H=grid_size, W=grid_size
)

# Stepper pairs for training/eval
X_all, Y_all = [], []
for _, f_seq, u_seq in test_list:
    X, Y = make_stepper_pairs(u_seq, f_seq)           # X: (Nt,3,H,W), Y:(Nt,H,W)
    X_all.append(X); Y_all.append(Y)
X_all = np.concatenate(X_all, axis=0)
Y_all = np.concatenate(Y_all, axis=0)

np.savez_compressed(
    "test_wave_step_pairs.npz",
    X=X_all, Y=Y_all,
    layout="CHW", origin=origin_meta, dt=dt, c2=c2, H=grid_size, W=grid_size
)

print("Saved 'test_wave_stepper.npz' and 'test_wave_step_pairs.npz'")
print(f"Test (full seq) shapes → u0: {u0_test.shape}, f: {f_test.shape}, u: {u_test.shape}")
print(f"Test (step pairs) shapes → X: {X_all.shape}, Y: {Y_all.shape}")

Generating deterministic TEST set...


Test IC modes: 100%|██████████| 3/3 [00:14<00:00,  4.89s/it]


Saved 'test_wave_stepper.npz' and 'test_wave_step_pairs.npz'
Test (full seq) shapes → u0: (27, 32, 32), f: (27, 21, 32, 32), u: (27, 21, 32, 32)
Test (step pairs) shapes → X: (513, 3, 32, 32), Y: (513, 32, 32)
