Skip to content
2 changes: 1 addition & 1 deletion be/src/storage/iterator/vertical_merge_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class VerticalMergeIteratorContext {

size_t bytes() {
if (_block) {
return _block->bytes();
return _block->allocated_bytes();
} else {
return 0;
}
Expand Down
142 changes: 139 additions & 3 deletions be/src/storage/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <ostream>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

Expand All @@ -45,16 +46,19 @@
#include "storage/olap_common.h"
#include "storage/olap_define.h"
#include "storage/rowid_conversion.h"
#include "storage/rowset/beta_rowset.h"
#include "storage/rowset/rowset.h"
#include "storage/rowset/rowset_meta.h"
#include "storage/rowset/rowset_writer.h"
#include "storage/segment/segment.h"
#include "storage/segment/segment_writer.h"
#include "storage/storage_engine.h"
#include "storage/tablet/base_tablet.h"
#include "storage/tablet/tablet.h"
#include "storage/tablet/tablet_fwd.h"
#include "storage/tablet/tablet_meta.h"
#include "storage/tablet/tablet_reader.h"
#include "storage/types.h"
#include "storage/utils.h"
#include "util/slice.h"

Expand Down Expand Up @@ -412,7 +416,8 @@ Status Merger::vertical_compact_one_group(
}

int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt,
ReaderType reader_type) {
ReaderType reader_type, int64_t group_per_row_from_footer,
bool footer_fallback) {
auto& sample_info_lock = tablet->get_sample_info_lock(reader_type);
auto& sample_infos = tablet->get_sample_infos(reader_type);
std::unique_lock<std::mutex> lock(sample_info_lock);
Expand Down Expand Up @@ -440,9 +445,21 @@ int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_
group_data_size = info.bytes / info.rows;
sample_infos[group_index].group_data_size = group_data_size;
} else {
// No historical sampling data available.
// Try to use raw_data_bytes from segment footer for a better estimate.
if (!footer_fallback && group_per_row_from_footer > 0) {
int64_t batch_size = block_mem_limit / group_per_row_from_footer;
int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), int64_t(32L));
LOG(INFO) << "estimate batch size from footer for vertical compaction, tablet id: "
<< tablet->tablet_id()
<< " group_per_row_from_footer: " << group_per_row_from_footer
<< " way cnt: " << way_cnt << " batch size: " << res;
return res;
}
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " group data size: " << info.group_data_size
<< " row num: " << info.rows << " consume bytes: " << info.bytes;
<< " row num: " << info.rows << " consume bytes: " << info.bytes
<< " footer_fallback: " << footer_fallback;
return 1024 - 32;
}

Expand Down Expand Up @@ -548,13 +565,132 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
std::unique_lock<std::mutex> lock(sample_info_lock);
sample_infos.resize(column_groups.size());
}
// Collect per-column raw_data_bytes from segment footer for first-time batch size estimation.
// raw_data_bytes is the original data size before encoding, close to runtime Block::bytes().
// Only collect when needed: skip if manual batch_size override is set, or if ALL groups
// already have historical sampling data. Use per-group granularity so that schema evolution
// (new groups without history) still gets footer-based estimation.
struct ColumnRawSizeInfo {
int64_t total_raw_bytes = 0;
int64_t rows_with_data = 0;
};
std::unordered_map<int32_t, ColumnRawSizeInfo> column_raw_sizes;
bool need_footer_collection = false;
if (config::compaction_batch_size == -1) {
std::unique_lock<std::mutex> lock(sample_info_lock);
for (const auto& info : sample_infos) {
if (info.group_data_size <= 0 && info.bytes <= 0 && info.rows <= 0) {
need_footer_collection = true;
break;
}
}
}
if (need_footer_collection) {
for (const auto& rs_reader : src_rowset_readers) {
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rs_reader->rowset());
if (!beta_rowset) {
continue;
}
std::vector<segment_v2::SegmentSharedPtr> segments;
auto st = beta_rowset->load_segments(&segments);
if (!st.ok()) {
LOG(WARNING) << "Failed to load segments for footer raw_data_bytes collection"
<< ", tablet_id: " << tablet->tablet_id()
<< ", rowset_id: " << beta_rowset->rowset_id() << ", status: " << st;
continue;
}
for (const auto& segment : segments) {
int64_t row_count = segment->num_rows();
Comment thread
Yukang-Lian marked this conversation as resolved.
auto collect_st = segment->traverse_column_meta_pbs(
[&](const segment_v2::ColumnMetaPB& meta) {
int32_t uid = meta.unique_id();
if (uid >= 0 && meta.has_raw_data_bytes()) {
auto& info = column_raw_sizes[uid];
info.total_raw_bytes += meta.raw_data_bytes();
info.rows_with_data += row_count;
}
});
if (!collect_st.ok()) {
LOG(WARNING) << "Failed to traverse column meta for footer collection"
<< ", tablet_id: " << tablet->tablet_id()
<< ", status: " << collect_st;
}
}
}
}

