Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6107aca
add table level event driven design doc
bobhan1 Apr 13, 2026
3c8e8c9
feat(warmup): add table-level event-driven warmup with ON TABLES clause
bobhan1 Apr 10, 2026
df72bc6
[doc](cloud) Remove warmup design alternatives section
bobhan1 Apr 15, 2026
6b299b5
add observation design doc
bobhan1 Apr 15, 2026
55509d0
[regression-test] Add Docker regression tests for table-level event-d…
bobhan1 Apr 16, 2026
2f902ae
refactor: extract WarmupMetricsUtils and move on_tables tests to dedi…
bobhan1 Apr 16, 2026
0dad051
docs: add test documentation for table-level event-driven warmup feature
bobhan1 Apr 16, 2026
3195b24
test: add multi-destination warmup regression test and update doc
bobhan1 Apr 16, 2026
3e6b5c2
docs: update warmup progress observation design doc
bobhan1 Apr 16, 2026
8a57970
docs: FE collection uses parallel HTTP requests
bobhan1 Apr 16, 2026
d4fa661
Design doc: add (job_id, table_id) dimension, fix gap formula, persis…
bobhan1 Apr 16, 2026
2b5a8c2
Design doc: JobReplicaInfo struct, inline per-(job,table) metrics nex…
bobhan1 Apr 16, 2026
e66914e
[Feature] Add warmup progress observation for event-driven warmup
bobhan1 Apr 16, 2026
7f99dcb
[Feature] Simplify warmup metric dimensions: remove table_id, keep jo…
bobhan1 Apr 17, 2026
77627b1
[Docs] Update stale comments and design doc after table_id dimension …
bobhan1 Apr 17, 2026
b19dd0a
[Feature] Change warmup stats collection from periodic daemon to on-d…
bobhan1 Apr 17, 2026
7af26d5
[Test] Add unit tests for warmup progress observation and fix WINDOW_…
bobhan1 Apr 17, 2026
f3d9e8b
[Refactor] Use ByteSizeValue for humanReadableSize, rename 2h window …
bobhan1 Apr 17, 2026
560f8d3
[Docs] Add warmup progress observation unit tests to test document
bobhan1 Apr 17, 2026
5bd5c9a
[Test] Add Docker regression test for SHOW WARM UP JOB SyncStats column
bobhan1 Apr 17, 2026
f545000
[Docs] Add SyncStats documentation to user guide and test doc
bobhan1 Apr 17, 2026
26e5d91
format
bobhan1 Apr 17, 2026
7c958d2
feat(warmup): add JobType.TABLES for ON TABLES event-driven jobs
bobhan1 Apr 17, 2026
354ab9d
docs(warmup): add SyncStats column to all SHOW WARM UP JOB example ou…
bobhan1 Apr 17, 2026
2bd59c0
fix(warmup): add Auth-Token header to BE warmup stats HTTP requests
bobhan1 Apr 20, 2026
199c357
feat(warmup): add async materialized view (MTMV) support tests and docs
bobhan1 Apr 20, 2026
a469652
docs(warmup): add warmup progress observation user guide
bobhan1 Apr 20, 2026
a39b762
perf(warmup): optimize resolveTableIds to iterate names not tables; a…
bobhan1 Apr 20, 2026
4e48ce6
fix wrongly modified conf
bobhan1 Apr 21, 2026
227abd1
[fix](cloud) Refine warm up on tables filtering
bobhan1 Apr 21, 2026
d10a47e
[fix](cloud) Aggregate warm up rowset failures
bobhan1 Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("request_type", "SET_JOB")
.tag("job_id", request.job_id);
if (request.__isset.event) {
st = manager.set_event(request.job_id, request.event);
const std::vector<int64_t>* table_ids_ptr = nullptr;
if (request.__isset.table_ids) {
table_ids_ptr = &request.table_ids;
}
st = manager.set_event(request.job_id, request.event, false, table_ids_ptr);
if (st.ok()) {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
return st;
}

st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");
st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "", nullptr, tablet->table_id());
if (!st.ok()) {
LOG(WARNING) << "failed to commit rowset, status=" << st.to_string();
return st;
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ Status CloudDeltaWriter::commit_rowset() {
}

// Handle normal rowset with data
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "", nullptr,
rowset_builder()->tablet()->table_id());
}

