### exploration of distributed load methods

During initial work at creating a daskified particle reader for yt, I've consistently seen that the daskified readers are a bit slower than yt's original particle reader. This isn't suprising because using a tool like dask introduces an overhead for communicating data. The full daskified reader, however, includes not just communication of data  but also serialization of complex yt dataset and index objects. 

So this notebook explores some simpler loading methods that isolate yt's chunking behavior from the serialization of yt objects. 

The associated functions used below are in `src/manual_loads.py`

First, spin up a dask client to work with:

In [1]:
from dask.distributed import Client
c = Client(n_workers=4, threads_per_worker= 2)

In order to avoid serializing yt objects, we'll use a function that pulls out all of the chunks corresponding to selecting all the data of a yt dataset:

```
ds = yt.load()
ad = ds.all_data()
```

In [2]:
from src.manual_loads import get_yt_chunks

In [3]:
file = "snapshot_033/snap_033.0.hdf5"
field_type = "PartType0"
field = "Density"

In [4]:
datafiles = get_yt_chunks(file)

yt : [INFO     ] 2022-06-06 17:34:32,303 Parameters: current_time              = 4.343952725460923e+17 s
yt : [INFO     ] 2022-06-06 17:34:32,305 Parameters: domain_dimensions         = [1 1 1]
yt : [INFO     ] 2022-06-06 17:34:32,305 Parameters: domain_left_edge          = [0. 0. 0.]
yt : [INFO     ] 2022-06-06 17:34:32,306 Parameters: domain_right_edge         = [25. 25. 25.]
yt : [INFO     ] 2022-06-06 17:34:32,306 Parameters: cosmological_simulation   = 1
yt : [INFO     ] 2022-06-06 17:34:32,307 Parameters: current_redshift          = -4.811891664902035e-05
yt : [INFO     ] 2022-06-06 17:34:32,307 Parameters: omega_lambda              = 0.762
yt : [INFO     ] 2022-06-06 17:34:32,307 Parameters: omega_matter              = 0.238
yt : [INFO     ] 2022-06-06 17:34:32,307 Parameters: omega_radiation           = 0.0
yt : [INFO     ] 2022-06-06 17:34:32,308 Parameters: hubble_constant           = 0.73
yt : [INFO     ] 2022-06-06 17:34:32,379 Allocating for 4.194e+06 particles
Loading par

and now we have a very simple list of the hdf files and start/end ranges that make up the yt index object:

In [5]:
datafiles[:3]

[{'file': '/home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033/snap_033.0.hdf5',
  'start_index': 0,
  'end_index': 262144},
 {'file': '/home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033/snap_033.0.hdf5',
  'start_index': 262144,
  'end_index': 280105},
 {'file': '/home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033/snap_033.1.hdf5',
  'start_index': 0,
  'end_index': 262144}]

In [6]:
len(datafiles)

12

In [7]:
c

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 31.20 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43047,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 31.20 GiB

0,1
Comm: tcp://127.0.0.1:40135,Total threads: 2
Dashboard: http://127.0.0.1:35951/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:36251,
Local directory: /home/chavlin/src/yt_general/daskify/minimal_particle_read_testing/dask-worker-space/worker-za32dish,Local directory: /home/chavlin/src/yt_general/daskify/minimal_particle_read_testing/dask-worker-space/worker-za32dish

0,1
Comm: tcp://127.0.0.1:46175,Total threads: 2
Dashboard: http://127.0.0.1:34707/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:38659,
Local directory: /home/chavlin/src/yt_general/daskify/minimal_particle_read_testing/dask-worker-space/worker-n98k6imw,Local directory: /home/chavlin/src/yt_general/daskify/minimal_particle_read_testing/dask-worker-space/worker-n98k6imw

0,1
Comm: tcp://127.0.0.1:33061,Total threads: 2
Dashboard: http://127.0.0.1:41101/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:46203,
Local directory: /home/chavlin/src/yt_general/daskify/minimal_particle_read_testing/dask-worker-space/worker-87n6eesy,Local directory: /home/chavlin/src/yt_general/daskify/minimal_particle_read_testing/dask-worker-space/worker-87n6eesy

0,1
Comm: tcp://127.0.0.1:39667,Total threads: 2
Dashboard: http://127.0.0.1:36803/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:46741,
Local directory: /home/chavlin/src/yt_general/daskify/minimal_particle_read_testing/dask-worker-space/worker-lu7jxl13,Local directory: /home/chavlin/src/yt_general/daskify/minimal_particle_read_testing/dask-worker-space/worker-lu7jxl13


In [8]:
from src.manual_loads import get_yt_chunks, serial_read_concat, delay_then_concat, read_delayed_array, manual_sum_by_chunk, read_chunk
from src.manual_loads import in_mem_from_dask_futures, sum_of_futures

In [9]:
file = "snapshot_033/snap_033.0.hdf5"
field_type = "PartType0"
field = "Density"

In [10]:
datafiles = get_yt_chunks(file)
len(datafiles)

yt : [INFO     ] 2022-06-06 17:34:35,135 Parameters: current_time              = 4.343952725460923e+17 s
yt : [INFO     ] 2022-06-06 17:34:35,136 Parameters: domain_dimensions         = [1 1 1]
yt : [INFO     ] 2022-06-06 17:34:35,137 Parameters: domain_left_edge          = [0. 0. 0.]
yt : [INFO     ] 2022-06-06 17:34:35,137 Parameters: domain_right_edge         = [25. 25. 25.]
yt : [INFO     ] 2022-06-06 17:34:35,138 Parameters: cosmological_simulation   = 1
yt : [INFO     ] 2022-06-06 17:34:35,138 Parameters: current_redshift          = -4.811891664902035e-05
yt : [INFO     ] 2022-06-06 17:34:35,138 Parameters: omega_lambda              = 0.762
yt : [INFO     ] 2022-06-06 17:34:35,138 Parameters: omega_matter              = 0.238
yt : [INFO     ] 2022-06-06 17:34:35,139 Parameters: omega_radiation           = 0.0
yt : [INFO     ] 2022-06-06 17:34:35,139 Parameters: hubble_constant           = 0.73
yt : [INFO     ] 2022-06-06 17:34:35,216 Allocating for 4.194e+06 particles
Loading par

12

### loading the full array

First, we'll check the time to load all particles into an array. 

`serial_read_concat` : loops over chunks, appends data to a list and then concatenated (pretty much what yt currently does)

In [11]:
%%timeit
data_serial = serial_read_concat(datafiles, field_type, field)

7.64 ms ± 380 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


Now a couple dask approaches

`delay_then_concat` : loops over chunks, use dask.delayed to read chunks into memory, concatenates after loading into memory

In [12]:
%%timeit
data_from_delayed = delay_then_concat(datafiles, field_type, field)

37.8 ms ± 10.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


`in_mem_from_dask_futures`: loop over chunks, immediates submits a chunk for reading and then concatenates after all future reads return

In [13]:
%%timeit
data_from_future = in_mem_from_dask_futures(c, datafiles, field_type, field)

28.5 ms ± 4.85 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


`read_delayed_array`: loop over chunks, create full dask array and concatenate. This returns a dask array, so need to call `compute` to get the in-memory array:

In [14]:
%%timeit
data_delayed = read_delayed_array(datafiles, field_type, field)
data_from_array = data_delayed.compute()

44 ms ± 2.02 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


So from this, we can see that the dask-version are a fair bit slower -- from a factor of 4.23 to 6.91 slower, depending on the method. This is consistent with the full daskified particle reader for yt, indicating that it's primarily the communication of the np arrays corresponding to each chunk. 

It also indicates that the daskified particle reader should be a use-dependent feature. When simply reading selections into memory, it is probably going to be faster to use yt's existing reader rather than the daskified one. But we'll explore this further with larger datasets down below.

### reductions on the array

Beyond reading yt chunks, we can use dask to compute reductions. Depending on the calculation, the communication requirement between dask processes should be a good deal lower since the whole data range need not be transferred, only the reduced result.

For this, we'll use a simple `sum` reduction that simply adds values from each chunks. This is easily parallelized -- add the values of each chunk then sum the intermediate sums to get the cross-chunk sum. 

Let's start with a reference where we read the whole array and sum it.

`serial_read_concat`: reads the whole arrays into memory, then sums (**requires whole array in memory**)

In [15]:
%%timeit
data_serial = serial_read_concat(datafiles, field_type, field)
dsum = data_serial.sum()

8.24 ms ± 241 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


And another reference that manually sums each chunk as it is read. 

`manual_sum_by_chunk`: manual loop over chunks, summing each chunk and adding to total (`val += chunk_data.sum()`), requires one chunk in memory at a time

