Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor column optimization based on typetracer updates #148

Merged

Conversation

douglasdavis
Copy link
Collaborator

@douglasdavis douglasdavis commented Jan 19, 2023

Before this PR we recently created two different optimization strategies for column projection:

  • the "simple getitem" strategy: searching for getitem layers that use strings or lists of strings
  • the "brute-force" strategy: drop fields from the starting IO layer(s) one at a time and see if a compute on the metadata fails

With this PR we now use the new typetracer_with_report API from upstream awkward to create a mutable report object, compute the task graph on purely the metadata (typetracer arrays), and return a list of necessary columns for each IO layer in the graph.

There are still some outstanding upstream typetracer array issues that need to be ironed out before this optimization works for the current dask-awkward test suite.

@codecov-commenter
Copy link

codecov-commenter commented Jan 23, 2023

Codecov Report

Merging #148 (99c122d) into main (830a18c) will decrease coverage by 0.49%.
The diff coverage is 94.71%.

📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more

@@            Coverage Diff             @@
##             main     #148      +/-   ##
==========================================
- Coverage   94.94%   94.45%   -0.49%     
==========================================
  Files          19       19              
  Lines        1919     2002      +83     
==========================================
+ Hits         1822     1891      +69     
- Misses         97      111      +14     
Impacted Files Coverage Δ
src/dask_awkward/lib/reducers.py 96.11% <ø> (ø)
src/dask_awkward/lib/core.py 90.53% <88.09%> (-1.01%) ⬇️
src/dask_awkward/lib/inspect.py 91.66% <91.30%> (-8.34%) ⬇️
src/dask_awkward/lib/optimize.py 95.13% <94.48%> (-1.39%) ⬇️
src/dask_awkward/layers/layers.py 97.91% <96.55%> (-2.09%) ⬇️
src/dask_awkward/layers/__init__.py 100.00% <100.00%> (ø)
src/dask_awkward/lib/__init__.py 100.00% <100.00%> (ø)
src/dask_awkward/lib/io/io.py 100.00% <100.00%> (ø)
src/dask_awkward/lib/io/json.py 99.31% <100.00%> (+<0.01%) ⬆️
src/dask_awkward/lib/io/parquet.py 90.24% <100.00%> (+0.98%) ⬆️
... and 3 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@martindurant
Copy link
Collaborator

Good to go, right?
I wonder if there's a convenient way to count how many warnings we are currently generating in the tests - presumably the same number as we had failing tests before.

@douglasdavis
Copy link
Collaborator Author

Good to go, right?

I think it would be reasonable to bring this in as is and start addressing the warnings in follow up PRs

@lgray would you be able to run a quick workflow of yours that you may be experimenting with? I think it would be useful to see how this PR affects things you've been working on.

I wonder if there's a convenient way to count how many warnings we are currently generating in the tests - presumably the same number as we had failing tests before.

There are actually a few more now that @lgray's #146 PR is in (which is good, just more information we can report upstream as holes in the typetracer)

@lgray
Copy link
Collaborator

lgray commented Jan 23, 2023

@douglasdavis Sure, I can give this a try later today.

@lgray
Copy link
Collaborator

lgray commented Jan 23, 2023

@douglasdavis

/Users/lgray/coffea-dev/dask-awkward/src/dask_awkward/lib/optimize.py:137: UserWarning: Column projection optimization failed: <class 'ValueError'>, cannot convert UnknownScalar(dtype('int64')) (type UnknownScalar) to an array element