// Pre-compute per-row estimate for each column group from footer data.
std::vector<int64_t> group_per_row_from_footer(column_groups.size(), 0);
std::vector<bool> group_footer_fallback(column_groups.size(), false);
for (size_t i = 0; i < column_groups.size(); ++i) {
int64_t group_per_row = 0;
bool need_fallback = false;
for (uint32_t col_ordinal : column_groups[i]) {
const auto& col = tablet_schema.column(col_ordinal);
int32_t uid = col.unique_id();

// Variant columns (root or subcolumn): raw_data_bytes is 0 (TODO in writer),
// cannot estimate from footer, fallback to default for the entire group.
if (uid < 0 || col.is_variant_type()) {
need_fallback = true;
break;
}

// Any column without footer data (e.g. legacy segments written before
// raw_data_bytes existed) makes the group sample partial and unreliable.
// Fall back to the default for the whole group instead of summing only
// the columns we measured.
auto it = column_raw_sizes.find(uid);
if (it == column_raw_sizes.end() || it->second.rows_with_data <= 0) {
need_fallback = true;
break;
}

int64_t raw_per_row = it->second.total_raw_bytes / it->second.rows_with_data;
int64_t col_per_row = 0;

if (col.type() == FieldType::OLAP_FIELD_TYPE_ARRAY ||
col.type() == FieldType::OLAP_FIELD_TYPE_MAP ||
col.type() == FieldType::OLAP_FIELD_TYPE_STRUCT) {
// Complex types: raw_data_bytes recursively aggregates sub-writers.
col_per_row = raw_per_row;
} else if (col.is_length_variable_type()) {
// Variable-length scalar (VARCHAR/STRING/HLL/BITMAP/...): raw_per_row
// is the average char payload across all rows; reader still pays an
// 8-byte offset entry per row regardless of null-ness.
col_per_row = raw_per_row + 8;
if (col.is_nullable()) {
col_per_row += 1; // null map
}
} else {
// Fixed-width scalar (INT/BIGINT/DOUBLE/DATE/...).
// raw_data_bytes only counts non-null payload (append_nulls() does
// not advance the page builder), but FileColumnIterator::next_batch
// still calls ColumnNullable::insert_many_defaults() for null runs,
// which grows the nested PODArray by N * type_size. So the runtime
// per-row footprint is at least type_size, no matter how sparse.
int64_t type_size = get_type_info(&col)->size();
col_per_row = std::max(raw_per_row, type_size);
if (col.is_nullable()) {
col_per_row += 1; // null map
}
}

group_per_row += col_per_row;
}
group_per_row_from_footer[i] = group_per_row;
group_footer_fallback[i] = need_fallback;
}

// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
int64_t batch_size = config::compaction_batch_size != -1
? config::compaction_batch_size
: estimate_batch_size(i, tablet, merge_way_num, reader_type);
: estimate_batch_size(i, tablet, merge_way_num, reader_type,
group_per_row_from_footer[i],
group_footer_fallback[i]);
CompactionSampleInfo sample_info;
Merger::Statistics group_stats;
group_stats.rowid_conversion = total_stats.rowid_conversion;
Expand Down
Loading
Loading