In [None]:
# Import packages
import os
import numpy as np
import matplotlib.pyplot as plt
from firedrake import *
from tqdm.auto import tqdm
from numpy.random import default_rng
import matplotlib.pyplot as plt
from movement import *
from firedrake.meshadapt import *

In [None]:
# Generate random fields for para
# You need to validate that 
def random_field(V, N: int = 1, m: int = 5, σ: float = 0.6, seed: int = 2023):
    rng = default_rng(seed)
    x, y = SpatialCoordinate(V.mesh())
    fields = []
    for _ in tqdm(range(N), disable=False):
        r = 0
        for _ in range(m):
            a, b = rng.standard_normal(2)
            k1, k2 = rng.normal(0, σ, 2)
            θ = 2 * np.pi * (k1 * x + k2 * y)
            r += Constant(a/2) * cos(θ) + Constant(b/2) * sin(θ) + Constant(1)
        fields.append(Function(V).interpolate(sqrt(1 / m) * r))
    return fields

In [None]:
# Define the function space
nx, ny, Lx, Ly = 20, 20, 1., 1.
mesh = RectangleMesh(nx, ny, Lx, Ly)
V = FunctionSpace(mesh, "CG", 1)

In [None]:
#visualize initial mesh
fig, axes = plt.subplots(figsize=(12, 6))
triplot(mesh, axes=axes)
axes.legend()
plt.show()

In [None]:
# Generate random field data for 10 examples
ks = random_field(V, N=10) 

# Visualize the 10 training data in 5x2 subplots
fig, axs = plt.subplots(5, 2, figsize=(15, 15))

# Plot each random field in a separate subplot
for i in range(5):
    for j in range(2):
        index = 2*i + j
        k_plot = tricontourf(ks[index], axes=axs[i, j])
        plt.colorbar(k_plot, ax=axs[i, j])
        axs[i, j].set_title(f'k Example {index + 1}')
plt.tight_layout()
plt.show()

Here we start to generate 1 data to have experiment and test

This part will furher used to generate training data/ test data for the cnn

here we will generate parameters and solve the variational problems with firedrake.
You can also try to modify other physics driven problems.

In [None]:
ks = random_field(V, N=1)
us = []
v = TestFunction(V)
x, y = SpatialCoordinate(mesh)
f = Function(V).interpolate(sin(np.pi * x) * sin(np.pi * y) + 2 * np.pi**2 * cos(np.pi * x) * cos(np.pi * y))  # Source term
g = Function(V).interpolate(cos(np.pi * x) * cos(np.pi * y))  # Boundary condition
bcs = [DirichletBC(V, g, "on_boundary")]
for k in tqdm(ks):
    u = Function(V)
    F = (inner(grad(u), grad(v)) - inner(f, v)) * dx
    solve(F == 0, u, bcs=bcs, solver_parameters={'ksp_type': 'preonly', 'pc_type': 'lu'})
    us.append(u)

In [None]:
# Plot u
u_plot = tricontourf(us[0])
plt.colorbar(u_plot)
plt.show()

Here we will use monge-ampere movement(r adaptive) technique to do mesh refinement

You need to download checkpoints for movement first: https://github.com/pyroteus/movement.git

In [None]:
for u in tqdm(us):
    
    def pde_monitor(mesh):

        # Define the Riemannian metric
        P1_tensor = TensorFunctionSpace(mesh, "CG", 1)
        metric = RiemannianMetric(P1_tensor)

        # Compute the Hessian of the current solution
        hessian = metric.compute_hessian(u)

        P1 = FunctionSpace(mesh, "CG", 1)
        
        beta = Constant(0.1)
        Hnorm = Function(P1).interpolate(sqrt(inner(hessian, hessian)))
        Hnorm_max = Hnorm.dat.data[:].max()                                 
        monitor = Function(P1).interpolate(1 + beta * Hnorm/Hnorm_max)
        
        return monitor
    mover = MongeAmpereMover(mesh, pde_monitor, method="quasi_newton")
    mover.move()