In [16]:
%%timeit
msum = manual_sum_by_chunk(datafiles, field_type, field)

6.15 ms ± 222 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


`read_delayed_array` constructs a delayed array from chunks, sums on the dask array:

In [17]:
%%timeit
x = read_delayed_array(datafiles, field_type, field)
sum_from_delayed = x.sum().compute()

59.7 ms ± 25.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


`sum_of_futures` : uses client.submit to get the sum of each chunk as a future then adds those results as the come back

In [18]:
%%timeit
future_sum = sum_of_futures(c, datafiles, field_type, field)

30.9 ms ± 3.8 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


So even in this case, the plain serial sum is faster. It's interesting that using dask's `client.submit()` functionality to construct a list of futures containing the sums by chunk is faster than constructing a full dask array -- this is likely due to the overhead of constructing the full dask graph associated with a dask array. 

## A larger dataset

The case above contains just under ~2 million `("PartType0", "Density")` particles across 12 hdf files:

In [19]:
data_serial = serial_read_concat(datafiles, field_type, field)
(data_serial.shape[0]/1e6, len(datafiles))

(1.941226, 12)

and it's possible that the relative performance for the daskified cases will improve with larger datasets. So let's try out another:

In [20]:
file = "snapshot_028_z000p000/snap_028_z000p000.0.hdf5"
datafiles = get_yt_chunks(file)
data_serial = serial_read_concat(datafiles, field_type, field)
(data_serial.shape[0]/1e6, len(datafiles))

yt : [INFO     ] 2022-06-06 17:35:07,646 Parameters: current_time              = 4.361428036047735e+17 s
yt : [INFO     ] 2022-06-06 17:35:07,647 Parameters: domain_dimensions         = [1 1 1]
yt : [INFO     ] 2022-06-06 17:35:07,647 Parameters: domain_left_edge          = [0. 0. 0.]
yt : [INFO     ] 2022-06-06 17:35:07,648 Parameters: domain_right_edge         = [8.47125 8.47125 8.47125]
yt : [INFO     ] 2022-06-06 17:35:07,648 Parameters: cosmological_simulation   = 1
yt : [INFO     ] 2022-06-06 17:35:07,649 Parameters: current_redshift          = 2.220446049250313e-16
yt : [INFO     ] 2022-06-06 17:35:07,649 Parameters: omega_lambda              = 0.693
yt : [INFO     ] 2022-06-06 17:35:07,649 Parameters: omega_matter              = 0.307
yt : [INFO     ] 2022-06-06 17:35:07,650 Parameters: omega_radiation           = 0.0
yt : [INFO     ] 2022-06-06 17:35:07,650 Parameters: hubble_constant           = 0.6777
yt : [INFO     ] 2022-06-06 17:35:07,909 Allocating for 1.329e+07 particle

(6.381559, 32)

### loading the full array

Repeating the same load methods:

In [21]:
%%timeit
data_serial = serial_read_concat(datafiles, field_type, field)

73.6 ms ± 1.82 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [22]:
%%timeit
data_from_delayed = delay_then_concat(datafiles, field_type, field)

87.2 ms ± 10.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [23]:
%%timeit
data_from_future = in_mem_from_dask_futures(c, datafiles, field_type, field)

87.5 ms ± 3.57 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [24]:
%%timeit
data_delayed = read_delayed_array(datafiles, field_type, field)
data_from_array = data_delayed.compute()

102 ms ± 4.07 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


so we see that with the larger number of particles and chunk-files, the performance is **much** more comparable when loading the full array. Dask is still a little slower, but now within a factor of 1.1-1.3

### reductions

And repeating the reduction methods:

In [25]:
%%timeit
data_serial = serial_read_concat(datafiles, field_type, field)
dsum = data_serial.sum()

79.4 ms ± 1.62 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [26]:
%%timeit
msum = manual_sum_by_chunk(datafiles, field_type, field)

70.9 ms ± 2.2 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [27]:
%%timeit
x = read_delayed_array(datafiles, field_type, field)
sum_from_delayed = x.sum().compute()

87.6 ms ± 7.8 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [28]:
%%timeit
future_sum = sum_of_futures(c, datafiles, field_type, field)

79.5 ms ± 2.65 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


and again, we have much more comparable results. 

### simulating an even larger dataset

Since we've detached ourselves from the yt api, it's easy to simulate **much** larger datasets by re-reading `datafiles`.

