Skip to content

Commit

Permalink
Do not fail getData for deleted buffer
Browse files Browse the repository at this point in the history
Since requests can arrive out of order
  • Loading branch information
arhimondr committed Mar 30, 2024
1 parent d3503ce commit eba2d11
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
20 changes: 11 additions & 9 deletions velox/exec/OutputBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,15 +724,17 @@ void OutputBuffer::getData(

VELOX_CHECK_LT(destination, buffers_.size());
auto* buffer = buffers_[destination].get();
VELOX_CHECK_NOT_NULL(
buffer,
"getData received after its buffer is deleted. Destination: {}, sequence: {}",
destination,
sequence);
freed = buffer->acknowledge(sequence, true);
updateAfterAcknowledgeLocked(freed, promises);
data = buffer->getData(
maxBytes, sequence, notify, activeCheck, arbitraryBuffer_.get());
if (buffer) {
freed = buffer->acknowledge(sequence, true);
updateAfterAcknowledgeLocked(freed, promises);
data = buffer->getData(
maxBytes, sequence, notify, activeCheck, arbitraryBuffer_.get());
} else {
data.data.emplace_back(nullptr);
data.immediate = true;
VLOG(1) << "getData received after deleteResults for destination "
<< destination << " and sequence " << sequence;
}
}
releaseAfterAcknowledge(freed, promises);
if (data.immediate) {
Expand Down
7 changes: 4 additions & 3 deletions velox/exec/OutputBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,10 @@ class OutputBuffer {

void acknowledge(int destination, int64_t sequence);

/// Deletes all data for 'destination'. Returns true if all destinations are
/// deleted, meaning that the buffer is fully consumed and the producer can be
/// marked finished and the buffers freed.
/// Deletes all buffered data and makes all subsequent getData requests
/// for 'destination' return empty results. Returns true if all destinations
/// are deleted, meaning that the buffer is fully consumed and the producer
/// can be marked finished and the buffers freed.
bool deleteResults(int destination);

void getData(
Expand Down
34 changes: 31 additions & 3 deletions velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,36 @@ class OutputBufferManagerTest : public testing::Test {
}
}
bufferManager_->deleteResults(taskId, destination);
// out of order requests are allowed (fetch after delete)
{
struct Response {
std::vector<std::unique_ptr<folly::IOBuf>> pages;
int64_t sequence;
std::vector<int64_t> remainingBytes;
};
folly::Promise<Response> promise;
auto future = promise.getSemiFuture();
bufferManager_->getData(
taskId,
destination,
32'000'000,
nextSequence,
[&promise](
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t inSequence,
std::vector<int64_t> remainingBytes) {
promise.setValue(Response{
std::move(pages), inSequence, std::move(remainingBytes)});
});
future.wait();
ASSERT_TRUE(future.isReady());
auto& response = future.value();
ASSERT_EQ(response.sequence, nextSequence);
ASSERT_EQ(response.remainingBytes.size(), 0);
ASSERT_EQ(response.pages.size(), 1);
ASSERT_EQ(response.pages.at(0), nullptr);
}

fetchedPages = nextSequence;
}

Expand Down Expand Up @@ -829,9 +859,7 @@ TEST_F(OutputBufferManagerTest, basicBroadcast) {
acknowledge(taskId, 5, 3);
EXPECT_FALSE(bufferManager_->isFinished(taskId));
deleteResults(taskId, 5);
VELOX_ASSERT_THROW(
fetch(taskId, 5, 0, 1'000'000'000, 2),
"getData received after its buffer is deleted. Destination: 5, sequence: 0");
fetch(taskId, 5, 0, 1'000'000'000, 1, true);

bufferManager_->updateOutputBuffers(taskId, 7, true);
EXPECT_FALSE(bufferManager_->isFinished(taskId));
Expand Down

0 comments on commit eba2d11

Please sign in to comment.