let's copy over the `read_gadget.py` contents and run it:

In [1]:
import yt

ds = yt.load_sample("snapshot_033")

reg = ds.all_data()

class MockSelector:
    is_all_data = True

class MockChunkObject:
    def __init__(self, data_file):
        self.data_files = [data_file]

class MockChunk:
    def __init__(self, data_file):
        self.objs = [MockChunkObject(data_file)]

ptf = {'PartType0': ['Coordinates']}

chunks = [MockChunk(data_file) for data_file in ds.index.data_files]
selector= MockSelector()

my_gen = ds.index.io._read_particle_fields(chunks, ptf, selector)
my_result = [_ for _ in my_gen]


yt : [INFO     ] 2020-09-11 14:37:28,270 Files located at /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.
yt : [INFO     ] 2020-09-11 14:37:28,271 Default to loading snap_033.0.hdf5 for snapshot_033 dataset
yt : [INFO     ] 2020-09-11 14:37:28,326 Parameters: current_time              = 4.343952725460923e+17 s
yt : [INFO     ] 2020-09-11 14:37:28,327 Parameters: domain_dimensions         = [1 1 1]
yt : [INFO     ] 2020-09-11 14:37:28,327 Parameters: domain_left_edge          = [0. 0. 0.]
yt : [INFO     ] 2020-09-11 14:37:28,328 Parameters: domain_right_edge         = [25. 25. 25.]
yt : [INFO     ] 2020-09-11 14:37:28,328 Parameters: cosmological_simulation   = 1
yt : [INFO     ] 2020-09-11 14:37:28,328 Parameters: current_redshift          = -4.811891664902035e-05
yt : [INFO     ] 2020-09-11 14:37:28,328 Parameters: omega_lambda              = 0.762
yt : [INFO     ] 2020-09-11 14:37:28,329 Parameters: omega_matter              = 0.238
yt :

So what comes out of this is a list with an entry for each chunk: 

In [2]:
my_result[0]

(('PartType0', 'Coordinates'),
 array([[ 7.6320577 , 11.81454   ,  0.5112596 ],
        [ 7.630863  , 11.814384  ,  0.51114064],
        [ 7.633304  , 11.81966   ,  0.51152855],
        ...,
        [ 9.0948305 , 18.531418  , 13.523693  ],
        [ 9.084332  , 18.547832  , 13.502992  ],
        [ 9.102446  , 18.544275  , 13.524328  ]], dtype=float32))

the `ds.index.data_files` objects is a list of `ParticleFile` objects with info about how that chunk including the on-disc datafile and start/end indices within that datafile:

In [3]:
for fid in [0,1,2]:
    print(ds.index.data_files[fid].filename)
    print([ds.index.data_files[fid].start,ds.index.data_files[fid].end])

/home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.0.hdf5
[0, 262144]
/home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.0.hdf5
[262144, 280105]
/home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.1.hdf5
[0, 262144]


Several ideas for applying dask:

* parrallelize: some dask.delayed() decorating to parallelize without relying on MPI
* daskify the read_particle_fields function to return dask arrays? 

Let's try writing a `read_particle_fields` that we can mess with by copying the `gadget` one: 

In [4]:
type(ds.index.io)

yt.frontends.gadget.io.IOHandlerGadgetHDF5