(https://github.com/scikit-hep/awkward/blob/awkward-cpp-7/awkward-cpp/src/python/content.cpp#L191)
  warnings.warn(f"Column projection optimization failed: {type(err)}, {err}")

I don't have any introspection as to what part of my code is causing it though. Interestingly the results come back just fine?

@lgray
Copy link
Collaborator

lgray commented Jan 23, 2023

The dict I'm passing into dask.compute looks like:

{'mass': Hist(
  StrCategory([], growth=True, name='dataset'),
  Regular(30000, 0.25, 300, name='mass'),
  storage=Double()) # (has staged fills), 'pt': Hist(
  StrCategory([], growth=True, name='dataset'),
  Regular(30000, 0.24, 300, name='pt'),
  storage=Double()) # (has staged fills), 'cutflow': {'ZJets_pt': dask.awkward<sum-agg, type=Scalar, dtype=Unknown>, 'ZJets_mass': dask.awkward<sum-agg, type=Scalar, dtype=Unknown>, 'Data_pt': dask.awkward<sum-agg, type=Scalar, dtype=Unknown>, 'Data_mass': dask.awkward<sum-agg, type=Scalar, dtype=Unknown>}}

@lgray
Copy link
Collaborator

lgray commented Jan 23, 2023

If I try to ask for _necessary_columns on one of the outputs, I get the following:
image

out["mass"] is a dask_histogram.

@lgray
Copy link
Collaborator

lgray commented Jan 23, 2023

Also it would be great if _necessary_columns was a public interface! I will need to do a bit of analysis on user code that will require it.

@douglasdavis
Copy link
Collaborator Author

douglasdavis commented Jan 23, 2023

We do know about the UnknownSclalar error (this one: cannot convert UnknownScalar(dtype('int64')) (type UnknownScalar) to an array element), reported upstream here scikit-hep/awkward#2135

I don't have any introspection as to what part of my code is causing it though. Interestingly the results come back just fine?

So the route we've gone down here for now is to catch the exceptions that are raised during the optimization and then throw that warning that you are seeing (the warning that says the optimization failed). We do still expect the computation to be correct, we just bailed on the column projection optimization (so I'm glad you see that the results come back just fine). Martin and I were just talking about a way to avoid the exceptions from awkward and still run the column projection optimization. This would be a workaround we can use until the upstream typetracer development is complete and the exceptions no longer get raised by awkward. I'm going to start experimenting with the workaround today.

Also it would be great if _necessary_columns was a public interface! I will need to do a bit of analysis on user code that will require it.

a necessary_columns function makes it into the top level module in this PR (you should be able to call dak.necessary_columns)

@douglasdavis
Copy link
Collaborator Author

a necessary columns example:

>>> import dask_awkward as dak
>>> ds = dak.from_parquet("s3://ddavistemp/hpq", storage_options={"anon": True})
>>> ds.muons.pt - ds.gen.pt
dask.awkward<subtract, npartitions=50>
>>> dak.necessary_columns(ds.muons.pt - ds.gen.pt)
{'read-parquet-7ec21ca9ab2b70cee1e98308ed59125d': ['muons.pt', 'gen.pt']}

@lgray
Copy link
Collaborator

lgray commented Jan 23, 2023

Ah it's already public. ok! Let me try again in a bit here.

@lgray
Copy link
Collaborator

lgray commented Jan 23, 2023

I guess my question is then: can I reliably use necessary_columns on workflows that have non-dak operations in them?

@douglasdavis
Copy link
Collaborator Author

douglasdavis commented Jan 23, 2023

tl;dr: Since the optimization is based on typetracer arrays, right now the operations in the workflow need to be typetracer array aware.

More explanation taking dask-histogram as an example: If we create a graph from the following code:

import dask_histogram.boost as dhb
import dask_awkward as dak
ds = dak.from_parquet("s3://ddavistemp/hpq", storage_options={"anon": True})
pt = dak.flatten(ds.muons.pt)
h = dhb.Histogram(dhb.axis.Regular(25, 0, 250))
h.fill(pt)
h.compute()

This compute will work, but only the dask_histogram optimizations will run because we called compute on the dask_histogram object; that is, this:

https://github.com/dask-contrib/dask-histogram/blob/d3ebda03385ab4892dddb65bded9ee6613bcfc5c/src/dask_histogram/core.py#L274-L276

function is called. Which does vanilla dask optimizations (not specific to dask-awkward). you can see dask-histogram's optimize is just calling functions from upstream dask

However, if we call dak.necessary_columns:

dak.necessary_columns(h)

We see

/Users/ddavis/software/repos/dask-awkward/src/dask_awkward/lib/optimize.py:150: UserWarning: Column projection optimization failed: <class 'AssertionError'>, bug in Awkward Array: attempt to convert TypeTracerArray into a concrete array

See if this has been reported at https://github.com/scikit-hep/awkward-1.0/issues
  warnings.warn(f"Column projection optimization failed: {type(err)}, {err}")

In this case the graph is getting analyzed by dask-awkward, which knows about AwkwardInputLayer; so the optimization is attempted, but it fails because there's a step in the graph calling boost_histogram code, which is unaware of awkward typetracers (hence the error where a typetracer array is attempting to be converted to a concrete numpy array). This is happening in a boost-histogram fill via this line in dask-histogram:

https://github.com/dask-contrib/dask-histogram/blob/d3ebda03385ab4892dddb65bded9ee6613bcfc5c/src/dask_histogram/core.py#L194-L195

data in that code block is simply a typetracer array during the optimization compute pass, and boost-histogram doesn't know anything about typetracer arrays.

It should be possible to add to dask-histogram a feature to make it run dask-awkward optimizations if dask-awkward is installed. But we still need to solve the issue of exceptions getting raised due to external libraries not knowing about typetracers. This is do-able too, but we need to figure out the best way to mock these layers in the type-tracer-only graph.

@lgray
Copy link
Collaborator

lgray commented Jan 23, 2023

Ah, right this would be taken care of by the empty_if_typetracer stuff that @agoose77 is working on, right? Since it's so common I think we should make sure dak -> dask-histogram works nearly out of the box.

@agoose77
Copy link
Collaborator

agoose77 commented Jan 24, 2023

Ultimately, any code executed by Dask with a typetracer can lead to operations on unknown values. We're planning for UnknownScalar to become TypeTracerArray(..., shape=()), which will unify the error semantics according to the rule that any operation requiring concrete values will fail.

There are two facets to this problem of probing functions with typetracer:

  • Ensuring that third-party code doesn't operate on typetracer arrays
    • Plan is to add support to hist for this e.g. with a structured-histogram protocol that we're thinking about at the moment. See here for some musings of mine on this (not actually planned yet). Incidentally, this wouldn't be much use right now given that dask-histogram uses boost-histogram instead of hist.
  • Ensuring that user code doesn't operate on typetracer arrays

In both cases, the current idea that we're pursuing is to export an empty_if_typetracer function that returns the equivalent of form.length_zero_array() if the array is a typetracer and calls touch_data(). This is an explicit, manual step, but it will ensure that the user code avoids any conditionals.

Note that empty_if_typetracer was originally dreamt up for Coffea, but it's not clear to me how needed it still is. It would certainly be useful for anything going in to map_partitions.

@lgray
Copy link
Collaborator

lgray commented Jan 24, 2023

@agoose77 There are a few places in coffea that need to call a map_partitions, particularly here I just want dask-histogram to yield to typetracer analysis, which (I think) will be solved by that.

@lgray
Copy link
Collaborator

lgray commented Jan 24, 2023

Though I guess that can also be solved by checking if it's a typetracer and then manually calling touch_data() on it?

@agoose77
Copy link
Collaborator

empty_if_typetracer would be useful in such a scenario, e.g.

def fill_user_hist(data):
    p = np.sqrt(data.px**2 + data.py**2 + data.pz**2)
    p_is_small = p < 10    
    
    # Maybe this will only be a single-arg function, who knows.
    # Ensure bogus data are available for histogramming
    p, p_is_small = ak.empty_if_typetracer(p, p_is_small)
    
    p_hist = Hist.new.Bool().Reg(128, 0, 1000).Int64()
    p_hist.fill(p_is_small, p)

dak.map_partitions(fill_user_hist, array)

@lgray
Copy link
Collaborator

lgray commented Jan 24, 2023

Ah - I see. This kind of stuff never goes anywhere near the common user in my perception of things, it should be there for them as a pressure relief valve or for power users, but the libraries should do a good portion of common functionality themselves without much complaining.

@agoose77
Copy link
Collaborator

agoose77 commented Jan 24, 2023

Yes, this is an escape hatch. Really, it's more of a convenience L2 function (public API for library developer uses) who need to know about typetracers. For developers of libraries, you'd already use ak.backend(layout) to determine whether you had a typetracer array, and then you could do array.layout.form.length_... if you wanted a zero length array, so it's not giving you any new abilities, besides the fact that it touches the data, which is currently not truly a public api.

So in this case, empty_if_typetracer shouldn't come in to if if it's non-awkward library code, e.g. Coffea, dask. If such knowledge exists at the task-graph level in a convenient fashion, we should be able to handle dask-histogram explicitly, but I'll defer to the Anaconda folks as to the practical feasibility of this.

@lgray
Copy link
Collaborator

lgray commented Jan 24, 2023

It is, however, considerably more concise and tidy in library code! :D

@douglasdavis
Copy link
Collaborator Author

douglasdavis commented Jan 24, 2023

I agree agree with @lgray's comment on it being a pressure relief valve. We can benefit (on the dask-awkward side as library authors) from usng empty_if_typetracer to help ensure the column selection optimization operates as desired (the implementation of the optimization is never exposed to users as a public API). We will always go with direct operation on typetracers as a first try, but being able to fall back to a mocked function call which can wrap the original function but operate on something created by empty_if_typetracer is a good thing to have in the toolbox

@douglasdavis
Copy link
Collaborator Author

douglasdavis commented Jan 24, 2023

And second, we can include dask-awkward optimizations in dask-histogram for graphs where dask-awkward's AwkwardInputLayer is detected.

PS it can be already be done explicitly by optimizing the dask-awkward graph before building more task graph with a dask-histogram step:

ds = dask_awkward_dataset_factory()
pt_ratio = ds.pt_reco / ds.pt_true
(pt_ratio,) = dask.optimize(pt_ratio)   # <-- modifies the read step in the graph to only grab ['pt_true', 'pt_reco']
h = dhb.Histogram(...)
h.fill(pt_ratio)
h.compute()

@agoose77
Copy link
Collaborator

Just to be clear, the empty_if_typetracer proposal is something users would need to add themselves to the functions that get passed in to map_partitions. Otherwise, all you can do is materialise/touch everything in the array, which really should work most of the time (unless the function actually cares about array lengths). Then of course there's the existing 'run one partition, take the meta' strategy as a third option.

@douglasdavis
Copy link
Collaborator Author

Just to be clear, the empty_if_typetracer proposal is something users would need to add themselves to the functions that get passed in to map_partitions.

Yea this makes sense, we already mock output layers in our typetracer-only-graph (which is one of the main components of this PR!) and touch the data without writing anything to disk.

def _mock_output_func(*args: Any, **kwargs: Any) -> Any:
    import awkward as ak

    for arg in args + tuple(kwargs.values()):
        if isinstance(arg, ak.Array):
            arg.layout._touch_data(recursive=True)


def _mock_output(layer: Any) -> Any:
    assert len(layer.dsk) == 1

    new_layer = copy.deepcopy(layer)
    mp = new_layer.mapping.copy()
    for k in iter(mp.keys()):
        # this layer tuple is originally is of the form (ak.to_parquet, ...), but we
        # replace the to_parquet or to_json, etc. with our mocked_output_func
        # which just touches the data
        mp[k] = (_mock_output_func,) + mp[k][1:]
    new_layer.mapping = mp
    return 

What @martindurant and I have in mind is to mock (really wrap) all other layers in the typetracer-only-graph and try to call whatever function func that layer is calling, and if the function call fails on the pure typetracer we can fall back to calling func on an object created by empty_if_typetracer.

@lgray
Copy link
Collaborator

lgray commented Jan 24, 2023

Cool! Got it to work:
image

Code in dask-histogram is like:

def _blocked_dak(data: Any, *, histref: bh.Histogram | None = None) -> bh.Histogram:
    import awkward as ak
    thedata = data
    if isinstance(thedata, ak.Array) and ak.backend(thedata) == "typetracer":
        thedata.layout._touch_data(recursive=True)
        thedata = data.layout.form.length_zero_array()

    return clone(histref).fill(thedata)


