From 2e4c1a53c117037905b4f31fce48119d9b05818b Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Thu, 21 Mar 2024 15:42:53 -0700 Subject: [PATCH] Do not fail getData for deleted buffer Since requests can arrive out of order --- .../boost/FindBoost.cmake | 19 +++++++++++ velox/exec/OutputBuffer.cpp | 18 +++++----- velox/exec/tests/OutputBufferManagerTest.cpp | 34 +++++++++++++++++-- 3 files changed, 59 insertions(+), 12 deletions(-) create mode 100644 CMake/resolve_dependency_modules/boost/FindBoost.cmake diff --git a/CMake/resolve_dependency_modules/boost/FindBoost.cmake b/CMake/resolve_dependency_modules/boost/FindBoost.cmake new file mode 100644 index 000000000000..f5bddc8fd522 --- /dev/null +++ b/CMake/resolve_dependency_modules/boost/FindBoost.cmake @@ -0,0 +1,19 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +message(STATUS "Using Boost - Bundled") +set(Boost_FOUND TRUE) +set(Boost_LIBRARIES atomic;context;date_time;filesystem;program_options;regex;system;thread) +list(APPEND Boost_LIBRARIES headers) +list(TRANSFORM Boost_LIBRARIES PREPEND Boost::) +message(STATUS "Boost targets: ${Boost_LIBRARIES}") diff --git a/velox/exec/OutputBuffer.cpp b/velox/exec/OutputBuffer.cpp index 09137b2641f6..0c509bd5ea36 100644 --- a/velox/exec/OutputBuffer.cpp +++ b/velox/exec/OutputBuffer.cpp @@ -724,15 +724,15 @@ void OutputBuffer::getData( VELOX_CHECK_LT(destination, buffers_.size()); auto* buffer = buffers_[destination].get(); - VELOX_CHECK_NOT_NULL( - buffer, - "getData received after its buffer is deleted. Destination: {}, sequence: {}", - destination, - sequence); - freed = buffer->acknowledge(sequence, true); - updateAfterAcknowledgeLocked(freed, promises); - data = buffer->getData( - maxBytes, sequence, notify, activeCheck, arbitraryBuffer_.get()); + if (buffer) { + freed = buffer->acknowledge(sequence, true); + updateAfterAcknowledgeLocked(freed, promises); + data = buffer->getData( + maxBytes, sequence, notify, activeCheck, arbitraryBuffer_.get()); + } else { + data.data.emplace_back(nullptr); + data.immediate = true; + } } releaseAfterAcknowledge(freed, promises); if (data.immediate) { diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index fa2c259e00d6..356b92089c68 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -337,6 +337,36 @@ class OutputBufferManagerTest : public testing::Test { } } bufferManager_->deleteResults(taskId, destination); + // out of order requests are allowed (fetch after delete) + { + struct Response { + std::vector> pages; + int64_t sequence; + std::vector remainingBytes; + }; + folly::Promise promise; + auto future = promise.getSemiFuture(); + bufferManager_->getData( + taskId, + destination, + 32'000'000, + nextSequence, + [&promise]( + std::vector> pages, + int64_t inSequence, + std::vector remainingBytes) { + promise.setValue(Response{ + std::move(pages), inSequence, std::move(remainingBytes)}); + }); + future.wait(); + ASSERT_TRUE(future.isReady()); + auto& response = future.value(); + ASSERT_EQ(response.sequence, nextSequence); + ASSERT_EQ(response.remainingBytes.size(), 0); + ASSERT_EQ(response.pages.size(), 1); + ASSERT_EQ(response.pages.at(0), nullptr); + } + fetchedPages = nextSequence; } @@ -829,9 +859,7 @@ TEST_F(OutputBufferManagerTest, basicBroadcast) { acknowledge(taskId, 5, 3); EXPECT_FALSE(bufferManager_->isFinished(taskId)); deleteResults(taskId, 5); - VELOX_ASSERT_THROW( - fetch(taskId, 5, 0, 1'000'000'000, 2), - "getData received after its buffer is deleted. Destination: 5, sequence: 0"); + fetch(taskId, 5, 0, 1'000'000'000, 1, true); bufferManager_->updateOutputBuffers(taskId, 7, true); EXPECT_FALSE(bufferManager_->isFinished(taskId));