Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] An error occurred when I set batch_readahead = 0 for parquet ScanOptions #15264

Closed
YoungRX opened this issue Jan 9, 2023 · 2 comments · Fixed by #29185
Closed

[C++] An error occurred when I set batch_readahead = 0 for parquet ScanOptions #15264

YoungRX opened this issue Jan 9, 2023 · 2 comments · Fixed by #29185
Assignees
Milestone

Comments

@YoungRX
Copy link

YoungRX commented Jan 9, 2023

Describe the bug, including details regarding any error messages, version, and platform.

In arrow cpp version 8.0.0, the error message is "Could not read from readahead_queue". And the details are as follows:

Firstly, I set use_threads to false, batch_readahead to 0, and set batch_size, dataset_schema, project, and filter in setScanOptions();.

    // project & filter
    setScanOptions();

Secondly, I use ParquetFileFormat::ScanBatchesAsync function to scan a ParquetFileFragment.

Then I follow the code below:

    // parquetFileFormat is an object of ParquetFileFormat class
    // parquetFileFragment is an object of FileFragment class
    auto recordBatchIterator = arrow::MakeGeneratorIterator(
                                std::move(parquetFileFormat->ScanBatchesAsync(parquetScanOptions, parquetFileFragment).ValueOrDie()));
    std::shared_ptr<arrow::RecordBatch> recordBatch;
    auto recordBatch_res = recordBatchIterator.Next();
    std::shared_ptr<arrow::RecordBatch> recordBatch;
    if (recordBatch_res.ok())
        recordBatch= recordBatch_res.ValueOrDie();

Finally, during debugging, the error message in recordBatch_res is "Could not read from readahead_queue".
I found this error message in the SerialReadaheadGenerator class, probably because of the data structure util::SpscQueue<std::shared_ptr<Future>> readahead_queue_.

template <typename T>
using SpscQueue = arrow_vendored::folly::ProducerConsumerQueue<T>;

The size of the corresponding ProducerConsumerQueue class must be greater than or equal to 2, which is described in the code comments. Therefore, if I set batch_readahead to 0, the size of ProducerConsumerQueue is 1, and an error occurs when I read the parquet file.

src\arrow\util\async_generator.h 583
src\arrow\vendored\ProducerConsumerQueue.h 75

My requirement is not to use multithreading, but when I set use_threads to false, if batch_readahead * batch_size > the number of rows in a row_group which to be read, it will read multiple row_groups at the same time in the parquetFileFragment. This means that when I turn use_threads off, multithreaded reads still occur in my code. My existing code doesn't support multithreading, so it reads errors.

Then I want to set batch_readahead to 0, and the above error occurs, which may be a bug. So, could you please fix this bug or help me to avoid reading multiple row_groups at the same time?

Component(s)

C++

@westonpace
Copy link
Member

My requirement is not to use multithreading, but when I set use_threads to false, if batch_readahead * batch_size > the number of rows in a row_group which to be read, it will read multiple row_groups at the same time in the parquetFileFragment. This means that when I turn use_threads off, multithreaded reads still occur in my code. My existing code doesn't support multithreading, so it reads errors.

Most effort on threading has been to avoid using multiple threads for compute. In 11.0.0 the threading controls for execution plans are improving and it should be easier to avoid using additional threads for compute.

However, there has not been much effort in avoiding multiple threads for read. Is there a particular reason you want this? I/O routines are blocking and I/O threads are typically going to be stuck in a waiting state for most of their life, not using up many resources. If you are reading from an HDD then you might not see much of a performance hit. However, just about any other type of disk will benefit from at least some parallel I/O.

One option you could try would be to change the I/O thread pool size to 1 using arrow::io::SetIOThreadPoolCapacity.

The fix is probably:

diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 0d95e1817..10a1ac8ce 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -471,6 +471,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
                               reader, row_groups, column_projection,
                               ::arrow::internal::GetCpuThreadPool(), rows_to_readahead));
     RecordBatchGenerator sliced = SlicingGenerator(std::move(generator), batch_size);
+    if (batch_readahead == 0) {
+      return sliced;
+    }
     RecordBatchGenerator sliced_readahead =
         MakeSerialReadaheadGenerator(std::move(sliced), batch_readahead);
     return sliced_readahead;

Are you able to try this patch out?

I will have to get some tests added to get this merged in.

@lidavidm lidavidm changed the title An error occurred when I set batch_readahead = 0 for parquet ScanOptions [C++] An error occurred when I set batch_readahead = 0 for parquet ScanOptions Jan 11, 2023
westonpace added a commit that referenced this issue Jan 11, 2023
…evant bugs (#29185)

* Closes: #15264

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
@westonpace westonpace added this to the 11.0.0 milestone Jan 11, 2023
@YoungRX
Copy link
Author

YoungRX commented Jan 12, 2023

Ok, thanks for your help and solution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants