From 090721ced881c291f93cec7ea07c69a270f0c7be Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 4 Jun 2026 16:44:30 +0800 Subject: [PATCH 1/5] Add regression test for hash shuffle partition-buffer spill ordering --- .../tests/VeloxShuffleWriterSpillTest.cc | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc index 0da2a2f1878..b8acb5d139b 100644 --- a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc @@ -201,6 +201,30 @@ TEST_F(VeloxHashShuffleWriterSpillTest, kInit) { ASSERT_NOT_OK(shuffleWriter->stop()); } +TEST_F(VeloxHashShuffleWriterSpillTest, spillPartitionBuffersInPidOrder) { + auto shuffleWriterOptions = std::make_shared(); + shuffleWriterOptions->partitioning = Partitioning::kRange; + shuffleWriterOptions->splitBufferSize = 4; + auto shuffleWriter = + createHashShuffleWriter(8, shuffleWriterOptions, nullptr, arrow::Compression::type::UNCOMPRESSED); + + // pid 4 gets a larger partition buffer than pid 1. Size-based eviction chooses pid 4 first, so the local + // partition writer must split spills before writing pid 1. + auto input = makeRowVector({ + makeFlatVector({4, 4, 4, 4, 4, 4, 4, 4, 1}), + makeFlatVector({0, 1, 2, 3, 4, 5, 6, 7, 8}), + }); + + ASSERT_NOT_OK(splitRowVector(*shuffleWriter, input)); + + int64_t evicted; + ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->partitionBufferSize(), &evicted)); + ASSERT_GT(evicted, 0); + ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0); + + ASSERT_NOT_OK(shuffleWriter->stop()); +} + TEST_F(VeloxHashShuffleWriterSpillTest, kInitSingle) { auto shuffleWriterOptions = std::make_shared(); shuffleWriterOptions->partitioning = Partitioning::kSingle; @@ -389,4 +413,4 @@ TEST_F(VeloxHashShuffleWriterSpillTest, resizeBinaryBufferTriggerSpill) { listener_->reset(); } -} // namespace gluten \ No newline at end of file +} // namespace gluten From e32270ff99fdbd530e4596c72e976bcac1a562ba Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 4 Jun 2026 17:20:28 +0800 Subject: [PATCH 2/5] Fix hash shuffle spill ordering when evicting partition buffers --- cpp/core/shuffle/LocalPartitionWriter.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 99bfbbd6b2b..93d7978f919 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -716,6 +716,11 @@ arrow::Status LocalPartitionWriter::hashEvict( rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize(); if (evictType == Evict::kSpill) { + // Spill merge scans payloads by partition id, so split the file when hash eviction wraps around. + if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) { + lastEvictPid_ = -1; + RETURN_NOT_OK(finishSpill()); + } RETURN_NOT_OK(requestSpill(false)); auto shouldCompress = codec_ != nullptr && inMemoryPayload->numRows() >= options_->compressionThreshold; @@ -725,6 +730,7 @@ arrow::Status LocalPartitionWriter::hashEvict( shouldCompress ? Payload::kToBeCompressed : Payload::kUncompressed, payloadPool_.get(), codec_.get())); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); + lastEvictPid_ = partitionId; return arrow::Status::OK(); } From 79e025b4a0780d4ac456dbe0c34ed9dd2b3f5e99 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 4 Jun 2026 18:14:10 +0800 Subject: [PATCH 3/5] Fix hash shuffle spill writes to preserve pid order --- cpp/core/shuffle/LocalPartitionWriter.cc | 6 --- cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 41 ++++++++++++------- .../tests/VeloxShuffleWriterSpillTest.cc | 4 +- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 93d7978f919..99bfbbd6b2b 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -716,11 +716,6 @@ arrow::Status LocalPartitionWriter::hashEvict( rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize(); if (evictType == Evict::kSpill) { - // Spill merge scans payloads by partition id, so split the file when hash eviction wraps around. - if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) { - lastEvictPid_ = -1; - RETURN_NOT_OK(finishSpill()); - } RETURN_NOT_OK(requestSpill(false)); auto shouldCompress = codec_ != nullptr && inMemoryPayload->numRows() >= options_->compressionThreshold; @@ -730,7 +725,6 @@ arrow::Status LocalPartitionWriter::hashEvict( shouldCompress ? Payload::kToBeCompressed : Payload::kUncompressed, payloadPool_.get(), codec_.get())); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); - lastEvictPid_ = partitionId; return arrow::Status::OK(); } diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index dfb799806af..8441fb30a6e 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -1505,7 +1505,6 @@ arrow::Result VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6 // shrinking is not enough. In this case partitionBufferSize_ == partitionBufferBase_ VELOX_CHECK(!partitionBufferInUse_); int64_t beforeEvict = partitionBufferPool_->bytes_allocated(); - int64_t evicted = 0; std::vector> pidToSize; for (auto pid = 0; pid < numPartitions_; ++pid) { if (partitionBufferSize_[pid] == 0) { @@ -1514,22 +1513,34 @@ arrow::Result VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6 pidToSize.emplace_back(pid, partitionBufferSize_[pid]); } std::sort(pidToSize.begin(), pidToSize.end(), [&](const auto& a, const auto& b) { return a.second > b.second; }); - if (!pidToSize.empty()) { - for (auto& item : pidToSize) { - auto pid = item.first; - ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); - auto* types = partitionWriter_->enableTypeAwareCompress() ? &tacBufferTypes_ : nullptr; - auto payload = std::make_unique( - item.second, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types); - metrics_.totalBytesToEvict += payload->rawSize(); - RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, writtenBytes_)); - evicted = beforeEvict - partitionBufferPool_->bytes_allocated(); - if (evicted >= size) { - break; - } + + struct PartitionPayload { + uint32_t pid; + std::unique_ptr payload; + }; + std::vector selectedPayloads; + int64_t selectedBytes = 0; + for (auto& item : pidToSize) { + auto pid = item.first; + ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); + auto* types = partitionWriter_->enableTypeAwareCompress() ? &tacBufferTypes_ : nullptr; + auto payload = std::make_unique( + item.second, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types); + selectedBytes += payload->rawCapacity(); + selectedPayloads.push_back({pid, std::move(payload)}); + if (selectedBytes >= size) { + break; } } - return evicted; + + std::sort(selectedPayloads.begin(), selectedPayloads.end(), [](const auto& a, const auto& b) { + return a.pid < b.pid; + }); + for (auto& item : selectedPayloads) { + metrics_.totalBytesToEvict += item.payload->rawSize(); + RETURN_NOT_OK(partitionWriter_->hashEvict(item.pid, std::move(item.payload), Evict::kSpill, false, writtenBytes_)); + } + return beforeEvict - partitionBufferPool_->bytes_allocated(); } bool VeloxHashShuffleWriter::shrinkPartitionBuffersAfterSpill() const { diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc index b8acb5d139b..62e5f28a4ce 100644 --- a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc @@ -208,8 +208,8 @@ TEST_F(VeloxHashShuffleWriterSpillTest, spillPartitionBuffersInPidOrder) { auto shuffleWriter = createHashShuffleWriter(8, shuffleWriterOptions, nullptr, arrow::Compression::type::UNCOMPRESSED); - // pid 4 gets a larger partition buffer than pid 1. Size-based eviction chooses pid 4 first, so the local - // partition writer must split spills before writing pid 1. + // pid 4 gets a larger partition buffer than pid 1. Size-based eviction chooses pid 4 first, but the selected + // payloads must still be written to local spill files in pid order. auto input = makeRowVector({ makeFlatVector({4, 4, 4, 4, 4, 4, 4, 4, 1}), makeFlatVector({0, 1, 2, 3, 4, 5, 6, 7, 8}), From 03c84901469a4bf0df05ce0151ab849ec719649a Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 4 Jun 2026 19:36:05 +0800 Subject: [PATCH 4/5] fix style --- cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index 8441fb30a6e..55a882a9604 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -1533,9 +1533,8 @@ arrow::Result VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6 } } - std::sort(selectedPayloads.begin(), selectedPayloads.end(), [](const auto& a, const auto& b) { - return a.pid < b.pid; - }); + std::sort( + selectedPayloads.begin(), selectedPayloads.end(), [](const auto& a, const auto& b) { return a.pid < b.pid; }); for (auto& item : selectedPayloads) { metrics_.totalBytesToEvict += item.payload->rawSize(); RETURN_NOT_OK(partitionWriter_->hashEvict(item.pid, std::move(item.payload), Evict::kSpill, false, writtenBytes_)); From 42ce32d02d3d8155ad7ee5124672d25b300e68b6 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Fri, 5 Jun 2026 00:23:31 +0800 Subject: [PATCH 5/5] Spill selected partition buffers in pid order without pre-materializing payloads --- cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 34 +++++++++------------ 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index 55a882a9604..fde3911db0d 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -1505,39 +1505,35 @@ arrow::Result VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6 // shrinking is not enough. In this case partitionBufferSize_ == partitionBufferBase_ VELOX_CHECK(!partitionBufferInUse_); int64_t beforeEvict = partitionBufferPool_->bytes_allocated(); - std::vector> pidToSize; + const auto partitionBytes = estimatePartitionBufferBytes(); + std::vector> pidToSize; for (auto pid = 0; pid < numPartitions_; ++pid) { if (partitionBufferSize_[pid] == 0) { continue; } - pidToSize.emplace_back(pid, partitionBufferSize_[pid]); + pidToSize.emplace_back(pid, partitionBytes[pid]); } std::sort(pidToSize.begin(), pidToSize.end(), [&](const auto& a, const auto& b) { return a.second > b.second; }); - struct PartitionPayload { - uint32_t pid; - std::unique_ptr payload; - }; - std::vector selectedPayloads; + std::vector selectedPids; int64_t selectedBytes = 0; for (auto& item : pidToSize) { - auto pid = item.first; - ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); - auto* types = partitionWriter_->enableTypeAwareCompress() ? &tacBufferTypes_ : nullptr; - auto payload = std::make_unique( - item.second, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types); - selectedBytes += payload->rawCapacity(); - selectedPayloads.push_back({pid, std::move(payload)}); + selectedPids.push_back(item.first); + selectedBytes += item.second; if (selectedBytes >= size) { break; } } - std::sort( - selectedPayloads.begin(), selectedPayloads.end(), [](const auto& a, const auto& b) { return a.pid < b.pid; }); - for (auto& item : selectedPayloads) { - metrics_.totalBytesToEvict += item.payload->rawSize(); - RETURN_NOT_OK(partitionWriter_->hashEvict(item.pid, std::move(item.payload), Evict::kSpill, false, writtenBytes_)); + std::sort(selectedPids.begin(), selectedPids.end()); + for (auto pid : selectedPids) { + auto numRows = partitionBufferBase_[pid]; + ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); + auto* types = partitionWriter_->enableTypeAwareCompress() ? &tacBufferTypes_ : nullptr; + auto payload = std::make_unique( + numRows, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types); + metrics_.totalBytesToEvict += payload->rawSize(); + RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, writtenBytes_)); } return beforeEvict - partitionBufferPool_->bytes_allocated(); }