In [None]:
# Generate observations by adding some noise on u
scale_noise: float = 2*1e-4
seed: int = 1234
np.random.seed(seed)
us_obs = []
for u in tqdm(us):
    u_obs = Function(V).assign(u)
    u_obs.dat.data[:] += scale_noise * np.random.randn(V.dim())
    us_obs.append(u_obs)

In [None]:
# Visualize one of the training data
fig, axs = plt.subplots(1, 3, figsize=(15, 5))

# Plot k
k_plot = tricontourf(ks[0], axes=axs[0])
plt.colorbar(k_plot, ax=axs[0])
axs[0].set_title('k')

# Plot u
u_plot = tricontourf(us[0], axes=axs[1])
plt.colorbar(u_plot, ax=axs[1])
axs[1].set_title('u')

# Plot u_obs
u_obs_plot = tricontourf(us_obs[0], axes=axs[2])
plt.colorbar(u_obs_plot, ax=axs[2])
axs[2].set_title('u_obs')

plt.tight_layout()
plt.show()

Please note we are very interested in the curvature and grad of the plot above because we need to make sure that the movement is making the right adjustments(adding high resolution) on the area with high curvature

In [None]:
#visualize adapted mesh
fig, axes = plt.subplots(figsize=(12, 6))
triplot(mesh, axes=axes)
axes.legend()
plt.show()

In [None]:
# you do not need to run this part at the moment
import torch
from torch.utils.data import Dataset
from firedrake.ml.pytorch import *
# Define BatchElement and BatchedElement data classes
from dataclasses import dataclass
from typing import NamedTuple, List, Optional

@dataclass
class BatchElement:
    target: torch.Tensor
    u_obs: torch.Tensor
    target_fd: Function
    u_obs_fd: Function

@dataclass
class BatchedElement:
    target: torch.Tensor
    u_obs: torch.Tensor
    target_fd: List[Function]
    u_obs_fd: List[Function]
    batch_elements: Optional[List[BatchElement]] = None

# Define PDEDataset class
class PDEDataset(Dataset):
    def __init__(self, ks_train, us_obs_train):
        self.data = [(k, u_obs) for k, u_obs in zip(ks_train, us_obs_train)]
    
    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, idx: int) -> BatchElement:
        target_fd, u_obs_fd = self.data[idx]
        # Convert Firedrake functions to PyTorch tensors
        target, u_obs = [to_torch(e) for e in [target_fd, u_obs_fd]]
        return BatchElement(target=target, u_obs=u_obs, target_fd=target_fd, u_obs_fd=u_obs_fd)
    
    def collate(self, batch_elements: List[BatchElement]) -> BatchedElement:
        batch_size = len(batch_elements)
        n = max(e.u_obs.size(-1) for e in batch_elements)
        m = max(e.target.size(-1) for e in batch_elements)

        u_obs = torch.zeros(batch_size, n, dtype=batch_elements[0].u_obs.dtype)
        target = torch.zeros(batch_size, m, dtype=batch_elements[0].target.dtype)
        target_fd = []
        u_obs_fd = []
        for i, e in enumerate(batch_elements):
            u_obs[i, :] = e.u_obs
            target[i, :] = e.target
            target_fd.append(e.target_fd)
            u_obs_fd.append(e.u_obs_fd)

        return BatchedElement(u_obs=u_obs, target=target,
                              target_fd=target_fd, u_obs_fd=u_obs_fd,
                              batch_elements=batch_elements)

# Instantiate PDEDataset
dataset = PDEDataset(ks_train, us_obs_train)

# Get a single sample from the dataset
sample = dataset[0]
print(f"target: {sample.target}\nu_obs: {sample.u_obs}")

# Get a batch from the dataset
batch_size = 4
batch = dataset.collate([dataset[i] for i in range(batch_size)])
print(f"batch u_obs size: {batch.u_obs.size()}\nbatch target size: {batch.target.size()}")