def _blocked_dak_ma(*data: Any, histref: bh.Histogram | None = None) -> bh.Histogram:
    import awkward as ak
    thedata = list(data)
    for idata, adatum in enumerate(thedata):
        if isinstance(adatum, ak.Array) and ak.backend(adatum) == "typetracer":
            adatum.layout._touch_data(recursive=True)
            thedata[idata] = adatum.layout.form.length_zero_array()

    return clone(histref).fill(*tuple(thedata))

@lgray
Copy link
Collaborator

lgray commented Feb 18, 2023

So sorry in advance - this needs a few PRs to go in (and I have to wrap up some work in coffea0 before it's easy to repeat. However, I did find some interesting outcomes in the column optimization for things that aren't zips, but are more complex things we tend to find in analysis code.

For instance with coffea nanoevents it's not so uncommon you want to match objects together get the nearest one, and maybe ask for the pt or some other single kinematic quantity. We have some convenience functions that wrap that up as follows, which works fine if daskified:

nearest_pt = x.Muon.nearest(x.Jet).pt

However when looking at the necessary columns. It's interesting that the column opt picks up the needed columns for Muons correctly (calculating delta_r only needs Muon.eta and Muon.phi), but the entire Jet object is requested from disk which is not necessary since we likewise only need the Jet's eta, phi, and pt in this case.

>>> dak.necessary_columns(x.Muon.nearest(x.Jet).pt)
{'from-uproot-efbdf44a84a3be6d60f91c4406a73ce9': ['Jet.btagCMVA', 'Jet.electronIdxG', 'Jet.eta', 'Jet.electronIdx2', 'Jet.cleanmask', 'Jet.electronIdx2G', 'Jet.muonSubtrFactor', 'Jet.muonIdxG', 'Jet.muonIdx2G', 'Jet.muEF', 'Jet.btagDeepB', 'Jet.btagCSVV2', 'Jet.muonIdx2', 'Jet.btagDeepFlavB', 'Jet.jercCHPUF', 'Jet.btagDeepFlavC', 'Jet.nElectrons', 'Jet.muonIdx1', 'Jet.pt', 'Jet.chEmEF', 'Jet.electronIdx1', 'Jet.neHEF', 'Jet.jetId', 'Jet.phi', 'Jet.nMuons', 'Muon.eta', 'Jet.rawFactor', 'Jet.muonIdx1G', 'Jet.btagDeepC', 'Jet.bRegRes', 'Jet.bRegCorr', 'Jet.electronIdx1G', 'Jet.chHEF', 'Muon.phi', 'Jet.neEmEF', 'Jet.mass', 'Jet.nConstituents', 'Jet.jercCHF', 'Jet.area', 'Jet.qgl', 'Jet.puId']}

I'll follow up with instructions to repeat once everything is in and the recipe is easily shareable.

@lgray
Copy link
Collaborator

lgray commented Feb 25, 2023

Found a bug that will be run into regularly, here's a repro:

import uproot
import dask_awkward as dak

x = uproot.dask({"https://github.com/CoffeaTeam/coffea/blob/master/tests/samples/nano_dy.root?raw=true": "Events"})
print(dak.necessary_columns(dak.broadcast_arrays(x.fixedGridRhoFastjetAll, x.Jet_pt)))

prints:

{'from-uproot-d77a4895f123e53606d66a8430b53410': ['Jet_pt']}

As opposed to needing the data from both columns!

@agoose77 @jpivarski since this could possibly include an issue with typetracer reports as well, but I can't find where it's failing.

FYI - I can fix it by manually touching all the data that goes into dak.broadcast_arrays.

@douglasdavis douglasdavis marked this pull request as ready for review February 27, 2023 20:18
@douglasdavis douglasdavis merged commit e526afe into dask-contrib:main Feb 27, 2023
@douglasdavis douglasdavis deleted the refactor-column-optimization branch February 27, 2023 22:17
@douglasdavis douglasdavis mentioned this pull request Feb 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants