# Parallelisation with MPI


Parcels can be run in Parallel with MPI. To do this, you will need to also install the `mpich` and `mpi4py` packages (i.e., `conda install -c conda-forge mpich mpi4py`).

Note that MPI support is only for Linux and macOS. There is no Windows support.


## Splitting the ParticleSet with MPI


Once you have installed a parallel version of Parcels, you can simply run your script with

```
mpirun -np <np> python <yourscript.py>
```

Where `<np>` is the number of processors you want to use

Parcels will then split the `ParticleSet` into `<np>` smaller ParticleSets, based on a `sklearn.cluster.KMeans` clustering. Each of those smaller `ParticleSets` will be executed by one of the `<np>` MPI processors.

Note that in principle this means that all MPI processors need access to the full `FieldSet`, which can be Gigabytes in size for large global datasets. Therefore, efficient parallelisation only works if at the same time we also chunk the `FieldSet` into smaller domains.


## Optimising the partitioning of the particles with a user-defined partition_function

Parcels uses so-called *partition functions* to assign particles to processors. By default, Parcels uses a `scikit-learn` `KMeans` clustering algorithm to partition the particles. However, you can also define your own partition function, which can be useful if you know more about the distribution of particles in your domain than the `KMeans` algorithm does.

To define your own partition function, you need to define a function that takes a list of coordinates (lat, lon) as input, and returns a list of integers, defining the MPI processor that each particle should be assigned to. See the example below:

In [None]:
def simple_partition_function(coords, mpi_size=1):
    """A very simple partition function
    that assigns particles to processors
    """
    return np.linspace(0, mpi_size, coords.shape[0], endpoint=False, dtype=np.int32)

To add the partition function to your script, you need to add it to the ParticleSet constructor:

In [None]:
pset = ParticleSet(..., partition_function=simple_partition_function)

## Reading in the ParticleFile data in zarr format

For efficiency, each processor will write its own data to a `zarr`-store. If the name of your `ParticleFile` is `fname`, then these stores will be located at `fname/proc00.zarr`, `fname/proc01.zarr`, etc.

Reading in these stores and merging them into one `xarray.Dataset` can be done with


In [None]:
from glob import glob
from os import path

files = glob(path.join(fname, "proc*"))
ds = xr.concat(
    [xr.open_zarr(f) for f in files],
    dim="trajectory",
    compat="no_conflicts",
    coords="minimal",
)

Note that, if you have added particles during the `execute()` (for example because you used `repeatdt`), then the trajectories will not be ordered monotonically. While this may not be a problem, this will result in a different Dataset than a single-core simulation. If you do want the outputs of the MPI run to be the same as the single-core run, add `.sortby(['trajectory'])` at the end of the `xr.concat()` command


In [None]:
ds = xr.concat(
    [xr.open_zarr(f) for f in files],
    dim="trajectory",
    compat="no_conflicts",
    coords="minimal",
).sortby(["trajectory"])

Note that if you want, you can save this new DataSet with the `.to_zarr()` or `.to_netcdf()` methods.

When using `.to_zarr()`, then further analysis may be sped up by first rechunking the DataSet, by using `ds.chunk()`. Note that in some cases, you will first need to remove the chunks encoding information manually, using a code like below


For small projects, the above instructions are sufficient. If your project is large, then it is helpful to combine the `proc*` directories into a single zarr dataset and to optimise the chunking for your analysis. What is "large"? If you find yourself running out of memory while doing your analysis, saving the results, or sorting the dataset, or if reading the data is taking longer than you can tolerate, your problem is "large." Another rule of thumb is if the size of your output directory is 1/3 or more of the memory of your machine, your problem is large. Chunking and combining the `proc*` data in order to speed up analysis is discussed [in the documentation on runs with large output](https://docs.oceanparcels.org/en/latest/examples/documentation_LargeRunsOutput.html).


## Future developments: load-balancing


The current implementation of MPI parallelisation in Parcels is still fairly rudimentary. In particular, we will continue to develop the load-balancing of the `ParticleSet`.

With load-balancing we mean that `Particles` that are close together are ideally on the same MPI processor. Practically, it means that we need to take care how `Particles` are spread over chunks and processors. See for example the two figures below:

![](images/ParcelsParallel.png)
_Example of load-balancing for Particles. The domain is chunked along the thick lines, and the orange and blue particles are on separate MPI processors. Before load-balancing (left panel), two chuncks in the centre of the domain have both orange and blue particles. After the load-balancing (right panel), the Particles are redistributed over the processors so that the number of chunks and particles per processor is optimised._

The difficulty is that since we don't know how the `ParticleSet` will disperse over time, we need to do this load-balancing 'on the fly'. If you to contribute to the optimisation of the load-balancing, please leave a message on [github](https://github.com/OceanParcels/parcels/issues)!