In [5]:
import h5py
def read_particle_fields(self,chunks, ptf, selector):
        # Now we have all the sizes, and we can allocate
        data_files = set([])
        for chunk in chunks:
            for obj in chunk.objs:
                data_files.update(obj.data_files)
                fls=','.join([x.filename for x in obj.data_files])
                print(f'adding {fls}')
                
        for data_file in sorted(data_files, key=lambda x: (x.filename, x.start)):
            si, ei = data_file.start, data_file.end
            f = h5py.File(data_file.filename, mode="r")
            for ptype, field_list in sorted(ptf.items()):
                if data_file.total_particles[ptype] == 0:
                    continue
                g = f[f"/{ptype}"]
                if getattr(selector, "is_all_data", False):
                    mask = slice(None, None, None)
                    mask_sum = data_file.total_particles[ptype]
                    hsmls = None
                else:
                    coords = g["Coordinates"][si:ei].astype("float64")
                    if ptype == "PartType0":
                        hsmls = self._get_smoothing_length(
                            data_file, g["Coordinates"].dtype, g["Coordinates"].shape
                        ).astype("float64")
                    else:
                        hsmls = 0.0
                    mask = selector.select_points(
                        coords[:, 0], coords[:, 1], coords[:, 2], hsmls
                    )
                    if mask is not None:
                        mask_sum = mask.sum()
                    del coords
                if mask is None:
                    continue
                for field in field_list:
                    if field in ("Mass", "Masses") and ptype not in self.var_mass:
                        data = np.empty(mask_sum, dtype="float64")
                        ind = self._known_ptypes.index(ptype)
                        data[:] = self.ds["Massarr"][ind]
                    elif field in self._element_names:
                        rfield = "ElementAbundance/" + field
                        data = g[rfield][si:ei][mask, ...]
                    elif field.startswith("Metallicity_"):
                        col = int(field.rsplit("_", 1)[-1])
                        data = g["Metallicity"][si:ei, col][mask]
                    elif field.startswith("GFM_Metals_"):
                        col = int(field.rsplit("_", 1)[-1])
                        data = g["GFM_Metals"][si:ei, col][mask]
                    elif field.startswith("Chemistry_"):
                        col = int(field.rsplit("_", 1)[-1])
                        data = g["ChemistryAbundances"][si:ei, col][mask]
                    elif field == "smoothing_length":
                        # This is for frontends which do not store
                        # the smoothing length on-disk, so we do not
                        # attempt to read them, but instead assume
                        # that they are calculated in _get_smoothing_length.
                        if hsmls is None:
                            hsmls = self._get_smoothing_length(
                                data_file,
                                g["Coordinates"].dtype,
                                g["Coordinates"].shape,
                            ).astype("float64")
                        data = hsmls[mask]
                    else:
                        data = g[field][si:ei][mask, ...]

                    yield (ptype, field), data
            f.close()

and now call the new one, passing in the `ds.index.io` object as `self`: 

In [6]:
# my_gen = ds.index.io._read_particle_fields(chunks, ptf, selector)
my_gen = read_particle_fields(ds.index.io, chunks, ptf, selector)
my_result = [_ for _ in my_gen]
my_result[:3]

adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.0.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.0.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.1.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.1.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.2.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.2.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.3.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.3.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.4.hdf5
adding /home/chavlin/hdd/dat

[(('PartType0', 'Coordinates'),
  array([[ 7.6320577 , 11.81454   ,  0.5112596 ],
         [ 7.630863  , 11.814384  ,  0.51114064],
         [ 7.633304  , 11.81966   ,  0.51152855],
         ...,
         [ 9.0948305 , 18.531418  , 13.523693  ],
         [ 9.084332  , 18.547832  , 13.502992  ],
         [ 9.102446  , 18.544275  , 13.524328  ]], dtype=float32)),
 (('PartType0', 'Coordinates'),
  array([[ 9.0948925, 18.540127 , 13.505761 ],
         [ 9.099401 , 18.551332 , 13.511906 ],
         [ 9.082766 , 18.54066  , 13.496419 ],
         ...,
         [ 9.948605 ,  8.47677  , 14.566635 ],
         [ 9.948661 ,  8.478258 , 14.567051 ],
         [ 9.94791  ,  8.478077 , 14.566901 ]], dtype=float32)),
 (('PartType0', 'Coordinates'),
  array([[ 1.5066416 ,  0.0244492 ,  3.2759523 ],
         [ 1.5392694 ,  0.05701507,  3.292754  ],
         [ 1.5390531 ,  0.05318429,  3.30687   ],
         ...,
         [11.684051  , 10.220002  , 19.18336   ],
         [11.671257  , 10.222958  , 19.17415

huh, that actually worked. cool. let's explore some ways to use dask here.... start with loading the data into dask as soon as it's read from disk. let's try to return the data for each chunk as sub-chunked dask arrays. To do that we'll need to read the data from disk into a dask array...

In [7]:
ds.index.data_files[0].filename

'/home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.0.hdf5'

Does dask's read_hdf work here?
```
dask.dataframe.read_hdf(pattern, key, start=0, stop=None, columns=None, chunksize=1000000, sorted_index=False, lock=True, mode='a')
```

In [8]:
from dask import dataframe as df, array as da

turns out it won't work: likely a pandas hdf issue -- seems there's a common issue (e.g. [this](https://github.com/dask/dask/issues/747)) in which pandas can only read very specific hdf files.... so this errors:


```
gkey = "/PartType0"
df.read_hdf(ds.index.data_files[0].filename,gkey,mode='r')
```

`TypeError: cannot create a storer if the object is not existing nor a value are passed`


but we can try reading with h5py and loading as a dask array following https://docs.dask.org/en/latest/array-creation.html#numpy-slicing

In [9]:
f = h5py.File(ds.index.data_files[0].filename, mode="r")

In [10]:
gkey = "/PartType0"
coords = f[gkey]['Coordinates'][ds.index.data_files[0].start:ds.index.data_files[0].end].astype("float64")
coords.shape

(262144, 3)

In [11]:
coords_da = da.from_array(coords, chunks=(30000, 1))
coords_da

Unnamed: 0,Array,Chunk
Bytes,6.29 MB,240.00 kB
Shape,"(262144, 3)","(30000, 1)"
Count,28 Tasks,27 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 6.29 MB 240.00 kB Shape (262144, 3) (30000, 1) Count 28 Tasks 27 Chunks Type float64 numpy.ndarray",3  262144,

Unnamed: 0,Array,Chunk
Bytes,6.29 MB,240.00 kB
Shape,"(262144, 3)","(30000, 1)"
Count,28 Tasks,27 Chunks
Type,float64,numpy.ndarray


So that would be a sub-chunk of a single base chunk (the base chunk == the file with the start/end indeces, here we're splitting that up). The above dask docs link claims "This process is entirely lazy. Neither creating the h5py object nor wrapping it with da.from_array have loaded any data." hmm... but `coords` here should be in memory? confused... in any case:

Let's assemble a delayed dask list of sub-chunked chunks? 

First let's write a new read function to return dask arrays?

In [12]:
def read_particle_fields_dask(self,chunks, ptf, selector):
    
        # let's still loop over the chunks 
        data_files = set([])
        for chunk in chunks:
            for obj in chunk.objs:
                data_files.update(obj.data_files)
                fls=','.join([x.filename for x in obj.data_files])
                print(f'adding {fls}')
                
        # and we still loop over each base chunk  
        for data_file in sorted(data_files, key=lambda x: (x.filename, x.start)):
            si, ei = data_file.start, data_file.end
            f = h5py.File(data_file.filename, mode="r")
            
            for ptype, field_list in sorted(ptf.items()):
                if data_file.total_particles[ptype] == 0:
                    continue
                g = f[f"/{ptype}"]
                if getattr(selector, "is_all_data", False):
                    mask = slice(None, None, None)
                    mask_sum = data_file.total_particles[ptype]
                    hsmls = None
                else:
                    coords = g["Coordinates"][si:ei].astype("float64")
                    if ptype == "PartType0":
                        hsmls = self._get_smoothing_length(
                            data_file, g["Coordinates"].dtype, g["Coordinates"].shape
                        ).astype("float64")
                    else:
                        hsmls = 0.0
                    mask = selector.select_points(
                        coords[:, 0], coords[:, 1], coords[:, 2], hsmls
                    )
                    if mask is not None:
                        mask_sum = mask.sum()
                    del coords
                if mask is None:
                    continue
                for field in field_list:
                    if field in ("Mass", "Masses") and ptype not in self.var_mass:
                        data = np.empty(mask_sum, dtype="float64")
                        ind = self._known_ptypes.index(ptype)
                        data[:] = self.ds["Massarr"][ind]
                    elif field in self._element_names:
                        rfield = "ElementAbundance/" + field
                        data = g[rfield][si:ei][mask, ...]
                    elif field.startswith("Metallicity_"):
                        col = int(field.rsplit("_", 1)[-1])
                        data = g["Metallicity"][si:ei, col][mask]
                    elif field.startswith("GFM_Metals_"):
                        col = int(field.rsplit("_", 1)[-1])
                        data = g["GFM_Metals"][si:ei, col][mask]
                    elif field.startswith("Chemistry_"):
                        col = int(field.rsplit("_", 1)[-1])
                        data = g["ChemistryAbundances"][si:ei, col][mask]
                    elif field == "smoothing_length":
                        # This is for frontends which do not store
                        # the smoothing length on-disk, so we do not
                        # attempt to read them, but instead assume
                        # that they are calculated in _get_smoothing_length.
                        if hsmls is None:
                            hsmls = self._get_smoothing_length(
                                data_file,
                                g["Coordinates"].dtype,
                                g["Coordinates"].shape,
                            ).astype("float64")
                        data = hsmls[mask]
                    else:
                        data = g[field][si:ei][mask, ...]

                  
                    if data.ndim > 1:
                        subchunk_shape = (10000,1)  # dont chunk up multidim arrays like Coordinates
                    else:
                        subchunk_shape = (10000)  
                        
                    yield (ptype, field), da.from_array(data,chunks=subchunk_shape)
            f.close()

In [13]:
my_gen = read_particle_fields_dask(ds.index.io, chunks, ptf, selector)
my_result = [_ for _ in my_gen]


adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.0.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.0.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.1.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.1.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.2.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.2.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.3.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.3.hdf5
adding /home/chavlin/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.4.hdf5
adding /home/chavlin/hdd/dat

now each of those data chunks is a dask array:

In [14]:
type(my_result[0][1])

dask.array.core.Array

In [15]:
my_result[0][1]

Unnamed: 0,Array,Chunk
Bytes,3.15 MB,40.00 kB
Shape,"(262144, 3)","(10000, 1)"
Count,82 Tasks,81 Chunks
Type,>f4,numpy.ndarray
"Array Chunk Bytes 3.15 MB 40.00 kB Shape (262144, 3) (10000, 1) Count 82 Tasks 81 Chunks Type >f4 numpy.ndarray",3  262144,

Unnamed: 0,Array,Chunk
Bytes,3.15 MB,40.00 kB
Shape,"(262144, 3)","(10000, 1)"
Count,82 Tasks,81 Chunks
Type,>f4,numpy.ndarray


In [16]:
my_result[:10]

[(('PartType0', 'Coordinates'),
  dask.array<array, shape=(262144, 3), dtype=>f4, chunksize=(10000, 1), chunktype=numpy.ndarray>),
 (('PartType0', 'Coordinates'),
  dask.array<array, shape=(419, 3), dtype=>f4, chunksize=(419, 1), chunktype=numpy.ndarray>),
 (('PartType0', 'Coordinates'),
  dask.array<array, shape=(255819, 3), dtype=>f4, chunksize=(10000, 1), chunktype=numpy.ndarray>),
 (('PartType0', 'Coordinates'),
  dask.array<array, shape=(251598, 3), dtype=>f4, chunksize=(10000, 1), chunktype=numpy.ndarray>),
 (('PartType0', 'Coordinates'),
  dask.array<array, shape=(244445, 3), dtype=>f4, chunksize=(10000, 1), chunktype=numpy.ndarray>),
 (('PartType0', 'Coordinates'),
  dask.array<array, shape=(239908, 3), dtype=>f4, chunksize=(10000, 1), chunktype=numpy.ndarray>),
 (('PartType0', 'Coordinates'),
  dask.array<array, shape=(233206, 3), dtype=>f4, chunksize=(10000, 1), chunktype=numpy.ndarray>),
 (('PartType0', 'Coordinates'),
  dask.array<array, shape=(227868, 3), dtype=>f4, chunks

In [17]:
my_result[0][1][:10,:]

Unnamed: 0,Array,Chunk
Bytes,120 B,40 B
Shape,"(10, 3)","(10, 1)"
Count,85 Tasks,3 Chunks
Type,>f4,numpy.ndarray
"Array Chunk Bytes 120 B 40 B Shape (10, 3) (10, 1) Count 85 Tasks 3 Chunks Type >f4 numpy.ndarray",3  10,

Unnamed: 0,Array,Chunk
Bytes,120 B,40 B
Shape,"(10, 3)","(10, 1)"
Count,85 Tasks,3 Chunks
Type,>f4,numpy.ndarray


In [18]:
my_result[0][1][:10,:].compute()

array([[ 7.6320577 , 11.81454   ,  0.5112596 ],
       [ 7.630863  , 11.814384  ,  0.51114064],
       [ 7.633304  , 11.81966   ,  0.51152855],
       [ 7.6330523 , 11.819399  ,  0.51174545],
       [ 7.6328063 , 11.818984  ,  0.512373  ],
       [ 7.634123  , 11.820499  ,  0.5114557 ],
       [ 7.6352324 , 11.820305  ,  0.5110059 ],
       [ 7.634844  , 11.817485  ,  0.5118982 ],
       [ 7.6348486 , 11.817831  ,  0.51274544],
       [ 7.635442  , 11.818255  ,  0.5099228 ]], dtype=float32)

Ok, so then let's try using dask to compute a derived quantity in parallel? Let's find a mean? One chunk would be:

In [19]:
my_result[0][1].mean().compute()

12.328807

In [20]:
my_result[0][1].size

786432

so not in parallel, we could do:

In [21]:
meanval=0.
count=0. 
for result in my_result:
    count+=result[1].size
    meanval+=result[1].sum().compute()
meanval=meanval / count 
print(meanval)

12.217880717413463


to parallelize, we can assemble some dask delayed tasks and compute them after initializing a dask.distributed Client:

In [22]:
from dask import delayed,compute


sums = [] 
count=0. 
for result in my_result:
    count+=result[1].size
    sums.append(delayed(result[1].sum()))
    
sums

[Delayed('Array-430f3401-c06f-46f9-9596-ab27fd2b8f7b'),
 Delayed('Array-45bb94a2-7b38-426a-850f-c6db0829e661'),
 Delayed('Array-38a586ec-c6ea-4308-a435-b63a52405f89'),
 Delayed('Array-1dd5bf3f-4893-42ef-aa01-438e0dfcb426'),
 Delayed('Array-52dc0457-72df-4755-812f-fcba24cd5b9a'),
 Delayed('Array-be983546-b61f-4639-870c-10cf4ab9a870'),
 Delayed('Array-a825a648-a095-4141-adc7-5ed279422219'),
 Delayed('Array-1d6c0abc-2a89-4bc0-afad-85fbad18bb0e'),
 Delayed('Array-7e4c26f4-f4ae-41e0-8b5d-8ca87b802d03')]

In [23]:
from dask.distributed import Client 
c = Client(threads_per_worker=1,n_workers=4)

In [24]:
import numpy as np
summed = np.sum(compute(*sums))/count

In [25]:
summed

12.21788017812798

In [26]:
def findmin(chunks):
    
    # assemble the delayed operation 
    minvals = []
    for result in my_result:
        minvals.append(delayed(result[1].min()))

    # return the min of each 
    return np.min(compute(*minvals))
    

In [27]:
findmin(my_result)

0.0

a trival example, but it seems to work. the minval of each chunk is calculated on separate processors and we compute the min over all the chunks after using `compute` to collect. 

## An aside about units 

Let's check what happens to a yt array when stored as a dask array

In [28]:
import yt

In [29]:
dasked_yt = da.from_array(yt.YTArray(np.random.random(10000),'kpc'),chunks=(1000,))
dasked_yt

Unnamed: 0,Array,Chunk
Bytes,80.00 kB,8.00 kB
Shape,"(10000,)","(1000,)"
Count,11 Tasks,10 Chunks
Type,float64,unyt.unyt_array
"Array Chunk Bytes 80.00 kB 8.00 kB Shape (10000,) (1000,) Count 11 Tasks 10 Chunks Type float64 unyt.unyt_array",10000  1,

Unnamed: 0,Array,Chunk
Bytes,80.00 kB,8.00 kB
Shape,"(10000,)","(1000,)"
Count,11 Tasks,10 Chunks
Type,float64,unyt.unyt_array


In [30]:
dasked_yt.mean().compute()

0.5005705955532631

so things work, but we lose the `unyt` attachment

In [31]:
yt.YTArray(np.random.random(10000),'kpc').mean()

unyt_quantity(0.49740046, 'kpc')