Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dak.unzip #136

Merged
merged 12 commits into from
Jan 2, 2023
Merged

Add dak.unzip #136

merged 12 commits into from
Jan 2, 2023

Conversation

lgray
Copy link
Collaborator

@lgray lgray commented Dec 23, 2022

Fixes #121

@lgray
Copy link
Collaborator Author

lgray commented Dec 23, 2022

Could a maintainer please run this :-)

@lgray
Copy link
Collaborator Author

lgray commented Dec 23, 2022

@douglasdavis all the reducers appear to be failing, my test passes.

@agoose77
Copy link
Collaborator

Ah, that's from the latest PR on reducers. Let me fix that.

@codecov-commenter
Copy link

codecov-commenter commented Dec 23, 2022

Codecov Report

Merging #136 (011d36d) into main (78e5712) will increase coverage by 0.01%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main     #136      +/-   ##
==========================================
+ Coverage   95.72%   95.73%   +0.01%     
==========================================
  Files          18       18              
  Lines        1754     1759       +5     
==========================================
+ Hits         1679     1684       +5     
  Misses         75       75              
Impacted Files Coverage Δ
src/dask_awkward/lib/structure.py 96.60% <100.00%> (+0.08%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@lgray
Copy link
Collaborator Author

lgray commented Dec 27, 2022

So I'm really surprised this new implementation works, but it does so long as I provide an empty typetracer to the actual ak.unzip step and then fill in what the typetracer should be when exploding the getitems into that tuple. This seems delicate but it's better than generating a bazillion dask-delayed tasks for large input datasets.

@lgray
Copy link
Collaborator Author

lgray commented Jan 2, 2023

Could someone kick this PR for tests? Thanks!

@lgray
Copy link
Collaborator Author

lgray commented Jan 2, 2023

@jpivarski @martindurant your thoughts on the present implementation would be useful. As noted above I really get the feeling it's delicate, despite being quite effective.

@martindurant
Copy link
Collaborator

Just trying to wrap my head around the implications of this. Some thoughts:

  • we create an intermediate graph layer that has output type awkward, but actually produces a tuple of awkwards
  • that layer has an empty typetracer which is not descriptive of what the layer is
  • downstream layers will not work with optimization in the case that not all the branches are actually needed for computing

It seems to me that a cleaner description would be to_delayed(), which makes it explicit that the intermediate thing is not a dask-awkward collection, and that we don't expect to be able to optimize through it.

OR, perhaps better, there should only be a single layer, which fuses unzip/getitem into a single function for each branch. This would return a tuple of dak objects as now. Yes, you would effectively repeat the ak.unzip call for evey branch, but that's a cheap metadata-only operation, so it's fine.

@jpivarski
Copy link
Collaborator

As I understand it, the issue is that unzip returns a tuple, rather than a Dask object, such as a dak.Array (or dask-histogram, or dask.array, etc.). You use an empty type-tracer to make this function look enough like a function that returns a Dask object that map_partitions doesn't complain. Is that the issue?

Thinking about it now, dak.unzip should not be sending any work to the workers (no calls to map_partitions): it's purely a metadata rearrangement. This

ak.unzip(some_ak_array)

is entirely equivalent to this

tuple(some_ak_array[field_name] for field_name in ak.fields(some_ak_array))

It must be the case that dak.fields doesn't even produce a Dask node, right? That function can/should return its result during the graph-creation pass; it should be

def dak.fields(dak_array):
    return ak.fields(dak_array._meta)

The getitem-field step, some_dak_array[field_name], is just metadata-rearrangement, but it needs to be a Dask node because it is an operation that needs to run on the workers (the return value is a Dask type). But it has already been implemented, and a dak.unzip implementation can use that.

If it's done the above way, then a dak.unzip(some_dak_array) in the source code would translate into a bunch of some_dak_array[field_name] nodes in the task graph, one for each field, but then the Dask graph consists entirely of nodes that input Dask types and output Dask types, no non-Dask tuples.

Wouldn't this be a better way to implement it? Looking at the Dask graph, it wouldn't be possible to determine that the source code contained dak.unzip instead of manually calling getitem-field, but I don't think that reversibility is important.

@martindurant
Copy link
Collaborator

@jpivarski , I think your description and mine are identical in effect, but I didn't explain it as well :) Indeed, simply not having ak.unzip in the call at all is simpler.

@lgray
Copy link
Collaborator Author

lgray commented Jan 2, 2023

Got it - thanks! Addressed this, please run tests :-)

@martindurant
Copy link
Collaborator

Does this fail if the input is not a record type? ak.unzip also supports fixed-length upmost level (becomes N elements of the tuple) and other (becomes one-element tuple).

I think in this special case, we should add to the docstring, specifying that this function procudes a tuple of dak objects.

@jpivarski
Copy link
Collaborator

I actually didn't know what it was going to do. What it does in Awkward looks like reasonable behavior:

>>> ak.fields(ak.Array([1, 2, 3]))   # this much I knew; it's intentional
[]
>>> ak.unzip(ak.Array([1, 2, 3]))    # this is an interesting surprise, not unwelcome
(<Array [1, 2, 3] type='3 * int64'>,)

In the case of no fields, there's no attempt to get array[field_name], but the resulting tuple is not empty (as the code in my previous comment would have it). It's a length-1 tuple of the original array.

I think your description and mine are identical in effect

Sorry that I didn't catch that. (We were writing our comments at the same time, without seeing each others'.)

@lgray
Copy link
Collaborator Author

lgray commented Jan 2, 2023

    fields = ak.fields(array._meta)
    if len(fields) == 0:
        return tuple([array])
    else:
        return tuple(array[field] for field in ak.fields(array._meta))

? could also do an unzip on the meta of the input array.

@jpivarski
Copy link
Collaborator

What you have looks right to me (although you can reuse the already-computed fields variable).

I don't think you want to run ak.unzip on the array._meta, since you need to end up with Dask arrays, not plain Awkward arrays (which would be type-tracers, anyway). It has to be Dask-in, Dask-out.

Although I guess you could run ak.unzip on the meta and then wrap all outputs as Dask, but then you'd have to preserve the partitioning—it just seems easier to do it manually.

@martindurant
Copy link
Collaborator

Yes, I think that. There should also be a test for the fixed-top-level ("tuple-like") case - @jpivarski , what is the right condition for that? It should give

        return tuple(array[i] for i in range(N))

where N is the fixed size length.

@jpivarski
Copy link
Collaborator

The ak.fields approach (which returns strings like "0", "1", "2" for tuple-type records) works for both tuple-type and named-field records. There's no need to write specialized code for this case.

>>> array = ak.Array([(1, 1.1, "one"), (2, 2.2, "two")])
>>> array
<Array [(1, 1.1, 'one'), (2, 2.2, ...)] type='2 * (int64, float64, string)'>

>>> ak.unzip(array)
(<Array [1, 2] type='2 * int64'>,
 <Array [1.1, 2.2] type='2 * float64'>,
 <Array ['one', 'two'] type='2 * string'>)

>>> ak.fields(array)
['0', '1', '2']

>>> array["0"]
<Array [1, 2] type='2 * int64'>
>>> array["1"]
<Array [1.1, 2.2] type='2 * float64'>
>>> array["2"]
<Array ['one', 'two'] type='2 * string'>

(If you were to use range(N), you'd have to ensure that the values are strings, so that they're not interpreted as row indexes: str(i). But ak.fields already gives you that.)

@martindurant
Copy link
Collaborator

Oh ok, thanks for clarifying. Might be worth a test just like your example code.

@lgray
Copy link
Collaborator Author

lgray commented Jan 2, 2023

please rerun tests

(maybe talking to the bot works if you're a new contributor?)

@martindurant martindurant merged commit bf1bfb5 into dask-contrib:main Jan 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

dak.unzip
5 participants