Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions cpp/velox/shuffle/VeloxHashShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1505,31 +1505,37 @@ arrow::Result<int64_t> VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
// shrinking is not enough. In this case partitionBufferSize_ == partitionBufferBase_
VELOX_CHECK(!partitionBufferInUse_);
int64_t beforeEvict = partitionBufferPool_->bytes_allocated();
int64_t evicted = 0;
std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
const auto partitionBytes = estimatePartitionBufferBytes();
std::vector<std::pair<uint32_t, int64_t>> pidToSize;
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (partitionBufferSize_[pid] == 0) {
continue;
}
pidToSize.emplace_back(pid, partitionBufferSize_[pid]);
pidToSize.emplace_back(pid, partitionBytes[pid]);
}
std::sort(pidToSize.begin(), pidToSize.end(), [&](const auto& a, const auto& b) { return a.second > b.second; });
if (!pidToSize.empty()) {
for (auto& item : pidToSize) {
auto pid = item.first;
ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));
auto* types = partitionWriter_->enableTypeAwareCompress() ? &tacBufferTypes_ : nullptr;
auto payload = std::make_unique<InMemoryPayload>(
item.second, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types);
metrics_.totalBytesToEvict += payload->rawSize();
RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, writtenBytes_));
evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
if (evicted >= size) {
break;
}

std::vector<uint32_t> selectedPids;
int64_t selectedBytes = 0;
for (auto& item : pidToSize) {
selectedPids.push_back(item.first);
selectedBytes += item.second;
if (selectedBytes >= size) {
break;
}
}
return evicted;

std::sort(selectedPids.begin(), selectedPids.end());
for (auto pid : selectedPids) {
auto numRows = partitionBufferBase_[pid];
ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));
auto* types = partitionWriter_->enableTypeAwareCompress() ? &tacBufferTypes_ : nullptr;
auto payload = std::make_unique<InMemoryPayload>(
numRows, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types);
metrics_.totalBytesToEvict += payload->rawSize();
RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, writtenBytes_));
}
return beforeEvict - partitionBufferPool_->bytes_allocated();
}

bool VeloxHashShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
Expand Down
26 changes: 25 additions & 1 deletion cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,30 @@ TEST_F(VeloxHashShuffleWriterSpillTest, kInit) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxHashShuffleWriterSpillTest, spillPartitionBuffersInPidOrder) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->partitioning = Partitioning::kRange;
shuffleWriterOptions->splitBufferSize = 4;
auto shuffleWriter =
createHashShuffleWriter(8, shuffleWriterOptions, nullptr, arrow::Compression::type::UNCOMPRESSED);

// pid 4 gets a larger partition buffer than pid 1. Size-based eviction chooses pid 4 first, but the selected
// payloads must still be written to local spill files in pid order.
auto input = makeRowVector({
makeFlatVector<int32_t>({4, 4, 4, 4, 4, 4, 4, 4, 1}),
makeFlatVector<int32_t>({0, 1, 2, 3, 4, 5, 6, 7, 8}),
});

ASSERT_NOT_OK(splitRowVector(*shuffleWriter, input));

int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->partitionBufferSize(), &evicted));
ASSERT_GT(evicted, 0);
ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);

ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxHashShuffleWriterSpillTest, kInitSingle) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->partitioning = Partitioning::kSingle;
Expand Down Expand Up @@ -389,4 +413,4 @@ TEST_F(VeloxHashShuffleWriterSpillTest, resizeBinaryBufferTriggerSpill) {
listener_->reset();
}

} // namespace gluten
} // namespace gluten
Loading