Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
292e6c7
[docs](cloud) Add module design docs for Cloud MOW two-phase commit
bobhan1 Mar 18, 2026
447f037
[docs](cloud) Update MOW 2PC module docs: tablet lock, TOlapTableSche…
bobhan1 Mar 18, 2026
95a5e1d
[docs](cloud) Complete proto schema doc: TOlapTableSchemaParam, table…
bobhan1 Mar 18, 2026
34f5919
[docs](cloud) Fix remaining TxnLoadInfoPB -> load_schema_param refere…
bobhan1 Mar 18, 2026
fa555a3
[docs](cloud) Rename internal 2PC to async_publish to avoid confusion…
bobhan1 Mar 18, 2026
edda7db
[feature](cloud) Add Proto & KV Schema for Cloud MOW async publish
bobhan1 Mar 18, 2026
658ef1c
[feature](cloud) Implement async publish commit phase
bobhan1 Mar 18, 2026
9dad6a1
[feature](cloud) Implement MS convert_tmp_rowset RPC for per-tablet r…
bobhan1 Mar 19, 2026
dfabfc3
[feature](cloud) Implement MS lightweight publish RPC for async publish
bobhan1 Mar 19, 2026
b4aa5c7
[feature](cloud) Implement FE commit phase for MOW async publish
bobhan1 Mar 19, 2026
dcfd00e
[feature](cloud) Implement CloudPublishDaemon for MOW async publish
bobhan1 Mar 19, 2026
9b70bd3
[feature](cloud) Improve MOW async publish with retry and common util…
bobhan1 Mar 20, 2026
012bba6
tmp
bobhan1 Mar 20, 2026
e42081f
[feature](cloud) Add enable_mow_async_publish to tablet metadata
bobhan1 Mar 23, 2026
7057da3
[feature](cloud) Pass is_mow_async_publish flag in update_delete_bitm…
bobhan1 Mar 23, 2026
ea6d342
[feature](cloud) Skip pending delete bitmap for async publish tables …
bobhan1 Mar 23, 2026
049c4cb
[feature](cloud) Implement tablet-level lock and lock mechanism refac…
bobhan1 Mar 23, 2026
8f6428d
add desgin =.md
bobhan1 Mar 23, 2026
76f908a
[feature](cloud) Fix lock mechanism for async publish tables
bobhan1 Mar 23, 2026
5853340
fix
bobhan1 Mar 23, 2026
a6bc50e
[feature](cloud) Move MS tablet lock acquisition before delete bitmap…
bobhan1 Mar 23, 2026
7d7be42
[fix](fe) Fix cloud async publish FE wiring
bobhan1 Mar 23, 2026
4265fa9
[feature](cloud) Add FE support for enable_mow_async_publish property
bobhan1 Mar 24, 2026
bb994e4
[feature](cloud) Implement async publish local apply with best-effort…
bobhan1 Mar 24, 2026
fc7fa25
[config](cloud) Change default cloud_publish_interval_ms from 100ms t…
bobhan1 Mar 24, 2026
7c89850
[refactor](cloud) Separate async publish delete bitmap calc into new …
bobhan1 Mar 24, 2026
d485aac
[fix](be) Simplify cloud async publish bitmap handling
bobhan1 Mar 25, 2026
6f7568d
[fix](be) Retry cloud publish on discontinuous versions
bobhan1 Mar 25, 2026
918f7aa
[fix](cloud) Restore async publish tablet lock stats checks
bobhan1 Mar 25, 2026
e6e8109
[fix](cloud) Skip partition version check for async publish in update…
bobhan1 Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_
"CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
[&engine](auto&& task) { return calc_delete_bitmap_callback(engine, task); });

_workers[TTaskType::CALC_DELETE_BITMAP_ASYNC_PUBLISH] =
std::make_unique<CloudCalcDeleteBitmapAsyncPublishWorkerPool>(engine);

// cloud, drop tablet just clean clear_cache, so just one thread do it
_workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>(
"DROP_TABLE", 1, [&engine](auto&& task) { return drop_tablet_callback(engine, task); });
Expand Down
76 changes: 76 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "agent/utils.h"
#include "cloud/cloud_delete_task.h"
#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
#include "cloud/cloud_calc_delete_bitmap_async_publish_task.h"
#include "cloud/cloud_schema_change_job.h"
#include "cloud/cloud_snapshot_loader.h"
#include "cloud/cloud_snapshot_mgr.h"
Expand Down Expand Up @@ -456,6 +457,8 @@ bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRA
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");
bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP");
bvar::Adder<uint64_t> CALC_DELETE_BITMAP_ASYNC_PUBLISH_count("task",
"CALC_DELETE_BITMAP_ASYNC_PUBLISH");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
Expand Down Expand Up @@ -485,6 +488,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP)
ADD_TASK_COUNT(CALC_DELETE_BITMAP_ASYNC_PUBLISH)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down Expand Up @@ -2160,6 +2164,78 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
remove_task_info(req.task_type, req.signature);
}

CloudCalcDeleteBitmapAsyncPublishWorkerPool::CloudCalcDeleteBitmapAsyncPublishWorkerPool(
CloudStorageEngine& engine)
: TaskWorkerPool("CALC_DBM_ASYNC_PUB", config::calc_delete_bitmap_worker_count,
[this](const TAgentTaskRequest& task) {
calc_delete_bitmap_async_publish_callback(task);
}),
_engine(engine) {}

