-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-10183: [C++] Apply composable futures to CSV #9095
Conversation
bf1e828
to
1cd42fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a brief look, will review further later
cpp/src/arrow/util/iterator.h
Outdated
finished_() {} | ||
|
||
util::optional<Result<V>> Pump() { | ||
while (!finished_ && last_value_.has_value()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop is very confusing. Could you rewrite it with a singular condition (while (!finished_)
, maybe) then include a break statement below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also include a comment describing the control flow, the contract of transformer functions, ...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment to MakeTransformedIterator and TransformAsynchronousGenerator. I also cleaned up the logic in these loops a little. In fact, Pump
no longer contains a while
loop as it was guaranteed to be executed only once anyways.
auto loop_body = [&counter]() -> Future<ControlFlow<int>> { | ||
while (counter < 1000000) { | ||
counter++; | ||
return Future<ControlFlow<int>>::MakeFinished(Continue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it'd be useful to have an implicit constructor Future<T>(Result<T>)
for finished futures, then I think we'd be able to just write
return Continue();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm...I'm a little torn here. Could this make return type inference even harder? For example, if you change
auto some_future = [] {
if (some_base_case) {
return Future<T>::MakeFinished(0);
}
return SomeAsyncFunction.Then(...);
};
to...
auto some_future = [] {
if (some_base_case) {
return 0;
}
return SomeAsyncFunction.Then(...);
};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For lambdas without an explicit trailing return type, all return statements must return the same type, so you'd have to choose which boilerplate is preferable. IMHO, the trailing return type and implicit conversion look prettiest
cpp/src/arrow/util/future_test.cc
Outdated
AssertSuccessful(none_fut); | ||
} | ||
|
||
TEST(FutureLoopTest, MoveOnlyBreakValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be good to have a stress test for Loop
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what to stress. The Loop call is made once and returns a future. It can't be called by multiple threads, each call would construct an entirely separate loop. It iterates in a serial (non reentrant pulling) fashion through the generator. There is a stack overflow test I have added to test the stack overflow case. Other than that I'm not sure what stressful case to add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, here are some comments. Some may be due to misunderstandings on my part.
namespace arrow { | ||
namespace csv { | ||
|
||
class SlowInputStream : public io::InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... why don't you use SlowInputStream
from arrow/io/slow.h
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This benchmark has been removed from the PR.
result.use_threads = use_threads; | ||
result.legacy_blocking_reads = !use_async; | ||
// Simulate larger files by using smaller block files so the impact of multiple | ||
// blocks is seen but we don't have to spend the time waiting on the large I/O |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is exactly the same. Using a very small block size (I assume the below is 10kB?) may emphasize fixed costs (managing vectors of shared_ptrs etc.) rather than actual reading/parsing costs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this benchmark from the PR. Those fixed costs is what I was hoping to emphasize since I was looking for to see how much overhead the futures implementation was adding and wanted to minimize the reading/parsing costs (since this PR didn't change those functions at all).
state.SkipWithError("Would deadlock"); | ||
} | ||
auto input_buffer = *MakeSampleCsvBuffer(num_rows); | ||
// Hard coding # of threads so we don't deadlock if there are too few cores |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make it a constexpr something
at least?
Also, why would we deadlock? The implementation should certainly prevent that. The user may be running in a 1- or 2-core VM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to include the benchmark at the end of the day. I created it originally to be able to demonstrate the nested deadlock case encountered by the dataset API. I wrote it for my own benefit to both confirm that the async table reader was avoiding the deadlock and show that it can outperform the threaded table reader since threads aren't wasted blocking on I/O. So it is mimicking the dataset API in two ways that don't make sense for a benchmark and, somewhat intentionally, cause the threaded table reader to underperform.
First, the table reader is running inside a thread pool thread for the threaded/serial case. This simulates the way the dataset API currently burns a thread waiting for the threaded table reader in much the same way.
Second, we are fixing the thread pool to size 6.
Eliminating that "waiter thread task" would prevent the deadlock entirely. With that in there however, as soon as you have as many waiters (one per file) as you have threads in your pool then you will deadlock.
Thus, this benchmark is probably more of a temporary demo than it is a benchmark. In the future, once the dataset logic has moved over to async, we can put in a real benchmark showing the actual gain. I will rearrange the commits so that this demo/benchmark is a separate branch built on top of the 10183 branch that can be optionally checked out.
auto latency_ms = state.range(0); | ||
auto num_rows = state.range(1); | ||
auto num_files = state.range(2); | ||
if (num_files > 5 && use_threads && !use_async) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 5
? Is it related to the thread pool size below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered above.
ABORT_NOT_OK(table); | ||
} | ||
} | ||
state.SetItemsProcessed(state.iterations() * num_rows); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to also multiply by num_files
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benchmark has been removed.
cpp/src/arrow/csv/reader.cc
Outdated
// Wrap shared pointer in callable | ||
Transformer<std::shared_ptr<Buffer>, util::optional<CSVBlock>> block_reader_fn = | ||
[block_reader](std::shared_ptr<Buffer> next) { return (*block_reader)(next); }; | ||
return TransformAsyncGenerator(buffer_generator, block_reader_fn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::move(buffer_generator)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
cpp/src/arrow/csv/reader.cc
Outdated
return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn); | ||
} | ||
|
||
Result<TransformFlow<util::optional<CSVBlock>>> operator()( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... why are we still returning util::optional
if we never yield a value-less result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TransformFinish
(line 201) will cause IterationTraits<util::optional<CSVBlock>>::End()
to be emitted. So it was either keeping the util::optional
or adding IterationTraits<CSVBlock>::End()
(and a corresponding equality operator).
The async iterators are still iterators and so they rely on that end token. One thing we could do is rewire all the async iterator functions so that they use util::optional
under the hood (unless the type being iterated is a pointer) and it doesn't allow users to specify their own end tokens. Then the util::optional
could be hidden from the consumer of the iterators API. This would require all users to rely on Visit
instead of manually iterating. However, for async iterator, this is probably a given already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the explanation. Can you add that somewhere as a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment on BlockReader.
cpp/src/arrow/csv/reader.cc
Outdated
|
||
std::function<Status(util::optional<CSVBlock>)> block_visitor = | ||
[self](util::optional<CSVBlock> maybe_block) -> Status { | ||
DCHECK(!maybe_block->consume_bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe_block
always has a value...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See previous comment. The Visit function is never passed the end token, there is no need. However, since the user is allowed to specify their own end token, this use of util::optional is unavoidable. For example, a user could create an iterator of TestInt where the end token is simply the TestInt with value -999. Now there is no "wrapping" going on (in the way that util::optional
wraps CSVBlock
) so there is nothing to unwrap when calling the Visit
function.
On the other hand, if we assume util::optional
is always used for iterators of non-pointer types we could redefine visit to take in T instead.
cpp/src/arrow/csv/reader.cc
Outdated
Result<std::shared_ptr<Table>> Read() override { return ReadAsync().result(); } | ||
|
||
Future<std::shared_ptr<Table>> ReadAsync() override { | ||
task_group_ = internal::TaskGroup::MakeThreaded(thread_pool_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have plans to remove TaskGroup from the picture? Ideally we should be able to call Future::Then
on an executor, or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ben was interested in this as well. However, as it stands, there would be no clear benefit in doing so and it would require parallel async implementations of the column builder code (which currently rely on task group being present). These parse & convert tasks do not block (except for maybe very briefly on shared collection access) and so using task group here isn't detrimental.
If we want to remove TaskGroup for the purpose of simplifying down to a single "scheduler-like" interface then we could do that. However, in that case, we should modify the existing serial & threaded table readers as well and I think it would make sense to do it as a separate story.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I just noticed that internal::TaskGroup::MakeThreaded(thread_pool_)
isn't good. It will transfer CPU-intensive tasks on the same thread pool that's used for IO. Also, it gets a fixed size of 8 threads, even on my 24-thread CPU.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I had completely misunderstood the purpose of io::AsyncContext
. The CPU thread pool is the correct thread pool.
However, we could still use this I/O pool in a different spot. It could be used instead of a dedicated "readahead" thread.
cpp/src/arrow/util/iterator.cc
Outdated
if (max_readahead_ > 0) { | ||
done_.push_back(std::move(promise)); | ||
work_done_.notify_one(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is frankly weird. It seems the logic here is conflating "unbounded readahead" and "is an async generator".
More generally, the fact that two different control flows (sync vs async) seem to be handled in the same implementation make things very confusing and difficult to follow. Can you find a way to separate the two cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will want a general purpose producer/consumer queue with both synchronous and asynchronous APIs. The AddReadahead is bounded by the "done" pool is in AddReadahead (and not in the readahead queue). I agree it needs some simplification but I found the old method equally confusing. Both approaches should be able to share a single multi-producer / multi-consumer "queue" API (maybe single-consumer / single-producer is sufficient if there are performance benefits to such a queue).
I will take a pass at simplifying this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it isn't "unbounded readahead". Both implementations right now have two queues. In the synchronous case both queues are in ReadaheadQueueImpl (todo_
and done_
). In the asynchronous case there is one queue in ReadaheadQueueImpl (todo_
) and the other is in ReadaheadGenerator (
readahead_queue_). The asynchronous case enforces the queue length with
readahead_queue_and the background thread will block if
readahead_queue_isn't pulled fast enough (because nothing will be inserted in
todo_`).
Now, I can also understand how that isn't clear at all in the code. I still plan on taking a pass at simplifying things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I've tracked down the oddness here and it actually unwound quite a bit of complexity. It turns out that readahead is something of an inherently asynchronous concept. The ReadaheadQueue was actually an asynchronous generator pipeline in disguise.
ReadaheadQueueImpl is actually part executor and part AddReadahead.
Part executor: ReadaheadQueueImpl is a thread pool of size 1. It has a worker thread (the background thread) and a job queue (ReadaheadQueueImpl::todo_). It accepted jobs on the job queue and ran them. The types were even named "promises".
Part readahead: It was also part readahead. The readahead queue was ReadaheadQueueImpl::done_ and all of the logic that was in the pumping.
So I got rid of it and replaced it with a thread pool of size 1 (which has an unlimited size job queue), BackgroundGenerator (which now takes in an executor and an iterator and creates an AsyncGenerator), AddReadahead (which creates readahead queue, this one is limited in size), and finally GeneratorIterator which brings us back to a synchronous iterator by waiting each result.
The tests all pass, except the logic changed a little bit on the tracing test because the readahead now doesn't pump until the first read but we can change that if we want to so it is more like the threaded reader and less like the serial reader.
I've made the change on a separate branch from this one so that change can be reviewed independently (westonpace#3).
1daaca7
to
6eadd2e
Compare
@pitrou I believe this is ready for another review. In addition to fixing your comments:
|
@ursabot please benchmark |
b0c41e4
to
e36badf
Compare
If I use the CPU thread pool for |
cpp/src/arrow/util/async_iterator.h
Outdated
namespace arrow { | ||
|
||
template <typename T> | ||
using AsyncGenerator = std::function<Future<T>()>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make the file name (async_iterator.h
) and the type name (AsyncGenerator
) consistent? I don't know which convention is the most common.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have gone back and forth a little (hence this discrepancy). I don't think asynchronous iterators are terribly common in general. When they are used (e.g. javascript) it is in conjunction with syntactic "await" sugar to create something that can actually be iterated (e.g. for await...of
). The only way to truly "iterate" these chains is to use the Collect/Visit utilities (or the underlying Loop) which I feel isn't quite the same thing. So asynchronous iterator isn't an ideal name.
Generator is used in python for creating an iterator with yield statements which is an entirely separate concept. It's used exactly the same in javascript. In fact, in javascript there is an "asynchronous generator" which allows you to create asynchronous functions that yield (which is not what we are doing there except perhaps in the transform functions). So the asynchronous generator name isn't perfect either.
In the FRP world these might be called events and event streams but I think that fits a push-based model better.
I think, in the interest of "perfect is the enemy of good" I will stick with AsyncGenerator everywhere and rename async_iterator
to async_generator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that Python now has asynchronous iterators as well.
(and you can implement a Python asynchronous iterator as an asynchronous generator ;-))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made async generator consistent for now until someone proposes a better name.
8369b41
to
81df21f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, too many tabs. Please disregard
c07be6b
to
90740c3
Compare
@pitrou It now runs on the CPU thread pool. The I/O thread pool is sent down all the way into the table reader but is not yet used (I added ARROW-11590 for this). When I tried to run it on the I/O thread pool it failed because there were multiple threads running the iterator. The current behavior (dedicated single thread pool) matches the old behavior though so this isn't a regression. I think I've addressed the major concerns and it is ready for review again. |
… no longer required per PR comment
d4608a9
to
356c300
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this. Just a few nits
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
simplify Loop slightly
Closes apache#9095 from westonpace/feature/arrow-10183-2 Lead-authored-by: Weston Pace <weston.pace@gmail.com> Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com> Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
Closes apache#9095 from westonpace/feature/arrow-10183-2 Lead-authored-by: Weston Pace <weston.pace@gmail.com> Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com> Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
No description provided.