diff --git a/velox/exec/ExchangeClient.cpp b/velox/exec/ExchangeClient.cpp index 0ca18e1fcf70..65eab7842e67 100644 --- a/velox/exec/ExchangeClient.cpp +++ b/velox/exec/ExchangeClient.cpp @@ -117,6 +117,11 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) { std::vector> pages; { std::lock_guard l(queue_->mutex()); + if (closed_) { + *atEnd = true; + return pages; + } + *atEnd = false; pages = queue_->dequeueLocked(maxBytes, atEnd, future); if (*atEnd) { diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 68021897b8ae..4deae1b71fec 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -423,5 +423,52 @@ TEST_F(ExchangeClientTest, sourceTimeout) { test::testingShutdownLocalExchangeSource(); } +TEST_F(ExchangeClientTest, callNextAfterClose) { + constexpr int32_t kNumSources = 3; + common::testutil::TestValue::enable(); + auto client = + std::make_shared("test", 17, 1 << 20, pool(), executor()); + + bool atEnd; + ContinueFuture future; + auto pages = client->next(1, &atEnd, &future); + ASSERT_EQ(0, pages.size()); + ASSERT_FALSE(atEnd); + + for (auto i = 0; i < kNumSources; ++i) { + client->addRemoteTaskId(fmt::format("local://{}", i)); + } + client->noMoreRemoteTasks(); + + // Fetch a page. No page is found. All sources are fetching. + pages = client->next(1, &atEnd, &future); + EXPECT_TRUE(pages.empty()); + + const auto& queue = client->queue(); + for (auto i = 0; i < 10; ++i) { + enqueue(*queue, makePage(1'000 + i)); + } + + // Fetch multiple pages. Each page is slightly larger than 1K bytes, hence, + // only 4 pages fit. + pages = client->next(5'000, &atEnd, &future); + EXPECT_EQ(4, pages.size()); + EXPECT_FALSE(atEnd); + + // Close the client and try calling next again. + client->close(); + + // Here we should have no pages returned, be at end (we are closed) and the + // future should be invalid (not based on a valid promise). + ContinueFuture futureFinal{ContinueFuture::makeEmpty()}; + pages = client->next(10'000, &atEnd, &futureFinal); + EXPECT_EQ(0, pages.size()); + EXPECT_TRUE(atEnd); + EXPECT_FALSE(futureFinal.valid()); + + client->close(); + test::testingShutdownLocalExchangeSource(); +} + } // namespace } // namespace facebook::velox::exec