Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,46 @@ BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_rowset_plan_count(
BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_failures(
"recycler", "batch_delete_failures");

// Operation Log Recycler BVars
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_total_num(
"recycler_oplog_last_round_total_num", {"instance_id"});
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_not_recycled_num(
"recycler_oplog_last_round_not_recycled_num", {"instance_id"});
mBvarIntAdder g_bvar_recycler_oplog_recycle_failed_num(
"recycler_oplog_recycle_failed_num", {"instance_id"});
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_commit_partition_num(
"recycler_oplog_last_round_recycled_commit_partition_num", {"instance_id"});
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_drop_partition_num(
"recycler_oplog_last_round_recycled_drop_partition_num", {"instance_id"});
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_commit_index_num(
"recycler_oplog_last_round_recycled_commit_index_num", {"instance_id"});
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_drop_index_num(
"recycler_oplog_last_round_recycled_drop_index_num", {"instance_id"});
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_update_tablet_num(
"recycler_oplog_last_round_recycled_update_tablet_num", {"instance_id"});
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_compaction_num(
"recycler_oplog_last_round_recycled_compaction_num", {"instance_id"});
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_schema_change_num(
"recycler_oplog_last_round_recycled_schema_change_num", {"instance_id"});
mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_commit_txn_num(
"recycler_oplog_last_round_recycled_commit_txn_num", {"instance_id"});
mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_partition_num(
"recycler_oplog_recycled_commit_partition_num", {"instance_id"});
mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_partition_num(
"recycler_oplog_recycled_drop_partition_num", {"instance_id"});
mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_index_num(
"recycler_oplog_recycled_commit_index_num", {"instance_id"});
mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_index_num(
"recycler_oplog_recycled_drop_index_num", {"instance_id"});
mBvarIntAdder g_bvar_recycler_oplog_recycled_update_tablet_num(
"recycler_oplog_recycled_update_tablet_num", {"instance_id"});
mBvarIntAdder g_bvar_recycler_oplog_recycled_compaction_num(
"recycler_oplog_recycled_compaction_num", {"instance_id"});
mBvarIntAdder g_bvar_recycler_oplog_recycled_schema_change_num(
"recycler_oplog_recycled_schema_change_num", {"instance_id"});
mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_txn_num(
"recycler_oplog_recycled_commit_txn_num", {"instance_id"});

// txn_kv's bvars
bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
Expand Down
24 changes: 22 additions & 2 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,28 @@ extern BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_failures;
extern BvarStatusWithTag<int64_t> g_bvar_recycler_packed_file_bytes_object_deleted;
extern BvarStatusWithTag<int64_t> g_bvar_recycler_packed_file_rowset_scanned_num;

extern BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_rowset_plan_count;
extern BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_failures;
// Operation Log Recycler BVars
// Note: generic metrics (last_round_to_recycle_num/bytes, last_round_recycled_num/bytes, etc.)
// are reported by RecyclerMetricsContext with operation_type = "recycle_operation_logs".
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_total_num;
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_not_recycled_num;
extern mBvarIntAdder g_bvar_recycler_oplog_recycle_failed_num;
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_commit_partition_num;
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_drop_partition_num;
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_commit_index_num;
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_drop_index_num;
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_update_tablet_num;
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_compaction_num;
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_schema_change_num;
extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_commit_txn_num;
extern mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_partition_num;
extern mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_partition_num;
extern mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_index_num;
extern mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_index_num;
extern mBvarIntAdder g_bvar_recycler_oplog_recycled_update_tablet_num;
extern mBvarIntAdder g_bvar_recycler_oplog_recycled_compaction_num;
extern mBvarIntAdder g_bvar_recycler_oplog_recycled_schema_change_num;
extern mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_txn_num;

// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
Expand Down
40 changes: 40 additions & 0 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7298,6 +7298,46 @@ int InstanceRecycler::scan_and_statistics_restore_jobs() {
return ret;
}

void InstanceRecycler::scan_and_statistics_operation_logs() {
if (!should_recycle_versioned_keys()) {
return;
}

RecyclerMetricsContext metrics_context(instance_id_, "recycle_operation_logs");

OperationLogRecycleChecker recycle_checker(instance_id_, txn_kv_.get(), instance_info_);
if (recycle_checker.init() != 0) {
return;
}

std::string log_key_prefix = versioned::log_key(instance_id_);
std::string begin_key = encode_versioned_key(log_key_prefix, Versionstamp::min());
std::string end_key = encode_versioned_key(log_key_prefix, Versionstamp::max());

std::unique_ptr<BlobIterator> iter = blob_get_range(txn_kv_, begin_key, end_key);
for (; iter->valid(); iter->next()) {
OperationLogPB operation_log;
if (!iter->parse_value(&operation_log)) {
continue;
}

std::string_view key = iter->key();
Versionstamp log_versionstamp;
if (!decode_versioned_key(&key, &log_versionstamp)) {
continue;
}

OperationLogReferenceInfo ref_info;
if (recycle_checker.can_recycle(log_versionstamp, operation_log.min_timestamp(),
&ref_info)) {
metrics_context.total_need_recycle_num++;
metrics_context.total_need_recycle_data_size += operation_log.ByteSizeLong();
}
}

metrics_context.report(true);
}

int InstanceRecycler::classify_rowset_task_by_ref_count(
RowsetDeleteTask& task, std::vector<RowsetDeleteTask>& batch_delete_tasks) {
constexpr int MAX_RETRY = 10;
Expand Down
25 changes: 24 additions & 1 deletion cloud/src/recycler/recycler.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class SegmentRecyclerMetricsContext : public RecyclerMetricsContext {
: RecyclerMetricsContext("global_recycler", "recycle_segment") {}
};

struct OplogRecycleStats;

class InstanceRecycler {
public:
struct PackedFileRecycleStats {
Expand Down Expand Up @@ -401,6 +403,8 @@ class InstanceRecycler {

int scan_and_statistics_restore_jobs();

void scan_and_statistics_operation_logs();

/**
* Decode the key of a packed-file metadata record into the persisted object path.
*
Expand Down Expand Up @@ -488,7 +492,8 @@ class InstanceRecycler {
//
// Both `operation_log` and `raw_keys` will be removed in the same transaction, to ensure atomicity.
int recycle_operation_log(Versionstamp log_version, const std::vector<std::string>& raw_keys,
OperationLogPB operation_log);
OperationLogPB operation_log,
OplogRecycleStats* oplog_stats = nullptr);

// Recycle rowset meta and data, return 0 for success otherwise error
//
Expand Down Expand Up @@ -612,6 +617,24 @@ struct OperationLogReferenceInfo {
Versionstamp referenced_snapshot_timestamp;
};

struct OplogRecycleStats {
// Total oplog count scanned per round
std::atomic<int64_t> total_num {0};
// Oplogs not recycled this round (per round, written to mBvarStatus)
std::atomic<int64_t> not_recycled_num {0};
// Recycle failures (per round, accumulated to mBvarIntAdder at end)
std::atomic<int64_t> failed_num {0};
// Per-oplog-type recycled counts (incremented after successful commit)
std::atomic<int64_t> recycled_commit_partition {0};
std::atomic<int64_t> recycled_drop_partition {0};
std::atomic<int64_t> recycled_commit_index {0};
std::atomic<int64_t> recycled_drop_index {0};
std::atomic<int64_t> recycled_update_tablet {0};
std::atomic<int64_t> recycled_compaction {0};
std::atomic<int64_t> recycled_schema_change {0};
std::atomic<int64_t> recycled_commit_txn {0};
};

// Helper class to check if operation logs can be recycled based on snapshots and versionstamps
class OperationLogRecycleChecker {
public:
Expand Down
90 changes: 87 additions & 3 deletions cloud/src/recycler/recycler_operation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include <utility>
#include <vector>

#include "common/bvars.h"
#include "common/config.h"
#include "common/defer.h"
#include "common/encryption_util.h"
#include "common/logging.h"
Expand Down Expand Up @@ -651,6 +653,50 @@ static TxnErrorCode get_txn_info(TxnKv* txn_kv, std::string_view instance_id, in
return TxnErrorCode::TXN_OK;
}

static void report_oplog_recycle_stats(const std::string& instance_id,
const OplogRecycleStats& stats) {
g_bvar_recycler_oplog_last_round_total_num.put({instance_id}, stats.total_num.load());
g_bvar_recycler_oplog_last_round_not_recycled_num.put({instance_id},
stats.not_recycled_num.load());
if (stats.failed_num.load() > 0) {
g_bvar_recycler_oplog_recycle_failed_num.put({instance_id}, stats.failed_num.load());
}
// Per-type last round counts (mBvarStatus, overwritten each round)
g_bvar_recycler_oplog_last_round_recycled_commit_partition_num.put(
{instance_id}, stats.recycled_commit_partition.load());
g_bvar_recycler_oplog_last_round_recycled_drop_partition_num.put(
{instance_id}, stats.recycled_drop_partition.load());
g_bvar_recycler_oplog_last_round_recycled_commit_index_num.put(
{instance_id}, stats.recycled_commit_index.load());
g_bvar_recycler_oplog_last_round_recycled_drop_index_num.put({instance_id},
stats.recycled_drop_index.load());
g_bvar_recycler_oplog_last_round_recycled_update_tablet_num.put(
{instance_id}, stats.recycled_update_tablet.load());
g_bvar_recycler_oplog_last_round_recycled_compaction_num.put({instance_id},
stats.recycled_compaction.load());
g_bvar_recycler_oplog_last_round_recycled_schema_change_num.put(
{instance_id}, stats.recycled_schema_change.load());
g_bvar_recycler_oplog_last_round_recycled_commit_txn_num.put({instance_id},
stats.recycled_commit_txn.load());
// Per-type cumulative counts (mBvarIntAdder, accumulated across rounds)
g_bvar_recycler_oplog_recycled_commit_partition_num.put({instance_id},
stats.recycled_commit_partition.load());
g_bvar_recycler_oplog_recycled_drop_partition_num.put({instance_id},
stats.recycled_drop_partition.load());
g_bvar_recycler_oplog_recycled_commit_index_num.put({instance_id},
stats.recycled_commit_index.load());
g_bvar_recycler_oplog_recycled_drop_index_num.put({instance_id},
stats.recycled_drop_index.load());
g_bvar_recycler_oplog_recycled_update_tablet_num.put({instance_id},
stats.recycled_update_tablet.load());
g_bvar_recycler_oplog_recycled_compaction_num.put({instance_id},
stats.recycled_compaction.load());
g_bvar_recycler_oplog_recycled_schema_change_num.put({instance_id},
stats.recycled_schema_change.load());
g_bvar_recycler_oplog_recycled_commit_txn_num.put({instance_id},
stats.recycled_commit_txn.load());
}

int InstanceRecycler::recycle_operation_logs() {
if (!should_recycle_versioned_keys()) {
VLOG_DEBUG << "instance " << instance_id_
Expand All @@ -665,6 +711,18 @@ int InstanceRecycler::recycle_operation_logs() {
AnnotateTag tag("instance_id", instance_id_);
LOG_WARNING("begin to recycle operation logs");

const std::string task_name = "recycle_operation_logs";
RecyclerMetricsContext metrics_context(instance_id_, task_name);
OplogRecycleStats oplog_stats;

// scan_and_statistics_operation_logs() is expensive (scans lots of KVs),
// so it's controlled by enable_recycler_stats_metrics.
// The other stats (counting what was actually recycled) are lightweight
// and always collected.
if (config::enable_recycler_stats_metrics) {
scan_and_statistics_operation_logs();
}

StopWatch stop_watch;
size_t total_operation_logs = 0;
size_t recycled_operation_logs = 0;
Expand All @@ -673,6 +731,9 @@ int InstanceRecycler::recycle_operation_logs() {
size_t recycled_operation_log_data_size = 0;

DORIS_CLOUD_DEFER {
metrics_context.finish_report();
report_oplog_recycle_stats(instance_id_, oplog_stats);

int64_t cost = stop_watch.elapsed_us() / 1000'000;
LOG_WARNING("recycle operation logs, cost={}s", cost)
.tag("total_operation_logs", total_operation_logs)
Expand Down Expand Up @@ -706,16 +767,24 @@ int InstanceRecycler::recycle_operation_logs() {
OperationLogReferenceInfo reference_info;
if (recycle_checker.can_recycle(log_versionstamp, operation_log.min_timestamp(),
&reference_info)) {
metrics_context.total_need_recycle_num++;
metrics_context.total_need_recycle_data_size += value_size;

AnnotateTag tag("log_key", hex(key));
int res = recycle_operation_log(log_versionstamp, raw_keys, std::move(operation_log));
int res = recycle_operation_log(log_versionstamp, raw_keys, std::move(operation_log),
&oplog_stats);
if (res != 0) {
LOG_WARNING("failed to recycle operation log").tag("error_code", res);
oplog_stats.failed_num.fetch_add(1, std::memory_order_relaxed);
return res;
}

recycled_operation_logs++;
recycled_operation_log_data_size += value_size;
metrics_context.total_recycled_num++;
metrics_context.total_recycled_data_size += value_size;
} else {
oplog_stats.not_recycled_num.fetch_add(1, std::memory_order_relaxed);
int res = calculator.calculate_operation_log_data_size(key, operation_log,
reference_info);
if (res != 0) {
Expand All @@ -727,6 +796,8 @@ int InstanceRecycler::recycle_operation_logs() {
total_operation_logs++;
operation_log_data_size += value_size;
max_operation_log_data_size = std::max(max_operation_log_data_size, value_size);
oplog_stats.total_num.fetch_add(1, std::memory_order_relaxed);
metrics_context.report();
return 0;
};

Expand Down Expand Up @@ -801,8 +872,11 @@ int InstanceRecycler::recycle_operation_logs() {

int InstanceRecycler::recycle_operation_log(Versionstamp log_version,
const std::vector<std::string>& raw_keys,
OperationLogPB operation_log) {
OperationLogPB operation_log,
OplogRecycleStats* oplog_stats) {
int recycle_log_count = 0;
// Track which oplog type was recycled (only one per log entry)
std::atomic<int64_t>* recycled_counter = nullptr;
OperationLogRecycler log_recycler(instance_id_, txn_kv_.get(), log_version,
operation_log.min_timestamp(), raw_keys);
RETURN_ON_FAILURE(log_recycler.begin());
Expand All @@ -818,6 +892,9 @@ int InstanceRecycler::recycle_operation_log(Versionstamp log_version,
return res; \
} \
recycle_log_count++; \
if (oplog_stats) { \
recycled_counter = &oplog_stats->recycled_##log_type; \
} \
} \
} while (0)

Expand Down Expand Up @@ -859,6 +936,9 @@ int InstanceRecycler::recycle_operation_log(Versionstamp log_version,
}

recycle_log_count++;
if (oplog_stats) {
recycled_counter = &oplog_stats->recycled_commit_txn;
}
}

if (recycle_log_count > 1) {
Expand All @@ -869,7 +949,11 @@ int InstanceRecycler::recycle_operation_log(Versionstamp log_version,
return -1; // This is an unexpected condition, should not happen
}

return log_recycler.commit();
int ret = log_recycler.commit();
if (ret == 0 && recycled_counter) {
recycled_counter->fetch_add(1, std::memory_order_relaxed);
}
return ret;
}

} // namespace doris::cloud
Loading
Loading