Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions be/src/storage/iterator/vertical_merge_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,14 @@ uint16_t RowSource::data() const {
Status RowSourcesBuffer::append(const std::vector<RowSource>& row_sources) {
if (_buffer.allocated_bytes() + row_sources.size() * sizeof(UInt16) >
config::vertical_compaction_max_row_source_memory_mb * 1024 * 1024) {
if (_buffer.allocated_bytes() - _buffer.size() * sizeof(UInt16) <
row_sources.size() * sizeof(UInt16)) {
// Use capacity() - size() to get the truly available element slots.
// Note: PODArrayBase::allocated_bytes() includes pad_left and pad_right,
// which are NOT usable for storing elements. Using allocated_bytes() here
// would over-estimate the available space and lead to a missed spill,
// causing _buffer to grow beyond the configured memory limit when
// push_back triggers reallocation below.
size_t available_slots = _buffer.capacity() - _buffer.size();
if (available_slots < row_sources.size()) {
VLOG_NOTICE << "RowSourceBuffer is too large, serialize and reset buffer: "
<< _buffer.allocated_bytes() << ", total size: " << _total_size;
// serialize current buffer
Expand Down
77 changes: 77 additions & 0 deletions be/test/storage/compaction/vertical_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,83 @@ TEST_F(VerticalCompactionTest, TestRowSourcesBuffer) {
}
}

// Regression test for RowSourcesBuffer::append spill threshold.
//
// Background:
// PaddedPODArray::allocated_bytes() returns the total allocated memory which
// INCLUDES pad_left and pad_right. These padding bytes are NOT usable for
// storing elements. Earlier, append() used `allocated_bytes() - size*sizeof`
// as "available room" to decide whether to skip spilling. This over-estimates
// the truly usable space (by pad_left + pad_right bytes), so when the buffer
// has already crossed the configured memory limit, append() may incorrectly
// decide that the upcoming push_back will fit without reallocation, skip the
// spill, and then push_back triggers a reallocation that doubles the buffer,
// exceeding the configured `vertical_compaction_max_row_source_memory_mb`.
//
// This test simulates the case by setting a very small memory limit (1 MB) and
// repeatedly appending row sources. After the first time the buffer crosses
// the limit, the next append must trigger a spill (file write + reset) instead
// of silently growing the in-memory buffer beyond the limit.
TEST_F(VerticalCompactionTest, TestRowSourcesBufferSpillThreshold) {
// 1 MB limit (set in SetUp as well, but make it explicit here).
config::vertical_compaction_max_row_source_memory_mb = 1;
const size_t mem_limit_bytes =
static_cast<size_t>(config::vertical_compaction_max_row_source_memory_mb) * 1024 * 1024;

RowSourcesBuffer buffer(200, absolute_dir, ReaderType::READER_CUMULATIVE_COMPACTION);

// Build a batch of row sources. Use a moderate batch size so that the
// buffer's allocated_bytes() can become very close to the limit before
// a single append crosses it.
constexpr size_t kBatchSize = 4096;
std::vector<RowSource> batch;
batch.reserve(kBatchSize);
for (size_t i = 0; i < kBatchSize; ++i) {
batch.emplace_back(static_cast<uint16_t>(i % 8), false);
}

// Total elements that fit in the memory limit (a safe upper bound).
// Each element is 2 bytes (UInt16), so ~512K elements per MB.
const size_t total_appends = (mem_limit_bytes / sizeof(uint16_t)) * 4 / kBatchSize + 8;

size_t expected_total = 0;
for (size_t i = 0; i < total_appends; ++i) {
ASSERT_TRUE(buffer.append(batch).ok());
expected_total += kBatchSize;

// Invariant: in-memory buffered_size() must never exceed what the
// memory limit allows (in elements). Otherwise the spill logic is
// broken (the bug described above).
// Allow a small slack equal to one batch because the spill check is
// performed BEFORE the push_back that crosses the threshold.
size_t buffered_elems = buffer.buffered_size();
size_t buffered_bytes = buffered_elems * sizeof(uint16_t);
// After each append, buffered_bytes should be <= mem_limit + one batch size.
// It must NOT grow unboundedly (e.g., 2x of the limit due to PODArray
// reallocation that the buggy version would allow).
EXPECT_LE(buffered_bytes, mem_limit_bytes + kBatchSize * sizeof(uint16_t))
<< "RowSourcesBuffer in-memory size exceeded the configured limit, "
<< "spill threshold logic is broken. iter=" << i
<< ", buffered_elems=" << buffered_elems;
}

EXPECT_EQ(buffer.total_size(), expected_total);

// Make sure data is persisted and can be read back correctly.
ASSERT_TRUE(buffer.flush().ok());
ASSERT_TRUE(buffer.seek_to_begin().ok());

size_t read_back = 0;
while (buffer.has_remaining().ok()) {
// Verify that the source num matches the pattern we wrote.
auto cur = buffer.current().get_source_num();
EXPECT_EQ(cur, (read_back % kBatchSize) % 8);
buffer.advance(1);
++read_back;
}
EXPECT_EQ(read_back, expected_total);
}

TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
auto num_input_rowset = 2;
auto num_segments = 2;
Expand Down
Loading