Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Migrate scanner logic to ExecPlan, remove merged generator #31486

Open
2 of 10 tasks
asfimport opened this issue Mar 30, 2022 · 3 comments
Open
2 of 10 tasks

[C++] Migrate scanner logic to ExecPlan, remove merged generator #31486

asfimport opened this issue Mar 30, 2022 · 3 comments

Comments

@asfimport
Copy link
Collaborator

asfimport commented Mar 30, 2022

We've hit a bit of a wall with the merged generator. The current behavior is: If one subscription encounters an error we simply stop pulling from the other subscriptions. Once everything has settled down we return the error and end the stream.

In reality, we should be sending some kind of cancel signal down to the other generators. Otherwise, we are not respecting the rule for AsyncGenerator that we recently defined which is "An AsyncGenerator should always be fully consumed".

There is no cancel mechanism for AsyncGenerator. We could add one, it would be fun, but it would be further, substantial investment into AsyncGenerator. At the same time, we have been putting more and more focus on our push-based ExecPlans.

So, rather than fix the merged generator, I propose we migrate the scanner (just the scanner, not the file formats / readers) to ExecPlan instead of AsyncGenerator.

This probably sounds easier than it will be but I think it's doable. It will be easy to create a node that lists a dataset and pushes a batch for each file. We need to limit fragment readahead but there is no reason we can't just buffer all the filenames in memory and process them slowly so this step should adapt to ExecPlan pretty well.

It's tempting to think that the merged generator is just a "union node" but that isn't quite true. That would imply that we are going to create a source node for each file. We don't know all the files ahead of time and this would cause backpressure issues anyways. We could modify the exec plan on the fly, adding new nodes as we start processing new files but I think that would be overly complex.

Instead I think we should create one node that holds all the scanner complexity in it. This node would keep a list of FragmentScanner objects. Each fragment scanner would have a reference to the async toggle so we could turn backpressure on and off as needed and all the fragment scanners would stop pulling. The fragment scanners would iterate, in a pull based fashion, from their sources and for each future they consume they would push the result to the output node. If an error occurs then we just cancel each fragment scanner and stop creating new fragment scanners.

This node would not extend SourceNode. In fact, we can probably get rid of SourceNode at this point but we could keep it around for future use if needed.

We can then get rid of the merged generator. We can't get rid of the AsyncGenerator code entirely because we still need it for CSV scanning and a few other places. We could migrate these spots over to exec plans (e.g. the CSV scanner could be an exec plan with a chunk node, parse node, and convert node) but I don't think we need to tackle that right now.

Reporter: Weston Pace / @westonpace

Subtasks:

Related issues:

Note: This issue was originally created as ARROW-16072. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
@lidavidm @pitrou I'm assuming that decreasing our usage of AsyncGenerator and investing more in ExecPlan is a good idea as we can focus our efforts. Also, we seem to have more developers interested in and aware of the ExecPlan than we have that know about the AsyncGenerator code. Please let me know if either of you think otherwise.

@asfimport
Copy link
Collaborator Author

David Li / @lidavidm:
This sounds good to me.

How is our cancellation story with ExecPlan? From what I recall even if the APIs are there it isn't used or tested too much either?

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:

How is our cancellation story with ExecPlan? From what I recall even if the APIs are there it isn't used or tested too much either?

Correct. I encountered some of these issues trying to do the serial executor work. Currently, when an error occurs, it propagates towards the sink. Cancellation needs to propagate towards the source. However, most of the nodes interpret StopProducing as an "abort" or "cancel" so they have the logic there. So perhaps a reasonable solution is:

  • Error occurs midstream

  • Error propagates to sink

  • Consumer receives error and abandons the exec plan

  • ExecPlan destructor calls StopProducing and then waits for finished

    Though this doesn't feel quite correct. One thing that's not clear to me is whether we really intend to be able to "recover" from an error or not. If we don't then I don't see the value in propagating the error at all. A node could just call Abort on the ExecPlan with the error and then the ExecPlan could call StopProducing and change finished_ to return the captured error after everything is finished.

    I agree it isn't tested much and I can do some of that as part of this work. I think it is inevitable we need cancellation support in ExecPlan anyways so at least this way we only have to add cancellation support to one execution framework and not two.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant