Skip to content

Commit

Permalink
GH-38884: [C++] DatasetWriter release rows_in_flight_throttle when al…
Browse files Browse the repository at this point in the history
…locate writing failed (#38885)

### Rationale for this change

When file-queue is fall or write failed, the `DatasetWriterImpl::DoWriteRecordBatch` might failed, however, the resources are not released.

### What changes are included in this PR?

When file-queue is full or cannot open file, release the `row` resources.

### Are these changes tested?

yes

### Are there any user-facing changes?

no

* Closes: #38884

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
mapleFU committed Dec 7, 2023
1 parent fe83387 commit f7286a9
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
15 changes: 13 additions & 2 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class Throttle {

private:
Future<> backpressure_ = Future<>::MakeFinished();
uint64_t max_value_;
const uint64_t max_value_;
uint64_t in_waiting_ = 0;
uint64_t current_value_ = 0;
std::mutex mutex_;
Expand Down Expand Up @@ -621,11 +621,21 @@ class DatasetWriter::DatasetWriterImpl {
backpressure = writer_state_.open_files_throttle.Acquire(1);
if (!backpressure.is_finished()) {
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
RETURN_NOT_OK(TryCloseLargestFile());
break;
}
}
RETURN_NOT_OK(dir_queue->StartWrite(next_chunk));
auto s = dir_queue->StartWrite(next_chunk);
if (!s.ok()) {
// If `StartWrite` succeeded, it will Release the
// `rows_in_flight_throttle` when the write task is finished.
//
// `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue`
// so we don't need to release it here.
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
return s;
}
batch = std::move(remainder);
if (batch) {
RETURN_NOT_OK(dir_queue->FinishCurrentFile());
Expand All @@ -647,6 +657,7 @@ class DatasetWriter::DatasetWriterImpl {
DatasetWriterState writer_state_;
std::function<void()> pause_callback_;
std::function<void()> resume_callback_;
// Map from directory + prefix to the queue for that directory
std::unordered_map<std::string, std::shared_ptr<DatasetWriterDirectoryQueue>>
directory_queues_;
std::mutex mutex_;
Expand Down
31 changes: 29 additions & 2 deletions cpp/src/arrow/dataset/dataset_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,18 @@ class DatasetWriterTestFixture : public testing::Test {
}
}

void AssertCreatedData(const std::vector<ExpectedFile>& expected_files) {
void AssertCreatedData(const std::vector<ExpectedFile>& expected_files,
bool check_num_record_batches = true) {
counter_ = 0;
for (const auto& expected_file : expected_files) {
std::optional<MockFileInfo> written_file = FindFile(expected_file.filename);
AssertFileCreated(written_file, expected_file.filename);
int num_batches = 0;
AssertBatchesEqual(*MakeBatch(expected_file.start, expected_file.num_rows),
*ReadAsBatch(written_file->data, &num_batches));
ASSERT_EQ(expected_file.num_record_batches, num_batches);
if (check_num_record_batches) {
ASSERT_EQ(expected_file.num_record_batches, num_batches);
}
}
}

Expand Down Expand Up @@ -277,6 +280,30 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
{"testdir/chunk-3.arrow", 30, 5}});
}

TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteBackpresure) {
// GH-38884: This test is to make sure that the writer can handle
// throttle resources in `WriteRecordBatch`.

constexpr auto kFileSizeLimit = static_cast<uint64_t>(10);
write_options_.max_rows_per_file = kFileSizeLimit;
write_options_.max_rows_per_group = kFileSizeLimit;
write_options_.max_open_files = 2;
write_options_.min_rows_per_group = kFileSizeLimit - 1;
auto dataset_writer = MakeDatasetWriter(/*max_rows=*/kFileSizeLimit);
for (int i = 0; i < 20; ++i) {
dataset_writer->WriteRecordBatch(MakeBatch(kFileSizeLimit * 5), "");
}
EndWriterChecked(dataset_writer.get());
std::vector<ExpectedFile> expected_files;
for (int i = 0; i < 100; ++i) {
expected_files.emplace_back("testdir/chunk-" + std::to_string(i) + ".arrow",
kFileSizeLimit * i, kFileSizeLimit);
}
// Not checking the number of record batches because file may contain the
// zero-length record batch.
AssertCreatedData(expected_files, /*check_num_record_batches=*/false);
}

TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithFunctor) {
// Left padding with up to four zeros
write_options_.max_rows_per_group = 10;
Expand Down

0 comments on commit f7286a9

Please sign in to comment.