Status CloudDeltaWriter::_commit_empty_rowset() {
Expand All @@ -138,7 +139,8 @@ Status CloudDeltaWriter::_commit_empty_rowset() {
return Status::OK();
}
// write a empty rowset kv to keep version continuous
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "", nullptr,
rowset_builder()->tablet()->table_id());
}

Status CloudDeltaWriter::set_txn_related_info() {
Expand Down
67 changes: 60 additions & 7 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/async_io.h"
#include "util/bvar_windowed_adder.h"
#include "util/debug_points.h"

namespace doris {
Expand Down Expand Up @@ -390,10 +391,36 @@ bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_num(
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num(
"file_cache_warm_up_rowset_wait_for_compaction_timeout_num");

// Per-job windowed metrics for target BE
// bvar::Window enforces MAX_SECONDS_LIMIT = 3600, so the longest window is 1h.
static constexpr int WINDOW_5M = 300;
static constexpr int WINDOW_30M = 1800;
static constexpr int WINDOW_1H = 3600;

MBvarWindowedAdder g_warmup_ed_finish_segment_num("warmup_ed_finish_segment_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H});
MBvarWindowedAdder g_warmup_ed_finish_segment_size("warmup_ed_finish_segment_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H});
MBvarWindowedAdder g_warmup_ed_finish_index_num("warmup_ed_finish_index_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H});
MBvarWindowedAdder g_warmup_ed_finish_index_size("warmup_ed_finish_index_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H});
MBvarWindowedAdder g_warmup_ed_fail_segment_num("warmup_ed_fail_segment_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H});
MBvarWindowedAdder g_warmup_ed_fail_segment_size("warmup_ed_fail_segment_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H});
MBvarWindowedAdder g_warmup_ed_fail_index_num("warmup_ed_fail_index_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H});
MBvarWindowedAdder g_warmup_ed_fail_index_size("warmup_ed_fail_index_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H});
bvar::MultiDimension<bvar::Status<int64_t>> g_warmup_ed_last_finish_ts("warmup_ed_last_finish_ts",
{"job_id"});

void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId& rowset_id,
int64_t segment_id, std::shared_ptr<CloudTablet> tablet,
std::shared_ptr<bthread::CountdownEvent> wait, Version version,
int64_t segment_size, int64_t request_ts, int64_t handle_ts) {
int64_t segment_size, int64_t request_ts, int64_t handle_ts,
std::string job_id_str) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
Expand All @@ -410,11 +437,19 @@ void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId&
});
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
g_warmup_ed_finish_segment_num.put({job_id_str}, 1);
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
g_warmup_ed_finish_segment_size.put({job_id_str}, segment_size);
int64_t now_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
auto* finish_ts = g_warmup_ed_last_finish_ts.get_stats(std::list<std::string> {job_id_str});
if (finish_ts) {
finish_ts->set_value(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
}
g_file_cache_warm_up_rowset_latency << (now_ts - request_ts);
g_file_cache_warm_up_rowset_handle_to_finish_latency << (now_ts - handle_ts);
if (request_ts > 0 && now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) {
Expand All @@ -431,7 +466,9 @@ void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId&
}
} else {
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
g_warmup_ed_fail_segment_num.put({job_id_str}, 1);
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
g_warmup_ed_fail_segment_size.put({job_id_str}, segment_size);
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
Expand All @@ -451,7 +488,7 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
std::shared_ptr<CloudTablet> tablet,
std::shared_ptr<bthread::CountdownEvent> wait,
Version version, uint64_t idx_size, int64_t request_ts,
int64_t handle_ts) {
int64_t handle_ts, std::string job_id_str) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO(
Expand All @@ -462,11 +499,19 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
});
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_index_num << 1;
g_warmup_ed_finish_index_num.put({job_id_str}, 1);
g_file_cache_event_driven_warm_up_finished_index_size << idx_size;
g_warmup_ed_finish_index_size.put({job_id_str}, static_cast<int64_t>(idx_size));
int64_t now_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
auto* finish_ts = g_warmup_ed_last_finish_ts.get_stats(std::list<std::string> {job_id_str});
if (finish_ts) {
finish_ts->set_value(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
}
g_file_cache_warm_up_rowset_latency << (now_ts - request_ts);
g_file_cache_warm_up_rowset_handle_to_finish_latency << (now_ts - handle_ts);
if (request_ts > 0 && now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) {
Expand All @@ -483,7 +528,9 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
}
} else {
g_file_cache_event_driven_warm_up_failed_index_num << 1;
g_warmup_ed_fail_index_num.put({job_id_str}, 1);
g_file_cache_event_driven_warm_up_failed_index_size << idx_size;
g_warmup_ed_fail_index_size.put({job_id_str}, static_cast<int64_t>(idx_size));
LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
}
Expand Down Expand Up @@ -513,6 +560,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
due_time = butil::milliseconds_from_now(request->sync_wait_timeout_ms());
}

// Extract job_id from request (0 if not set, for backward compatibility)
std::string job_id_str = std::to_string(request->has_job_id() ? request->job_id() : 0);

for (auto& rs_meta_pb : request->rowset_metas()) {
RowsetMeta rs_meta;
rs_meta.init_from_pb(rs_meta_pb);
Expand Down Expand Up @@ -578,9 +628,10 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.is_warmup = true},
.download_done =
[=, version = rs_meta.version()](Status st) {
handle_segment_download_done(
st, tablet_id, rowset_id, segment_id, tablet, wait,
version, segment_size, request_ts, handle_ts);
handle_segment_download_done(st, tablet_id, rowset_id,
segment_id, tablet, wait, version,
segment_size, request_ts,
handle_ts, job_id_str);
},
.tablet_id = tablet_id};

Expand All @@ -594,7 +645,8 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}

// Use rs_meta.fs() to support packed files for inverted index download.
auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) {
auto download_inverted_index = [&, tablet, job_id_str](std::string index_path,
uint64_t idx_size) {
io::DownloadFileMeta download_meta {
.path = io::Path(index_path),
.file_size = static_cast<int64_t>(idx_size),
Expand All @@ -607,7 +659,8 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
[=, version = rs_meta.version()](Status st) {
handle_inverted_index_download_done(
st, tablet_id, rowset_id, segment_id, index_path,
tablet, wait, version, idx_size, request_ts, handle_ts);
tablet, wait, version, idx_size, request_ts, handle_ts,
job_id_str);
},
.tablet_id = tablet_id};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string
}

Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
RowsetMetaSharedPtr* existed_rs_meta, int64_t table_id) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
{
Expand Down Expand Up @@ -1379,7 +1379,7 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_i
<< ", with timeout: " << timeout_ms << " ms";
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.warm_up_rowset(rs_meta, timeout_ms);
manager.warm_up_rowset(rs_meta, table_id, timeout_ms);
return st;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class CloudMetaMgr {
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr,
int64_t table_id = 0);
void cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t expiration_time);

Status update_tmp_rowset(const RowsetMeta& rs_meta);
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
st.to_string());
}

st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), _job_id,
&existed_rs_meta);
st = _cloud_storage_engine.meta_mgr().commit_rowset(
*rowset_writer->rowset_meta(), _job_id, &existed_rs_meta, _new_tablet->table_id());
if (!st.ok()) {
if (st.is<ALREADY_EXIST>()) {
LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet "
Expand Down
Loading
Loading