# Tutorial 03 - Hole

In [None]:
import abc
import itertools
import typing

In [None]:
import dolfinx.fem
import gmsh
import multiphenicsx.io
import multiphenicsx.mesh
import numpy as np
import petsc4py
import plotly.graph_objects as go
import ufl

In [None]:
import rbnicsx.backends
import rbnicsx.online
import rbnicsx.test

In [None]:
%load_ext nbvalx

In [None]:
%register_run_if_allowed_tags \
    pod_galerkin_inefficient, pod_galerkin_efficient

In [None]:
%register_run_if_current_tag pod_galerkin_efficient

## 1. Mesh generation

In [None]:
mesh_size = 2e-1

In [None]:
%%run_if pod_galerkin_efficient
subdomains_vertices = [
    [(-1.0, -1.0), (-2.0, -2.0), (1.0, -1.0)],  # subdomain 1
    [(1.0, -1.0), (-2.0, -2.0), (2.0, -2.0)],  # subdomain 2
    [(-1.0, -1.0), (-1.0, 1.0), (-2.0, -2.0)],  # subdomain 3
    [(-1.0, 1.0), (-2.0, 2.0), (-2.0, -2.0)],  # subdomain 4
    [(1.0, -1.0), (2.0, -2.0), (1.0, 1.0)],  # subdomain 5
    [(2.0, 2.0), (1.0, 1.0), (2.0, -2.0)],  # subdomain 6
    [(-1.0, 1.0), (1.0, 1.0), (-2.0, 2.0)],  # subdomain 7
    [(2.0, 2.0), (-2.0, 2.0), (1.0, 1.0)]  # subdomain 8
]

In [None]:
boundaries_vertices = [
    [(-1.0, -1.0), (1.0, -1.0)],  # boundary 1, bottom inner
    [(-1.0, -1.0), (-1.0, 1.0)],  # boundary 2, left inner
    [(-1.0, 1.0), (1.0, 1.0)],  # boundary 3, top inner
    [(1.0, -1.0), (1.0, 1.0)],  # boundary 4, right inner
    [(-2.0, -2.0), (2.0, -2.0)],  # boundary 5, bottom outer
    [(-2.0, -2.0), (-2.0, 2.0)],  # boundary 6, left outer
    [(-2.0, 2.0), (2.0, 2.0)],  # boundary 7, top outer
    [(2.0, -2.0), (2.0, 2.0)]  # boundary 8, right outer
]

In [None]:
gmsh.initialize()
gmsh.model.add("hole")

In [None]:
%%run_if pod_galerkin_efficient
gmsh_points = dict()
for subdomain_vertices in subdomains_vertices:
    for vertex in subdomain_vertices:
        try:
            gmsh_points[vertex]
        except KeyError:
            gmsh_points[vertex] = gmsh.model.geo.addPoint(vertex[0], vertex[1], 0.0, mesh_size)

In [None]:
%%run_if pod_galerkin_inefficient
gmsh_points = dict()
for boundary_vertices in boundaries_vertices:
    for vertex in boundary_vertices:
        try:
            gmsh_points[vertex]
        except KeyError:
            gmsh_points[vertex] = gmsh.model.geo.addPoint(vertex[0], vertex[1], 0.0, mesh_size)

In [None]:
%%run_if pod_galerkin_efficient
gmsh_lines = dict()
gmsh_subdomains = list()
for subdomain_vertices in subdomains_vertices:
    subdomain_lines = list()
    for v in range(3):
        key = (gmsh_points[subdomain_vertices[v]], gmsh_points[subdomain_vertices[(v + 1) % 3]])
        try:
            gmsh_lines[key]
        except KeyError:
            gmsh_lines[key] = gmsh.model.geo.addLine(*key)
            gmsh_lines[key[1], key[0]] = - gmsh_lines[key]
        subdomain_lines.append(gmsh_lines[key])
    subdomain_lines_loop = gmsh.model.geo.addCurveLoop(subdomain_lines)
    gmsh_subdomains.append(gmsh.model.geo.addPlaneSurface([subdomain_lines_loop]))

In [None]:
%%run_if pod_galerkin_inefficient
gmsh_lines = dict()
gmsh_lines_list = list()
gmsh_subdomains = list()
for boundary_vertices in boundaries_vertices:
    key = (gmsh_points[boundary_vertices[0]], gmsh_points[boundary_vertices[1]])
    assert key not in gmsh_lines
    gmsh_lines[key] = gmsh.model.geo.addLine(*key)
    gmsh_lines[key[1], key[0]] = - gmsh_lines[key]
    gmsh_lines_list.append(gmsh_lines[key])
inner_lines_loop = gmsh.model.geo.addCurveLoop(
    [gmsh_lines_list[0], gmsh_lines_list[3], - gmsh_lines_list[2], - gmsh_lines_list[1]])
outer_lines_loop = gmsh.model.geo.addCurveLoop(
    [gmsh_lines_list[4], gmsh_lines_list[7], - gmsh_lines_list[6], - gmsh_lines_list[5]])
gmsh_subdomains.append(gmsh.model.geo.addPlaneSurface([inner_lines_loop, outer_lines_loop]))

In [None]:
%%run_if pod_galerkin_efficient, pod_galerkin_inefficient
gmsh.model.geo.synchronize()
for (label, gmsh_subdomain) in enumerate(gmsh_subdomains):
    gmsh.model.addPhysicalGroup(2, [gmsh_subdomain], label + 1)
for (label, boundary) in enumerate(boundaries_vertices):
    gmsh.model.addPhysicalGroup(1, [gmsh_lines[gmsh_points[boundary[0]], gmsh_points[boundary[1]]]], label + 1)
gmsh.model.mesh.generate(2)

In [None]:
mesh, subdomains, boundaries = multiphenicsx.mesh.gmsh_to_fenicsx(gmsh.model, gdim=2)
gmsh.finalize()

In [None]:
multiphenicsx.io.plot_mesh(mesh)

