Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should GetIOThreadPool() be accessible from installed headers? #15151

Closed
paleolimbot opened this issue Jan 2, 2023 · 7 comments · Fixed by #15183
Closed

Should GetIOThreadPool() be accessible from installed headers? #15151

paleolimbot opened this issue Jan 2, 2023 · 7 comments · Fixed by #15183

Comments

@paleolimbot
Copy link
Member

Describe the enhancement requested

In #14582 it was found that using the CPU thread pool in arrow::compute::MakeReaderGenerator() caused problems when the number of CPU threads was limited (as it often is on CI machines with few available cores). The solution was to use the IO thread pool for this; however, arrow::io::internal::GetIOThreadPool() is not available in any installed headers. I don't know what the best way to make this available would be (or whether creating a source node from a record batch reader should be baked into the internals somewhere); however, my hack of:

namespace arrow {
namespace io {
namespace internal {
arrow::internal::ThreadPool* GetIOThreadPool();
}
}  // namespace io
}  // namespace arrow

...in the R package should almost certainly not exist.

Component(s)

C++

@westonpace
Copy link
Member

Yes, I think it's entirely appropriate to put RecordBatchReader->source node in the C++ code. On the output side we have collector variants for tables (DeclarationToTable), vector of record batches (DeclarationToBatches) and record batch reader (DeclarationToReader). We already have source node variants for accepting data from a table (table_source) and a vector of record batches (record_batch_source). So I think it would be a good addition to add record_batch_reader_source.

However, I'm also not sure why we wouldn't expose the default I/O pool (arrow::io::internal::GetIOThreadPool) in the public headers given that we have public methods for getting and setting the size.

As a short term hack you can do:

#include "arrow/io/type_fwd.h"

arrow::io::IOContext io_context = arrow::io::default_io_context();
arrow::internal::Executor* io_executor = io_context.executor();

@paleolimbot
Copy link
Member Author

As a short term hack you can do:

We do this already in the R package for the place where we need to use the IO thread pool to submit jobs...the problem here is that we need a ThreadPool* to pass to MakeReaderGenerator().

I think it would be a good addition to add record_batch_reader_source.

That would be my preferred solution...I'd rather not maintain the best way to do that in the R package and it's come up on the mailing list in a context unrelated to the R package as well ( https://lists.apache.org/thread/zo9qq0pntkrt2vnczoxx7hfsl6k233zy ).

@vibhatha
Copy link
Collaborator

vibhatha commented Jan 3, 2023

take

@vibhatha
Copy link
Collaborator

vibhatha commented Jan 3, 2023

As a short term hack you can do:

We do this already in the R package for the place where we need to use the IO thread pool to submit jobs...the problem here is that we need a ThreadPool* to pass to MakeReaderGenerator().

I think it would be a good addition to add record_batch_reader_source.

That would be my preferred solution...I'd rather not maintain the best way to do that in the R package and it's come up on the mailing list in a context unrelated to the R package as well ( https://lists.apache.org/thread/zo9qq0pntkrt2vnczoxx7hfsl6k233zy ).

@paleolimbot The reference link here which refers to this code block is outdated AFAIU.

@paleolimbot
Copy link
Member Author

Yes, sorry! The block was this one:

std::shared_ptr<compute::ExecNode> ExecNode_SourceNode(
const std::shared_ptr<compute::ExecPlan>& plan,
const std::shared_ptr<arrow::RecordBatchReader>& reader) {
arrow::compute::SourceNodeOptions options{
/*output_schema=*/reader->schema(),
/*generator=*/ValueOrStop(
compute::MakeReaderGenerator(reader, arrow::io::internal::GetIOThreadPool()))};
return MakeExecNodeOrStop("source", plan.get(), {}, options);
}

@vibhatha
Copy link
Collaborator

vibhatha commented Jan 3, 2023

Thanks @paleolimbot, I will work on this.

@vibhatha
Copy link
Collaborator

vibhatha commented Jan 4, 2023

@westonpace @paleolimbot I created a draft PR to use a RecordBatchReader directly as the data source. I would appreciate your reviews to see if it addresses the problem as expected.

westonpace pushed a commit that referenced this issue Jan 12, 2023
…R API (#15183)

This PR includes the factory `record_batch_reader_source` for the Acero. This is a source node which takes in a `RecordBatchReader` as the data source along an executor which gives the freedom to choose the threadpool required for the execution. Also an example shows how this can be used in Acero.

- [x] Self-review
* Closes: #15151

Lead-authored-by: vibhatha <vibhatha@gmail.com>
Co-authored-by: Vibhatha Lakmal Abeykoon <vibhatha@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
@westonpace westonpace added this to the 11.0.0 milestone Jan 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants