Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Create a ForEach library function that runs on an iterator of futures #26190

Closed
asfimport opened this issue Oct 5, 2020 · 10 comments
Closed

Comments

@asfimport
Copy link
Collaborator

This method should take in an iterator of futures and a callback and pull an item off the iterator, "await" it, run the callback on it, and then fetch the next item from the iterator.

Reporter: Weston Pace / @westonpace
Assignee: Weston Pace / @westonpace

Original Issue Attachments:

PRs and other links:

Note: This issue was originally created as ARROW-10183. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
So it's spawning a dedicated thread pool? Is there a desired use case for this feature?

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
Not spawning a thread pool, it would take a thread pool as an argument.  The current use case I'm looking at is the CSV reader.

The current implementation is:

  • One thread (outside the thread pool, let's call it the I/O thread) reads from an input stream, up to X blocks in advance, and places the blocks in a blocking queue.

  • Another thread (calling thread, may be a thread pool thread, let's call it the parse thread) takes blocks off of the blocking queue (which it sees as an iterator) and creates thread pool tasks for conversion.  This step will block if I/O is slow.

  • The thread pool threads (conversion tasks) then do the conversion, possibly making new conversion tasks which are added to the thread pool.

  • Once the parsing thread is done reading the iterator it blocks until the conversion tasks have finished.

    The goal is to change the parsing thread so it is no longer blocking, as it may be a thread pool thread, and if it is blocking it shouldn't tie up the thread.  I can keep the dedicated I/O thread since it is outside the thread pool.

    This changes the I/O thread from an iterator of Block to an iterator of Future.

    Converting the parse thread is a little trickier.  It currently is...

        iterator = StartIterator();

        for each block in iterator:

            ``ParseBlockAndCreateConversionTasks();

        WaitForConversionTasks();

    The "for each" part is a little trickier with a generator that returns promises.  This task is aiming to replace that bit.

    Now that I think it all through like this I suppose the "parallel for each" and "N threads" wording is not needed.  This is a natural point to allow for concurrency (e.g. allowing up to N parse threads).  However, the original implementation had only the single parse thread so I don't need to introduce it here.  I'll go ahead and strip that from the task and start with just a basic serial for each.  Even with a serial for-each there is a need for a common library function that, given an iterator of futures, and a function, applies the function to each of the items in the iterator.

     

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
This is also a bit of a mental investigation on my part to be sure this can be done without exploding the stack.  Since this is essentially iterator.next().then(iterator.next().then(iterator.next().then(...  My understanding is that it can, and there are numerous articles on continuations and avoiding stack busting while doing this kind of thing.  I have yet to synthesize all that knowledge and put it into practice.

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
Just for the record, we already have a AsCompleted iterator. We could have a variant that takes an iterator of futures rather than a vector of futures.

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
I had not quite come across that class so I appreciate the mention.  It will not satisfy in this case though.  There is a back pressure problem that is difficult to explain but will make more sense in code.

More significantly though, the AsCompletedIterator still does waits, which is what I'm trying to avoid.  I'm attaching a diagram to this sub task that will hopefully provide a bit more explanation.

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
Latest round of benchmarks (5 iterations on each)
| |2.0.0 (Mean)|2.0.0 (StdDev)|Async (Mean)|Async (StdDev)|Async (Tasks)|Threaded (Mean)|Threaded (StdDev)|Threaded (Tasks)|

|gzip/cache|6.291222|0.095669|6.467804|0.035468|6229|6.262252|0.056097|4149|
|
|-|-|-|-|-|-|-|-|-|-|
|gzip/none|9.292271|0.251346|9.494446|0.273585|6229|9.22652|0.254951|4149|
|
|none/cache|1.226155|0.086003|1.245934|0.077262|6229|1.238495|0.073567|4149|
|
|none/none|34.326746|0.392563|35.091284|0.833403|6222|36.270428|2.033464|4149|



gzip means the source file was compressed with gzip and cache means the source file was cached in the OS cache.  For cache=None the benchmark is high.  We have to make a file copy to ensure we are reading from the disk and this copy time is included in the benchmark.  However, this overhead is consistent.



The 2.0.0 numbers come from using the conda pyarrow (and thus the threaded table reader).



Results are fairly noisy but I think there is some consistent minor degredation in the async case.  It could be a result of there being a higher number of tasks, the high number of futures, using submit instead of spawn, or could just be noise.|

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
Some good news.  I figured out my CSV reading benchmark was flawed (had the wrong separator).  New results...

| |2.0.0 (Mean)|2.0.0 (StdDev)|Async (Mean)|Async (StdDev)|
|
|-|-|-|-|-|-|
|gzip/cache|6.62|0.12|6.70|0.16|
|
|gzip/none|9.89|0.43|9.06|0.21|
|
|none/cache|4.05|0.09|3.95|0.11|
|
|none/none|34.57|1.15|32.25|1.22|



I also realized at least one possible situation where the async reader could fall behind the threaded reader.  If the I/O is running slower (e.g. zip case) then sometimes the parse task will find the I/O promise unfulfilled (finished parsing but next decompressed block not ready).  In the threaded case the outer parsing thread will block here while in the async case a new task will get added to the pool to run when the I/O finishes.  That task will get added in the pool behind all the conversion tasks.  So then the parsing will be delayed and it is possible the readahead queue will fill up, delaying the I/O.



 



The timing has to be just right so that parsing & I/O are similar in performance.  The I/O has to be slow enough to sometimes not be ready but not so slow that the task pool completely drains between each block.



 



I've tested the gzip/cache case quite often and this is the only case where the async version consistently unperformed.  I think the I/O in the */none cases are too slow and the I/O in the none/cache case is too fast.



 



A prioritized thread queue would allow working around this situation.  The conversion tasks should be marked lower priority than the parsing tasks.|

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
@pitrou @bkietz I wanted to add a note about the parallelism of the AsyncGenerator.  This is not so much a question as it is an update on my brain trying to wrap my head around all of this.  I will confess that I have not yet fully internalized all of what can go on.  For the purposes of discussion I will consider a chain (graph where each node has 1 or 2 edges and there are no cycles) of async generators, each node in the chain mutates the stream in some way.  For example, in the CSV case there is the original Buffer generator (a background generator), the CSV block reader, the chunker, and then the "parse & convert scheduler" (a Visitor which terminates the chain).  The "fan-out" in the CSV is still delegated to the task group so that parallelism need hasn't been fully explored, although it could remain a chain (I think).

An AsyncGenerator's Next() function should never have to be called by more than one thread at once in the way you might do with an iterator.  Instead, the question comes down to whether you can call a generator's Next() function before the promise returned by the previous call has completed.  It is similar and different.  So it's not so much a question of "thread safe" as it is a question of "reentrant".

AsyncGenerators come in both flavors.  A decompressing node (or the CSV block reader and the CSV chunker) are all quite stateful and must finish processing a block before they can begin consuming the next.  So you should not call Next() until the future returned is resolved.  The parsing and converting on the other hand is free to run in parallel.  In addition, any queuing stage (AddReadahead and BackgroundIterator) can be called in parallel, thus allowing for pipeline parallelism.  Since everything is pull driven this "parallel pull" is driven from the AddReadahead nodes and could be driven from a "visit" or "collect" sink as well.

So just summarizing what we have today...

Sources:
BackgroundGenerator - Reentrant, Does not pull

Intermediate Nodes:
Transformer - Not reentrant (by necessity of has_next), Does not pull
AddReadahead - Reentrant, Pulls reentrantly

Sinks:
Visit - Pulls serially (could be reentrantly in the future)
Collect - Pulls serially (could be reentrantly in the future)

Today we have...

BackgroundGenerator -> AddReadahead -> Transformer -> Transformer -> Visit(1)

It would be an error for example to do...

BackgroundGenerator -> AddReadahead -> Transformer -> Transformer -> Visit(N)

...or...

BackgroundGenerator -> AddReadahead -> Transformer -> Transformer -> AddReadahead -> Visit(1)

...both of those would cause Transformer (which is not reentrant) to be pulled reentrantly.  I am wondering if there is some merit in encoding these rules somehow into the types themselves so that something like that would fail to compile.

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
As maybe more food for though one could consider a classic map-reduce style summation task.  A buffer comes in with 1024 elements, it is split into 8 sum tasks of 128 and then reduced with 7 reduce tasks.  This "fan-out" could be implemented as a fully reentrant pipeline...

Mapper (Just a flatMap, fits Transformer model, but should be able to be made reentrant, emits 8 times for each input)
Summer (Takes in a buffer and computes the sum, fits transformer model, but should be reentrant)
Reducer (Consumes two numbers and outputs the sum, would NOT fit transformer model (even with skip), curiously though it should also be reentrant)

All of these could be "transformers" but the existing transform model would make them non-reentrant.  However, all of them "should" be able to be reentrant.  I think this argues somewhat to gaps in the transformer model.  I have an improved model in mind, but it is was not compatible with synchronous iterators so I abandoned it.  I may have to revisit.

@asfimport
Copy link
Collaborator Author

Ben Kietzman / @bkietz:
Issue resolved by pull request 9095
#9095

@asfimport asfimport added this to the 4.0.0 milestone Jan 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants