Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Request all arrays from uproot at once inside dask task #1076

Merged
merged 2 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ classifiers = [
]
dependencies = [
"awkward>=2.6.3",
"uproot>=5.3.0",
"uproot!=5.3.3,>=5.3.0",
"dask[array]>=2024.3.0;python_version>'3.8'",
"dask[array]>=2023.4.0;python_version<'3.9'",
"dask-awkward>=2024.3.0",
Expand Down
35 changes: 27 additions & 8 deletions src/coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ def __getitem__(self, index):
return self._mapping[self._func(index)]


class _OnlySliceableAs:
"""A workaround for how PreloadedSourceMapping works"""

def __init__(self, array, expected_slice):
self._array = array
self._expected_slice = expected_slice

def __getitem__(self, s):
if s != self._expected_slice:
raise RuntimeError(f"Mismatched slice: {s} vs. {self._expected_slice}")
return self._array


class _map_schema_uproot(_map_schema_base):
def __init__(
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None
Expand Down Expand Up @@ -141,17 +154,23 @@ def load_buffers(
f"{start}-{stop}",
)
uuidpfn = {partition_key[0]: tree.file.file_path}
mapping = UprootSourceMapping(
TrivialUprootOpener(uuidpfn, interp_options),
start,
stop,
cache={},
access_log=None,
use_ak_forth=True,
arrays = tree.arrays(
keys,
entry_start=start,
entry_stop=stop,
ak_add_doc=interp_options["ak_add_doc"],
decompression_executor=decompression_executor,
interpretation_executor=interpretation_executor,
how=dict,
)
mapping.preload_column_source(partition_key[0], partition_key[1], tree)
source_arrays = {
k: _OnlySliceableAs(v, slice(start, stop)) for k, v in arrays.items()
}
mapping = PreloadedSourceMapping(
PreloadedOpener(uuidpfn), start, stop, access_log=None
lgray marked this conversation as resolved.
Show resolved Hide resolved
)
mapping.preload_column_source(partition_key[0], partition_key[1], source_arrays)

buffer_key = partial(self._key_formatter, tuple_to_key(partition_key))

# The buffer-keys that dask-awkward knows about will not include the
Expand Down
30 changes: 26 additions & 4 deletions src/coffea/nanoevents/mapping/preloaded.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import warnings

import awkward
import numpy

from coffea.nanoevents.mapping.base import BaseSourceMapping, UUIDOpener
from coffea.nanoevents.util import quote, tuple_to_key

Expand Down Expand Up @@ -76,12 +79,31 @@ def preload_column_source(self, uuid, path_in_source, source):
key = self.key_root() + tuple_to_key((uuid, path_in_source))
self._cache[key] = source

def get_column_handle(self, columnsource, name):
def get_column_handle(self, columnsource, name, allow_missing):
if allow_missing:
return columnsource[name] if name in columnsource else None
return columnsource[name]

def extract_column(self, columnhandle, start, stop, **kwargs):
# make sure uproot is single-core since our calling context might not be
return columnhandle[start:stop]
def extract_column(
self, columnhandle, start, stop, allow_missing, use_ak_forth=True
):
if allow_missing and columnhandle is None:
return awkward.contents.IndexedOptionArray(
awkward.index.Index64(numpy.full(stop - start, -1, dtype=numpy.int64)),
awkward.contents.NumpyArray(numpy.array([], dtype=bool)),
)
elif not allow_missing and columnhandle is None:
raise RuntimeError(
"Received columnhandle of None when missing column in file is not allowed!"
)
the_array = columnhandle[start:stop]
if allow_missing:
the_array = awkward.contents.IndexedOptionArray(
awkward.index.Index64(numpy.arange(stop - start, dtype=numpy.int64)),
awkward.contents.NumpyArray(the_array),
)

return the_array

def __len__(self):
return self._stop - self._start
Expand Down
3 changes: 2 additions & 1 deletion src/coffea/nanoevents/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ def nestedindex(stack):

def item(stack):
field = stack.pop()
stack.append(stack.pop()[field])
array = stack.pop()
stack.append(array[field])


def eventindex(stack):
Expand Down