In [None]:
multiphenicsx.io.plot_mesh_tags(subdomains)

In [None]:
multiphenicsx.io.plot_mesh_tags(boundaries)

## 2. Problem definition

In [None]:
%%run_if pod_galerkin_efficient
class AffineShapeParametrization(rbnicsx.backends.MeshMotion):
    """Deform the domain with an affine shape parametrization."""

    def __init__(self, mu: np.typing.NDArray[np.float64]) -> None:
        # Define function space
        M = dolfinx.fem.VectorFunctionSpace(mesh, ("Lagrange", mesh.geometry.cmap.degree))
        # Interpolate affine shape parametrization expression on a dolfinx Function
        shape_parametrization = dolfinx.fem.Function(M)
        for (label, expression) in enumerate(self.get_shape_parametrization_expression(mu)):
            shape_parametrization.interpolate(expression, subdomains.indices[subdomains.values == label + 1])
        # Initialize mesh motion object
        super().__init__(mesh, shape_parametrization)

    @staticmethod
    def get_shape_parametrization_expression(
        mu: typing.Union[np.typing.NDArray[np.float64], rbnicsx.backends.SymbolicParameters]
    ) -> typing.List[typing.Callable]:
        """Get the expression of the shape parametrization on each domain."""
        # TODO: ufl4rom should compute these based on the vertices mapping
        return [
            lambda x: (
                2 - 2 * mu[0] + mu[0] * x[0] + (2 - 2 * mu[0]) * x[1],
                2 - 2 * mu[1] + (2 - mu[1]) * x[1]
            ),  # subdomain 1
            lambda x: (
                2 * mu[0] - 2 + x[0] + (mu[0] - 1) * x[1],
                2 - 2 * mu[1] + (2 - mu[1]) * x[1]
            ),  # subdomain 2
            lambda x: (
                2 - 2 * mu[0] + (2 - mu[0]) * x[0],
                2 - 2 * mu[1] + (2 - 2 * mu[1]) * x[0] + mu[1] * x[1]
            ),  # subdomain 3
            lambda x: (
                2 - 2 * mu[0] + (2 - mu[0]) * x[0],
                2 * mu[1] - 2 + (mu[1] - 1) * x[0] + x[1]
            ),  # subdomain 4
            lambda x: (
                2 * mu[0] - 2 + (2 - mu[0]) * x[0],
                2 - 2 * mu[1] + (2 * mu[1] - 2) * x[0] + mu[1] * x[1]
            ),  # subdomain 5
            lambda x: (
                2 * mu[0] - 2 + (2 - mu[0]) * x[0],
                2 * mu[1] - 2 + (1 - mu[1]) * x[0] + x[1]
            ),  # subdomain 6
            lambda x: (
                2 - 2 * mu[0] + mu[0] * x[0] + (2 * mu[0] - 2) * x[1],
                2 * mu[1] - 2 + (2 - mu[1]) * x[1]
            ),  # subdomain 7
            lambda x: (
                2 * mu[0] - 2 + x[0] + (1 - mu[0]) * x[1],
                2 * mu[1] - 2 + (2 - mu[1]) * x[1]
            )  # subdomain 8
        ]

In [None]:
%%run_if pod_galerkin_inefficient
class HarmonicExtension(rbnicsx.backends.MeshMotion):
    """Extend the shape parametrization from the boundary to the interior with an harmonic extension."""

    def __init__(self, mu: np.typing.NDArray[np.float64]) -> None:
        # Define function space
        M = dolfinx.fem.VectorFunctionSpace(mesh, ("Lagrange", mesh.geometry.cmap.degree))
        # Define trial and test functions
        m = ufl.TrialFunction(M)
        n = ufl.TestFunction(M)
        # Define bilinear form of the harmonic extension problem
        a_he = dolfinx.fem.form(ufl.inner(ufl.grad(m), ufl.grad(n)) * ufl.dx)
        a_he_cpp = dolfinx.fem.form(a_he)
        # Define linear form of the harmonic extension problem
        zero_vector = dolfinx.fem.Constant(mesh, np.zeros(mesh.topology.dim, petsc4py.PETSc.ScalarType))
        f_he = dolfinx.fem.form(ufl.inner(zero_vector, n) * ufl.dx)
        f_he_cpp = dolfinx.fem.form(f_he)
        # Define boundary conditions for the harmonic extension problem
        facets_inner_boundaries = boundaries.indices[np.isin(boundaries.values, (1, 2, 3, 4))]
        facets_outer_boundaries = boundaries.indices[np.isin(boundaries.values, (5, 6, 7, 8))]
        bdofs_M_inner_boundaries = dolfinx.fem.locate_dofs_topological(
            M, mesh.topology.dim - 1, facets_inner_boundaries)
        bdofs_M_outer_boundaries = dolfinx.fem.locate_dofs_topological(
            M, mesh.topology.dim - 1, facets_outer_boundaries)
        shape_parametrization_inner_boundaries = dolfinx.fem.Function(M)
        shape_parametrization_inner_boundaries.interpolate(lambda x: (mu[0] * x[0], mu[1] * x[1]))
        shape_parametrization_outer_boundaries = dolfinx.fem.Function(M)
        shape_parametrization_outer_boundaries.interpolate(lambda x: (x[0], x[1]))
        bcs_he = [
            dolfinx.fem.dirichletbc(shape_parametrization_inner_boundaries, bdofs_M_inner_boundaries),
            dolfinx.fem.dirichletbc(shape_parametrization_outer_boundaries, bdofs_M_outer_boundaries)]
        # Assemble the left-hand side matrix of the harmonic extension problem
        A = dolfinx.fem.assemble_matrix(a_he_cpp, bcs=bcs_he)
        A.assemble()
        # Assemble the right-hand side vector of the harmonic extension problem
        F = dolfinx.fem.assemble_vector(f_he_cpp)
        dolfinx.fem.apply_lifting(F, [a_he_cpp], [bcs_he])
        F.ghostUpdate(addv=petsc4py.PETSc.InsertMode.ADD, mode=petsc4py.PETSc.ScatterMode.REVERSE)
        dolfinx.fem.set_bc(F, bcs_he)
        # Solve the harmonic extension problem
        ksp = petsc4py.PETSc.KSP()
        ksp.create(mesh.comm)
        ksp.setOperators(A)
        ksp.setType("preonly")
        ksp.getPC().setType("lu")
        ksp.getPC().setFactorSolverType("mumps")
        ksp.setFromOptions()
        shape_parametrization = dolfinx.fem.Function(M)
        ksp.solve(F, shape_parametrization.vector)
        shape_parametrization.vector.ghostUpdate(
            addv=petsc4py.PETSc.InsertMode.INSERT, mode=petsc4py.PETSc.ScatterMode.FORWARD)
        # Initialize mesh motion object
        super().__init__(mesh, shape_parametrization)

In [None]:
%%run_if pod_galerkin_efficient
mu_mesh_motion = np.array([0.5, 0.5, np.nan])
with AffineShapeParametrization(mu_mesh_motion) as mesh_motion:
    multiphenicsx.io.plot_mesh(mesh)

In [None]:
%%run_if pod_galerkin_inefficient
mu_mesh_motion = np.array([0.5, 0.5, np.nan])
with HarmonicExtension(mu_mesh_motion) as mesh_motion:
    multiphenicsx.io.plot_mesh(mesh)

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
multiphenicsx.io.plot_vector_field(mesh_motion.deformation, "domain deformation", glyph_factor=1.0)

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
del mesh_motion

In [None]:
class ProblemBase(abc.ABC):
    """Define a linear problem, and solve it with KSP."""

    def __init__(self) -> None:
        # Define function space
        V = dolfinx.fem.FunctionSpace(mesh, ("Lagrange", 1))
        self._V = V
        # Define trial and test functions
        u = ufl.TrialFunction(V)
        v = ufl.TestFunction(V)
        self._trial = u
        self._test = v
        # Define measures for integration of forms
        dx = ufl.Measure("dx")(subdomain_data=subdomains)
        ds = ufl.Measure("ds")(subdomain_data=boundaries)
        self._dx = dx
        self._ds = ds
        # Define symbolic parameters for use in UFL forms
        mu_symb = rbnicsx.backends.SymbolicParameters(mesh, shape=(3, ))
        self._mu_symb = mu_symb
        # Define numeric parameters for use in shape parametrization
        self._mu = np.zeros(mu_symb.value.shape)

    @property
    def function_space(self) -> dolfinx.fem.FunctionSpace:
        """Return the function space of the problem."""
        return self._V

    @property
    def mu_symb(self) -> rbnicsx.backends.SymbolicParameters:
        """Return the symbolic parameters of the problem."""
        return self._mu_symb

    @property
    def trial_and_test(self) -> typing.Tuple[ufl.Argument, ufl.Argument]:
        """Return the UFL arguments used in the construction of the forms."""
        return self._trial, self._test

    @property
    def measures(self) -> typing.Tuple[ufl.Measure, ufl.Measure]:
        """Return the UFL measures used in the construction of the forms."""
        return self._dx, self._ds

    @abc.abstractproperty
    def bilinear_form(self) -> ufl.Form:
        """Return the bilinear form of the problem."""
        pass

    @abc.abstractproperty
    def linear_form(self) -> ufl.Form:
        """Return the linear form of the problem."""
        pass

    @property
    def boundary_conditions(self) -> typing.List[dolfinx.fem.DirichletBCMetaClass]:
        """Return the boundary conditions for the problem."""
        return self._bcs

    @abc.abstractproperty
    def mesh_motion(self) -> rbnicsx.backends.MeshMotion:
        """Return the mesh motion for the parameter of the latest solve."""
        pass

    @abc.abstractmethod
    def _assemble_matrix(self) -> petsc4py.PETSc.Mat:
        """Assemble the left-hand side matrix."""
        pass

    @abc.abstractmethod
    def _assemble_vector(self) -> petsc4py.PETSc.Vec:
        """Assemble the right-hand side vector."""
        pass

    def solve(self, mu: np.typing.NDArray[np.float64]) -> dolfinx.fem.Function:
        """Assign the provided parameters value and solve the problem."""
        self._mu[:] = mu
        self._mu_symb.value[:] = mu
        return self._solve()

    def _solve(self) -> dolfinx.fem.Function:
        """Solve the linear problem with KSP."""
        A = self._assemble_matrix()
        F = self._assemble_vector()
        ksp = petsc4py.PETSc.KSP()
        ksp.create(mesh.comm)
        ksp.setOperators(A)
        ksp.setType("preonly")
        ksp.getPC().setType("lu")
        ksp.getPC().setFactorSolverType("mumps")
        ksp.setFromOptions()
        solution = dolfinx.fem.Function(self._V)
        ksp.solve(F, solution.vector)
        solution.vector.ghostUpdate(
            addv=petsc4py.PETSc.InsertMode.INSERT, mode=petsc4py.PETSc.ScatterMode.FORWARD)
        return solution

In [None]:
%%run_if pod_galerkin_inefficient
class ProblemOnDeformedDomain(ProblemBase):
    """Solve the problem on the deformed domain obtained by harmonic extension."""

    def __init__(self) -> None:
        super().__init__()
        # Get trial and test functions
        u, v = self.trial_and_test
        # Get measures
        dx, ds = self.measures
        # Get symbolic parameters for use in UFL forms
        mu_symb = self.mu_symb
        # Define bilinear form of the problem
        a = (
            ufl.inner(ufl.grad(u), ufl.grad(v)) * dx + mu_symb[2] * (
                ufl.inner(u, v) * ds(5) + ufl.inner(u, v) * ds(6)
                + ufl.inner(u, v) * ds(7) + ufl.inner(u, v) * ds(8)
            )
        )
        self._a = a
        self._a_cpp = dolfinx.fem.form(a)
        # Define linear form of the problem
        one = petsc4py.PETSc.ScalarType(1)
        f = (
            ufl.inner(one, v) * ds(1) + ufl.inner(one, v) * ds(2)
            + ufl.inner(one, v) * ds(3) + ufl.inner(one, v) * ds(4)
        )
        self._f = f
        self._f_cpp = dolfinx.fem.form(f)
        # Store mesh motion object used in the latest solve, to avoid having to solve
        # the harmonic extension once for computation and once for (optional) visualization.
        self._mesh_motion = None

    @property
    def bilinear_form(self) -> ufl.Form:
        """Return the bilinear form of the problem."""
        return self._a

    @property
    def linear_form(self) -> ufl.Form:
        """Return the linear form of the problem."""
        return self._f

    @property
    def mesh_motion(self) -> rbnicsx.backends.MeshMotion:
        """Return the mesh motion object that was used in the latest solve."""
        return self._mesh_motion

    def _assemble_matrix(self) -> petsc4py.PETSc.Mat:
        """Assemble the left-hand side matrix."""
        A = dolfinx.fem.assemble_matrix(self._a_cpp)
        A.assemble()
        return A

    def _assemble_vector(self) -> petsc4py.PETSc.Vec:
        """Assemble the right-hand side vector."""
        F = dolfinx.fem.assemble_vector(self._f_cpp)
        F.ghostUpdate(addv=petsc4py.PETSc.InsertMode.ADD, mode=petsc4py.PETSc.ScatterMode.REVERSE)
        return F

    def _solve(self) -> dolfinx.fem.Function:
        """Apply shape parametrization to the mesh and solve the linear problem with KSP."""
        with HarmonicExtension(self._mu) as self._mesh_motion:
            return super()._solve()

In [None]:
%%run_if pod_galerkin_efficient
class ProblemOnReferenceDomain(ProblemBase):
    """Solve the problem on the reference domain after a pull back with the affine shape parametrization."""

    def __init__(self) -> None:
        super().__init__()
        # Get trial and test functions
        u, v = self.trial_and_test
        # Get measures
        dx, ds = self.measures
        # Get symbolic parameters for use in UFL forms
        mu_symb = self.mu_symb
        # Define bilinear form of the problem
        # TODO: ufl4rom should carry out a pull back of the form defined on the deformed domain
        #       avoiding the need to have them hardcoded here and duplicated in the reduced problem.
        #       ufl4rom should NOT return the separable expansion, but directly the sum.
        thetas_a = (
            # subdomains 1 and 7
            lambda mu: - (mu[1] - 2) / mu[0] - (2 * (2 * mu[0] - 2) * (mu[0] - 1)) / (mu[0] * (mu[1] - 2)),
            lambda mu: - mu[0] / (mu[1] - 2),
            lambda mu: - (2 * (mu[0] - 1)) / (mu[1] - 2),
            # subdomains 2 and 8
            lambda mu: 2 - (mu[0] - 1) * (mu[0] - 1) / (mu[1] - 2) - mu[1],
            lambda mu: - 1 / (mu[1] - 2),
            lambda mu: (mu[0] - 1) / (mu[1] - 2),
            # subdomains 3 and 5
            lambda mu: - mu[1] / (mu[0] - 2),
            lambda mu: - (mu[0] - 2) / mu[1] - (2 * (2 * mu[1] - 2) * (mu[1] - 1)) / (mu[1] * (mu[0] - 2)),
            lambda mu: - (2 * (mu[1] - 1)) / (mu[0] - 2),
            # subdomains 4 and 6
            lambda mu: - 1 / (mu[0] - 2),
            lambda mu: 2 - (mu[1] - 1) * (mu[1] - 1) / (mu[0] - 2) - mu[0],
            lambda mu: (mu[1] - 1) / (mu[0] - 2),
            # boundaries 5, 6, 7 and 8
            lambda mu: mu[2]
        )
        operators_a = (
            ufl.inner(u.dx(0), v.dx(0)) * dx(1) + ufl.inner(u.dx(0), v.dx(0)) * dx(7),
            ufl.inner(u.dx(1), v.dx(1)) * dx(1) + ufl.inner(u.dx(1), v.dx(1)) * dx(7),
            (ufl.inner(u.dx(0), v.dx(1)) * dx(1) + ufl.inner(u.dx(1), v.dx(0)) * dx(1)
             - ufl.inner(u.dx(0), v.dx(1)) * dx(7) - ufl.inner(u.dx(1), v.dx(0)) * dx(7)),
            # subdomains 2 and 8
            ufl.inner(u.dx(0), v.dx(0)) * dx(2) + ufl.inner(u.dx(0), v.dx(0)) * dx(8),
            ufl.inner(u.dx(1), v.dx(1)) * dx(2) + ufl.inner(u.dx(1), v.dx(1)) * dx(8),
            (ufl.inner(u.dx(0), v.dx(1)) * dx(2) + ufl.inner(u.dx(1), v.dx(0)) * dx(2)
             - ufl.inner(u.dx(0), v.dx(1)) * dx(8) - ufl.inner(u.dx(1), v.dx(0)) * dx(8)),
            # subdomains 3 and 5
            ufl.inner(u.dx(0), v.dx(0)) * dx(3) + ufl.inner(u.dx(0), v.dx(0)) * dx(5),
            ufl.inner(u.dx(1), v.dx(1)) * dx(3) + ufl.inner(u.dx(1), v.dx(1)) * dx(5),
            (ufl.inner(u.dx(0), v.dx(1)) * dx(3) + ufl.inner(u.dx(1), v.dx(0)) * dx(3)
             - ufl.inner(u.dx(0), v.dx(1)) * dx(5) - ufl.inner(u.dx(1), v.dx(0)) * dx(5)),
            # subdomains 4 and 6
            ufl.inner(u.dx(0), v.dx(0)) * dx(4) + ufl.inner(u.dx(0), v.dx(0)) * dx(6),
            ufl.inner(u.dx(1), v.dx(1)) * dx(4) + ufl.inner(u.dx(1), v.dx(1)) * dx(6),
            (ufl.inner(u.dx(0), v.dx(1)) * dx(4) + ufl.inner(u.dx(1), v.dx(0)) * dx(4)
             - ufl.inner(u.dx(0), v.dx(1)) * dx(6) - ufl.inner(u.dx(1), v.dx(0)) * dx(6)),
            # boundaries 5, 6, 7 and 8
            ufl.inner(u, v) * ds(5) + ufl.inner(u, v) * ds(6) + ufl.inner(u, v) * ds(7) + ufl.inner(u, v) * ds(8)
        )
        a = sum([theta_a(mu_symb) * operator_a for (theta_a, operator_a) in zip(thetas_a, operators_a)])
        self._a = a
        self._a_cpp = dolfinx.fem.form(a)
        # Define linear form of the problem
        # TODO: ufl4rom should carry out a pull back of the form defined on the deformed domain
        #       avoiding the need to have them hardcoded here and duplicated in the reduced problem.
        #       ufl4rom should NOT return the separable expansion, but directly the sum.
        thetas_f = (
            lambda mu: mu[0],  # boundary 1
            lambda mu: mu[1],  # boundary 2
            lambda mu: mu[0],  # boundary 3
            lambda mu: mu[1]  # boundary 4
        )
        one = petsc4py.PETSc.ScalarType(1)
        operators_f = (
            ufl.inner(one, v) * ds(1),  # boundary 1
            ufl.inner(one, v) * ds(2),  # boundary 2
            ufl.inner(one, v) * ds(3),  # boundary 3
            ufl.inner(one, v) * ds(4)  # boundary 4
        )
        f = sum([theta_f(mu_symb) * operator_f for (theta_f, operator_f) in zip(thetas_f, operators_f)])
        self._f = f
        self._f_cpp = dolfinx.fem.form(f)

    @property
    def bilinear_form(self) -> ufl.Form:
        """Return the bilinear form of the problem."""
        return self._a

    @property
    def linear_form(self) -> ufl.Form:
        """Return the linear form of the problem."""
        return self._f

    @property
    def mesh_motion(self) -> rbnicsx.backends.MeshMotion:
        """Return the mesh motion for the parameter of the latest solve."""
        return AffineShapeParametrization(self._mu)

    def _assemble_matrix(self) -> petsc4py.PETSc.Mat:
        """Assemble the left-hand side matrix."""
        A = dolfinx.fem.assemble_matrix(self._a_cpp)
        A.assemble()
        return A

    def _assemble_vector(self) -> petsc4py.PETSc.Vec:
        """Assemble the right-hand side vector."""
        F = dolfinx.fem.assemble_vector(self._f_cpp)
        F.ghostUpdate(addv=petsc4py.PETSc.InsertMode.ADD, mode=petsc4py.PETSc.ScatterMode.REVERSE)
        return F

In [None]:
%%run_if pod_galerkin_inefficient
problem = ProblemOnDeformedDomain()

In [None]:
%%run_if pod_galerkin_efficient
problem = ProblemOnReferenceDomain()

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
mu_solve = np.array([0.5, 0.5, 0.01])
solution = problem.solve(mu_solve)

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
multiphenicsx.io.plot_scalar_field(solution, "high fidelity solution on the reference domain")

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
with problem.mesh_motion:
    multiphenicsx.io.plot_scalar_field(solution, "high fidelity solution on the deformed domain")

## 3. Reduced problem definition

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
class ProjectionBasedReducedProblem(abc.ABC):
    """Define a linear projection-based problem, and solve it with KSP."""

    def __init__(self, problem: ProblemBase) -> None:
        # Define basis functions storage
        basis_functions = rbnicsx.backends.FunctionsList(problem.function_space)
        self._basis_functions = basis_functions
        # Get trial and test functions from the high fidelity problem
        u, v = problem.trial_and_test
        # Define H^1 inner product
        inner_product = ufl.inner(u, v) * ufl.dx + ufl.inner(ufl.grad(u), ufl.grad(v)) * ufl.dx
        self._inner_product = inner_product
        self._inner_product_action = rbnicsx.backends.bilinear_form_action(
            inner_product, part="real")
        # Store numeric and symbolic parameters from problem
        self._mu_symb = problem.mu_symb
        self._mu = np.zeros(self._mu_symb.value.shape)

    @property
    def basis_functions(self) -> rbnicsx.backends.FunctionsList:
        """Return the basis functions of the reduced problem."""
        return self._basis_functions

    @property
    def inner_product_form(self) -> ufl.Form:
        """
        Return the bilinear form that defines the inner product associated to this reduced problem.

        This inner product is used to define the notion of orthogonality employed during the offline stage.
        """
        return self._inner_product

    @property
    def inner_product_action(self) -> typing.Callable:
        """
        Return the action of the bilinear form that defines the inner product associated to this reduced problem.

        This inner product is used to define the notion of orthogonality employed during the offline stage.
        """
        return self._inner_product_action

    @abc.abstractproperty
    def mesh_motion(self) -> rbnicsx.backends.MeshMotion:
        """Return the mesh motion for the parameter of the latest solve."""
        pass

    @abc.abstractmethod
    def _assemble_matrix(self, N: int) -> petsc4py.PETSc.Mat:
        """Assemble the left-hand side online matrix of dimension N."""
        pass

    @abc.abstractmethod
    def _assemble_vector(self, N: int) -> petsc4py.PETSc.Vec:
        """Assemble the right-hand side online vector of dimension N."""
        pass

    def solve(self, mu: np.typing.NDArray[np.float64], N: int) -> petsc4py.PETSc.Vec:
        """Assign the provided parameters value and solve the problem."""
        self._mu[:] = mu
        self._mu_symb.value[:] = mu
        return self._solve(N)

    def _solve(self, N: int) -> petsc4py.PETSc.Vec:
        """Solve the linear online problem with KSP."""
        reduced_solution = rbnicsx.online.create_vector(N)
        A = self._assemble_matrix(N)
        F = self._assemble_vector(N)
        ksp = petsc4py.PETSc.KSP()
        ksp.create(reduced_solution.comm)
        ksp.setOperators(A)
        ksp.setType("preonly")
        ksp.getPC().setType("lu")
        ksp.setFromOptions()
        ksp.solve(F, reduced_solution)
        return reduced_solution

    def reconstruct_solution(self, reduced_solution: petsc4py.PETSc.Vec) -> dolfinx.fem.Function:
        """Reconstructed reduced solution on the high fidelity space."""
        return self.basis_functions[:reduced_solution.size] * reduced_solution

    def compute_pointwise_error(
        self, solution: dolfinx.fem.Function, reduced_solution: petsc4py.PETSc.Vec
    ) -> dolfinx.fem.Function:
        """Compute the pointwise error."""
        reconstructed_reduced_solution = self.reconstruct_solution(reduced_solution)
        assert solution.function_space == reconstructed_reduced_solution.function_space
        error = dolfinx.fem.Function(solution.function_space)
        with error.vector.localForm() as error_local, solution.vector.localForm() as solution_local, \
                reconstructed_reduced_solution.vector.localForm() as reduced_solution_local:
            error_local[:] = solution_local.array - reduced_solution_local.array
        return error

    def compute_norm(self, function: dolfinx.fem.Function) -> petsc4py.PETSc.RealType:
        """Compute the norm of a function using the H^1 inner product on the reference domain."""
        return np.sqrt(self._inner_product_action(function)(function))

In [None]:
%%run_if pod_galerkin_inefficient
class PODGInefficientReducedProblem(ProjectionBasedReducedProblem):
    """Specialize the projection-based problem to inefficient assembly of the system and POD basis."""

    def __init__(self, problem: ProblemOnDeformedDomain) -> None:
        super().__init__(problem)
        # Store forms of the problem
        self._a_action = rbnicsx.backends.bilinear_form_action(problem.bilinear_form)
        self._f_action = rbnicsx.backends.linear_form_action(problem.linear_form)
        # Store mesh motion object used in the latest solve, to avoid having to solve
        # the harmonic extension once for computation and once for (optional) visualization.
        self._mesh_motion = None

    @property
    def mesh_motion(self) -> rbnicsx.backends.MeshMotion:
        """Return the mesh motion object that was used in the latest solve."""
        return self._mesh_motion

    def _assemble_matrix(self, N: int) -> petsc4py.PETSc.Mat:
        """Assemble the left-hand side online matrix of dimension N."""
        return rbnicsx.backends.project_matrix(self._a_action, self._basis_functions[:N])

    def _assemble_vector(self, N: int) -> petsc4py.PETSc.Vec:
        """Assemble the right-hand side online vector of dimension N."""
        return rbnicsx.backends.project_vector(self._f_action, self._basis_functions[:N])

    def _solve(self, N: int) -> petsc4py.PETSc.Vec:
        """Apply shape parametrization to the mesh and solve the linear problem with KSP."""
        with HarmonicExtension(self._mu) as self._mesh_motion:
            return super()._solve(N)

In [None]:
%%run_if pod_galerkin_efficient
class PODGEfficientReducedProblem(ProjectionBasedReducedProblem):
    """Specialize the projection-based problem to efficient assembly of the system and POD basis."""

    def __init__(self, problem: ProblemOnReferenceDomain) -> None:
        super().__init__(problem)
        # Get trial and test functions from the high fidelity problem
        u, v = problem.trial_and_test
        # Get measures from the high fidelity problem
        dx, ds = problem.measures
        # Define separable expansion of the bilinear form of the high fidelity problem,
        # and storage for the corresponding pre-assembled reduced operators.
        # TODO: ufl4rom should deduce thetas_a and operators_a from problem.bilinear_form
        #       avoiding the need to have them hardcoded here.
        thetas_a = (
            # subdomains 1 and 7
            lambda mu: - (mu[1] - 2) / mu[0] - (2 * (2 * mu[0] - 2) * (mu[0] - 1)) / (mu[0] * (mu[1] - 2)),
            lambda mu: - mu[0] / (mu[1] - 2),
            lambda mu: - (2 * (mu[0] - 1)) / (mu[1] - 2),
            # subdomains 2 and 8
            lambda mu: 2 - (mu[0] - 1) * (mu[0] - 1) / (mu[1] - 2) - mu[1],
            lambda mu: - 1 / (mu[1] - 2),
            lambda mu: (mu[0] - 1) / (mu[1] - 2),
            # subdomains 3 and 5
            lambda mu: - mu[1] / (mu[0] - 2),
            lambda mu: - (mu[0] - 2) / mu[1] - (2 * (2 * mu[1] - 2) * (mu[1] - 1)) / (mu[1] * (mu[0] - 2)),
            lambda mu: - (2 * (mu[1] - 1)) / (mu[0] - 2),
            # subdomains 4 and 6
            lambda mu: - 1 / (mu[0] - 2),
            lambda mu: 2 - (mu[1] - 1) * (mu[1] - 1) / (mu[0] - 2) - mu[0],
            lambda mu: (mu[1] - 1) / (mu[0] - 2),
            # boundaries 5, 6, 7 and 8
            lambda mu: mu[2]
        )
        self._thetas_a = thetas_a
        operators_a = (
            ufl.inner(u.dx(0), v.dx(0)) * dx(1) + ufl.inner(u.dx(0), v.dx(0)) * dx(7),
            ufl.inner(u.dx(1), v.dx(1)) * dx(1) + ufl.inner(u.dx(1), v.dx(1)) * dx(7),
            (ufl.inner(u.dx(0), v.dx(1)) * dx(1) + ufl.inner(u.dx(1), v.dx(0)) * dx(1)
             - ufl.inner(u.dx(0), v.dx(1)) * dx(7) - ufl.inner(u.dx(1), v.dx(0)) * dx(7)),
            # subdomains 2 and 8
            ufl.inner(u.dx(0), v.dx(0)) * dx(2) + ufl.inner(u.dx(0), v.dx(0)) * dx(8),
            ufl.inner(u.dx(1), v.dx(1)) * dx(2) + ufl.inner(u.dx(1), v.dx(1)) * dx(8),
            (ufl.inner(u.dx(0), v.dx(1)) * dx(2) + ufl.inner(u.dx(1), v.dx(0)) * dx(2)
             - ufl.inner(u.dx(0), v.dx(1)) * dx(8) - ufl.inner(u.dx(1), v.dx(0)) * dx(8)),
            # subdomains 3 and 5
            ufl.inner(u.dx(0), v.dx(0)) * dx(3) + ufl.inner(u.dx(0), v.dx(0)) * dx(5),
            ufl.inner(u.dx(1), v.dx(1)) * dx(3) + ufl.inner(u.dx(1), v.dx(1)) * dx(5),
            (ufl.inner(u.dx(0), v.dx(1)) * dx(3) + ufl.inner(u.dx(1), v.dx(0)) * dx(3)
             - ufl.inner(u.dx(0), v.dx(1)) * dx(5) - ufl.inner(u.dx(1), v.dx(0)) * dx(5)),
            # subdomains 4 and 6
            ufl.inner(u.dx(0), v.dx(0)) * dx(4) + ufl.inner(u.dx(0), v.dx(0)) * dx(6),
            ufl.inner(u.dx(1), v.dx(1)) * dx(4) + ufl.inner(u.dx(1), v.dx(1)) * dx(6),
            (ufl.inner(u.dx(0), v.dx(1)) * dx(4) + ufl.inner(u.dx(1), v.dx(0)) * dx(4)
             - ufl.inner(u.dx(0), v.dx(1)) * dx(6) - ufl.inner(u.dx(1), v.dx(0)) * dx(6)),
            # boundaries 5, 6, 7 and 8
            ufl.inner(u, v) * ds(5) + ufl.inner(u, v) * ds(6) + ufl.inner(u, v) * ds(7) + ufl.inner(u, v) * ds(8)
        )
        self._operators_a = operators_a
        self._operators_a_action = tuple(
            rbnicsx.backends.bilinear_form_action(operator_a) for operator_a in operators_a)
        assert len(thetas_a) == len(operators_a)
        self._Q_a = len(thetas_a)
        reduced_operators_a: typing.Dict[int, rbnicsx.online.TensorsList] = dict()
        self._reduced_operators_a = reduced_operators_a
        # Define separable expansion of the linear form of the high fidelity problem,
        # and storage for the corresponding pre-assembled reduced operators.
        # TODO: ufl4rom should deduce thetas_f and operators_f from problem.linear_form
        #       avoiding the need to have them hardcoded here.
        thetas_f = (
            lambda mu: mu[0],  # boundary 1
            lambda mu: mu[1],  # boundary 2
            lambda mu: mu[0],  # boundary 3
            lambda mu: mu[1]  # boundary 4
        )
        self._thetas_f = thetas_f
        one = petsc4py.PETSc.ScalarType(1)
        operators_f = (
            ufl.inner(one, v) * ds(1),  # boundary 1
            ufl.inner(one, v) * ds(2),  # boundary 2
            ufl.inner(one, v) * ds(3),  # boundary 3
            ufl.inner(one, v) * ds(4)  # boundary 4
        )
        self._operators_f = operators_f
        self._operators_f_action = tuple(
            rbnicsx.backends.linear_form_action(operator_f) for operator_f in operators_f)
        assert len(thetas_f) == len(operators_f)
        self._Q_f = len(thetas_f)
        reduced_operators_f: typing.Dict[int, rbnicsx.online.TensorsList] = dict()
        self._reduced_operators_f = reduced_operators_f

    @property
    def mesh_motion(self) -> rbnicsx.backends.MeshMotion:
        """Return the mesh motion for the parameter of the latest solve."""
        return AffineShapeParametrization(self._mu)

    def preassemble_reduced_operators(self, N: int) -> None:
        """Preassemble reduced operators in the separable expansion."""
        # Preassemble left-hand side reduced operators
        reduced_operators_a_N = rbnicsx.online.TensorsList((N, N))
        reduced_operators_a_N.extend([
            rbnicsx.backends.project_matrix(operator_a_action, self._basis_functions[:N])
            for operator_a_action in self._operators_a_action])
        assert N not in self._reduced_operators_a
        self._reduced_operators_a[N] = reduced_operators_a_N
        # Preassemble right-hand side reduced operators
        reduced_operators_f_N = rbnicsx.online.TensorsList(N)
        reduced_operators_f_N.extend([
            rbnicsx.backends.project_vector(operator_f_action, self._basis_functions[:N])
            for operator_f_action in self._operators_f_action])
        assert N not in self._reduced_operators_f
        self._reduced_operators_f[N] = reduced_operators_f_N

    def _assemble_matrix(self, N: int) -> petsc4py.PETSc.Mat:
        """Assemble the left-hand side online matrix of dimension N."""
        mu = self._mu
        thetas_a_mu = rbnicsx.online.create_vector(self._Q_a)
        thetas_a_mu[:] = [theta_a(mu) for theta_a in self._thetas_a]
        return self._reduced_operators_a[N] * thetas_a_mu

    def _assemble_vector(self, N: int) -> petsc4py.PETSc.Vec:
        """Assemble the right-hand side online vector of dimension N."""
        mu = self._mu
        thetas_f_mu = rbnicsx.online.create_vector(self._Q_f)
        thetas_f_mu[:] = [theta_f(mu) for theta_f in self._thetas_f]
        return self._reduced_operators_f[N] * thetas_f_mu

## 4. Training

In [None]:
def generate_training_set() -> np.typing.NDArray[np.float64]:
    """Generate an equispaced training set using numpy."""
    training_set_01 = np.linspace(0.5, 1.5, 4)
    training_set_2 = np.linspace(0.01, 1.0, 4)
    training_set = np.array(list(itertools.product(training_set_01, training_set_01, training_set_2)))
    return training_set


