# Handling Large Data Sets
Until now, we have applied our framework to a very small simulation. However, what if we are working with a very large data set (like TNG50-1, which has $2160^3$ particles, $512$ times more than TNG50-4)?

# Starting simple: computing in chunks

First, we can still run the same calculation as above, and it will "just work" (hopefully).

This is because Dask has many versions of common algorithms and functions which work on "blocks" or "chunks" of the data, which split up the large array into smaller arrays. Work is needed on each chunk, after which the final answer is assembled.

Importantly, in our case above, even if the `mass` array above does not fit into memory, the `mass.sum().compute()` will chunk the operation up in a way that the task can be calculated.

In [8]:
from astrodask import load
ds = load("/data/public/testdata-astrodask/TNG50-3_snapshot")

/data/public/testdata-astrodask/TNG50-3_snapshot
None
A /home/cbyrohl/cachedir/b82ad144a0d83c01/data.hdf5


Before we start, let's enable a progress indicator from dask (note that this will only work for local schedulers, see next section):

In [9]:
from dask.diagnostics import ProgressBar
ProgressBar().register()

And then we can request the actual computation:

In [10]:
%time ds.data["PartType0"]["Masses"].sum().compute()

CPU times: user 60.1 ms, sys: 13.2 ms, total: 73.3 ms
Wall time: 405 ms


518363.12

While the result is eventually computed, it is a bit slow, primarily because the actual reading of the data off disk is the limiting factor, and we can only use resources available on our local machine.

## More advanced: computing in parallel

Rather than sequentially calculating large tasks, we can also run the computation in parallel. 

To do so different advanced dask schedulers are available. Here, we use the most straight forward [distributed scheduler](https://docs.dask.org/en/latest/how-to/deploy-dask/single-distributed.html).

Usually, we would start a scheduler and then connect new workers (e.g. running on multiple compute/backend nodes of a HPC cluster). After, tasks (either interactively or scripted) can leverage the power of these connected resources.

For this example, we will use the same "distributed" scheduler/API, but keep things simple by using just the one (local) node we are currently running on.

In [11]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=8, threads_per_worker=1, 
                       dashboard_address=":8787")
client = Client(cluster)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 38317 instead


Here is our client. We can access the scheduler on specified dashboard port to investigate its state.

In [12]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:38317/status,

0,1
Dashboard: http://127.0.0.1:38317/status,Workers: 8
Total threads: 8,Total memory: 47.05 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:33583,Workers: 8
Dashboard: http://127.0.0.1:38317/status,Total threads: 8
Started: Just now,Total memory: 47.05 GiB

0,1
Comm: tcp://127.0.0.1:38129,Total threads: 1
Dashboard: http://127.0.0.1:42159/status,Memory: 5.88 GiB
Nanny: tcp://127.0.0.1:41329,
Local directory: /tmp/dask-worker-space/worker-i_4ugp5o,Local directory: /tmp/dask-worker-space/worker-i_4ugp5o

0,1
Comm: tcp://127.0.0.1:43271,Total threads: 1
Dashboard: http://127.0.0.1:37037/status,Memory: 5.88 GiB
Nanny: tcp://127.0.0.1:37167,
Local directory: /tmp/dask-worker-space/worker-3ak0yhbg,Local directory: /tmp/dask-worker-space/worker-3ak0yhbg

0,1
Comm: tcp://127.0.0.1:39709,Total threads: 1
Dashboard: http://127.0.0.1:43403/status,Memory: 5.88 GiB
Nanny: tcp://127.0.0.1:45167,
Local directory: /tmp/dask-worker-space/worker-ctw3wr46,Local directory: /tmp/dask-worker-space/worker-ctw3wr46

0,1
Comm: tcp://127.0.0.1:40355,Total threads: 1
Dashboard: http://127.0.0.1:45839/status,Memory: 5.88 GiB
Nanny: tcp://127.0.0.1:39271,
Local directory: /tmp/dask-worker-space/worker-84k72rg5,Local directory: /tmp/dask-worker-space/worker-84k72rg5

0,1
Comm: tcp://127.0.0.1:37605,Total threads: 1
Dashboard: http://127.0.0.1:45013/status,Memory: 5.88 GiB
Nanny: tcp://127.0.0.1:35487,
Local directory: /tmp/dask-worker-space/worker-ba38z2sm,Local directory: /tmp/dask-worker-space/worker-ba38z2sm

0,1
Comm: tcp://127.0.0.1:44455,Total threads: 1
Dashboard: http://127.0.0.1:39401/status,Memory: 5.88 GiB
Nanny: tcp://127.0.0.1:38065,
Local directory: /tmp/dask-worker-space/worker-26_n68du,Local directory: /tmp/dask-worker-space/worker-26_n68du

0,1
Comm: tcp://127.0.0.1:38421,Total threads: 1
Dashboard: http://127.0.0.1:41031/status,Memory: 5.88 GiB
Nanny: tcp://127.0.0.1:43377,
Local directory: /tmp/dask-worker-space/worker-mlan56i2,Local directory: /tmp/dask-worker-space/worker-mlan56i2

0,1
Comm: tcp://127.0.0.1:35193,Total threads: 1
Dashboard: http://127.0.0.1:39063/status,Memory: 5.88 GiB
Nanny: tcp://127.0.0.1:37007,
Local directory: /tmp/dask-worker-space/worker-aqn8o39z,Local directory: /tmp/dask-worker-space/worker-aqn8o39z


We can now perform the same operations, but it is performed in a distributed manner, in parallel.

One significant advantage is that (even when using only a single node) individual workers will load just the subsets of data they need to work on, meaing that I/O operations become parallel.

Note: after creating a `Client()`, all calls to `.compute()` will automatically use this scheduler and its set of workers.

In [13]:
%time ds.data["PartType0"]["Masses"].sum().compute()

CPU times: user 108 ms, sys: 56.4 ms, total: 165 ms
Wall time: 630 ms


518363.12

The progress bar, we could use for the default scheduler (before initializing `LocalCluster`), is unavailable for the distributed scheduler. However, we can still view the progress of this task as it executes using its status dashboard (as a webpage in a new browser tab or within [jupyter lab](https://github.com/dask/dask-labextension)). You can find it by clicking on the "Dashboard" link above. If running this notebook server remotely, e.g. on a login node of a HPC cluster, you may have to change the '127.0.0.1' part of the address to be the same machine name/IP.