In [29]:
file_multiplier = 10
more_files = datafiles * file_multiplier
len(more_files)

320

the effective size in millions of particles if we read all the data in would be:

In [30]:
data_serial = serial_read_concat(datafiles, field_type, field)
print(data_serial.size * file_multiplier / 1e6)

63.81559


### loading the full array 

~64 million particles should be fine in memory, so let's read it all in again:

In [31]:
%%timeit
_ = serial_read_concat(more_files, field_type, field)

801 ms ± 12.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [32]:
%%timeit
_ = delay_then_concat(more_files, field_type, field)

689 ms ± 35.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [33]:
%%timeit
_ = in_mem_from_dask_futures(c, more_files, field_type, field)

117 ms ± 8.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [34]:
%%timeit
data_delayed = read_delayed_array(more_files, field_type, field)
_ = data_delayed.compute()

826 ms ± 55.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Now we're starting to see some improvements from the dask reads! At least in the cases where we don't build a full dask array. The dask futures are surprisingly significantly faster. 

### reductions

and now the reductions again

In [35]:
%%timeit
data_serial = serial_read_concat(more_files, field_type, field)
dsum = data_serial.sum()

830 ms ± 23.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [36]:
%%timeit
msum = manual_sum_by_chunk(more_files, field_type, field)

713 ms ± 15.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [37]:
%%timeit
x = read_delayed_array(more_files, field_type, field)
sum_from_delayed = x.sum().compute()

554 ms ± 37.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [38]:
%%timeit
future_sum = sum_of_futures(c, more_files, field_type, field)

496 ms ± 46.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


similar to loading the full array, we see dask is a bit faster, with the best performance from dask's futures

## a too-large for memory example

What happens when we go **even larger**??

In [39]:
file_multiplier = 2000
more_files = datafiles * file_multiplier
len(more_files)

64000

In [40]:
data_serial = serial_read_concat(datafiles, field_type, field)
print(data_serial.size * file_multiplier / 1e9)

12.763118


12.7 billion elements can't load on my machine, so we'll just test the reductions that do not require the full array. It also takes a fair bit of time, so we'll just run a single iteration..

In [41]:
%%time
msum = manual_sum_by_chunk(more_files, field_type, field)

CPU times: user 2min 22s, sys: 6.71 s, total: 2min 29s
Wall time: 2min 24s


In [42]:
%%time
x = read_delayed_array(more_files, field_type, field)
sum_from_delayed = x.sum().compute()

CPU times: user 2min 47s, sys: 2.7 s, total: 2min 49s
Wall time: 3min


In [43]:
%%time
future_sum = sum_of_futures(c, more_files, field_type, field)

CPU times: user 1min 7s, sys: 6.19 s, total: 1min 13s
Wall time: 1min 28s


And we see even more speedup from dask in the futures case. The delayed array is actually slower, but based on having run that cell a few times, it's got a lot variance in time -- I saw as fast as 2 mins. So definitely worth it for huge amounts of data...


### caveats

Some notes on things that could use further investigation:

I've only tested one client setup here (4 workers, 2 threads per worker), it's possible other configurations could improve dask performance.

More complex reduction operations could also enhance the relative performance of dask. 

## Summary

Some takeways:

Serializing the yt objections (datasets, indexes) does not add much overhead -- here we've extracted chunk info from the yt objects and still see similar read times to the full daskified readers. So the overhead comes from chunked-data communication and dask internals.

For larger amounts of data, the dask reads and reductions are either on par or faster than the single process serial chunking. Using dask's `client.submit()` functionality tends to be faster than building full dask arrays. So in internal-only yt methods, it may be worth simply using dask's futures implementation -- this, however, necessitates some explicit client handling that would be a bit more complex to code.

Using full dask arrays will mostly be useful as a user-feature and for interoperability with other packages -- the extra overhead is probably not worth it in yt's internal read methods. But the ease with which a user could write their own parallel calculations without knowing yt's internal chunking mechanics is potentially very useful. 

**So the main takeaway** is that the load or reduction method should be situation dependent. For many users, if simply loading a yt selection, it's likely that using yt's existing chunk-iteration methods is totally sufficient. But for larger datasets with more chunks, using a daskified reader is comparable or better than yt's existing reader. This suggests that having some combination of keyword arguments and configuration options is the way to go -- allowing a user to turn on the dask reads and return dask arrays only when they want/need it. 