CloudCalcDeleteBitmapAsyncPublishWorkerPool::~CloudCalcDeleteBitmapAsyncPublishWorkerPool() =
default;

void CloudCalcDeleteBitmapAsyncPublishWorkerPool::calc_delete_bitmap_async_publish_callback(
const TAgentTaskRequest& req) {
std::vector<TTabletId> error_tablet_ids;
std::vector<TTabletId> succ_tablet_ids;
const auto& async_publish_req = req.calc_delete_bitmap_async_publish_req;
if (req.signature != async_publish_req.transaction_id) {
LOG_INFO("begin to execute calc delete bitmap async publish task")
.tag("signature", req.signature)
.tag("transaction_id", async_publish_req.transaction_id);
}

CloudCalcDeleteBitmapAsyncPublishTask engine_task(
_engine, async_publish_req, &error_tablet_ids, &succ_tablet_ids);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
Status status = engine_task.execute();
if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
LOG_EVERY_SECOND(INFO) << "wait for previous cloud publish task to be done, "
<< "transaction_id: " << async_publish_req.transaction_id;

int64_t enqueue_time = req.__isset.recv_time ? req.recv_time : time(nullptr);
int64_t time_elapsed = time(nullptr) - enqueue_time;
if (time_elapsed > config::publish_version_task_timeout_s) {
LOG(INFO) << "calc delete bitmap async publish task elapsed " << time_elapsed
<< " seconds since it is inserted to queue, it is timeout";
} else {
CALC_DELETE_BITMAP_ASYNC_PUBLISH_count << 1;
auto st = _thread_pool->submit_func([this, req] {
this->calc_delete_bitmap_async_publish_callback(req);
CALC_DELETE_BITMAP_ASYNC_PUBLISH_count << -1;
});
if (!st.ok()) [[unlikely]] {
CALC_DELETE_BITMAP_ASYNC_PUBLISH_count << -1;
status = std::move(st);
} else {
return;
}
}
}

TFinishTaskRequest finish_task_request;
if (!status) {
DorisMetrics::instance()->publish_task_failed_total->increment(1);
LOG_WARNING("failed to calculate delete bitmap for async publish")
.tag("signature", req.signature)
.tag("transaction_id", async_publish_req.transaction_id)
.tag("error_tablets_num", error_tablet_ids.size())
.error(status);
}

status.to_thrift(&finish_task_request.task_status);
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(req.task_type);
finish_task_request.__set_signature(req.signature);
finish_task_request.__set_report_version(s_report_version);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
finish_task_request.__set_resp_partitions(async_publish_req.partitions);

finish_task(finish_task_request);
remove_task_info(req.task_type, req.signature);
}

void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const auto& clear_transaction_task_req = req.clear_transaction_task_req;
LOG(INFO) << "get clear transaction task. signature=" << req.signature
Expand Down
12 changes: 12 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ class PublishVersionWorkerPool final : public TaskWorkerPool {
StorageEngine& _engine;
};

class CloudCalcDeleteBitmapAsyncPublishWorkerPool final : public TaskWorkerPool {
public:
CloudCalcDeleteBitmapAsyncPublishWorkerPool(CloudStorageEngine& engine);

~CloudCalcDeleteBitmapAsyncPublishWorkerPool() override;

private:
void calc_delete_bitmap_async_publish_callback(const TAgentTaskRequest& task);

CloudStorageEngine& _engine;
};

class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
public:
PriorTaskWorkerPool(const std::string& name, int normal_worker_count,
Expand Down
14 changes: 14 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,15 @@ Status CloudBaseCompaction::execute_compact() {
}

Status CloudBaseCompaction::modify_rowsets() {
bool hold_delete_bitmap_and_rowset_layout_lock =
_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() && _tablet->enable_mow_async_publish();
std::unique_lock<std::mutex> delete_bitmap_and_rowset_layout_lock(
cloud_tablet()->get_delete_bitmap_and_rowset_layout_lock(), std::defer_lock);
if (hold_delete_bitmap_and_rowset_layout_lock) {
delete_bitmap_and_rowset_layout_lock.lock();
}

// commit compaction job
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
Expand Down Expand Up @@ -387,6 +396,7 @@ Status CloudBaseCompaction::modify_rowsets() {
.tag("input_segments", _input_segments)
.tag("num_output_delete_bitmap", output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
compaction_job->set_use_delete_bitmap_tablet_lock(_tablet->enable_mow_async_publish());
}

cloud::FinishTabletJobResponse resp;
Expand Down Expand Up @@ -447,6 +457,9 @@ Status CloudBaseCompaction::modify_rowsets() {
stats.num_rows(), stats.data_size());
}
}
if (delete_bitmap_and_rowset_layout_lock.owns_lock()) {
delete_bitmap_and_rowset_layout_lock.unlock();
}
_tablet->prefill_dbm_agg_cache_after_compaction(_output_rowset);
return Status::OK();
}
Expand All @@ -467,6 +480,7 @@ Status CloudBaseCompaction::garbage_collection() {
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
compaction_job->set_use_delete_bitmap_tablet_lock(_tablet->enable_mow_async_publish());
}
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
Expand Down
Loading
Loading