Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41771: [C++] Iterator releases its resource immediately when it reads all values #41824

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions cpp/src/arrow/util/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,18 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
Iterator() : ptr_(NULLPTR, [](void*) {}) {}

/// \brief Return the next element of the sequence, IterationTraits<T>::End() when the
/// iteration is completed. Calling this on a default constructed Iterator
/// will result in undefined behavior.
Result<T> Next() { return next_(ptr_.get()); }
/// iteration is completed.
Result<T> Next() {
if (ptr_) {
auto next_result = next_(ptr_.get());
if (next_result.ok() && IsIterationEnd(next_result.ValueUnsafe())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kou @bkietz This extra check and release should be the responsibility of the underlying iterator instead of forcing every abstract iterator to behave this way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the reason? Performance?

For the #41771 case:

This is the underlying iterator:

Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
return ::arrow::internal::IterateSynchronously<TaggedRecordBatch>(
[this](::arrow::internal::Executor* executor) {
return ScanBatchesAsync(executor);
},
scan_options_->use_threads);
}

We want to release the IPC reader created at:

auto open_reader = OpenReaderAsync(source);

It's referred by the underlying iterator indirectly via Future/std::function.

It seems that we can't remove a reference from Future/std::function without deleting Future/std::function and we can't delete Future/std::function without deleting the underlying iterator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the reason? Performance?

Performance, binary code size, and overall elegance of the iterator tree.

But I see that it's not possible because these iterators are like C++ in that the iterator itself is the value as well instead of being something that produces the value. So just ignore my comment.

ptr_.reset(NULLPTR);
}
return next_result;
} else {
return IterationTraits<T>::End();
}
}

/// Pass each element of the sequence to a visitor. Will return any error status
/// returned by the visitor, terminating iteration.
Expand Down
43 changes: 43 additions & 0 deletions cpp/src/arrow/util/iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,49 @@ void AssertIteratorNext(T expected, Iterator<T>& it) {
ASSERT_EQ(expected, actual);
}

template <typename T>
class DeleteDetectableIterator {
public:
explicit DeleteDetectableIterator(std::vector<T> values, bool* deleted)
: values_(std::move(values)), i_(0), deleted_(deleted) {}

DeleteDetectableIterator(DeleteDetectableIterator&& source)
: values_(std::move(source.values_)), i_(source.i_), deleted_(source.deleted_) {
source.deleted_ = nullptr;
}

~DeleteDetectableIterator() {
if (deleted_) {
*deleted_ = true;
}
}

Result<T> Next() {
if (i_ == values_.size()) {
return IterationTraits<T>::End();
}
return std::move(values_[i_++]);
}

private:
std::vector<T> values_;
size_t i_;
bool* deleted_;
};

// Generic iterator tests

TEST(TestIterator, DeleteOnEnd) {
bool deleted = false;
Iterator<TestInt> it(DeleteDetectableIterator<TestInt>({1}, &deleted));
ASSERT_FALSE(deleted);
AssertIteratorNext({1}, it);
ASSERT_FALSE(deleted);
ASSERT_OK_AND_ASSIGN(auto value, it.Next());
ASSERT_TRUE(IsIterationEnd(value));
ASSERT_TRUE(deleted);
}

// --------------------------------------------------------------------
// Synchronous iterator tests

Expand Down
Loading