<a href="https://colab.research.google.com/github/IgorBaratta/FEniCSxCourse/blob/ICMC23/Problem8_HPC/HPC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DOLFINx in Parallel with MPI

To run the programs in this section, first you will need to install the dolfinx and the mpi4py library:

In [None]:
!wget "https://fem-on-colab.github.io/releases/fenicsx-install-real.sh" -O "/tmp/fenicsx-install.sh" && bash "/tmp/fenicsx-install.sh"

## First parallel program

For a quick introduction of distributed parallel computing using MPI please follow  these tutorials: [introduction-to-mpi](https://www.codingame.com/playgrounds/349/introduction-to-mpi/introduction-to-distributed-computing) or [mpitutorial](https://mpitutorial.com/).

In this hands-on tutorial we only discuss the minimum for running simple codes in DOLFINx and understanding concepts that you may have already seen in the previous parts of the tutorial such as `MPI.COMM_WORLD`, `comm.size`, etc.

In [None]:
%%writefile hello-world.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
print(f"Hello world from rank {comm.rank} of {comm.size}")

A communicator is a collection of MPI processes, and the default communicator is `MPI.COMM_WORLD`. `MPI.COMM_WORLD` basically groups all the processes when the program started. The number of processes in a communicator does not change once it is created. That number is called the `size` of the communicator. The `rank` of a process in a communicator is a unique id  from $0$ to $N-1$.

Next let's see how we can use the mpirun program to execute the above python code using 4 processes. The value after `-np` is the number of processes to use when running script in parallel. 

In [None]:
! mpirun --allow-run-as-root --oversubscribe -np 8 python hello-world.py

**Note**: MPI codes are not guaranteed to complete in any specific order.

## Point-to-Point communication

In MPI processes (and hence their memory) are totally independent. Information between proces

In [None]:
%%writefile p2p.py

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
assert comm.size == 2
rank = comm.rank

if rank == 0:
    data = numpy.arange(10, dtype=int)
    comm.Send(data, dest=1)
    print(f"Process 0 sent {data} to process 1")
elif rank == 1:
    data = numpy.zeros(10, dtype=int)
    comm.Recv(data, source=0)
    print(f"Process 1 received {data} from process 0")


In [None]:
!mpirun -n 5 --allow-run-as-root --oversubscribe python3 p2p.py

 Key points:
 - MPI uses the notion of a rank to distinguish processes.
 - Send and Recv are the fundumental primitives.
 - Sending a message from one process to another is known as point-to-point communication.

**Exercice**: Implement a simple communication ring.
Each process `i` sends data to its neighbor `i+1` except for the last process in the communicator which sends data to process 0.

Template:
```python3
from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.rank

sent_to = rank+1 if rank<size-1 else 0
receive_from = rank-1 if rank>0 else size-1

send_data = numpy.array([rank], dtype=int)
recv_data = numpy.zeros_like(send_data)

comm.Send(TODO)
comm.Recv(TODO)

print(f"Process {rank} received {recv_data} from process 0")
```


## Creating a mesh in parallel
DOLFINx abstracts most of the difficult aspects of distributing the finite element problem across the MPI communicator away from the user. 
Let's have a look at how to create and write a mesh in parallel: 

In [None]:
%%writefile mesh.py

from mpi4py import MPI
import dolfinx
from dolfinx.io import XDMFFile

# Global communicator 
comm = MPI.COMM_WORLD
Nprocs = comm.size

# Create the mesh and distribute across Nprocs
mesh = dolfinx.mesh.create_unit_square(comm, 20, 20)

# Create a cell function to visualize the mesh distribution
DG = dolfinx.fem.FunctionSpace(mesh, ("DG", 0))
cell_value = dolfinx.fem.Function(DG)
cell_value.x.array[:] = comm.rank

# Save mesh and 
with XDMFFile(comm, f"mesh_{comm.size}.xdmf", "w") as xdmf:
    xdmf.write_mesh(mesh)
    xdmf.write_function(cell_value)

In [None]:
np = 4
! mpirun -np $np --allow-run-as-root --oversubscribe python3 mesh.py

from google.colab import files
files.download(f"mesh_{np}.xdmf")
files.download(f"mesh_{np}.h5")

Try changing the number of processes $n_p$ and visualizing the mesh.

## Managing data in parallel: the IndexMap

The IndexMap represents the distribution index arrays across processes. 
An index array is a contiguous collection of N indices `[0, 1, . . ., N-1]` that are distributed across M processes. On a given process, the IndexMap stores a portion of the index set using local indices `[0, 1, . . . , n-1]`, and a map from the local indices to a unique global index.

Let's have a look at an example. 
How many cells are owned by a given process?

In [None]:
%%writefile index_map.py

from mpi4py import MPI
import dolfinx
from dolfinx.io import XDMFFile

comm = MPI.COMM_WORLD
mesh = dolfinx.mesh.create_unit_square(comm, 20, 20)
tdim = mesh.topology.dim
idx_map = mesh.topology.index_map(tdim)

N = idx_map.size_global
n = idx_map.size_local
print(f"Process {comm.rank} owns {n} cells of {N}")

comm.Barrier()
print("")
comm.Barrier()

vmap = mesh.topology.index_map(0)
print(f"Process {comm.rank} owns {vmap.size_local} vertices of {vmap.size_global}")



In [None]:
! mpirun -n 2 --allow-run-as-root --oversubscribe python3 index_map.py

## Function and FunctionSpaces

When we define a Function Space on a distributed mesh, each rank constructs a local part of the space on it’s part of the mesh. 
The distribution of a vector or function across MPI processes is also managed by an IndexMap and it follows the distribution of the mesh. 

Consider a continuous first-order Lagrange space over a mesh. The degrees of freedom of this space are associated with the vertices of the mesh.

A function in DOLFINx contains memory (an array) in which the expansion coefficients ($u_i$) of the finite element basis ($\phi_i$) can be stored:

$$
u_h = \sum_{i = 0}^{N-1} \phi_i u_i
$$


For instance, in the figure below we show an arbitrary vector of coefficients of size 15 distributed across 3 different processes.

<figure>
<center>
<img src='https://raw.githubusercontent.com/IgorBaratta/FEniCSxCourse/main/Problem8_HPC/distribute.png' width="600" height="800"/>
<figcaption>Layout of a parallel vector of size 15 distributed to 3 processes. From left to right: Sequential vector, distributed vector, distributed vector with ghosting.
</figcaption></center>
</figure>

In [None]:
%%writefile distributed_function.py

from mpi4py import MPI
import dolfinx
from dolfinx.io import XDMFFile

comm = MPI.COMM_WORLD
mesh = dolfinx.mesh.create_unit_square(comm, 20, 20)
V = dolfinx.fem.FunctionSpace(mesh, ("Lagrange", 1))
u = dolfinx.fem.Function(V)

map = V.dofmap.index_map

print(f"Process {comm.rank} owns {map.size_local} dofs of {map.size_global}")

comm.Barrier()
print("")
comm.Barrier()

print(f"Local size {u.x.array.size} in process {comm.rank}")

# The size of local vector includes owned dofs and ghosts
assert u.x.array.size ==  map.size_local + map.num_ghosts

In [None]:
! mpirun -n 4 --allow-run-as-root --oversubscribe python3 distributed_function.py

**Note**:

Let $N$ be the global size of a function (total number of degrees of freedom distributed across all processes), and $N_i$ the local size on process $i$ (u.x.array.size) of $P$ processes. Then:

$$
N \leq \sum_{i=0}^{P-1} N_i
$$

Now, let $N_i^o$ be the number owned `dofs` on process $i$, then
$$
N = \sum_{i=0}^{P-1} N_i^o
$$

And
$$
N_i = N_i^o + N_i^{g}
$$

## Scattering values

Let’s say we want to change the expansion coefficient $u_i$ (local numbering) each processes to have a value equal to the MPI rank + 1 of the owning process. For consistency we need this change to be reflected in two places:

 - In the memory of the process that owns the degree of freedom.
 - In the memory of the process (if any) that has the degree of freedom as a ghost.


How do we do that?

In [None]:
%%writefile scatter.py

def print_data(comm, data):
  for i in range(comm.size):
      if comm.rank == i:
        print(f"Data on process {comm.rank}: {data}")
      comm.Barrier()


from mpi4py import MPI
import dolfinx
from dolfinx.io import XDMFFile

comm = MPI.COMM_WORLD
mesh = dolfinx.mesh.create_unit_square(comm, 5, 5)
V = dolfinx.fem.FunctionSpace(mesh, ("Lagrange", 1))
u = dolfinx.fem.Function(V)

u.x.array[:] = comm.rank

print_data(comm, u.x.array)

print()

# Scatter values to sharing process
u.x.scatter_forward()

print_data(comm, u.x.array)

In [None]:
! mpirun -n 4 --allow-run-as-root --oversubscribe python3 scatter.py

<figure>
<center>
<img src='https://raw.githubusercontent.com/IgorBaratta/FEniCSxCourse/main/Problem8_HPC/update.png' width="600" height="800"/>
<figcaption>Forward scattering of a vector in parallel. Communication direction is dof owner to ghost.
</figcaption></center>
</figure>

## Assembling vectors in parallel

It is possible to divide linear and bilinear forms into subdomain contributions.

$$
L(v) = \sum_{i=0}^{P-1} L_i(v) = \sum_{i=0}^{P-1} \int_{\Omega_i} f \cdot v \, dx
$$

When we call `dolfinx.fem.assemble_vector(L)` on a given linear form $L$ the following happens:
 - Each process computes the cell tensors $b$ for cells that it owns.
 - It assembles (adds) the result into its local array.

At this point no parallel communication has taken place! The vector is in an inconsistent state, a contribution to a degree of freedom might have taken place in another process.

To update the values we use:

```
x.scatter_rev(ScatterMode.add)
```

<figure>
<center>
<img src='https://raw.githubusercontent.com/IgorBaratta/FEniCSxCourse/main/Problem8_HPC/accumulate.png' width="600" height="800"/>
<figcaption>Reverse scattering of a vector in parallel. Communication direction is ghost to dof owner.
</figcaption></center>
</figure>



Let's have a look of state of the vector before and after the scatter reverse:

In [None]:
%%writefile linear_form.py

from mpi4py import MPI
from petsc4py import PETSc
import dolfinx
import ufl

comm = MPI.COMM_WORLD


comm = MPI.COMM_WORLD
mesh = dolfinx.mesh.create_unit_square(comm, 2, 2, ghost_mode=dolfinx.mesh.GhostMode.none)
V = dolfinx.fem.FunctionSpace(mesh, ("Lagrange", 1))
u = dolfinx.fem.Function(V)
v = ufl.TestFunction(V)

L = ufl.inner(5.0, v)*ufl.dx

b = dolfinx.fem.assemble_vector(dolfinx.fem.form(L))

if comm.rank == 0:
    print("Before scattering")

for i in range(comm.size):
    if comm.rank == i:
      print(f"Data on process {comm.rank}: {b.array}")
    comm.Barrier()

b.scatter_reverse(dolfinx.la.ScatterMode.add)

if comm.rank == 0:
    print("\nAfter scattering")

for i in range(comm.size):
    if comm.rank == i:
      print(f"Data on process {comm.rank}: {b.array}")
    comm.Barrier()

In [None]:
! mpirun -n 2 --allow-run-as-root --oversubscribe python3 linear_form.py

## Assembling matrices in parallel

As for the linear form, the bilinear form the contributions can be computed for each subdomain separetely. For example, the bilinear form a(⋅,⋅) associated with the Poisson’s equation can be deomposed into ai(⋅,⋅) defined by:

$$
a(u,v) = \sum_i^{P-1} a_i(u, v) = \sum_i^{P-1} \int_{\Omega_i} \nabla u \cdot \nabla v~dx
$$

Each process assembles its local contribution to the global bilinear form into a sparse matrix. The number of rows in the local matrix coincides with the dimension of the local function space.

In [None]:
%%writefile bilinear_form.py
from mpi4py import MPI
from petsc4py import PETSc
import dolfinx
import ufl
import numpy as np
import scipy.sparse

comm = MPI.COMM_WORLD


comm = MPI.COMM_WORLD
mesh = dolfinx.mesh.create_unit_square(comm, 2, 2, ghost_mode=dolfinx.mesh.GhostMode.none)
V = dolfinx.fem.FunctionSpace(mesh, ("Lagrange", 1))
u = ufl.TrialFunction(V)
v = ufl.TestFunction(V)

a = ufl.inner(ufl.grad(u), ufl.grad(v)) * ufl.dx
a = dolfinx.fem.form(a)

A = dolfinx.fem.assemble.create_matrix(a)
dolfinx.fem.assemble_matrix(A, a)
A.finalize()


# Create a Scipy sparse matrix that shares data with A
As = scipy.sparse.csr_matrix((A.data, A.indices, A.indptr))

print(f"Process {comm.rank} has {As.shape[0]} rows," + \
      f"from which {V.dofmap.index_map.size_local} are owned.")

comm.Barrier()
if comm.rank == 0:
  print(f"Global number of degrees of freedom {V.dofmap.index_map.size_global}")



In [None]:
! mpirun -n 2 --allow-run-as-root --oversubscribe python3 bilinear_form.py

## Parallel Scaling

High performance computing has two common notions of scalability:

 - Strong scaling is defined as how the solution time varies with the number of processors for a fixed total problem size.
 - Weak scaling is defined as how the solution time varies with the number of processors for a fixed problem size per processor.

# Inclass-work (8)

In this final assignmente we will solve the 3D
Poisson's problem with homegeneous diffusivity
coefficient and pure Dirichlet homegeneous boundary data, i.e.,

\begin{equation}
\left \{
\begin{array}{rcll}
-\nabla \cdot ( \mu \nabla {u} ) & = & f &  \mbox{in}~\Omega \\
&& \\
{u} & = & 0 &  \mbox{in}~\partial\Omega
\end{array}
\right.
\end{equation}

where the domain $\Omega = [0,1]^3$. The
variational formulation is:

Find $u \in V_0$ such that 

\begin{equation}
\int_{\Omega}{\mu\,\nabla{u}\cdot \nabla{v}}\,dx = \int_{\Omega} {f\,v\,dx}~~~\forall v \in V_0(\Omega)
\end{equation}

The figure below illustrates the computational mesh:

<figure>
<center>
<img src='https://raw.githubusercontent.com/IgorBaratta/FEniCSxCourse/ICMC23/Problem8_HPC/meshim3D.png' width="600" height="500"/>
<figcaption>Typical 3D tetrahedral mesh to run the
scalability tests. The example has $51\times 51 \times 51$ grid points.
</figcaption></center>
</figure>

---

Scalability study: [poisson](https://github.com/IgorBaratta/FEniCSxCourse/blob/main/Problem8_HPC/poisson.py)
