Skip to content

Commit

Permalink
Merge pull request #1076 from CoffeaTeam/uproot_read_all
Browse files Browse the repository at this point in the history
feat: Request all arrays from uproot at once inside dask task
  • Loading branch information
lgray committed Apr 15, 2024
2 parents 750b96d + 98a1548 commit b8d296c
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ classifiers = [
]
dependencies = [
"awkward>=2.6.3",
"uproot>=5.3.0",
"uproot!=5.3.3,>=5.3.0",
"dask[array]>=2024.3.0;python_version>'3.8'",
"dask[array]>=2023.4.0;python_version<'3.9'",
"dask-awkward>=2024.3.0",
Expand Down
35 changes: 27 additions & 8 deletions src/coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ def __getitem__(self, index):
return self._mapping[self._func(index)]


class _OnlySliceableAs:
"""A workaround for how PreloadedSourceMapping works"""

def __init__(self, array, expected_slice):
self._array = array
self._expected_slice = expected_slice

def __getitem__(self, s):
if s != self._expected_slice:
raise RuntimeError(f"Mismatched slice: {s} vs. {self._expected_slice}")
return self._array


class _map_schema_uproot(_map_schema_base):
def __init__(
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None
Expand Down Expand Up @@ -141,17 +154,23 @@ def load_buffers(
f"{start}-{stop}",
)
uuidpfn = {partition_key[0]: tree.file.file_path}
mapping = UprootSourceMapping(
TrivialUprootOpener(uuidpfn, interp_options),
start,
stop,
cache={},
access_log=None,
use_ak_forth=True,
arrays = tree.arrays(
keys,
entry_start=start,
entry_stop=stop,
ak_add_doc=interp_options["ak_add_doc"],
decompression_executor=decompression_executor,
interpretation_executor=interpretation_executor,
how=dict,
)
mapping.preload_column_source(partition_key[0], partition_key[1], tree)
source_arrays = {
k: _OnlySliceableAs(v, slice(start, stop)) for k, v in arrays.items()
}
mapping = PreloadedSourceMapping(
PreloadedOpener(uuidpfn), start, stop, access_log=None
)
mapping.preload_column_source(partition_key[0], partition_key[1], source_arrays)

buffer_key = partial(self._key_formatter, tuple_to_key(partition_key))

# The buffer-keys that dask-awkward knows about will not include the
Expand Down
30 changes: 26 additions & 4 deletions src/coffea/nanoevents/mapping/preloaded.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import warnings

import awkward
import numpy

from coffea.nanoevents.mapping.base import BaseSourceMapping, UUIDOpener
from coffea.nanoevents.util import quote, tuple_to_key

Expand Down Expand Up @@ -76,12 +79,31 @@ def preload_column_source(self, uuid, path_in_source, source):
key = self.key_root() + tuple_to_key((uuid, path_in_source))
self._cache[key] = source

def get_column_handle(self, columnsource, name):
def get_column_handle(self, columnsource, name, allow_missing):
if allow_missing:
return columnsource[name] if name in columnsource else None
return columnsource[name]

def extract_column(self, columnhandle, start, stop, **kwargs):
# make sure uproot is single-core since our calling context might not be
return columnhandle[start:stop]
def extract_column(
self, columnhandle, start, stop, allow_missing, use_ak_forth=True
):
if allow_missing and columnhandle is None:
return awkward.contents.IndexedOptionArray(
awkward.index.Index64(numpy.full(stop - start, -1, dtype=numpy.int64)),
awkward.contents.NumpyArray(numpy.array([], dtype=bool)),
)
elif not allow_missing and columnhandle is None:
raise RuntimeError(
"Received columnhandle of None when missing column in file is not allowed!"
)
the_array = columnhandle[start:stop]
if allow_missing:
the_array = awkward.contents.IndexedOptionArray(
awkward.index.Index64(numpy.arange(stop - start, dtype=numpy.int64)),
awkward.contents.NumpyArray(the_array),
)

return the_array

def __len__(self):
return self._stop - self._start
Expand Down
3 changes: 2 additions & 1 deletion src/coffea/nanoevents/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ def nestedindex(stack):

def item(stack):
field = stack.pop()
stack.append(stack.pop()[field])
array = stack.pop()
stack.append(array[field])


def eventindex(stack):
Expand Down

0 comments on commit b8d296c

Please sign in to comment.