# Interprocess communication in MiniCombust


## Halo Exchange for Cells

The unstructured mesh is distributed through the memories of several processes (MPI ranks).
Each rank only needs to store the part of the mesh it will be operating on, plus some surrounding cells
required for computation. Neighbouring processes will need to exchange the values of border cells to
keep values up to ate. 

In this section, we look at this process in detail.

We have an unstructured mesh of cells, in which each MPI rank has several neighbour ranks.

Some of the cells are interior to the MPI rank: no other rank will need their values to be able to
calculate the fluid fields. Howeverm some of the cells are "border cells", and neighbouring ranks
need to know the values in these cells for their calculations. In addition, this rank needs to 
know values of the adjacent cells to its border cells, which belong to neighbouring ranks: its stores
these "ghost cells", but never updates their values.

At the end of each timestep calculation, we perform a _halo exchange_ to tell neighbours the new
values of our boundary cells, and to receive new values for our "ghost cells".

Fortunately, after the partitioning is complete, each process knows statically:
* Which are its neighbour ranks
* Which cells are border cells, and which neighbour ranks need their values
* Which cells are ghost cells, and which neighbour ranks to receive the updated values from

Recall that each process stores its cells (whether interior, boundary and ghost cells) in a 
densely packed, contiguous local structure. Each cell has a local id, which identifies the
cell in this structure. However, each cell also has a global id, which is the same accross all ranks.
Consequently, we store maps that allow local processes to look up the global id for a local cell and vice versa.

After partitioning, each process know which global cell ids to send to which neighbour, and which 
global cell ids to receive from each neighbour. We can precompute some structures:

* A receive buffer, which knows how much data we'll be receiving from each neighbour, and which
local cell that represents. This allos us to easily perform an optimised _local scatter_ operation to
redistribute these received values from the receive buffer back to their correct position in the local cells array.
* Since we know which global cell ids we'll be sending to which neighbour, we can build a densely
packed send buffer and perform an optimised _local gather_ operation, which 
collects the values we want to send from their positions in the local cells array and places them
in the densely packed send structure.

![An unstructured mesh distributed over 4 processes](images/halo-exchange.svg "Halo exchange for a distributed mesh")

In [198]:
from mpi4py import MPI
from typing import Iterable, Dict, List
from numpy.typing import NDArray
from minicombust.data_structures import T, LocalId, GlobalId
import numpy as np

comm = MPI.COMM_WORLD
my_rank = comm.Get_rank()
nproc = comm.Get_size()
print('Hello from rank {}'.format(my_rank))

Hello from rank 0


In [201]:
# In lieu of partitioning and a real mesh, we'll use this simple example
# from the image in the docs

# Who each ranks neighbours are
mesh_neighbour_ranks = {
    0:[1],
    1:[0,2],
    2:[1,3],
    3:[2]
}

# How many of a rank's ghost cells are owned by each neighbour 
mesh_neighbour_rank_to_num_ghost_cells = {
    0: {1:2},
    1: {0:2, 2:2},
    2: {1:2, 3:2},
    3: {2:2}
}

# How many of a rank's border cells are held as ghost cells by each neighbour 
mesh_neighbour_rank_to_num_border_cells = {
    0 : {1:2},
    1 : {0:2, 2:2},
    2 : {1:2, 3:2},
    3 : {2:2}
}

# Which global cell ids must be received from which neighbour, for each rank 
mesh_neighbour_rank_to_ghost_global_cell_ids = {
    0: {1: [2,3]},
    1: {0: [0,1], 2: [4,5]},
    2: {1: [2,3], 3: [6,7]},
    3: {2: [4,5]}
}

# Which global cell ids must be sent to which neighbour, for each rank
mesh_neighbour_rank_to_border_global_cell_ids = {
    0: {1: [0,1]},
    1: {0: [2,3], 2: [2,3]},
    2: {1: [4,5], 3: [4,5]},
    3: {2: [6,7]}
}

# map from global cell id for local cell id for each rank (only bother with border cells and ghost cells)
mesh_global_cell_to_local_cell_id = {
    0: {0:0, 1:1, 2:2, 3:3},
    1: {0:0, 1:1, 2:2, 3:3, 4:4, 5:5},
    2: {2:0, 3:1, 4:2, 5:3, 6:4, 7:5},
    3: {4:0, 5:1, 6:2, 7:3}
}

# map from local cell id for global cell id for each rank (only bother with border cells and ghost cells)
mesh_local_cell_to_global_cell_id = {
    0: {0:0, 1:1, 2:2, 3:3},
    1: {0:0, 1:1, 2:2, 3:3, 4:4, 5:5},
    2: {0:2, 1:3, 2:4, 3:5, 4:6, 5:7},
    3: {0:4, 1:5, 2:6, 3:7}
}

