# Parallel `ContactFrequency` with Dask

In principle, each frame that makes up a `ContactFrequency` can have its contact map calculated in parallel. This shows how to use [`dask.distributed`](https://distributed.readthedocs.io/) to do this.

This will use the same example data as the main contact maps example (data from https://figshare.com/s/453b1b215cf2f9270769). See that example, `contact_map.ipynb`, for details.

In [None]:
%matplotlib inline
import mdtraj as md
# dask and distributed are extra installs
from dask.distributed import Client, LocalCluster
from contact_map.dask_runner import dask_run

In [None]:
trajectory = md.load("5550217/kras.xtc", top="5550217/kras.pdb")

In [None]:
# TODO: we need a more user-friendly approach for this than what we see here
run_info = {
    'trajectory_file': "5550217/kras.xtc",
    'load_kwargs': {'top': "5550217/kras.pdb"},
    'parameters': {}
}

Next we need to connect a client to a dask network.

Note that there are several ways to set up the dask computer network and then connect a client to it. See https://distributed.readthedocs.io/en/latest/setup.html. The approach used here creates a `LocalCluster`. Large scale simulations would need other approaches. Personally, I would usually recommend using `dask-mpi`. 

In [None]:
c = LocalCluster()
client = Client(c)

In [None]:
# if you started the cluster with dask-mpi and the scheduler file is called sched.json
#client = Client(scheduler_file="./sched.json")

In [None]:
client

In [None]:
%%time
freq = dask_run(trajectory=trajectory, client=client, run_info=run_info)

Note that on a single machine (shared memory) this may not improve performance. That is because the single-frame aspect of this calculation is already parallelized with OpenMP, and will therefore use all cores on the machine.

Next we check that we're still getting the same results:

In [None]:
# did it add up to give us the right number of frames?
freq.n_frames

In [None]:
# do we get a familiar-looking residue map?
fig, ax = freq.residue_contacts.plot()

In [None]:
# Something like this is supposed to shut down the workers and the scheduler
# I get it to shut down workers, but not scheduler... and does it all with lots of warnings
#client.loop.add_callback(client.scheduler.retire_workers, close_workers=True)
#client.loop.add_callback(client.scheduler.terminate)
#client.run_on_scheduler(lambda dask_scheduler: dask_scheduler.loop.stop())