Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive chunksize test implementation #544

Merged
merged 14 commits into from
Jun 28, 2021

Conversation

btovar
Copy link
Contributor

@btovar btovar commented Jun 10, 2021

This is an idea we have been playing with to adapt the chunksize as the workflow progresses. Instead of performing a trial-an-error, the objective is to let the executors choose a correct chunksize given a desired memory usage or desired runtime per task. (Currently only the runtime one is implemented, and only for the work_queue_executor.)

To achieve this, we let the WorkItem generator reach the executor as is. Before, run_uproot_job would collect the generator into a list. This also meant that we needed to factor into a function the preprocessing code outside run_uproot_job, and that the fileset is filtered (if needed) of bad files before reaching the generator.

The main complication is how to control the loops. Since the number of chunks is not known before hand, we control the loop by counting the number of events. However, this cannot be done directly with the generator, as it would be consumed before reaching the executor. Therefore, we count the events directly from the filemeta objects, but this may not work well when using alignclusters or maxchunks. In the current implementation, dynamic-chunksize cannot be used with aligncluster or maxchunks.

Another complication is that since chunks are accounted by their number of events, and not simply by how many there are, the executor needs to know how to accumulate the items. Otherwise, the executor cannot be used unchanged for preprocessing. This could be solved by adding proper len(file) = 1 when preprocessing, len(WorkItem) = numitems for Processing. In the current implementation we only did this for processing.

The chunksize itself is computed with a linear regression rounded up to a power of two of a base chunksize. Some variation is added to this value to have sampling data for the regression.

If this sounds interesting to you, please let us known how can be make this change more amenable for the other executors.

Thanks!

Filemeta.chunks generator is changed so that it can receive an updated
chunksize. E.g., to get the next chunks of a size chunksize and
2 * chunksize:

```python
chunks = _chunk_generator(fileset, metadata_fetcher, metadata_cache, chunksize, align_clusters, maxchunks, dynamic_chunksize=True)
...
next(chunks)  # move to next yield
chunk = chunks.send(chunksize)
...
next(chunks)  # move to next yield
chunk = chunks.send(2*chunksize)
```

For this to work, preprocessing was moved to its own function that is
called inside run_uproot_job.
@lgray
Copy link
Collaborator

lgray commented Jun 24, 2021

This looks really nice - we should think about if there is a nice way to bring this to the other executors, since it would be convenient to have this available where it makes sense.

I'm just not sure off the top of my head. Happy to continue the discussion.

I'll merge this if you guys are OK with it?

@nsmith- nsmith- self-requested a review June 24, 2021 13:39
@btovar
Copy link
Contributor Author

btovar commented Jun 24, 2021

@lgray: Sounds very good!

Something that will make it more easy to adapt to other executors is to more easily know the number of total events. It is still not clear to me how maxchunks works without doing the preprocessing step. (Also, maybe it does not make sense to use dynamic chunksize with maxchunks? Is maxchunks used only for testing purposes? Along the same lines, align_clusters probably does not make much sense, as chunks will be different by design.)

One thing that is not in the current pr is that it may make more sense to have a chunksize per dataset, rather than per workflow.

@lgray
Copy link
Collaborator

lgray commented Jun 24, 2021

Yeah maxchunks/align-clusters + dynamic chunking should produce an error, it's really easy to get undefined behavior out of that.

@nsmith- ?

Copy link
Member

@nsmith- nsmith- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in reviewing this. Thanks for the nice addition.

One issue with dynamic chunksize is that if we are using a column caching solution, then this is going to be more likely to spoil the cache since it caches by chunk. To see what I mean, take a look at the way persistent_cache behaves with NanoEventsFactory on a local file with various choices of entry_stop. On the other hand, if you keep to a fixed set of sizes as you do (powers of two--by the way, why the base_chunksize? Just sticking to powers of two would give the same flexibility and you can just set the start to 2**10 instead of 1000) then perhaps the column cache can be intelligent enough to know how to subdivide if requested.

It is still not clear to me how maxchunks works without doing the preprocessing step

As you guessed, maxchunks is mainly used as a testing device. It simply opens files sequentially in the current thread of execution until it reaches enough chunks, and then sends the list to the executor. In a system where the pre-executor and executor are both operating as streams, we could improve this logic to allow parallel execution of the preprocessing stage while keeping the maxchunks feature. Also, the motivation for sharing the same execcutor abstraction for preprocessing and processing is not so high, and it might make sense to specialize the file preprocessing anyway. The results of preprocessing are also better kept in a local database than recomputed each time, but our current setup doesn't really accomodate this in a user-friendly way.

coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
@btovar
Copy link
Contributor Author

btovar commented Jun 28, 2021

@nsmith- Thanks for your comments! I've changed so that the chunksizes are always powers of 2.

@lgray
Copy link
Collaborator

lgray commented Jun 28, 2021

@nsmith- you happy?

Copy link
Member

@nsmith- nsmith- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

@lgray lgray merged commit dea5818 into CoffeaTeam:master Jun 28, 2021
@btovar btovar deleted the chunksize_adaptive branch July 19, 2021 11:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants