-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: ServiceX processor #488
Conversation
@BenGalewsky can you rebase this on the latest master at some point? I can't seem to merge in today's additions. |
b02369f
to
b7415da
Compare
FYI we're fine with justified |
1c381fa
to
a9a612c
Compare
FYI it looks like #495 may actually fix the random failures with the spark and parsl unit tests. |
7c888b0
to
a9f5223
Compare
Hmm it has spent 1h installing in py39. What does SX drag along with it? |
It is also OK to not test in all versions on all platforms if it's significantly painful to do so. But, if you can make your stuff work everywhere - awesome. :-) |
Not much at all, just some asynchronous I/O libraries and a little abstract syntax tree manipulation. Let me try building locally and see what the hangup could be |
a9f5223
to
e3dda8a
Compare
Looks like some of the func_adl libraries don't offer support for 3.9 - rather than hack up the CI job to omit installing serviceX only in 3.9, I deactivated tests for serviceX in the CI build. I'll work with Gordon to get us working in 3.9 and update then |
@BenGalewsky I'd still recommend testing things and continuing to kick this down the field. You may go the route of SkyHook/Workqueue and add your own separate (non-required) tests in a python version that is suitably comfy. |
8c8d55b
to
fb71304
Compare
I had a few minor comments, otherwise it looks fine for now. @nsmith- take a look as well? |
x = await r | ||
return x | ||
|
||
finished_events = aiostream.stream.map(func_results, inline_wait, ordered=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know the lifecycle of the futures here? Will they be cleaned up as soon as we're done adding them to outpput
? If not, there will be a good chance of a memory explosion for large outputs. Further, since everything is async, you'll want to put some throttling/backpressure semaphores in this flow so that results don't pileup somewhere.
1c62ecd
to
7810b2d
Compare
7810b2d
to
d09f271
Compare
Just seeing this. If you mean just the |
669c8ff
to
ad5f262
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll be interested to see how the memory behavior plays out at scale with aiostream. If it is doing a good job of dropping references early that would be great news and maybe we can refactor the regular processor to use it.
Overall there's a lot of bifurcation here:
processor.servicex.executor.run_coffea_processor
is akin toprocessor.processor._work_function
processor.servicex.executor.Executor
is akin torun_uproot_job
In the long run we'll need to harmonize (likely in the direction of objects representing analysis state, input state, output state, and executor state) but for now it can be considered an experimental interface
This PR integrates the asynchronous serviceX processor into the Coffea repo.
It represents a slightly different usage pattern since we have access to a streaming source of event data.
This streaming interface is provided by the
DataSource
class. The constructor to this class accepts a func_adl query, a list of ServiceX datasets and optional metadata. It produces an aynchio stream of parquet files that are the result of the ServiceX transform on the provided dataset and selection statements. Users of this class can request a stream of paths to locally downloaded files or URLs to files on the ServiceX Minio object store.We've created a simple
Analysis
base class that analyzers subclass to create the physics code to get their work done. We've boiled out almost all of the standard code to make an analysis tightly focused. This meant subclassingdict_accumulator
to inject the standard setup.Finally, we created a base class and a series of subclasses for an
Executor
- the contract of the executor is to consume files from theDataSource
instance and invoke asynchronous calls to the analysis code on a backend of choice. The return value is an asynchio stream of histogram objects. It performs some last-minute packaging to further reduce duplicated code that the analyzer might have to include.Still to Come
Need to write unit testsMore documentation
Try this out on coffea-casa
Outstanding Questions
.servicex
file somehow.