diff --git a/be/src/storage/iterator/vertical_merge_iterator.h b/be/src/storage/iterator/vertical_merge_iterator.h index e748ae460c7e0e..865399c47478c9 100644 --- a/be/src/storage/iterator/vertical_merge_iterator.h +++ b/be/src/storage/iterator/vertical_merge_iterator.h @@ -208,7 +208,7 @@ class VerticalMergeIteratorContext { size_t bytes() { if (_block) { - return _block->bytes(); + return _block->allocated_bytes(); } else { return 0; } diff --git a/be/src/storage/merger.cpp b/be/src/storage/merger.cpp index ec246f13c16d5d..5cd9ee5c20f7fa 100644 --- a/be/src/storage/merger.cpp +++ b/be/src/storage/merger.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -45,9 +46,11 @@ #include "storage/olap_common.h" #include "storage/olap_define.h" #include "storage/rowid_conversion.h" +#include "storage/rowset/beta_rowset.h" #include "storage/rowset/rowset.h" #include "storage/rowset/rowset_meta.h" #include "storage/rowset/rowset_writer.h" +#include "storage/segment/segment.h" #include "storage/segment/segment_writer.h" #include "storage/storage_engine.h" #include "storage/tablet/base_tablet.h" @@ -55,6 +58,7 @@ #include "storage/tablet/tablet_fwd.h" #include "storage/tablet/tablet_meta.h" #include "storage/tablet/tablet_reader.h" +#include "storage/types.h" #include "storage/utils.h" #include "util/slice.h" @@ -412,7 +416,8 @@ Status Merger::vertical_compact_one_group( } int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt, - ReaderType reader_type) { + ReaderType reader_type, int64_t group_per_row_from_footer, + bool footer_fallback) { auto& sample_info_lock = tablet->get_sample_info_lock(reader_type); auto& sample_infos = tablet->get_sample_infos(reader_type); std::unique_lock lock(sample_info_lock); @@ -440,9 +445,21 @@ int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_ group_data_size = info.bytes / info.rows; sample_infos[group_index].group_data_size = group_data_size; } else { + // No historical sampling data available. + // Try to use raw_data_bytes from segment footer for a better estimate. + if (!footer_fallback && group_per_row_from_footer > 0) { + int64_t batch_size = block_mem_limit / group_per_row_from_footer; + int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), int64_t(32L)); + LOG(INFO) << "estimate batch size from footer for vertical compaction, tablet id: " + << tablet->tablet_id() + << " group_per_row_from_footer: " << group_per_row_from_footer + << " way cnt: " << way_cnt << " batch size: " << res; + return res; + } LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id() << " group data size: " << info.group_data_size - << " row num: " << info.rows << " consume bytes: " << info.bytes; + << " row num: " << info.rows << " consume bytes: " << info.bytes + << " footer_fallback: " << footer_fallback; return 1024 - 32; } @@ -548,13 +565,132 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t std::unique_lock lock(sample_info_lock); sample_infos.resize(column_groups.size()); } + // Collect per-column raw_data_bytes from segment footer for first-time batch size estimation. + // raw_data_bytes is the original data size before encoding, close to runtime Block::bytes(). + // Only collect when needed: skip if manual batch_size override is set, or if ALL groups + // already have historical sampling data. Use per-group granularity so that schema evolution + // (new groups without history) still gets footer-based estimation. + struct ColumnRawSizeInfo { + int64_t total_raw_bytes = 0; + int64_t rows_with_data = 0; + }; + std::unordered_map column_raw_sizes; + bool need_footer_collection = false; + if (config::compaction_batch_size == -1) { + std::unique_lock lock(sample_info_lock); + for (const auto& info : sample_infos) { + if (info.group_data_size <= 0 && info.bytes <= 0 && info.rows <= 0) { + need_footer_collection = true; + break; + } + } + } + if (need_footer_collection) { + for (const auto& rs_reader : src_rowset_readers) { + auto beta_rowset = std::dynamic_pointer_cast(rs_reader->rowset()); + if (!beta_rowset) { + continue; + } + std::vector segments; + auto st = beta_rowset->load_segments(&segments); + if (!st.ok()) { + LOG(WARNING) << "Failed to load segments for footer raw_data_bytes collection" + << ", tablet_id: " << tablet->tablet_id() + << ", rowset_id: " << beta_rowset->rowset_id() << ", status: " << st; + continue; + } + for (const auto& segment : segments) { + int64_t row_count = segment->num_rows(); + auto collect_st = segment->traverse_column_meta_pbs( + [&](const segment_v2::ColumnMetaPB& meta) { + int32_t uid = meta.unique_id(); + if (uid >= 0 && meta.has_raw_data_bytes()) { + auto& info = column_raw_sizes[uid]; + info.total_raw_bytes += meta.raw_data_bytes(); + info.rows_with_data += row_count; + } + }); + if (!collect_st.ok()) { + LOG(WARNING) << "Failed to traverse column meta for footer collection" + << ", tablet_id: " << tablet->tablet_id() + << ", status: " << collect_st; + } + } + } + } + + // Pre-compute per-row estimate for each column group from footer data. + std::vector group_per_row_from_footer(column_groups.size(), 0); + std::vector group_footer_fallback(column_groups.size(), false); + for (size_t i = 0; i < column_groups.size(); ++i) { + int64_t group_per_row = 0; + bool need_fallback = false; + for (uint32_t col_ordinal : column_groups[i]) { + const auto& col = tablet_schema.column(col_ordinal); + int32_t uid = col.unique_id(); + + // Variant columns (root or subcolumn): raw_data_bytes is 0 (TODO in writer), + // cannot estimate from footer, fallback to default for the entire group. + if (uid < 0 || col.is_variant_type()) { + need_fallback = true; + break; + } + + // Any column without footer data (e.g. legacy segments written before + // raw_data_bytes existed) makes the group sample partial and unreliable. + // Fall back to the default for the whole group instead of summing only + // the columns we measured. + auto it = column_raw_sizes.find(uid); + if (it == column_raw_sizes.end() || it->second.rows_with_data <= 0) { + need_fallback = true; + break; + } + + int64_t raw_per_row = it->second.total_raw_bytes / it->second.rows_with_data; + int64_t col_per_row = 0; + + if (col.type() == FieldType::OLAP_FIELD_TYPE_ARRAY || + col.type() == FieldType::OLAP_FIELD_TYPE_MAP || + col.type() == FieldType::OLAP_FIELD_TYPE_STRUCT) { + // Complex types: raw_data_bytes recursively aggregates sub-writers. + col_per_row = raw_per_row; + } else if (col.is_length_variable_type()) { + // Variable-length scalar (VARCHAR/STRING/HLL/BITMAP/...): raw_per_row + // is the average char payload across all rows; reader still pays an + // 8-byte offset entry per row regardless of null-ness. + col_per_row = raw_per_row + 8; + if (col.is_nullable()) { + col_per_row += 1; // null map + } + } else { + // Fixed-width scalar (INT/BIGINT/DOUBLE/DATE/...). + // raw_data_bytes only counts non-null payload (append_nulls() does + // not advance the page builder), but FileColumnIterator::next_batch + // still calls ColumnNullable::insert_many_defaults() for null runs, + // which grows the nested PODArray by N * type_size. So the runtime + // per-row footprint is at least type_size, no matter how sparse. + int64_t type_size = get_type_info(&col)->size(); + col_per_row = std::max(raw_per_row, type_size); + if (col.is_nullable()) { + col_per_row += 1; // null map + } + } + + group_per_row += col_per_row; + } + group_per_row_from_footer[i] = group_per_row; + group_footer_fallback[i] = need_fallback; + } + // compact group one by one for (auto i = 0; i < column_groups.size(); ++i) { VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); bool is_key = (i == 0); int64_t batch_size = config::compaction_batch_size != -1 ? config::compaction_batch_size - : estimate_batch_size(i, tablet, merge_way_num, reader_type); + : estimate_batch_size(i, tablet, merge_way_num, reader_type, + group_per_row_from_footer[i], + group_footer_fallback[i]); CompactionSampleInfo sample_info; Merger::Statistics group_stats; group_stats.rowid_conversion = total_stats.rowid_conversion; diff --git a/be/test/storage/compaction/vertical_compaction_test.cpp b/be/test/storage/compaction/vertical_compaction_test.cpp index 57b273caa592ec..3b736857242caf 100644 --- a/be/test/storage/compaction/vertical_compaction_test.cpp +++ b/be/test/storage/compaction/vertical_compaction_test.cpp @@ -63,6 +63,7 @@ #include "storage/rowset/rowset_writer.h" #include "storage/rowset/rowset_writer_context.h" #include "storage/schema.h" +#include "storage/segment/segment.h" #include "storage/storage_engine.h" #include "storage/tablet/tablet.h" #include "storage/tablet/tablet_meta.h" @@ -1192,4 +1193,262 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMergeWithNullableSparseColum config::sparse_column_compaction_threshold_percent = original_threshold; } +// Test that first-time compaction (no historical sampling) uses footer raw_data_bytes +// to estimate batch_size instead of hardcoded 992. +// This test verifies the footer-based estimation path is triggered and compaction succeeds. +TEST_F(VerticalCompactionTest, TestFirstCompactionUsesFooterEstimation) { + // Use small data to ensure compaction completes quickly + auto num_input_rowset = 2; + auto num_segments = 1; + auto rows_per_segment = 1024; + SegmentsOverlapPB overlap = NONOVERLAPPING; + std::vector>>> input_data; + generate_input_data(num_input_rowset, num_segments, rows_per_segment, overlap, input_data); + + TabletSchemaSPtr tablet_schema = create_schema(); + + // Create input rowsets + std::vector input_rowsets; + for (auto i = 0; i < num_input_rowset; i++) { + RowsetSharedPtr rowset = create_rowset(tablet_schema, overlap, input_data[i], i); + input_rowsets.push_back(rowset); + } + + // Create input rowset readers + std::vector input_rs_readers; + for (auto& rowset : input_rowsets) { + RowsetReaderSharedPtr rs_reader; + ASSERT_TRUE(rowset->create_reader(&rs_reader).ok()); + input_rs_readers.push_back(std::move(rs_reader)); + } + + // Create output rowset writer + auto writer_context = create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, + {0, input_rowsets.back()->end_version()}); + auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, true); + ASSERT_TRUE(res.has_value()) << res.error(); + auto output_rs_writer = std::move(res).value(); + + // Create tablet - fresh tablet has no historical sampling data, + // so estimate_batch_size will hit the else branch and use footer raw_data_bytes. + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + Merger::Statistics stats; + RowIdConversion rowid_conversion; + stats.rowid_conversion = &rowid_conversion; + + // Verify sample_infos are empty (no historical data) + auto& sample_infos = tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION); + EXPECT_TRUE(sample_infos.empty()); + + // Run vertical merge - this should use footer raw_data_bytes for batch size estimation + // since there is no historical sampling data. + // The log should contain "estimate batch size from footer" instead of the old hardcoded path. + auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, + *tablet_schema, input_rs_readers, + output_rs_writer.get(), 100000, num_segments, &stats); + ASSERT_TRUE(s.ok()) << s; + + RowsetSharedPtr out_rowset; + ASSERT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); + EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), + num_input_rowset * num_segments * rows_per_segment); + + // After first compaction, sample_infos should be populated with historical data + // for subsequent compactions to use. + auto& updated_infos = tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION); + EXPECT_FALSE(updated_infos.empty()); +} + +// Test that raw_data_bytes in segment footer accurately reflects the original data size +// for different column types, which is the foundation of footer-based batch size estimation. +TEST_F(VerticalCompactionTest, TestFooterRawDataBytesAccuracy) { + // Create a schema with INT key + VARCHAR value to test both fixed and variable-length types + TabletSchemaSPtr tablet_schema = std::make_shared(); + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(DUP_KEYS); + tablet_schema_pb.set_num_short_key_columns(1); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(3); + + ColumnPB* col_int = tablet_schema_pb.add_column(); + col_int->set_unique_id(1); + col_int->set_name("c_int"); + col_int->set_type("INT"); + col_int->set_is_key(true); + col_int->set_length(4); + col_int->set_index_length(4); + col_int->set_is_nullable(false); + col_int->set_is_bf_column(false); + + ColumnPB* col_varchar = tablet_schema_pb.add_column(); + col_varchar->set_unique_id(2); + col_varchar->set_name("c_varchar"); + col_varchar->set_type("VARCHAR"); + col_varchar->set_is_key(false); + col_varchar->set_length(128); + col_varchar->set_index_length(20); + col_varchar->set_is_nullable(false); + col_varchar->set_is_bf_column(false); + + tablet_schema->init_from_pb(tablet_schema_pb); + + // Write 1000 rows: INT values + VARCHAR strings of exactly 20 bytes each + constexpr int kNumRows = 1000; + constexpr int kStringLen = 20; + std::string fixed_string(kStringLen, 'x'); + + auto writer_context = + create_rowset_writer_context(tablet_schema, NONOVERLAPPING, UINT32_MAX, {0, 0}); + auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, true); + ASSERT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + + Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int i = 0; i < kNumRows; i++) { + int32_t int_val = i; + columns[0]->insert_data(reinterpret_cast(&int_val), sizeof(int_val)); + columns[1]->insert_data(fixed_string.data(), fixed_string.size()); + } + ASSERT_TRUE(rowset_writer->add_block(&block).ok()); + ASSERT_TRUE(rowset_writer->flush().ok()); + + RowsetSharedPtr rowset; + ASSERT_EQ(Status::OK(), rowset_writer->build(rowset)); + ASSERT_EQ(1, rowset->rowset_meta()->num_segments()); + ASSERT_EQ(kNumRows, rowset->rowset_meta()->num_rows()); + + // Load segments and read footer's raw_data_bytes + auto beta_rowset = std::dynamic_pointer_cast(rowset); + ASSERT_NE(beta_rowset, nullptr); + std::vector segments; + ASSERT_TRUE(beta_rowset->load_segments(&segments).ok()); + ASSERT_EQ(1, segments.size()); + ASSERT_EQ(kNumRows, segments[0]->num_rows()); + + // Collect raw_data_bytes per column from footer + std::unordered_map raw_bytes_by_uid; + auto st = segments[0]->traverse_column_meta_pbs([&](const segment_v2::ColumnMetaPB& meta) { + if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) { + raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes(); + } + }); + ASSERT_TRUE(st.ok()) << st; + + // Verify INT column (uid=1): raw_data_bytes should be exactly kNumRows * sizeof(int32_t). + // PageBuilder::get_raw_data_size() accumulates raw data bytes added via add(), + // for fixed-width types this is exactly N * sizeof(T). + ASSERT_TRUE(raw_bytes_by_uid.count(1) > 0) << "INT column raw_data_bytes not found in footer"; + EXPECT_EQ(raw_bytes_by_uid[1], kNumRows * sizeof(int32_t)) + << "INT column: expected " << kNumRows * sizeof(int32_t) + << " total raw_data_bytes, got " << raw_bytes_by_uid[1]; + + // Verify VARCHAR column (uid=2): raw_data_bytes should be exactly kNumRows * kStringLen. + // BinaryPlainPageBuilder/BinaryDictPageBuilder only accumulate src->size (the raw string + // payload), not offsets, varint length prefixes, or dictionary overhead. + ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0) + << "VARCHAR column raw_data_bytes not found in footer"; + EXPECT_EQ(raw_bytes_by_uid[2], kNumRows * kStringLen) + << "VARCHAR column: expected " << kNumRows * kStringLen << " total raw_data_bytes, got " + << raw_bytes_by_uid[2]; +} + +// Verify that raw_data_bytes only counts non-null payload for nullable +// fixed-width columns. This is the premise that motivates the type_size +// lower bound in merger.cpp's footer-based per-row estimation: without it, +// a sparse nullable column would produce a per-row estimate far below the +// reader's actual memory footprint (which still allocates the full nested +// slot for null rows via ColumnNullable::insert_many_defaults). +TEST_F(VerticalCompactionTest, TestFooterRawDataBytesNullableSparse) { + TabletSchemaSPtr tablet_schema = std::make_shared(); + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(DUP_KEYS); + tablet_schema_pb.set_num_short_key_columns(1); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(3); + + ColumnPB* col_key = tablet_schema_pb.add_column(); + col_key->set_unique_id(1); + col_key->set_name("c_key"); + col_key->set_type("INT"); + col_key->set_is_key(true); + col_key->set_length(4); + col_key->set_index_length(4); + col_key->set_is_nullable(false); + col_key->set_is_bf_column(false); + + ColumnPB* col_val = tablet_schema_pb.add_column(); + col_val->set_unique_id(2); + col_val->set_name("c_val"); + col_val->set_type("INT"); + col_val->set_is_key(false); + col_val->set_length(4); + col_val->set_index_length(4); + col_val->set_is_nullable(true); + col_val->set_is_bf_column(false); + + tablet_schema->init_from_pb(tablet_schema_pb); + + constexpr int kNumRows = 1000; + constexpr int kNonNullCount = 100; // 10% non-null, 90% null + + auto writer_context = + create_rowset_writer_context(tablet_schema, NONOVERLAPPING, UINT32_MAX, {0, 0}); + auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, true); + ASSERT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + + Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int i = 0; i < kNumRows; i++) { + int32_t key_val = i; + columns[0]->insert_data(reinterpret_cast(&key_val), sizeof(key_val)); + if (i < kNonNullCount) { + int32_t val = i; + columns[1]->insert_data(reinterpret_cast(&val), sizeof(val)); + } else { + columns[1]->insert_default(); // ColumnNullable default is null + } + } + ASSERT_TRUE(rowset_writer->add_block(&block).ok()); + ASSERT_TRUE(rowset_writer->flush().ok()); + + RowsetSharedPtr rowset; + ASSERT_EQ(Status::OK(), rowset_writer->build(rowset)); + ASSERT_EQ(1, rowset->rowset_meta()->num_segments()); + + auto beta_rowset = std::dynamic_pointer_cast(rowset); + ASSERT_NE(beta_rowset, nullptr); + std::vector segments; + ASSERT_TRUE(beta_rowset->load_segments(&segments).ok()); + ASSERT_EQ(1, segments.size()); + + std::unordered_map raw_bytes_by_uid; + auto st = segments[0]->traverse_column_meta_pbs([&](const segment_v2::ColumnMetaPB& meta) { + if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) { + raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes(); + } + }); + ASSERT_TRUE(st.ok()) << st; + + // Key column (non-null INT): full coverage, raw == kNumRows * 4. + ASSERT_TRUE(raw_bytes_by_uid.count(1) > 0); + EXPECT_EQ(raw_bytes_by_uid[1], kNumRows * sizeof(int32_t)); + + // Value column (nullable INT, 90% null): raw_data_bytes only reflects + // the non-null payload because ScalarColumnWriter::append_nulls() does + // not advance the page builder. So raw == kNonNullCount * 4. + // + // If merger.cpp used `raw / total_rows` directly the per-row estimate + // would be ~0.4 bytes (+1 for null map = 1.4), but the reader actually + // allocates 4 bytes for every nested slot (regardless of null-ness) + // plus 1 byte of null map = 5 bytes/row. The fixed-width type_size + // lower bound is what closes that ~3.5x gap. + ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0); + EXPECT_EQ(raw_bytes_by_uid[2], kNonNullCount * sizeof(int32_t)); + EXPECT_LT(raw_bytes_by_uid[2], kNumRows * sizeof(int32_t)); +} + } // namespace doris