training_set = rbnicsx.io.on_rank_zero(mesh.comm, generate_training_set)
assert training_set.shape == (64, 3)

In [None]:
Nmax = 10

In [None]:
%%run_if pod_galerkin_inefficient
print(rbnicsx.io.TextBox("POD-Galerkin offline phase begins", fill="="))
print("")

print("set up snapshots matrix")
snapshots_matrix = rbnicsx.backends.FunctionsList(problem.function_space)

print("set up reduced problem")
reduced_problem = PODGInefficientReducedProblem(problem)

print("")

for (mu_index, mu) in enumerate(training_set):
    print(rbnicsx.io.TextLine(str(mu_index), fill="#"))

    print("high fidelity solve for mu =", mu)
    snapshot = problem.solve(mu)

    print("update snapshots matrix")
    snapshots_matrix.append(snapshot)

    print("")

print(rbnicsx.io.TextLine("perform POD", fill="#"))
eigenvalues, modes, _ = rbnicsx.backends.proper_orthogonal_decomposition(
    snapshots_matrix, reduced_problem.inner_product_action, N=Nmax, tol=0.0)
reduced_problem.basis_functions.extend(modes)
print("")

print(rbnicsx.io.TextBox("POD-Galerkin offline phase ends", fill="="))

In [None]:
%%run_if pod_galerkin_efficient
print(rbnicsx.io.TextBox("POD-Galerkin offline phase begins", fill="="))
print("")

print("set up snapshots matrix")
snapshots_matrix = rbnicsx.backends.FunctionsList(problem.function_space)

print("set up reduced problem")
reduced_problem = PODGEfficientReducedProblem(problem)

print("")

for (mu_index, mu) in enumerate(training_set):
    print(rbnicsx.io.TextLine(str(mu_index), fill="#"))

    print("high fidelity solve for mu =", mu)
    snapshot = problem.solve(mu)

    print("update snapshots matrix")
    snapshots_matrix.append(snapshot)

    print("")

print(rbnicsx.io.TextLine("perform POD", fill="#"))
eigenvalues, modes, _ = rbnicsx.backends.proper_orthogonal_decomposition(
    snapshots_matrix, reduced_problem.inner_product_action, N=Nmax, tol=0.0)
reduced_problem.basis_functions.extend(modes)
print("")

print("preassemble reduced operators")
for N in range(1, Nmax + 1):
    reduced_problem.preassemble_reduced_operators(N)
print("")

print(rbnicsx.io.TextBox("POD-Galerkin offline phase ends", fill="="))

In [None]:
%%run_if pod_galerkin_efficient, pod_galerkin_inefficient
positive_eigenvalues = np.where(eigenvalues > 0., eigenvalues, np.nan)
singular_values = np.sqrt(positive_eigenvalues)

In [None]:
%%run_if pod_galerkin_efficient, pod_galerkin_inefficient
fig = go.Figure()
fig.add_scatter(
    x=np.arange(singular_values.shape[0]), y=singular_values / singular_values[0], mode="markers+lines")
fig.update_layout(title="Normalized singular values")
fig.update_xaxes(title_text="N"),
fig.update_yaxes(title_text=r"$\frac{\sigma_N}{\sigma_0}$", type="log", exponentformat="power")
fig.show()

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
assert np.all(rbnicsx.test.order_of_magnitude(eigenvalues[:Nmax]) == [5, 2, 1, 1, -1, -1, -1, -1, -1, -2])

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
multiphenicsx.io.plot_scalar_field(reduced_problem.basis_functions[0], "first basis solution")

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
multiphenicsx.io.plot_scalar_field(reduced_problem.basis_functions[1], "second basis solution")

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
multiphenicsx.io.plot_scalar_field(reduced_problem.basis_functions[2], "third basis solution")

## 5. Testing

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
reduced_solution = reduced_problem.solve(mu_solve, Nmax)

In [None]:
%%run_if pod_galerkin_inefficient, pod_galerkin_efficient
reconstructed_reduced_solution = reduced_problem.reconstruct_solution(reduced_solution)
with reduced_problem.mesh_motion:
    multiphenicsx.io.plot_scalar_field(reconstructed_reduced_solution, "reconstructed reduced solution")

In [None]:
%%run_if reduced_basis_inefficient, reduced_basis_efficient, pod_galerkin_inefficient, pod_galerkin_efficient
error = reduced_problem.compute_pointwise_error(solution, reduced_solution)
with reduced_problem.mesh_motion:
    multiphenicsx.io.plot_scalar_field(error, "pointwise error")

In [None]:
%%run_if pod_galerkin_efficient, pod_galerkin_inefficient
error_norm_reference_domain = reduced_problem.compute_norm(error)
solution_norm_reference_domain = reduced_problem.compute_norm(solution)
print("Norm of the error, measured on the reference domain", error_norm_reference_domain)
print(
    "Norm of the relative error, measured on the reference domain",
    error_norm_reference_domain / solution_norm_reference_domain)

In [None]:
%%run_if pod_galerkin_efficient, pod_galerkin_inefficient
assert rbnicsx.test.order_of_magnitude(error_norm_reference_domain) == -2

In [None]:
%%run_if pod_galerkin_efficient, pod_galerkin_inefficient
with reduced_problem.mesh_motion:
    error_norm_deformed_domain = reduced_problem.compute_norm(error)
    solution_norm_deformed_domain = reduced_problem.compute_norm(solution)
    print("Norm of the error, measured on the deformed domain", error_norm_deformed_domain)
    print(
        "Norm of the relative error, measured on the deformed domain",
        error_norm_deformed_domain / solution_norm_deformed_domain)

In [None]:
%%run_if pod_galerkin_efficient, pod_galerkin_inefficient
assert rbnicsx.test.order_of_magnitude(error_norm_deformed_domain) == -2