diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index dfb799806a..fde3911db0 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -1505,31 +1505,37 @@ arrow::Result VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6 // shrinking is not enough. In this case partitionBufferSize_ == partitionBufferBase_ VELOX_CHECK(!partitionBufferInUse_); int64_t beforeEvict = partitionBufferPool_->bytes_allocated(); - int64_t evicted = 0; - std::vector> pidToSize; + const auto partitionBytes = estimatePartitionBufferBytes(); + std::vector> pidToSize; for (auto pid = 0; pid < numPartitions_; ++pid) { if (partitionBufferSize_[pid] == 0) { continue; } - pidToSize.emplace_back(pid, partitionBufferSize_[pid]); + pidToSize.emplace_back(pid, partitionBytes[pid]); } std::sort(pidToSize.begin(), pidToSize.end(), [&](const auto& a, const auto& b) { return a.second > b.second; }); - if (!pidToSize.empty()) { - for (auto& item : pidToSize) { - auto pid = item.first; - ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); - auto* types = partitionWriter_->enableTypeAwareCompress() ? &tacBufferTypes_ : nullptr; - auto payload = std::make_unique( - item.second, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types); - metrics_.totalBytesToEvict += payload->rawSize(); - RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, writtenBytes_)); - evicted = beforeEvict - partitionBufferPool_->bytes_allocated(); - if (evicted >= size) { - break; - } + + std::vector selectedPids; + int64_t selectedBytes = 0; + for (auto& item : pidToSize) { + selectedPids.push_back(item.first); + selectedBytes += item.second; + if (selectedBytes >= size) { + break; } } - return evicted; + + std::sort(selectedPids.begin(), selectedPids.end()); + for (auto pid : selectedPids) { + auto numRows = partitionBufferBase_[pid]; + ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); + auto* types = partitionWriter_->enableTypeAwareCompress() ? &tacBufferTypes_ : nullptr; + auto payload = std::make_unique( + numRows, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types); + metrics_.totalBytesToEvict += payload->rawSize(); + RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, writtenBytes_)); + } + return beforeEvict - partitionBufferPool_->bytes_allocated(); } bool VeloxHashShuffleWriter::shrinkPartitionBuffersAfterSpill() const { diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc index 0da2a2f187..62e5f28a4c 100644 --- a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc @@ -201,6 +201,30 @@ TEST_F(VeloxHashShuffleWriterSpillTest, kInit) { ASSERT_NOT_OK(shuffleWriter->stop()); } +TEST_F(VeloxHashShuffleWriterSpillTest, spillPartitionBuffersInPidOrder) { + auto shuffleWriterOptions = std::make_shared(); + shuffleWriterOptions->partitioning = Partitioning::kRange; + shuffleWriterOptions->splitBufferSize = 4; + auto shuffleWriter = + createHashShuffleWriter(8, shuffleWriterOptions, nullptr, arrow::Compression::type::UNCOMPRESSED); + + // pid 4 gets a larger partition buffer than pid 1. Size-based eviction chooses pid 4 first, but the selected + // payloads must still be written to local spill files in pid order. + auto input = makeRowVector({ + makeFlatVector({4, 4, 4, 4, 4, 4, 4, 4, 1}), + makeFlatVector({0, 1, 2, 3, 4, 5, 6, 7, 8}), + }); + + ASSERT_NOT_OK(splitRowVector(*shuffleWriter, input)); + + int64_t evicted; + ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->partitionBufferSize(), &evicted)); + ASSERT_GT(evicted, 0); + ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0); + + ASSERT_NOT_OK(shuffleWriter->stop()); +} + TEST_F(VeloxHashShuffleWriterSpillTest, kInitSingle) { auto shuffleWriterOptions = std::make_shared(); shuffleWriterOptions->partitioning = Partitioning::kSingle; @@ -389,4 +413,4 @@ TEST_F(VeloxHashShuffleWriterSpillTest, resizeBinaryBufferTriggerSpill) { listener_->reset(); } -} // namespace gluten \ No newline at end of file +} // namespace gluten