In [2]:
%%capture
# This is all of the mesh that each local process needs to store:
neighbour_ranks: List[int] = mesh_neighbour_ranks[my_rank]
neighbour_rank_to_num_ghost_cells: Dict[int, np.int64] = mesh_neighbour_rank_to_num_ghost_cells[my_rank]
neighbour_rank_to_num_boundary_cells: Dict[int, np.int64] = mesh_neighbour_rank_to_num_border_cells[my_rank]
neighbour_rank_to_boundary_global_cell_ids = mesh_neighbour_rank_to_border_global_cell_ids[my_rank]
neighbour_rank_to_ghost_global_cell_ids = mesh_neighbour_rank_to_ghost_global_cell_ids[my_rank]
global_cell_to_local_cell_id: Dict[GlobalId, LocalId] = mesh_global_cell_to_local_cell_id[my_rank]
local_cell_to_global_cell_id: Dict[LocalId, GlobalId] = mesh_local_cell_to_global_cell_id[my_rank]
neighbour_rank_to_received_cell_values: Dict[int, NDArray] = {i: np.zeros((len(neighbour_rank_to_ghost_global_cell_ids[i])), dtype=T) for i in neighbour_ranks}
cells: NDArray[T] = np.zeros((len(local_cell_to_global_cell_id),), dtype=T)

# Just some dummy values for this example
cells[:] = my_rank

def gather_local_cell_values_for_ids(ids : Iterable[LocalId]) -> NDArray[T]:
    return np.take_along_axis(cells, np.array(list(ids)), axis=None)

def send_to_single_neighbour(dst_rank: int):
    global_cell_ids = neighbour_rank_to_boundary_global_cell_ids[dst_rank]
    local_cell_ids = (global_cell_to_local_cell_id[i] for i in global_cell_ids)
    src_cell_vals = gather_local_cell_values_for_ids(local_cell_ids)
    #print("{} sends border to {}: {}".format(my_rank, dst_rank, len(src_cell_vals)))
    return comm.Isend(src_cell_vals, dest=dst_rank)

def receive_from_single_neighbour(src_rank: int):
    arr = neighbour_rank_to_received_cell_values[src_rank]
    #print("{} receives ghost from {}: {}".format(my_rank, src_rank, len(arr)))
    return comm.Irecv(arr, source=src_rank)

def perform_updates_on_local_cells():
    # Insert fluid solver of your choice here
    pass

def scatter_received_halos_to_cells():
    def scatter_single_rank(rank):       
        global_cell_ids = neighbour_rank_to_ghost_global_cell_ids[rank]
        local_cell_ids = np.array([global_cell_to_local_cell_id[i] for i in global_cell_ids])
        np.put_along_axis(cells, local_cell_ids, neighbour_rank_to_received_cell_values[rank], axis=None)

    for rank in neighbour_ranks:
        scatter_single_rank(rank)


def halo_exchange():
    # We use non-blocking comms for the exchange
    # to avoid deadlock. Dependencies are mesh-dependent
    # and so are difficult to create perfect blocking comms for.
    # But this might be worth optimising more
    reqs = []
    for rank in range(len(neighbour_ranks)):
        reqs.append(receive_from_single_neighbour(neighbour_ranks[rank]))
        reqs.append(send_to_single_neighbour(neighbour_ranks[rank]))
    MPI.Request.Waitall(reqs)
    scatter_received_halos_to_cells()

def main_loop():
    halo_exchange()
    for timestep, idx in enumerate(range(10)):
        perform_updates_on_local_cells()
        halo_exchange()
        if my_rank == 0:
            print("Timestep {} complete".format(idx))

def main():
    main_loop()

if __name__ == "__main__":
    main()

NameError: name 'mesh_neighbour_ranks' is not defined

## Exchange of particles that leave the region of cells owned by a rank

![Distributed particle exchange](images/particle-exchange.svg "Distributed particle exchange ")

#

ExchangeParticles:

We'll already have marked each particle with its owning rank
group particles by rank
for each neighbour:
  exchange expected particle counts with neighbours
wait(all)
for each neighbour:
    if > 0:
    isend particles
    irecv particles
wait(all)

When do we exchange?
Every time 
We proceed when nobody has anything left to send (I won't be receiving or sending any particles)

Communication for Particles

In MiniCombust, Lagrangian particle tracking is decomposed over distributed nodes. 
Each particle tracking rank only has a subset of the particles in the system.

Particle tracking ranks are reponsible for 
* Adjusting the position, velocity, diameter, mass etc. of particles based on their Lagrangian motion
* Adjusting particle properties based on the fluid field (e.g. contribution of surrounding field velocity to particle motion)
* Adjusting the fluid field based on particles (e.g. evaporating particles that become a source term for the gas flow)


In addition the communication required at the end of each fluid solver timestep to exchange the halos (cell values),
particle tracking ranks may need to move particles that are out of their domain.

During the Lagrangian particle update, a particle may still have "travel time" left over, but intersects a face that means ownership
of the particle should pass to another rank.

Each rank knows which cells are ghost cells
For ghosts cells, we know which neighbour rank that belongs to
We need to transfer the particle to the neighbour rank

If particles don't interact with each other (Do they in PRECISE?):
Reach a point where each particle has been updated 'as much as it can in this timestep'
Tell each neighbour a list of particles (potentially empty)
Tell each neighbour how many particles are coming first and then send?

LOAD IMBALANCE when most particles clustered in particular cell (likely)

Can we a dynamic process / executor approach to update each particle irrespective of whether its cell is on rank?
i.e. we  gather all its cell stuff and send it for processing
As a result of processing it can be adddedd back to processing pool (still time left, but cell adjustments to be made)

What does this mean for local particle ids? Need to mark inactive and periodically defrag???
Remove particle from structures? Should particle structures be more dynamic and based on map? Code is difficult to vectorise.


The number of particles can change several times in the course of a single fluid timestep!

Fluid timesteps are much bigger than particle timesteps? Or is it the other way round? Think it might be the other way round? How often does fluid update vs particle?