Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Computes that require shape only fail after form rehydration (count, zeros_like, ones_like, ...) #313

Closed
lgray opened this issue Jul 7, 2023 · 5 comments · Fixed by #379

Comments

@lgray
Copy link
Collaborator

lgray commented Jul 7, 2023

This is in uproot 5.0.10, dask-awkward 2023.7.0, and awkward 2.3.1.
I'm not sure where to put this one, since the actual error is in awkward, but I'm putting it here since it's in a dak function.

@agoose77

import uproot
import awkward as ak
import dask_awkward as dak

tree = uproot.open({"https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root": "Events"})
dtree = uproot.dask(tree)
    
print(ak.count(tree["event"].array()))

print(dak.count(dtree.event).compute())

fails with:

Traceback (most recent call last):
  File "count_repro.py", line 10, in <module>
    print(dak.count(dtree.event).compute())
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/base.py", line 314, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/base.py", line 599, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/local.py", line 319, in reraise
    raise exc
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask_awkward/lib/core.py", line 1492, in __call__
    return self.fn(*args, **kwargs)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask_awkward/lib/core.py", line 1692, in _prepare_axis_none_chunk
    (layout,) = ak._do.remove_structure(
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/awkward/_do.py", line 223, in remove_structure
    arrays = layout._remove_structure(
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/awkward/contents/numpyarray.py", line 1230, in _remove_structure
    backend.nplike.reshape(self._raw(backend.nplike), shape),
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/awkward/_nplikes/array_module.py", line 147, in reshape
    assert not isinstance(x, PlaceholderArray)
AssertionError
@lgray lgray changed the title dak.count fails dak.count(flat_array).compute() fails Jul 7, 2023
@douglasdavis
Copy link
Collaborator

This exception appears when going from dask-awkward 2023.6.2 to 2023.6.3; which includes the new reductions code from PR #267

@douglasdavis
Copy link
Collaborator

The exception changes to

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/ddavis/software/repos/dask/dask/threaded.py", line 89, in get
    results = get_async(
              ^^^^^^^^^^
  File "/Users/ddavis/software/repos/dask/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/Users/ddavis/software/repos/dask/dask/local.py", line 319, in reraise
    raise exc
  File "/Users/ddavis/software/repos/dask/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ddavis/.pyenv/versions/dev/lib/python3.11/site-packages/awkward/_dispatch.py", line 43, in dispatch
    next(gen_or_result)
  File "/Users/ddavis/.pyenv/versions/dev/lib/python3.11/site-packages/awkward/operations/ak_count.py", line 102, in count
    return _impl(array, axis, keepdims, mask_identity, highlevel, behavior)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ddavis/.pyenv/versions/dev/lib/python3.11/site-packages/awkward/operations/ak_count.py", line 111, in _impl
    out = ak._do.reduce(
          ^^^^^^^^^^^^^^
  File "/Users/ddavis/.pyenv/versions/dev/lib/python3.11/site-packages/awkward/_do.py", line 328, in reduce
    raise ValueError(
ValueError: axis=1 exceeds the depth of the nested list structure (which is 1)

This error occurred while calling

    ak.count(
        <Array [3749778, 3749762, ..., 3749882, 3749892] type='40 * uint64'>
        axis = 1
    )

with dask-awkward 2023.6.2; so I don't think the new reducer is actually the problem. I'm wondering why we're getting use of axis=1 popping up when the default ak.count is axis=None

@douglasdavis
Copy link
Collaborator

Something else weird I'm noticing: there's a compute happening somewhere upon just calling dak.count. That definitely shouldn't happen!

@douglasdavis
Copy link
Collaborator

with the latest release (2023.7.1) this now fails due to the known FieldNotFoundError issue with shape only touched parts of the array

FieldNotFoundError: no field 'event' in record with 1 fields

This error occurred while attempting to slice

    <Array [{run: 1}, {run: 1}, ..., {...}, {run: 1}] type='40 * {run: uint32}'>

with

    'event'


@douglasdavis
Copy link
Collaborator

douglasdavis commented Jul 21, 2023

I've put together a "minimal" (sorta) reproducer that is using only awkward code.

Starting with a summary of the problem:

When we call ak.count (or ak.{zeros,ones}_like), we only need shape information, so we end up with a necessary columns optimization (NCO) that tells us we can drop all fields. When we end up with a NCO telling us we have 0 necessary columns we do make sure to read at least one.

Keeping with ak.count as our example... If we call ak.count(array.foo), but the NCO picks array.bar as the one field to read, we of course will drop the foo field (we know it's not necessary for the ak.count call to give a result), so we rely on form rehydration to have an existing foo field. This part works. (shown by the # sanity check comments in the reproducer below below)

The problem appears to come from removing structure from an array that has PlaceholderArrays in it as a consequence of form rehydration. In the reproducer below we attempt to use the y field for the ak.count call which has been explicitly dropped and rehydrated.

Here's a reproducer (it's quite long because it extracts the contents of dask_awkward.lib.unproject_layout; which is a module that is independent of dask-awkward (it only imports awkward and numpy). The last ~15 lines are the main part of the reproducer

from __future__ import annotations

import math

import awkward as ak
import numpy as np
from awkward.contents import (
    BitMaskedArray,
    ByteMaskedArray,
    Content,
    EmptyArray,
    IndexedArray,
    IndexedOptionArray,
    ListArray,
    ListOffsetArray,
    NumpyArray,
    RecordArray,
    RegularArray,
    UnionArray,
    UnmaskedArray,
)
from awkward.forms import (
    BitMaskedForm,
    ByteMaskedForm,
    EmptyForm,
    Form,
    IndexedForm,
    IndexedOptionForm,
    ListForm,
    ListOffsetForm,
    NumpyForm,
    RecordForm,
    RegularForm,
    UnionForm,
    UnmaskedForm,
)
from awkward.typetracer import PlaceholderArray, unknown_length

index_of = {
    "i8": ak.index.Index8,
    "u8": ak.index.IndexU8,
    "i32": ak.index.Index32,
    "u32": ak.index.IndexU32,
    "i64": ak.index.Index64,
}
dtype_of = {
    "i8": np.dtype(np.int8),
    "u8": np.dtype(np.uint8),
    "i32": np.dtype(np.int32),
    "u32": np.dtype(np.uint32),
    "i64": np.dtype(np.int64),
    "u64": np.dtype(np.uint64),
}


def dummy_index_of(typecode: str, length: int, nplike) -> ak.index.Index:
    index_cls = index_of[typecode]
    dtype = dtype_of[typecode]
    return index_cls(PlaceholderArray(nplike, (length,), dtype), nplike=nplike)


def dummy_buffer(shape, dtype, backend):
    return backend.broadcast_to(backend.asarray([0], dtype=dtype), shape)


def compatible(form: Form, layout: Content) -> bool:
    if layout is None:
        return True

    elif isinstance(layout, Content) and type(form) is layout.form_cls:
        if isinstance(form, (EmptyForm, NumpyForm)):
            # 0 contents
            return True

        elif isinstance(
            form,
            (
                BitMaskedForm,
                ByteMaskedForm,
                IndexedForm,
                IndexedOptionForm,
                ListForm,
                ListOffsetForm,
                RegularForm,
                UnmaskedForm,
            ),
        ):
            # 1 content
            return compatible(form.content, layout.content)

        elif isinstance(form, RecordForm):
            # arbitrarily many contents, possibly with missing fields
            for field in form.fields:
                if layout.has_field(field):
                    if not compatible(form.content(field), layout.content(field)):
                        return False
            return True

        elif isinstance(form, UnionForm):
            # arbitrarily many contents, possibly with missing fields
            for sublayout in layout.contents:
                if not any(compatible(subform, sublayout) for subform in form.contents):
                    return False
            return True

    elif isinstance(layout, UnmaskedArray) and form.is_option:
        return compatible(form.content, layout.content)

    elif isinstance(form, UnionForm):
        for subform in form.contents:
            if compatible(subform, layout):
                return True
        else:
            return False

    # handle other cases that come up here...

    else:
        return False


def _unproject_layout(form, layout, length, backend):
    if layout is None:
        # construct the "minimum necessary" layout
        # maintaining length constraints if there are any, 0 otherwise

        if isinstance(form, EmptyForm):
            return EmptyArray(parameters=form.parameters)

        elif isinstance(form, NumpyForm):
            return NumpyArray(
                PlaceholderArray(
                    backend.nplike,
                    (length,) + form.inner_shape,
                    ak.types.numpytype.primitive_to_dtype(form.primitive),
                ),
                parameters=form.parameters,
            )

        elif isinstance(form, BitMaskedForm):
            return BitMaskedArray(
                dummy_index_of(
                    form.mask,
                    unknown_length
                    if length is unknown_length
                    else math.ceil(length / 8.0),
                    backend.index_nplike,
                ),
                _unproject_layout(form.content, None, length, backend),
                form.valid_when,
                length,
                form.lsb_order,
                parameters=form.parameters,
            )

        elif isinstance(form, ByteMaskedForm):
            return ByteMaskedArray(
                dummy_index_of(form.mask, length, backend.index_nplike),
                _unproject_layout(form.content, None, length, backend),
                form.valid_when,
                parameters=form.parameters,
            )

        elif isinstance(form, IndexedForm):
            return IndexedArray(
                dummy_index_of(form.index, length, backend.index_nplike),
                _unproject_layout(form.content, None, unknown_length, backend),
                parameters=form.parameters,
            )

        elif isinstance(form, IndexedOptionForm):
            return IndexedOptionArray(
                dummy_index_of(form.index, length, backend.index_nplike),
                _unproject_layout(form.content, None, unknown_length, backend),
                parameters=form.parameters,
            )

        elif isinstance(form, ListForm):
            return ListArray(
                dummy_index_of(form.starts, length, backend.index_nplike),
                dummy_index_of(form.stops, length, backend.index_nplike),
                _unproject_layout(form.content, None, unknown_length, backend),
                parameters=form.parameters,
            )

        elif isinstance(form, ListOffsetForm):
            return ListOffsetArray(
                dummy_index_of(form.offsets, length + 1, backend.index_nplike),
                _unproject_layout(form.content, None, unknown_length, backend),
                parameters=form.parameters,
            )

        elif isinstance(form, RegularForm):
            return RegularArray(
                _unproject_layout(form.content, None, length * form.size, backend),
                form.size,
                length,
                parameters=form.parameters,
            )

        elif isinstance(form, UnmaskedForm):
            return UnmaskedArray(
                _unproject_layout(form.content, None, length, backend),
                parameters=form.parameters,
            )

        elif isinstance(form, RecordForm):
            return RecordArray(
                [
                    _unproject_layout(content, None, length, backend)
                    for content in form.contents
                ],
                None if form.is_tuple else form.fields,
                length,
                parameters=form.parameters,
            )

        elif isinstance(form, UnionForm):
            return UnionArray(
                dummy_index_of(form.tags, length, backend.index_nplike),
                dummy_index_of(form.index, length, backend.index_nplike),
                [
                    _unproject_layout(content, None, unknown_length, backend)
                    for content in form.contents
                ],
                parameters=form.parameters,
            )

        else:
            raise AssertionError(f"unrecognized Form type: {type(form)}")

    elif isinstance(layout, Content) and type(form) is layout.form_cls:
        # pass on this layout node, allowing for descendants to be missing

        if isinstance(form, (EmptyForm, NumpyForm)):
            # 0 contents
            return layout

        elif isinstance(
            form,
            (
                BitMaskedForm,
                ByteMaskedForm,
                IndexedForm,
                IndexedOptionForm,
                ListForm,
                ListOffsetForm,
                RegularForm,
                UnmaskedForm,
            ),
        ):
            # 1 content
            return layout.copy(
                content=_unproject_layout(
                    form.content, layout.content, layout.content.length, backend
                )
            )

        elif isinstance(form, RecordForm):
            # arbitrarily many contents, possibly with missing fields
            contents = []
            for field in form.fields:
                if layout.has_field(field):
                    layout_content = layout.content(field)
                    contents.append(
                        _unproject_layout(
                            form.content(field),
                            layout_content,
                            layout_content.length,
                            backend,
                        )
                    )
                else:
                    contents.append(
                        _unproject_layout(form.content(field), None, length, backend)
                    )

            return RecordArray(
                contents,
                None if form.is_tuple else form.fields,
                length,
                parameters=form.parameters,
            )

        elif isinstance(form, UnionForm):
            # arbitrarily many contents, possibly with missing fields
            available = dict(enumerate(layout.contents))

            newtags = backend.empty_like(layout.tags.data)
            contents = []
            for newtag, subform in enumerate(form.contents):
                for oldtag, sublayout in available.items():
                    if compatible(subform, sublayout):
                        contents.append(sublayout)
                        newtags[layout.tags.data == oldtag] = newtag
                        del available[oldtag]
                        break
                else:
                    contents.append(_unproject_layout(subform, None, 0, backend))

            return UnionArray(
                ak.index.Index8(newtags),
                layout.index,
                contents,
                parameters=form.parameters,
            )

        else:
            raise AssertionError(f"unrecognized Form type: {type(form)}")

    # UnmaskedArray, non-UnmaskedArray form
    elif isinstance(layout, UnmaskedArray) and form.is_option:
        if isinstance(form, BitMaskedForm):
            return BitMaskedArray(
                ak.index.IndexU8.zeros(length, backend.index_nplike),
                _unproject_layout(
                    form.content, layout.content, layout.content.length, backend
                ),
                form.valid_when,
                layout.length,
                form.lsb_order,
                parameters=layout._parameters,
            )
        elif isinstance(form, ByteMaskedForm):
            return ByteMaskedArray(
                ak.index.Index8.zeros(length, backend.index_nplike),
                _unproject_layout(
                    form.content, layout.content, layout.content.length, backend
                ),
                form.valid_when,
                parameters=layout._parameters,
            )
        elif isinstance(form, IndexedOptionForm):
            return IndexedOptionArray(
                ak.index.Index64(
                    backend.index_nplike.arange(layout.length, dtype=np.int64),
                    nplike=backend.index_nplike,
                ),
                _unproject_layout(
                    form.content, layout.content, layout.content.length, backend
                ),
                parameters=layout._parameters,
            )
        else:
            raise TypeError(form)

    # Something added an option to our layout. This can happen when reading a subset of
    # columns from a Parquet file (perhaps https://github.com/apache/arrow/issues/30043)
    elif isinstance(layout, UnmaskedArray) and not form.is_option:
        return _unproject_layout(form, layout.content, layout.content.length, backend)

    elif isinstance(form, UnionForm):
        newtags, newindex = None, None
        contents = []
        for newtag, subform in enumerate(form.contents):
            if compatible(subform, layout):
                contents.append(layout)
                newtags = backend.full(layout.length, newtag, dtype=np.int8)
                newindex = backend.arange(layout.length, dtype=dtype_of[form.index])
            else:
                contents.append(_unproject_layout(subform, None, 0, backend))

        assert newtags is not None and newindex is not None
        return UnionArray(
            ak.index.Index8(newtags),
            ak.index.Index(newindex),
            contents,
            parameters=form.parameters,
        )

    # handle other cases that come up here...

    else:
        raise AssertionError(f"unexpected combination: {type(form)} and {type(layout)}")


def unproject_layout(form: Form | None, layout: Content) -> Content:
    """Does nothing! Currently returns the passed in layout unchanged!

    Rehydrate a layout to include all parts of an original form.

    When we perform the necessary columns optimization we drop fields
    that are not necessary for a computed result. Sometimes we have
    task graphs that expect to see fields in name only (but no their
    data). To protect against FieldNotFound exception we "unproject"
    or "rehydrate" the layout with the original form. This reapplys
    all original fields, but the ones that were orignally projected
    away are data-less.

    Parameters
    ----------
    form : awkward.forms.form.Form, optional
        The complete Form to apply to a projected layout. If ``None``,
        the layout will be returned without unprojection (this case
        assumes column projection did not occur).
    layout : awkward.contents.content.Content
        The projected layout.

    Returns
    -------
    awkward.contents.content.Content
        Unprojected layout (all fields from the original form that did
        not appear in the projected layout will be PlaceholderArrays).

    """
    if form is None:
        return layout
    return _unproject_layout(form, layout, layout.length, layout.backend)


a = ak.Array([{"x": 1, "y": 2}, {"x": 3, "y": 4}])
trimmed = a[["x"]]
trimmed_layout = trimmed.layout
original_layout = a.layout

rehydrated_layout = unproject_layout(original_layout.form, trimmed_layout)
rehydrated_array = ak.Array(rehydrated_layout)


assert trimmed.fields == ["x"]  # sanity check
assert rehydrated_array.fields == ["x", "y"]  # sanity check

ak.count(rehydrated_array.y)

@douglasdavis douglasdavis changed the title dak.count(flat_array).compute() fails Computes that require shape only fail after form rehydration (count, zeros_like, ones_like, ...) Jul 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants