From 8cf9b7f07a49c201adb1d52365ba257ceaa281d8 Mon Sep 17 00:00:00 2001 From: w41ter Date: Wed, 27 Aug 2025 11:03:23 +0000 Subject: [PATCH] [improve](cloud) Report read/write conflict range --- cloud/src/meta-store/txn_kv.cpp | 128 +++++++++++++++++++++++++++++++- cloud/src/meta-store/txn_kv.h | 4 + 2 files changed, 131 insertions(+), 1 deletion(-) diff --git a/cloud/src/meta-store/txn_kv.cpp b/cloud/src/meta-store/txn_kv.cpp index 497f5c57a7022d..6623fedb5dc712 100644 --- a/cloud/src/meta-store/txn_kv.cpp +++ b/cloud/src/meta-store/txn_kv.cpp @@ -944,6 +944,94 @@ TxnErrorCode Transaction::get_conflicting_range( return TxnErrorCode::TXN_OK; } +TxnErrorCode Transaction::get_read_conflict_range( + std::vector>* values) { + constexpr std::string_view start = "\xff\xff/transaction/read_conflict_range/"; + constexpr std::string_view end = "\xff\xff/transaction/read_conflict_range/\xff"; + + int limit = 0; + int target_bytes = 0; + FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL; + int iteration = 0; + fdb_bool_t snapshot = 0; + fdb_bool_t reverse = 0; + FDBFuture* future = fdb_transaction_get_range( + txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()), + FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit, + target_bytes, mode, iteration, snapshot, reverse); + + DORIS_CLOUD_DEFER { + fdb_future_destroy(future); + }; + + RETURN_IF_ERROR(await_future(future)); + + FDBKeyValue const* out_kvs; + int out_kvs_count; + fdb_bool_t out_more; + do { + fdb_error_t err = + fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more); + if (err) { + LOG(WARNING) << "get_conflicting_range get keyvalue array error: " + << fdb_get_error(err); + return cast_as_txn_code(err); + } + for (int i = 0; i < out_kvs_count; i++) { + std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length); + std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length); + key.remove_prefix(start.size()); + values->emplace_back(key, value); + } + } while (out_more); + + return TxnErrorCode::TXN_OK; +} + +TxnErrorCode Transaction::get_write_conflict_range( + std::vector>* values) { + constexpr std::string_view start = "\xff\xff/transaction/write_conflict_range/"; + constexpr std::string_view end = "\xff\xff/transaction/write_conflict_range/\xff"; + + int limit = 0; + int target_bytes = 0; + FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL; + int iteration = 0; + fdb_bool_t snapshot = 0; + fdb_bool_t reverse = 0; + FDBFuture* future = fdb_transaction_get_range( + txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()), + FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit, + target_bytes, mode, iteration, snapshot, reverse); + + DORIS_CLOUD_DEFER { + fdb_future_destroy(future); + }; + + RETURN_IF_ERROR(await_future(future)); + + FDBKeyValue const* out_kvs; + int out_kvs_count; + fdb_bool_t out_more; + do { + fdb_error_t err = + fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more); + if (err) { + LOG(WARNING) << "get_conflicting_range get keyvalue array error: " + << fdb_get_error(err); + return cast_as_txn_code(err); + } + for (int i = 0; i < out_kvs_count; i++) { + std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length); + std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length); + key.remove_prefix(start.size()); + values->emplace_back(key, value); + } + } while (out_more); + + return TxnErrorCode::TXN_OK; +} + TxnErrorCode Transaction::report_conflicting_range() { if (!config::enable_logging_conflict_keys) { return TxnErrorCode::TXN_OK; @@ -969,7 +1057,45 @@ TxnErrorCode Transaction::report_conflicting_range() { out += fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count); } - LOG(WARNING) << "conflicting key ranges: " << out; + key_values.clear(); + RETURN_IF_ERROR(get_read_conflict_range(&key_values)); + if (key_values.size() % 2 != 0) { + LOG(WARNING) << "the read conflict range is not well-formed, size=" << key_values.size(); + return TxnErrorCode::TXN_INVALID_DATA; + } + std::string read_conflict_range_out; + for (size_t i = 0; i < key_values.size(); i += 2) { + std::string_view start = key_values[i].first; + std::string_view end = key_values[i + 1].first; + std::string_view conflict_count = key_values[i].second; + if (!read_conflict_range_out.empty()) { + read_conflict_range_out += ", "; + } + read_conflict_range_out += + fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count); + } + + key_values.clear(); + RETURN_IF_ERROR(get_write_conflict_range(&key_values)); + if (key_values.size() % 2 != 0) { + LOG(WARNING) << "the write conflict range is not well-formed, size=" << key_values.size(); + return TxnErrorCode::TXN_INVALID_DATA; + } + std::string write_conflict_range_out; + for (size_t i = 0; i < key_values.size(); i += 2) { + std::string_view start = key_values[i].first; + std::string_view end = key_values[i + 1].first; + std::string_view conflict_count = key_values[i].second; + if (!write_conflict_range_out.empty()) { + write_conflict_range_out += ", "; + } + write_conflict_range_out += + fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count); + } + + LOG(WARNING) << "conflicting key ranges: " << out + << ", read conflict range: " << read_conflict_range_out + << ", write conflict range: " << write_conflict_range_out; return TxnErrorCode::TXN_OK; } diff --git a/cloud/src/meta-store/txn_kv.h b/cloud/src/meta-store/txn_kv.h index 200ea1e776873b..ab97a1b4c97252 100644 --- a/cloud/src/meta-store/txn_kv.h +++ b/cloud/src/meta-store/txn_kv.h @@ -841,6 +841,10 @@ class Transaction : public cloud::Transaction { // It only works when the report_conflicting_ranges option is enabled. TxnErrorCode get_conflicting_range( std::vector>* key_values); + TxnErrorCode get_read_conflict_range( + std::vector>* key_values); + TxnErrorCode get_write_conflict_range( + std::vector>* key_values); TxnErrorCode report_conflicting_range(); std::shared_ptr db_ {nullptr};