From 95a0b54ad665bdd32d337c1c7d6e769ab404df28 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 9 Apr 2026 14:42:39 +0800 Subject: [PATCH 1/8] [improve](compaction) Use segment footer raw_data_bytes for first-time batch size estimation When vertical compaction runs for the first time on a tablet (no historical sampling data), estimate_batch_size() previously returned a hardcoded value of 992, which could cause OOM for wide tables or be too conservative for narrow tables. This change uses ColumnMetaPB.raw_data_bytes from segment footer to compute a per-row size estimate for the first compaction. raw_data_bytes records the original data size before encoding, which closely approximates runtime Block::bytes(). Subsequent compactions continue to use the existing historical sampling mechanism unchanged. Key design decisions: - Footer collection only runs when needed (no manual override, and at least one column group lacks historical sampling data) - Variant columns (raw_data_bytes=0 TODO) trigger fallback to 992 - Structural overhead (+1 null map, +8 offset) only added for scalar columns with actual footer data - Complex types (ARRAY/MAP/STRUCT) use raw_data_bytes directly without structural compensation as it already includes recursive sub-writer data - Historical sampling now uses Block::allocated_bytes() instead of bytes() for more accurate memory estimation --- .../iterator/vertical_merge_iterator.h | 2 +- be/src/storage/merger.cpp | 124 +++++++++++++++++- .../compaction/vertical_compaction_test.cpp | 66 ++++++++++ 3 files changed, 188 insertions(+), 4 deletions(-) diff --git a/be/src/storage/iterator/vertical_merge_iterator.h b/be/src/storage/iterator/vertical_merge_iterator.h index e748ae460c7e0e..865399c47478c9 100644 --- a/be/src/storage/iterator/vertical_merge_iterator.h +++ b/be/src/storage/iterator/vertical_merge_iterator.h @@ -208,7 +208,7 @@ class VerticalMergeIteratorContext { size_t bytes() { if (_block) { - return _block->bytes(); + return _block->allocated_bytes(); } else { return 0; } diff --git a/be/src/storage/merger.cpp b/be/src/storage/merger.cpp index ec246f13c16d5d..99c4e11970a274 100644 --- a/be/src/storage/merger.cpp +++ b/be/src/storage/merger.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -45,9 +46,11 @@ #include "storage/olap_common.h" #include "storage/olap_define.h" #include "storage/rowid_conversion.h" +#include "storage/rowset/beta_rowset.h" #include "storage/rowset/rowset.h" #include "storage/rowset/rowset_meta.h" #include "storage/rowset/rowset_writer.h" +#include "storage/segment/segment.h" #include "storage/segment/segment_writer.h" #include "storage/storage_engine.h" #include "storage/tablet/base_tablet.h" @@ -412,7 +415,8 @@ Status Merger::vertical_compact_one_group( } int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt, - ReaderType reader_type) { + ReaderType reader_type, int64_t group_per_row_from_footer, + bool footer_fallback) { auto& sample_info_lock = tablet->get_sample_info_lock(reader_type); auto& sample_infos = tablet->get_sample_infos(reader_type); std::unique_lock lock(sample_info_lock); @@ -440,9 +444,21 @@ int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_ group_data_size = info.bytes / info.rows; sample_infos[group_index].group_data_size = group_data_size; } else { + // No historical sampling data available. + // Try to use raw_data_bytes from segment footer for a better estimate. + if (!footer_fallback && group_per_row_from_footer > 0) { + int64_t batch_size = block_mem_limit / group_per_row_from_footer; + int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), int64_t(32L)); + LOG(INFO) << "estimate batch size from footer for vertical compaction, tablet id: " + << tablet->tablet_id() << " group_per_row_from_footer: " + << group_per_row_from_footer << " way cnt: " << way_cnt + << " batch size: " << res; + return res; + } LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id() << " group data size: " << info.group_data_size - << " row num: " << info.rows << " consume bytes: " << info.bytes; + << " row num: " << info.rows << " consume bytes: " << info.bytes + << " footer_fallback: " << footer_fallback; return 1024 - 32; } @@ -548,13 +564,115 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t std::unique_lock lock(sample_info_lock); sample_infos.resize(column_groups.size()); } + // Collect per-column raw_data_bytes from segment footer for first-time batch size estimation. + // raw_data_bytes is the original data size before encoding, close to runtime Block::bytes(). + // Only collect when needed: skip if manual batch_size override is set, or if ALL groups + // already have historical sampling data. Use per-group granularity so that schema evolution + // (new groups without history) still gets footer-based estimation. + struct ColumnRawSizeInfo { + int64_t total_raw_bytes = 0; + int64_t rows_with_data = 0; + }; + std::unordered_map column_raw_sizes; + bool need_footer_collection = false; + if (config::compaction_batch_size == -1) { + std::unique_lock lock(sample_info_lock); + for (const auto& info : sample_infos) { + if (info.group_data_size <= 0 && info.bytes <= 0 && info.rows <= 0) { + need_footer_collection = true; + break; + } + } + } + if (need_footer_collection) { + for (const auto& rs_reader : src_rowset_readers) { + auto beta_rowset = std::dynamic_pointer_cast(rs_reader->rowset()); + if (!beta_rowset) { + continue; + } + std::vector segments; + auto st = beta_rowset->load_segments(&segments); + if (!st.ok()) { + LOG(WARNING) << "Failed to load segments for footer raw_data_bytes collection" + << ", tablet_id: " << tablet->tablet_id() + << ", rowset_id: " << beta_rowset->rowset_id() + << ", status: " << st; + continue; + } + for (const auto& segment : segments) { + int64_t row_count = segment->num_rows(); + auto collect_st = segment->traverse_column_meta_pbs( + [&](const segment_v2::ColumnMetaPB& meta) { + int32_t uid = meta.unique_id(); + if (uid >= 0 && meta.has_raw_data_bytes()) { + auto& info = column_raw_sizes[uid]; + info.total_raw_bytes += meta.raw_data_bytes(); + info.rows_with_data += row_count; + } + }); + if (!collect_st.ok()) { + LOG(WARNING) << "Failed to traverse column meta for footer collection" + << ", tablet_id: " << tablet->tablet_id() + << ", status: " << collect_st; + } + } + } + } + + // Pre-compute per-row estimate for each column group from footer data. + std::vector group_per_row_from_footer(column_groups.size(), 0); + std::vector group_footer_fallback(column_groups.size(), false); + for (size_t i = 0; i < column_groups.size(); ++i) { + int64_t group_per_row = 0; + bool need_fallback = false; + for (uint32_t col_ordinal : column_groups[i]) { + const auto& col = tablet_schema.column(col_ordinal); + int32_t uid = col.unique_id(); + + // Variant columns (root or subcolumn): raw_data_bytes is 0 (TODO in writer), + // cannot estimate from footer, fallback to default for the entire group. + if (uid < 0 || col.is_variant_type()) { + need_fallback = true; + break; + } + + int64_t col_per_row = 0; + auto it = column_raw_sizes.find(uid); + if (it != column_raw_sizes.end() && it->second.rows_with_data > 0) { + col_per_row = it->second.total_raw_bytes / it->second.rows_with_data; + + // Structural overhead compensation for scalar columns only. + // Only add when footer data actually exists for this column, + // to avoid creating a non-zero estimate purely from structural overhead. + // Complex types (ARRAY/MAP/STRUCT) have raw_data_bytes recursively aggregated + // from sub-writers, so no additional compensation is needed. + if (col.type() != FieldType::OLAP_FIELD_TYPE_ARRAY && + col.type() != FieldType::OLAP_FIELD_TYPE_MAP && + col.type() != FieldType::OLAP_FIELD_TYPE_STRUCT) { + if (col.is_nullable()) { + col_per_row += 1; // null map: 1 byte per row + } + if (col.is_length_variable_type()) { + col_per_row += 8; // offset array: 8 bytes per row at runtime + } + } + } + + group_per_row += col_per_row; + } + group_per_row_from_footer[i] = group_per_row; + group_footer_fallback[i] = need_fallback; + } + // compact group one by one for (auto i = 0; i < column_groups.size(); ++i) { VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); bool is_key = (i == 0); int64_t batch_size = config::compaction_batch_size != -1 ? config::compaction_batch_size - : estimate_batch_size(i, tablet, merge_way_num, reader_type); + : estimate_batch_size(i, tablet, merge_way_num, reader_type, + group_per_row_from_footer[i], + group_footer_fallback[i]); CompactionSampleInfo sample_info; Merger::Statistics group_stats; group_stats.rowid_conversion = total_stats.rowid_conversion; diff --git a/be/test/storage/compaction/vertical_compaction_test.cpp b/be/test/storage/compaction/vertical_compaction_test.cpp index 57b273caa592ec..310259c01cb64e 100644 --- a/be/test/storage/compaction/vertical_compaction_test.cpp +++ b/be/test/storage/compaction/vertical_compaction_test.cpp @@ -1192,4 +1192,70 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMergeWithNullableSparseColum config::sparse_column_compaction_threshold_percent = original_threshold; } +// Test that first-time compaction (no historical sampling) uses footer raw_data_bytes +// to estimate batch_size instead of hardcoded 992. +// This test verifies the footer-based estimation path is triggered and compaction succeeds. +TEST_F(VerticalCompactionTest, TestFirstCompactionUsesFooterEstimation) { + // Use small data to ensure compaction completes quickly + auto num_input_rowset = 2; + auto num_segments = 1; + auto rows_per_segment = 1024; + SegmentsOverlapPB overlap = NONOVERLAPPING; + std::vector>>> input_data; + generate_input_data(num_input_rowset, num_segments, rows_per_segment, overlap, input_data); + + TabletSchemaSPtr tablet_schema = create_schema(); + + // Create input rowsets + std::vector input_rowsets; + for (auto i = 0; i < num_input_rowset; i++) { + RowsetSharedPtr rowset = create_rowset(tablet_schema, overlap, input_data[i], i); + input_rowsets.push_back(rowset); + } + + // Create input rowset readers + std::vector input_rs_readers; + for (auto& rowset : input_rowsets) { + RowsetReaderSharedPtr rs_reader; + ASSERT_TRUE(rowset->create_reader(&rs_reader).ok()); + input_rs_readers.push_back(std::move(rs_reader)); + } + + // Create output rowset writer + auto writer_context = create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, + {0, input_rowsets.back()->end_version()}); + auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, true); + ASSERT_TRUE(res.has_value()) << res.error(); + auto output_rs_writer = std::move(res).value(); + + // Create tablet - fresh tablet has no historical sampling data, + // so estimate_batch_size will hit the else branch and use footer raw_data_bytes. + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + Merger::Statistics stats; + RowIdConversion rowid_conversion; + stats.rowid_conversion = &rowid_conversion; + + // Verify sample_infos are empty (no historical data) + auto& sample_infos = tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION); + EXPECT_TRUE(sample_infos.empty()); + + // Run vertical merge - this should use footer raw_data_bytes for batch size estimation + // since there is no historical sampling data. + // The log should contain "estimate batch size from footer" instead of the old hardcoded path. + auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, + *tablet_schema, input_rs_readers, + output_rs_writer.get(), 100000, num_segments, &stats); + ASSERT_TRUE(s.ok()) << s; + + RowsetSharedPtr out_rowset; + ASSERT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); + EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), + num_input_rowset * num_segments * rows_per_segment); + + // After first compaction, sample_infos should be populated with historical data + // for subsequent compactions to use. + auto& updated_infos = tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION); + EXPECT_FALSE(updated_infos.empty()); +} + } // namespace doris From 6943519ca0ca7a0c7234ddf045c83c9f471f794d Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 9 Apr 2026 14:51:42 +0800 Subject: [PATCH 2/8] [improve](compaction) clang-format --- be/src/storage/merger.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/be/src/storage/merger.cpp b/be/src/storage/merger.cpp index 99c4e11970a274..dbd03666f7992b 100644 --- a/be/src/storage/merger.cpp +++ b/be/src/storage/merger.cpp @@ -450,9 +450,9 @@ int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_ int64_t batch_size = block_mem_limit / group_per_row_from_footer; int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), int64_t(32L)); LOG(INFO) << "estimate batch size from footer for vertical compaction, tablet id: " - << tablet->tablet_id() << " group_per_row_from_footer: " - << group_per_row_from_footer << " way cnt: " << way_cnt - << " batch size: " << res; + << tablet->tablet_id() + << " group_per_row_from_footer: " << group_per_row_from_footer + << " way cnt: " << way_cnt << " batch size: " << res; return res; } LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " @@ -595,8 +595,7 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t if (!st.ok()) { LOG(WARNING) << "Failed to load segments for footer raw_data_bytes collection" << ", tablet_id: " << tablet->tablet_id() - << ", rowset_id: " << beta_rowset->rowset_id() - << ", status: " << st; + << ", rowset_id: " << beta_rowset->rowset_id() << ", status: " << st; continue; } for (const auto& segment : segments) { From 1e27cbbdf80ffdb0f6e551ef7e3d28b3629375c9 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 9 Apr 2026 15:20:08 +0800 Subject: [PATCH 3/8] [improve](compaction) add raw_data_bytes accuracy test and tighten assertions --- .../compaction/vertical_compaction_test.cpp | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/be/test/storage/compaction/vertical_compaction_test.cpp b/be/test/storage/compaction/vertical_compaction_test.cpp index 310259c01cb64e..c20f14e3685690 100644 --- a/be/test/storage/compaction/vertical_compaction_test.cpp +++ b/be/test/storage/compaction/vertical_compaction_test.cpp @@ -63,6 +63,7 @@ #include "storage/rowset/rowset_writer.h" #include "storage/rowset/rowset_writer_context.h" #include "storage/schema.h" +#include "storage/segment/segment.h" #include "storage/storage_engine.h" #include "storage/tablet/tablet.h" #include "storage/tablet/tablet_meta.h" @@ -1258,4 +1259,100 @@ TEST_F(VerticalCompactionTest, TestFirstCompactionUsesFooterEstimation) { EXPECT_FALSE(updated_infos.empty()); } +// Test that raw_data_bytes in segment footer accurately reflects the original data size +// for different column types, which is the foundation of footer-based batch size estimation. +TEST_F(VerticalCompactionTest, TestFooterRawDataBytesAccuracy) { + // Create a schema with INT key + VARCHAR value to test both fixed and variable-length types + TabletSchemaSPtr tablet_schema = std::make_shared(); + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(DUP_KEYS); + tablet_schema_pb.set_num_short_key_columns(1); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(3); + + ColumnPB* col_int = tablet_schema_pb.add_column(); + col_int->set_unique_id(1); + col_int->set_name("c_int"); + col_int->set_type("INT"); + col_int->set_is_key(true); + col_int->set_length(4); + col_int->set_index_length(4); + col_int->set_is_nullable(false); + col_int->set_is_bf_column(false); + + ColumnPB* col_varchar = tablet_schema_pb.add_column(); + col_varchar->set_unique_id(2); + col_varchar->set_name("c_varchar"); + col_varchar->set_type("VARCHAR"); + col_varchar->set_is_key(false); + col_varchar->set_length(128); + col_varchar->set_index_length(20); + col_varchar->set_is_nullable(false); + col_varchar->set_is_bf_column(false); + + tablet_schema->init_from_pb(tablet_schema_pb); + + // Write 1000 rows: INT values + VARCHAR strings of exactly 20 bytes each + constexpr int kNumRows = 1000; + constexpr int kStringLen = 20; + std::string fixed_string(kStringLen, 'x'); + + auto writer_context = + create_rowset_writer_context(tablet_schema, NONOVERLAPPING, UINT32_MAX, {0, 0}); + auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, true); + ASSERT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + + Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int i = 0; i < kNumRows; i++) { + int32_t int_val = i; + columns[0]->insert_data(reinterpret_cast(&int_val), sizeof(int_val)); + columns[1]->insert_data(fixed_string.data(), fixed_string.size()); + } + ASSERT_TRUE(rowset_writer->add_block(&block).ok()); + ASSERT_TRUE(rowset_writer->flush().ok()); + + RowsetSharedPtr rowset; + ASSERT_EQ(Status::OK(), rowset_writer->build(rowset)); + ASSERT_EQ(1, rowset->rowset_meta()->num_segments()); + ASSERT_EQ(kNumRows, rowset->rowset_meta()->num_rows()); + + // Load segments and read footer's raw_data_bytes + auto beta_rowset = std::dynamic_pointer_cast(rowset); + ASSERT_NE(beta_rowset, nullptr); + std::vector segments; + ASSERT_TRUE(beta_rowset->load_segments(&segments).ok()); + ASSERT_EQ(1, segments.size()); + ASSERT_EQ(kNumRows, segments[0]->num_rows()); + + // Collect raw_data_bytes per column from footer + std::unordered_map raw_bytes_by_uid; + auto st = segments[0]->traverse_column_meta_pbs( + [&](const segment_v2::ColumnMetaPB& meta) { + if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) { + raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes(); + } + }); + ASSERT_TRUE(st.ok()) << st; + + // Verify INT column (uid=1): raw_data_bytes should be exactly kNumRows * sizeof(int32_t). + // PageBuilder::get_raw_data_size() accumulates raw data bytes added via add(), + // for fixed-width types this is exactly N * sizeof(T). + ASSERT_TRUE(raw_bytes_by_uid.count(1) > 0) << "INT column raw_data_bytes not found in footer"; + EXPECT_EQ(raw_bytes_by_uid[1], kNumRows * sizeof(int32_t)) + << "INT column: expected " << kNumRows * sizeof(int32_t) + << " total raw_data_bytes, got " << raw_bytes_by_uid[1]; + + // Verify VARCHAR column (uid=2): raw_data_bytes should be exactly kNumRows * kStringLen. + // BinaryPlainPageBuilder/BinaryDictPageBuilder only accumulate src->size (the raw string + // payload), not offsets, varint length prefixes, or dictionary overhead. + ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0) + << "VARCHAR column raw_data_bytes not found in footer"; + EXPECT_EQ(raw_bytes_by_uid[2], kNumRows * kStringLen) + << "VARCHAR column: expected " << kNumRows * kStringLen + << " total raw_data_bytes, got " << raw_bytes_by_uid[2]; +} + } // namespace doris From b9888f2006b77ce0f33038fda4d9b3ab2c4018e0 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 9 Apr 2026 17:05:35 +0800 Subject: [PATCH 4/8] 1 --- .../compaction/vertical_compaction_test.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/be/test/storage/compaction/vertical_compaction_test.cpp b/be/test/storage/compaction/vertical_compaction_test.cpp index c20f14e3685690..b119e0d8ad6cc6 100644 --- a/be/test/storage/compaction/vertical_compaction_test.cpp +++ b/be/test/storage/compaction/vertical_compaction_test.cpp @@ -1329,12 +1329,11 @@ TEST_F(VerticalCompactionTest, TestFooterRawDataBytesAccuracy) { // Collect raw_data_bytes per column from footer std::unordered_map raw_bytes_by_uid; - auto st = segments[0]->traverse_column_meta_pbs( - [&](const segment_v2::ColumnMetaPB& meta) { - if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) { - raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes(); - } - }); + auto st = segments[0]->traverse_column_meta_pbs([&](const segment_v2::ColumnMetaPB& meta) { + if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) { + raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes(); + } + }); ASSERT_TRUE(st.ok()) << st; // Verify INT column (uid=1): raw_data_bytes should be exactly kNumRows * sizeof(int32_t). @@ -1351,8 +1350,8 @@ TEST_F(VerticalCompactionTest, TestFooterRawDataBytesAccuracy) { ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0) << "VARCHAR column raw_data_bytes not found in footer"; EXPECT_EQ(raw_bytes_by_uid[2], kNumRows * kStringLen) - << "VARCHAR column: expected " << kNumRows * kStringLen - << " total raw_data_bytes, got " << raw_bytes_by_uid[2]; + << "VARCHAR column: expected " << kNumRows * kStringLen << " total raw_data_bytes, got " + << raw_bytes_by_uid[2]; } } // namespace doris From 77c4d13711f9e63e3e695a8a9b9f712e1ea32492 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 15 Apr 2026 11:19:10 +0800 Subject: [PATCH 5/8] [improve](compaction) Add block sample logging after vertical compaction init Log per_row, sample_bytes, sample_rows immediately after all merge inputs finish loading their first block, before the actual merge starts. This helps diagnose memory issues by showing the actual per-row memory size at init time. --- be/src/storage/iterator/vertical_block_reader.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/be/src/storage/iterator/vertical_block_reader.cpp b/be/src/storage/iterator/vertical_block_reader.cpp index f3c483fb5b1e1d..2d51d7bc0044f5 100644 --- a/be/src/storage/iterator/vertical_block_reader.cpp +++ b/be/src/storage/iterator/vertical_block_reader.cpp @@ -168,6 +168,15 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, opts.block_row_max = cast_set(read_params.batch_size); } RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info)); + if (sample_info != nullptr && sample_info->rows > 0) { + LOG(INFO) << "vertical compaction block sample after init, tablet_id: " + << read_params.tablet->tablet_id() + << ", is_key_group: " << read_params.is_key_column_group + << ", batch_size: " << read_params.batch_size + << ", sample_bytes: " << sample_info->bytes + << ", sample_rows: " << sample_info->rows + << ", per_row: " << sample_info->bytes / sample_info->rows; + } // In agg keys value columns compact, get first row for _init_agg_state if (!read_params.is_key_column_group && read_params.tablet->keys_type() == KeysType::AGG_KEYS) { From d6786ef61d4a0bbaec43df968bd8bb17aa6fe744 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 22 Apr 2026 17:46:02 +0800 Subject: [PATCH 6/8] [improve](compaction) Remove debug log after diagnosis is complete The log was added to help diagnose vertical compaction memory issues. Investigation is complete; the existing 'estimate batch size' log in merger.cpp already provides per-group batch_size and per_row info for daily monitoring. --- be/src/storage/iterator/vertical_block_reader.cpp | 9 --------- 1 file changed, 9 deletions(-) diff --git a/be/src/storage/iterator/vertical_block_reader.cpp b/be/src/storage/iterator/vertical_block_reader.cpp index 2d51d7bc0044f5..f3c483fb5b1e1d 100644 --- a/be/src/storage/iterator/vertical_block_reader.cpp +++ b/be/src/storage/iterator/vertical_block_reader.cpp @@ -168,15 +168,6 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, opts.block_row_max = cast_set(read_params.batch_size); } RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info)); - if (sample_info != nullptr && sample_info->rows > 0) { - LOG(INFO) << "vertical compaction block sample after init, tablet_id: " - << read_params.tablet->tablet_id() - << ", is_key_group: " << read_params.is_key_column_group - << ", batch_size: " << read_params.batch_size - << ", sample_bytes: " << sample_info->bytes - << ", sample_rows: " << sample_info->rows - << ", per_row: " << sample_info->bytes / sample_info->rows; - } // In agg keys value columns compact, get first row for _init_agg_state if (!read_params.is_key_column_group && read_params.tablet->keys_type() == KeysType::AGG_KEYS) { From ad1bd4edca98bc415248f4a4a3bcdc5914ed5ada Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 23 Apr 2026 11:41:59 +0800 Subject: [PATCH 7/8] [improve](compaction) Address review feedback on footer-based batch_size estimation - Fall back to default per-row when any column in the group lacks footer raw_data_bytes (e.g. legacy segments after rolling upgrade), instead of silently summing only the columns we measured. - For fixed-width scalar columns, lower-bound the per-row estimate by the fixed type size. raw_data_bytes only counts non-null payload, but the reader still allocates the full nested column slot for null rows via ColumnNullable::insert_many_defaults(), so highly nullable INT/BIGINT/ etc. columns were under-estimated and could still pick an OOM-sized batch on first compaction. --- be/src/storage/merger.cpp | 55 ++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/be/src/storage/merger.cpp b/be/src/storage/merger.cpp index dbd03666f7992b..5cd9ee5c20f7fa 100644 --- a/be/src/storage/merger.cpp +++ b/be/src/storage/merger.cpp @@ -58,6 +58,7 @@ #include "storage/tablet/tablet_fwd.h" #include "storage/tablet/tablet_meta.h" #include "storage/tablet/tablet_reader.h" +#include "storage/types.h" #include "storage/utils.h" #include "util/slice.h" @@ -635,25 +636,43 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t break; } - int64_t col_per_row = 0; + // Any column without footer data (e.g. legacy segments written before + // raw_data_bytes existed) makes the group sample partial and unreliable. + // Fall back to the default for the whole group instead of summing only + // the columns we measured. auto it = column_raw_sizes.find(uid); - if (it != column_raw_sizes.end() && it->second.rows_with_data > 0) { - col_per_row = it->second.total_raw_bytes / it->second.rows_with_data; - - // Structural overhead compensation for scalar columns only. - // Only add when footer data actually exists for this column, - // to avoid creating a non-zero estimate purely from structural overhead. - // Complex types (ARRAY/MAP/STRUCT) have raw_data_bytes recursively aggregated - // from sub-writers, so no additional compensation is needed. - if (col.type() != FieldType::OLAP_FIELD_TYPE_ARRAY && - col.type() != FieldType::OLAP_FIELD_TYPE_MAP && - col.type() != FieldType::OLAP_FIELD_TYPE_STRUCT) { - if (col.is_nullable()) { - col_per_row += 1; // null map: 1 byte per row - } - if (col.is_length_variable_type()) { - col_per_row += 8; // offset array: 8 bytes per row at runtime - } + if (it == column_raw_sizes.end() || it->second.rows_with_data <= 0) { + need_fallback = true; + break; + } + + int64_t raw_per_row = it->second.total_raw_bytes / it->second.rows_with_data; + int64_t col_per_row = 0; + + if (col.type() == FieldType::OLAP_FIELD_TYPE_ARRAY || + col.type() == FieldType::OLAP_FIELD_TYPE_MAP || + col.type() == FieldType::OLAP_FIELD_TYPE_STRUCT) { + // Complex types: raw_data_bytes recursively aggregates sub-writers. + col_per_row = raw_per_row; + } else if (col.is_length_variable_type()) { + // Variable-length scalar (VARCHAR/STRING/HLL/BITMAP/...): raw_per_row + // is the average char payload across all rows; reader still pays an + // 8-byte offset entry per row regardless of null-ness. + col_per_row = raw_per_row + 8; + if (col.is_nullable()) { + col_per_row += 1; // null map + } + } else { + // Fixed-width scalar (INT/BIGINT/DOUBLE/DATE/...). + // raw_data_bytes only counts non-null payload (append_nulls() does + // not advance the page builder), but FileColumnIterator::next_batch + // still calls ColumnNullable::insert_many_defaults() for null runs, + // which grows the nested PODArray by N * type_size. So the runtime + // per-row footprint is at least type_size, no matter how sparse. + int64_t type_size = get_type_info(&col)->size(); + col_per_row = std::max(raw_per_row, type_size); + if (col.is_nullable()) { + col_per_row += 1; // null map } } From 4f82a53615b22a55576ff0f8ee3bc89ef1f5413d Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 23 Apr 2026 11:44:39 +0800 Subject: [PATCH 8/8] [test](compaction) Add nullable sparse INT case for footer raw_data_bytes Asserts raw_data_bytes only counts non-null payload for a nullable fixed-width column (10% non-null INT in this case), which is the premise behind the type_size lower bound added to the footer-based per-row estimation. --- .../compaction/vertical_compaction_test.cpp | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/be/test/storage/compaction/vertical_compaction_test.cpp b/be/test/storage/compaction/vertical_compaction_test.cpp index b119e0d8ad6cc6..3b736857242caf 100644 --- a/be/test/storage/compaction/vertical_compaction_test.cpp +++ b/be/test/storage/compaction/vertical_compaction_test.cpp @@ -1354,4 +1354,101 @@ TEST_F(VerticalCompactionTest, TestFooterRawDataBytesAccuracy) { << raw_bytes_by_uid[2]; } +// Verify that raw_data_bytes only counts non-null payload for nullable +// fixed-width columns. This is the premise that motivates the type_size +// lower bound in merger.cpp's footer-based per-row estimation: without it, +// a sparse nullable column would produce a per-row estimate far below the +// reader's actual memory footprint (which still allocates the full nested +// slot for null rows via ColumnNullable::insert_many_defaults). +TEST_F(VerticalCompactionTest, TestFooterRawDataBytesNullableSparse) { + TabletSchemaSPtr tablet_schema = std::make_shared(); + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(DUP_KEYS); + tablet_schema_pb.set_num_short_key_columns(1); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(3); + + ColumnPB* col_key = tablet_schema_pb.add_column(); + col_key->set_unique_id(1); + col_key->set_name("c_key"); + col_key->set_type("INT"); + col_key->set_is_key(true); + col_key->set_length(4); + col_key->set_index_length(4); + col_key->set_is_nullable(false); + col_key->set_is_bf_column(false); + + ColumnPB* col_val = tablet_schema_pb.add_column(); + col_val->set_unique_id(2); + col_val->set_name("c_val"); + col_val->set_type("INT"); + col_val->set_is_key(false); + col_val->set_length(4); + col_val->set_index_length(4); + col_val->set_is_nullable(true); + col_val->set_is_bf_column(false); + + tablet_schema->init_from_pb(tablet_schema_pb); + + constexpr int kNumRows = 1000; + constexpr int kNonNullCount = 100; // 10% non-null, 90% null + + auto writer_context = + create_rowset_writer_context(tablet_schema, NONOVERLAPPING, UINT32_MAX, {0, 0}); + auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, true); + ASSERT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + + Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int i = 0; i < kNumRows; i++) { + int32_t key_val = i; + columns[0]->insert_data(reinterpret_cast(&key_val), sizeof(key_val)); + if (i < kNonNullCount) { + int32_t val = i; + columns[1]->insert_data(reinterpret_cast(&val), sizeof(val)); + } else { + columns[1]->insert_default(); // ColumnNullable default is null + } + } + ASSERT_TRUE(rowset_writer->add_block(&block).ok()); + ASSERT_TRUE(rowset_writer->flush().ok()); + + RowsetSharedPtr rowset; + ASSERT_EQ(Status::OK(), rowset_writer->build(rowset)); + ASSERT_EQ(1, rowset->rowset_meta()->num_segments()); + + auto beta_rowset = std::dynamic_pointer_cast(rowset); + ASSERT_NE(beta_rowset, nullptr); + std::vector segments; + ASSERT_TRUE(beta_rowset->load_segments(&segments).ok()); + ASSERT_EQ(1, segments.size()); + + std::unordered_map raw_bytes_by_uid; + auto st = segments[0]->traverse_column_meta_pbs([&](const segment_v2::ColumnMetaPB& meta) { + if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) { + raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes(); + } + }); + ASSERT_TRUE(st.ok()) << st; + + // Key column (non-null INT): full coverage, raw == kNumRows * 4. + ASSERT_TRUE(raw_bytes_by_uid.count(1) > 0); + EXPECT_EQ(raw_bytes_by_uid[1], kNumRows * sizeof(int32_t)); + + // Value column (nullable INT, 90% null): raw_data_bytes only reflects + // the non-null payload because ScalarColumnWriter::append_nulls() does + // not advance the page builder. So raw == kNonNullCount * 4. + // + // If merger.cpp used `raw / total_rows` directly the per-row estimate + // would be ~0.4 bytes (+1 for null map = 1.4), but the reader actually + // allocates 4 bytes for every nested slot (regardless of null-ness) + // plus 1 byte of null map = 5 bytes/row. The fixed-width type_size + // lower bound is what closes that ~3.5x gap. + ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0); + EXPECT_EQ(raw_bytes_by_uid[2], kNonNullCount * sizeof(int32_t)); + EXPECT_LT(raw_bytes_by_uid[2], kNumRows * sizeof(int32_t)); +} + } // namespace doris