From e60dc0a942bcd13cf23d8abe386894e95e1d156f Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Sun, 18 Feb 2024 19:27:00 +0800 Subject: [PATCH 01/23] support_no_idempotent_dup --- src/replica/duplication/mutation_batch.cpp | 2 +- src/replica/mutation.h | 3 ++ src/replica/replica_2pc.cpp | 11 +++++- src/server/pegasus_mutation_duplicator.cpp | 16 +++++++++ src/server/pegasus_write_service.cpp | 41 ++++++++++++++++++++++ src/server/pegasus_write_service.h | 1 + 6 files changed, 72 insertions(+), 2 deletions(-) diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index 8b7a815fca..e306a936de 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -173,7 +173,7 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree // ERR_OPERATION_DISABLED, but there could still be a mutation written // before the duplication was added. // To ignore means this write will be lost, which is acceptable under this rare case. - if (!task_spec::get(update.code)->rpc_request_is_write_idempotent) { + if (!task_spec::get(update.code)->rpc_request_is_write_idempotent && !FLAGS_force_send_no_idempotent_when_duplication) { continue; } blob bb; diff --git a/src/replica/mutation.h b/src/replica/mutation.h index d5b7f238ad..3e20d04531 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -42,6 +42,7 @@ #include "utils/autoref_ptr.h" #include "utils/fmt_logging.h" #include "utils/link.h" +#include "utils/flags.h" namespace dsn { class binary_reader; @@ -54,6 +55,8 @@ class latency_tracer; namespace replication { +DSN_DECLARE_bool(force_send_no_idempotent_when_duplication); + class mutation; typedef dsn::ref_ptr mutation_ptr; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 0e335d49b1..a43c556707 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -86,6 +86,14 @@ DSN_DEFINE_bool(replication, "reject client write requests if disk status is space insufficient"); DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE); +DSN_DEFINE_bool("replication", + force_send_no_idempotent_when_duplication, + false, + "receive client idempotent write requests and send them to backup cluster when " + "doing duplication"); +DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE); + + DSN_DEFINE_int32(replication, prepare_timeout_ms_for_secondaries, 1000, @@ -154,7 +162,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } - if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) { + if (is_duplication_master() && !spec->rpc_request_is_write_idempotent && + !FLAGS_force_send_no_idempotent_when_duplication) { // Ignore non-idempotent write, because duplication provides no guarantee of atomicity to // make this write produce the same result on multiple clusters. METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests); diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 3553bcfc79..43be6be612 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -101,6 +101,22 @@ using namespace dsn::literals::chrono_literals; dsn::from_blob_to_thrift(data, thrift_request); return pegasus_hash_key_hash(thrift_request.hash_key); } + if (tc == dsn::apps::RPC_RRDB_RRDB_INCR) { + dsn::apps::incr_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { + dsn::apps::check_and_set_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + dsn::apps::check_and_mutate_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + LOG_FATAL("unexpected task code: {}", tc); __builtin_unreachable(); } diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 09f05ae577..98bd0320d1 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -126,6 +126,11 @@ METRIC_DEFINE_counter(replica, dsn::metric_unit::kRequests, "The number of DUPLICATE requests"); +METRIC_DEFINE_counter(replica, + no_idempotent_duplicate, + dsn::metric_unit::kRequests, + "The number of forced idempotent requests when doing duplication"); + METRIC_DEFINE_percentile_int64(replica, dup_time_lag_ms, dsn::metric_unit::kMilliSeconds, @@ -169,6 +174,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) METRIC_VAR_INIT_replica(check_and_set_latency_ns), METRIC_VAR_INIT_replica(check_and_mutate_latency_ns), METRIC_VAR_INIT_replica(dup_requests), + METRIC_VAR_INIT_replica(no_idempotent_duplicate), METRIC_VAR_INIT_replica(dup_time_lag_ms), METRIC_VAR_INIT_replica(dup_lagging_writes), _put_batch_size(0), @@ -416,6 +422,41 @@ int pegasus_write_service::duplicate(int64_t decree, } continue; } + + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR || + request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || + request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + // receive no idempotent request from master cluster via duplication + METRIC_VAR_INCREMENT(no_idempotent_duplicate); + + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { + incr_rpc rpc(write); + resp.__set_error(_impl->incr(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { + check_and_set_rpc rpc(write); + resp.__set_error(_impl->check_and_set(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + check_and_mutate_rpc rpc(write); + resp.__set_error( + _impl->check_and_mutate(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + } + + resp.__set_error(rocksdb::Status::kInvalidArgument); resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code)); return empty_put(ctx.decree); diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 18db638176..967a548cce 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -232,6 +232,7 @@ class pegasus_write_service : dsn::replication::replica_base METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns); METRIC_VAR_DECLARE_counter(dup_requests); + METRIC_VAR_DECLARE_counter(no_idempotent_duplicate); METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms); METRIC_VAR_DECLARE_counter(dup_lagging_writes); From 31a4271f72f76310cc859d478de9cf3f580679fe Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Sun, 18 Feb 2024 20:00:31 +0800 Subject: [PATCH 02/23] format and IWYU --- src/replica/duplication/mutation_batch.cpp | 3 ++- src/replica/replica_2pc.cpp | 1 - src/server/pegasus_write_service.cpp | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index e306a936de..8e84e19d58 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -173,7 +173,8 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree // ERR_OPERATION_DISABLED, but there could still be a mutation written // before the duplication was added. // To ignore means this write will be lost, which is acceptable under this rare case. - if (!task_spec::get(update.code)->rpc_request_is_write_idempotent && !FLAGS_force_send_no_idempotent_when_duplication) { + if (!task_spec::get(update.code)->rpc_request_is_write_idempotent && + !FLAGS_force_send_no_idempotent_when_duplication) { continue; } blob bb; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index a43c556707..c3235b9438 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -93,7 +93,6 @@ DSN_DEFINE_bool("replication", "doing duplication"); DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE); - DSN_DEFINE_int32(replication, prepare_timeout_ms_for_secondaries, 1000, diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 98bd0320d1..50f7add25c 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -456,7 +456,6 @@ int pegasus_write_service::duplicate(int64_t decree, } } - resp.__set_error(rocksdb::Status::kInvalidArgument); resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code)); return empty_put(ctx.decree); From 61b446f8ee33e0d38919fa7de20c86a82fba398d Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Mon, 26 Feb 2024 20:27:45 +0800 Subject: [PATCH 03/23] add two matric and type key if dup non-idempotent write retried --- src/common/duplication_common.cpp | 7 ++ src/common/duplication_common.h | 1 + src/replica/mutation.h | 4 +- src/replica/replica_2pc.cpp | 9 --- src/server/info_collector.cpp | 2 + src/server/info_collector.h | 6 ++ src/server/pegasus_mutation_duplicator.cpp | 77 ++++++++++++++++++++++ src/server/pegasus_mutation_duplicator.h | 3 + src/server/pegasus_write_service.cpp | 8 +-- src/server/pegasus_write_service.h | 2 +- src/shell/command_helper.h | 8 +++ src/shell/commands/table_management.cpp | 4 ++ 12 files changed, 114 insertions(+), 17 deletions(-) diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index cb2fb02984..0c6304fc40 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -40,6 +40,13 @@ DSN_DEFINE_uint32(replication, "send mutation log batch bytes size per rpc"); DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE); +DSN_DEFINE_bool("replication", + force_send_no_idempotent_when_duplication, + false, + "receive client idempotent write requests and send them to backup cluster when " + "doing duplication"); +DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE); + const std::string duplication_constants::kDuplicationCheckpointRootDir /*NOLINT*/ = "duplication"; const std::string duplication_constants::kClustersSectionName /*NOLINT*/ = "pegasus.clusters"; const std::string duplication_constants::kDuplicationEnvMasterClusterKey /*NOLINT*/ = diff --git a/src/common/duplication_common.h b/src/common/duplication_common.h index 6b953f6e96..5e6ad1a278 100644 --- a/src/common/duplication_common.h +++ b/src/common/duplication_common.h @@ -34,6 +34,7 @@ namespace dsn { namespace replication { DSN_DECLARE_uint32(duplicate_log_batch_bytes); +DSN_DECLARE_bool(force_send_no_idempotent_when_duplication); typedef rpc_holder duplication_modify_rpc; typedef rpc_holder duplication_add_rpc; diff --git a/src/replica/mutation.h b/src/replica/mutation.h index 3e20d04531..55a9d5cdff 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -34,6 +34,7 @@ #include "common/replication_common.h" #include "common/replication_other_types.h" +#include "common/duplication_common.h" #include "consensus_types.h" #include "runtime/api_layer1.h" #include "runtime/rpc/rpc_message.h" @@ -42,7 +43,6 @@ #include "utils/autoref_ptr.h" #include "utils/fmt_logging.h" #include "utils/link.h" -#include "utils/flags.h" namespace dsn { class binary_reader; @@ -55,8 +55,6 @@ class latency_tracer; namespace replication { -DSN_DECLARE_bool(force_send_no_idempotent_when_duplication); - class mutation; typedef dsn::ref_ptr mutation_ptr; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index a43c556707..ce629a5f20 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -85,15 +85,6 @@ DSN_DEFINE_bool(replication, true, "reject client write requests if disk status is space insufficient"); DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE); - -DSN_DEFINE_bool("replication", - force_send_no_idempotent_when_duplication, - false, - "receive client idempotent write requests and send them to backup cluster when " - "doing duplication"); -DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE); - - DSN_DEFINE_int32(replication, prepare_timeout_ms_for_secondaries, 1000, diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp index 1c7f362288..f11d4a4488 100644 --- a/src/server/info_collector.cpp +++ b/src/server/info_collector.cpp @@ -207,9 +207,11 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s INIT_COUNTER(incr_qps); INIT_COUNTER(check_and_set_qps); INIT_COUNTER(check_and_mutate_qps); + INIT_COUNTER(force_receive_no_idempotent_duplicate_qps); INIT_COUNTER(scan_qps); INIT_COUNTER(duplicate_qps); INIT_COUNTER(dup_shipped_ops); + INIT_COUNTER(dup_retry_no_idempotent_duplicate_qps); INIT_COUNTER(dup_failed_shipping_ops); INIT_COUNTER(dup_recent_mutation_loss_count); INIT_COUNTER(recent_read_cu); diff --git a/src/server/info_collector.h b/src/server/info_collector.h index 649d0dee09..ae3bcc2d72 100644 --- a/src/server/info_collector.h +++ b/src/server/info_collector.h @@ -66,10 +66,14 @@ class info_collector incr_qps->set(row_stats.incr_qps); check_and_set_qps->set(row_stats.check_and_set_qps); check_and_mutate_qps->set(row_stats.check_and_mutate_qps); + force_receive_no_idempotent_duplicate_qps->set( + row_stats.force_receive_no_idempotent_duplicate_qps); scan_qps->set(row_stats.scan_qps); duplicate_qps->set(row_stats.duplicate_qps); dup_shipped_ops->set(row_stats.dup_shipped_ops); dup_failed_shipping_ops->set(row_stats.dup_failed_shipping_ops); + dup_retry_no_idempotent_duplicate_qps->set( + row_stats.dup_retry_no_idempotent_duplicate_qps); dup_recent_mutation_loss_count->set(row_stats.dup_recent_mutation_loss_count); recent_read_cu->set(row_stats.recent_read_cu); recent_write_cu->set(row_stats.recent_write_cu); @@ -144,10 +148,12 @@ class info_collector ::dsn::perf_counter_wrapper incr_qps; ::dsn::perf_counter_wrapper check_and_set_qps; ::dsn::perf_counter_wrapper check_and_mutate_qps; + ::dsn::perf_counter_wrapper force_receive_no_idempotent_duplicate_qps; ::dsn::perf_counter_wrapper scan_qps; ::dsn::perf_counter_wrapper duplicate_qps; ::dsn::perf_counter_wrapper dup_shipped_ops; ::dsn::perf_counter_wrapper dup_failed_shipping_ops; + ::dsn::perf_counter_wrapper dup_retry_no_idempotent_duplicate_qps; ::dsn::perf_counter_wrapper dup_recent_mutation_loss_count; ::dsn::perf_counter_wrapper recent_read_cu; ::dsn::perf_counter_wrapper recent_write_cu; diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 43be6be612..ea3eded2e6 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -48,6 +48,7 @@ #include "utils/errors.h" #include "utils/fmt_logging.h" #include "utils/rand.h" +#include "pegasus_rpc_types.h" METRIC_DEFINE_counter(replica, dup_shipped_successful_requests, @@ -59,6 +60,11 @@ METRIC_DEFINE_counter(replica, dsn::metric_unit::kRequests, "The number of failed DUPLICATE requests sent from client"); +METRIC_DEFINE_counter(replica, + dup_retry_no_idempotent_duplicate_qps, + dsn::metric_unit::kRequests, + "The qps of Non-idempotent write when doing DUPLICATE which is Retried"); + namespace dsn { namespace replication { struct replica_base; @@ -205,6 +211,9 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, // retry this rpc _inflights[hash].push_front(rpc); _env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s); + + type_force_send_no_idempotent_if_need(rpc); + return; } if (_inflights[hash].empty()) { @@ -221,6 +230,74 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, } } +void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicate_rpc &rpc) +{ + + // there maybe more than one mutation in one dup rpc + if (dsn::replication::FLAGS_force_send_no_idempotent_when_duplication) { + for (auto entry : rpc.request().entries) { + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR || + entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || + entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + + METRIC_VAR_INCREMENT(dup_retry_no_idempotent_duplicate_qps); + + dsn::message_ex *write = + dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message); + + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { + incr_rpc raw_rpc(write); + absl::string_view unmarshall_key(raw_rpc.request().key.data(), + raw_rpc.request().key.length()); + + LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing " + "duplication," + "key is [{}]", + unmarshall_key); + continue; + } + + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { + check_and_set_rpc raw_rpc(write); + absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), + raw_rpc.request().hash_key.length()); + absl::string_view unmarshall_ori_sort_key( + raw_rpc.request().check_sort_key.data(), + raw_rpc.request().check_sort_key.length()); + absl::string_view unmarshall_set_sort_key( + raw_rpc.request().set_sort_key.data(), + raw_rpc.request().set_sort_key.length()); + + LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried " + "when doing duplication," + "hash key [{}], check sort key [{}]," + "set sort key [{}]", + unmarshall_hash_key, + unmarshall_ori_sort_key, + unmarshall_set_sort_key); + continue; + } + + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + check_and_mutate_rpc raw_rpc(write); + absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), + raw_rpc.request().hash_key.length()); + absl::string_view unmarshall_ori_sort_key( + raw_rpc.request().check_sort_key.data(), + raw_rpc.request().check_sort_key.length()); + + LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been " + "retried when doing duplication," + "hash key is [{}] , sort key is [{}] .", + unmarshall_hash_key, + unmarshall_ori_sort_key); + continue; + } + } + } + } +} + void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb) { _total_shipped_size = 0; diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index 2d81a6a37c..b4d2911e73 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -73,6 +73,8 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator void on_duplicate_reply(uint64_t hash, callback, duplicate_rpc, dsn::error_code err); + void type_force_send_no_idempotent_if_need(duplicate_rpc &rpc); + private: friend class pegasus_mutation_duplicator_test; @@ -91,6 +93,7 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator METRIC_VAR_DECLARE_counter(dup_shipped_successful_requests); METRIC_VAR_DECLARE_counter(dup_shipped_failed_requests); + METRIC_VAR_DECLARE_counter(dup_retry_no_idempotent_duplicate_qps); }; // Decodes the binary `request_data` into write request in thrift struct, and diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 98bd0320d1..c735eb2895 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -127,9 +127,9 @@ METRIC_DEFINE_counter(replica, "The number of DUPLICATE requests"); METRIC_DEFINE_counter(replica, - no_idempotent_duplicate, + force_receive_no_idempotent_duplicate_qps, dsn::metric_unit::kRequests, - "The number of forced idempotent requests when doing duplication"); + "statistic the those no idempotent qps of DUPLICATE requests Force received"); METRIC_DEFINE_percentile_int64(replica, dup_time_lag_ms, @@ -174,7 +174,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) METRIC_VAR_INIT_replica(check_and_set_latency_ns), METRIC_VAR_INIT_replica(check_and_mutate_latency_ns), METRIC_VAR_INIT_replica(dup_requests), - METRIC_VAR_INIT_replica(no_idempotent_duplicate), + METRIC_VAR_INIT_replica(force_receive_no_idempotent_duplicate_qps), METRIC_VAR_INIT_replica(dup_time_lag_ms), METRIC_VAR_INIT_replica(dup_lagging_writes), _put_batch_size(0), @@ -427,7 +427,7 @@ int pegasus_write_service::duplicate(int64_t decree, request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { // receive no idempotent request from master cluster via duplication - METRIC_VAR_INCREMENT(no_idempotent_duplicate); + METRIC_VAR_INCREMENT(force_receive_no_idempotent_duplicate_qps); if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { incr_rpc rpc(write); diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 967a548cce..c7ceed77a6 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -232,7 +232,7 @@ class pegasus_write_service : dsn::replication::replica_base METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns); METRIC_VAR_DECLARE_counter(dup_requests); - METRIC_VAR_DECLARE_counter(no_idempotent_duplicate); + METRIC_VAR_DECLARE_counter(force_receive_no_idempotent_duplicate_qps); METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms); METRIC_VAR_DECLARE_counter(dup_lagging_writes); diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index e5ac4395bd..d558b55c14 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -813,9 +813,11 @@ struct row_data incr_qps += row.incr_qps; check_and_set_qps += row.check_and_set_qps; check_and_mutate_qps += row.check_and_mutate_qps; + force_receive_no_idempotent_duplicate_qps += row.force_receive_no_idempotent_duplicate_qps; scan_qps += row.scan_qps; duplicate_qps += row.duplicate_qps; dup_shipped_ops += row.dup_shipped_ops; + dup_retry_no_idempotent_duplicate_qps += row.dup_retry_no_idempotent_duplicate_qps; dup_failed_shipping_ops += row.dup_failed_shipping_ops; dup_recent_mutation_loss_count += row.dup_recent_mutation_loss_count; recent_read_cu += row.recent_read_cu; @@ -880,9 +882,11 @@ struct row_data double incr_qps = 0; double check_and_set_qps = 0; double check_and_mutate_qps = 0; + double force_receive_no_idempotent_duplicate_qps = 0; double scan_qps = 0; double duplicate_qps = 0; double dup_shipped_ops = 0; + double dup_retry_no_idempotent_duplicate_qps = 0; double dup_failed_shipping_ops = 0; double dup_recent_mutation_loss_count = 0; double recent_read_cu = 0; @@ -954,12 +958,16 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name, row.check_and_set_qps += value; else if (counter_name == "check_and_mutate_qps") row.check_and_mutate_qps += value; + else if (counter_name == "force_receive_no_idempotent_duplicate_qps") + row.force_receive_no_idempotent_duplicate_qps += value; else if (counter_name == "scan_qps") row.scan_qps += value; else if (counter_name == "duplicate_qps") row.duplicate_qps += value; else if (counter_name == "dup_shipped_ops") row.dup_shipped_ops += value; + else if (counter_name == "dup_retry_no_idempotent_duplicate_qps") + row.dup_retry_no_idempotent_duplicate_qps += value; else if (counter_name == "dup_failed_shipping_ops") row.dup_failed_shipping_ops += value; else if (counter_name == "dup_recent_mutation_loss_count") diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index e6167da3f1..b3d4f3f45f 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -555,6 +555,9 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args) sum.incr_qps += row.incr_qps; sum.check_and_set_qps += row.check_and_set_qps; sum.check_and_mutate_qps += row.check_and_mutate_qps; + sum.force_receive_no_idempotent_duplicate_qps += + row.force_receive_no_idempotent_duplicate_qps; + sum.scan_qps += row.scan_qps; sum.recent_read_cu += row.recent_read_cu; sum.recent_write_cu += row.recent_write_cu; @@ -651,6 +654,7 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args) tp.append_data(row.incr_qps); tp.append_data(row.check_and_set_qps); tp.append_data(row.check_and_mutate_qps); + tp.append_data(row.force_receive_no_idempotent_duplicate_qps); tp.append_data(row.scan_qps); tp.append_data(row.recent_read_cu); tp.append_data(row.recent_write_cu); From cef197a95c334bd85c747934def3f1d788d1861b Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Tue, 27 Feb 2024 10:22:01 +0800 Subject: [PATCH 04/23] small fix --- src/server/pegasus_mutation_duplicator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 1ca117017a..4881f910c0 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -234,7 +234,7 @@ void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicat { // there maybe more than one mutation in one dup rpc - if (dsn::replication::FLAGS_force_send_no_idempotent_when_duplication) { + if (FLAGS_force_send_no_idempotent_when_duplication) { for (auto entry : rpc.request().entries) { if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR || entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || From a4f510a3cfd0575bd500a30852febbe9d0134270 Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Tue, 27 Feb 2024 10:44:58 +0800 Subject: [PATCH 05/23] PASS IWYU --- src/replica/duplication/mutation_batch.cpp | 1 + src/replica/mutation.h | 2 +- src/replica/replica_2pc.cpp | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index 8e84e19d58..d4af553fa9 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -22,6 +22,7 @@ #include #include "absl/strings/string_view.h" +#include "common/duplication_common.h" #include "common/replication.codes.h" #include "consensus_types.h" #include "metadata_types.h" diff --git a/src/replica/mutation.h b/src/replica/mutation.h index 55a9d5cdff..49971e4d8b 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -34,7 +34,6 @@ #include "common/replication_common.h" #include "common/replication_other_types.h" -#include "common/duplication_common.h" #include "consensus_types.h" #include "runtime/api_layer1.h" #include "runtime/rpc/rpc_message.h" @@ -49,6 +48,7 @@ class binary_reader; class binary_writer; class blob; class gpid; + namespace utils { class latency_tracer; } // namespace utils diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 940e49a5cf..71966c32b4 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -38,6 +38,7 @@ #include "bulk_load/replica_bulk_loader.h" #include "bulk_load_types.h" +#include "common/duplication_common.h" #include "common/fs_manager.h" #include "common/gpid.h" #include "common/replication.codes.h" From 538090b08785c6ddaaf17e21e0a52868266a591c Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Tue, 27 Feb 2024 10:48:24 +0800 Subject: [PATCH 06/23] format code --- src/server/pegasus_mutation_duplicator.cpp | 35 +++++++++++----------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 4881f910c0..374ecc1111 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -248,19 +248,20 @@ void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicat if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { incr_rpc raw_rpc(write); absl::string_view unmarshall_key(raw_rpc.request().key.data(), - raw_rpc.request().key.length()); + raw_rpc.request().key.length()); - LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing " - "duplication," - "key is [{}]", - unmarshall_key); + LOG_DEBUG( + "Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing " + "duplication," + "key is [{}]", + unmarshall_key); continue; } if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { check_and_set_rpc raw_rpc(write); absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), - raw_rpc.request().hash_key.length()); + raw_rpc.request().hash_key.length()); absl::string_view unmarshall_ori_sort_key( raw_rpc.request().check_sort_key.data(), raw_rpc.request().check_sort_key.length()); @@ -269,28 +270,28 @@ void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicat raw_rpc.request().set_sort_key.length()); LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried " - "when doing duplication," - "hash key [{}], check sort key [{}]," - "set sort key [{}]", - unmarshall_hash_key, - unmarshall_ori_sort_key, - unmarshall_set_sort_key); + "when doing duplication," + "hash key [{}], check sort key [{}]," + "set sort key [{}]", + unmarshall_hash_key, + unmarshall_ori_sort_key, + unmarshall_set_sort_key); continue; } if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { check_and_mutate_rpc raw_rpc(write); absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), - raw_rpc.request().hash_key.length()); + raw_rpc.request().hash_key.length()); absl::string_view unmarshall_ori_sort_key( raw_rpc.request().check_sort_key.data(), raw_rpc.request().check_sort_key.length()); LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been " - "retried when doing duplication," - "hash key is [{}] , sort key is [{}] .", - unmarshall_hash_key, - unmarshall_ori_sort_key); + "retried when doing duplication," + "hash key is [{}] , sort key is [{}] .", + unmarshall_hash_key, + unmarshall_ori_sort_key); continue; } } From c5e39caf74e67db841a02e1cf1dedb295ef0a7bf Mon Sep 17 00:00:00 2001 From: ninsmiracle <110282526+ninsmiracle@users.noreply.github.com> Date: Mon, 4 Mar 2024 14:07:38 +0800 Subject: [PATCH 07/23] Update src/common/duplication_common.cpp Co-authored-by: Yingchun Lai --- src/common/duplication_common.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index df9a8dba60..796ab44a7a 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -37,7 +37,7 @@ DSN_DEFINE_uint32(replication, "send mutation log batch bytes size per rpc"); DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE); -DSN_DEFINE_bool("replication", +DSN_DEFINE_bool(replication, force_send_no_idempotent_when_duplication, false, "receive client idempotent write requests and send them to backup cluster when " From b5217525aea6f6b736fb89ce0628196ac761917a Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Mon, 4 Mar 2024 15:50:59 +0800 Subject: [PATCH 08/23] fix by github comment --- src/common/duplication_common.cpp | 4 +- src/common/duplication_common.h | 2 +- src/replica/duplication/mutation_batch.cpp | 2 +- src/replica/replica_2pc.cpp | 2 +- src/server/info_collector.cpp | 4 +- src/server/info_collector.h | 12 +++--- src/server/pegasus_mutation_duplicator.cpp | 43 +++++++++++----------- src/server/pegasus_mutation_duplicator.h | 6 ++- src/server/pegasus_write_service.cpp | 7 ++-- src/server/pegasus_write_service.h | 2 +- src/shell/command_helper.h | 16 ++++---- src/shell/commands/table_management.cpp | 6 +-- 12 files changed, 55 insertions(+), 51 deletions(-) diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index 796ab44a7a..5a9b502b65 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -38,11 +38,11 @@ DSN_DEFINE_uint32(replication, DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE); DSN_DEFINE_bool(replication, - force_send_no_idempotent_when_duplication, + force_send_non_idempotent_when_duplication, false, "receive client idempotent write requests and send them to backup cluster when " "doing duplication"); -DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE); +DSN_TAG_VARIABLE(force_send_non_idempotent_when_duplication, FT_MUTABLE); namespace dsn { namespace replication { diff --git a/src/common/duplication_common.h b/src/common/duplication_common.h index 5d12de8ed1..d0578ad869 100644 --- a/src/common/duplication_common.h +++ b/src/common/duplication_common.h @@ -31,7 +31,7 @@ #include "utils/fmt_utils.h" DSN_DECLARE_uint32(duplicate_log_batch_bytes); -DSN_DECLARE_bool(force_send_no_idempotent_when_duplication); +DSN_DECLARE_bool(force_send_non_idempotent_when_duplication); namespace dsn { namespace replication { diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index 4367bcc8cb..b92f3e853b 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -175,7 +175,7 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree // before the duplication was added. // To ignore means this write will be lost, which is acceptable under this rare case. if (!task_spec::get(update.code)->rpc_request_is_write_idempotent && - !FLAGS_force_send_no_idempotent_when_duplication) { + !FLAGS_force_send_non_idempotent_when_duplication) { continue; } blob bb; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 71966c32b4..43c89c249c 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -156,7 +156,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) } if (is_duplication_master() && !spec->rpc_request_is_write_idempotent && - !FLAGS_force_send_no_idempotent_when_duplication) { + !FLAGS_force_send_non_idempotent_when_duplication) { // Ignore non-idempotent write, because duplication provides no guarantee of atomicity to // make this write produce the same result on multiple clusters. METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests); diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp index 02d591b8b8..23329beb75 100644 --- a/src/server/info_collector.cpp +++ b/src/server/info_collector.cpp @@ -207,11 +207,11 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s INIT_COUNTER(incr_qps); INIT_COUNTER(check_and_set_qps); INIT_COUNTER(check_and_mutate_qps); - INIT_COUNTER(force_receive_no_idempotent_duplicate_qps); + INIT_COUNTER(force_receive_non_idempotent_duplicate_request); INIT_COUNTER(scan_qps); INIT_COUNTER(duplicate_qps); INIT_COUNTER(dup_shipped_ops); - INIT_COUNTER(dup_retry_no_idempotent_duplicate_qps); + INIT_COUNTER(dup_retry_non_idempotent_duplicate_request); INIT_COUNTER(dup_failed_shipping_ops); INIT_COUNTER(dup_recent_mutation_loss_count); INIT_COUNTER(recent_read_cu); diff --git a/src/server/info_collector.h b/src/server/info_collector.h index ae3bcc2d72..484d309356 100644 --- a/src/server/info_collector.h +++ b/src/server/info_collector.h @@ -66,14 +66,14 @@ class info_collector incr_qps->set(row_stats.incr_qps); check_and_set_qps->set(row_stats.check_and_set_qps); check_and_mutate_qps->set(row_stats.check_and_mutate_qps); - force_receive_no_idempotent_duplicate_qps->set( - row_stats.force_receive_no_idempotent_duplicate_qps); + force_receive_non_idempotent_duplicate_request->set( + row_stats.force_receive_non_idempotent_duplicate_request); scan_qps->set(row_stats.scan_qps); duplicate_qps->set(row_stats.duplicate_qps); dup_shipped_ops->set(row_stats.dup_shipped_ops); dup_failed_shipping_ops->set(row_stats.dup_failed_shipping_ops); - dup_retry_no_idempotent_duplicate_qps->set( - row_stats.dup_retry_no_idempotent_duplicate_qps); + dup_retry_non_idempotent_duplicate_request->set( + row_stats.dup_retry_non_idempotent_duplicate_request); dup_recent_mutation_loss_count->set(row_stats.dup_recent_mutation_loss_count); recent_read_cu->set(row_stats.recent_read_cu); recent_write_cu->set(row_stats.recent_write_cu); @@ -148,12 +148,12 @@ class info_collector ::dsn::perf_counter_wrapper incr_qps; ::dsn::perf_counter_wrapper check_and_set_qps; ::dsn::perf_counter_wrapper check_and_mutate_qps; - ::dsn::perf_counter_wrapper force_receive_no_idempotent_duplicate_qps; + ::dsn::perf_counter_wrapper force_receive_non_idempotent_duplicate_request; ::dsn::perf_counter_wrapper scan_qps; ::dsn::perf_counter_wrapper duplicate_qps; ::dsn::perf_counter_wrapper dup_shipped_ops; ::dsn::perf_counter_wrapper dup_failed_shipping_ops; - ::dsn::perf_counter_wrapper dup_retry_no_idempotent_duplicate_qps; + ::dsn::perf_counter_wrapper dup_retry_non_idempotent_duplicate_request; ::dsn::perf_counter_wrapper dup_recent_mutation_loss_count; ::dsn::perf_counter_wrapper recent_read_cu; ::dsn::perf_counter_wrapper recent_write_cu; diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 374ecc1111..03a23cdb61 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -61,7 +61,7 @@ METRIC_DEFINE_counter(replica, "The number of failed DUPLICATE requests sent from client"); METRIC_DEFINE_counter(replica, - dup_retry_no_idempotent_duplicate_qps, + dup_retry_non_idempotent_duplicate_request, dsn::metric_unit::kRequests, "The qps of Non-idempotent write when doing DUPLICATE which is Retried"); @@ -232,33 +232,33 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicate_rpc &rpc) { + if (!FLAGS_force_send_non_idempotent_when_duplication) { + return; + } // there maybe more than one mutation in one dup rpc - if (FLAGS_force_send_no_idempotent_when_duplication) { - for (auto entry : rpc.request().entries) { - if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR || - entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || - entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { - - METRIC_VAR_INCREMENT(dup_retry_no_idempotent_duplicate_qps); + for (auto entry : rpc.request().entries) { + // not a non idempotent request + if(!_non_idempotent_code.count(entry.task_code)){ + continue ; + } - dsn::message_ex *write = - dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message); + METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request); + dsn::message_ex *write =dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message); - if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { - incr_rpc raw_rpc(write); - absl::string_view unmarshall_key(raw_rpc.request().key.data(), - raw_rpc.request().key.length()); + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { + incr_rpc raw_rpc(write); + absl::string_view unmarshall_key(raw_rpc.request().key.data(), + raw_rpc.request().key.length()); - LOG_DEBUG( - "Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing " + LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing " "duplication," "key is [{}]", unmarshall_key); continue; - } + } - if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { check_and_set_rpc raw_rpc(write); absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), raw_rpc.request().hash_key.length()); @@ -277,9 +277,9 @@ void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicat unmarshall_ori_sort_key, unmarshall_set_sort_key); continue; - } + } - if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { check_and_mutate_rpc raw_rpc(write); absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), raw_rpc.request().hash_key.length()); @@ -293,10 +293,9 @@ void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicat unmarshall_hash_key, unmarshall_ori_sort_key); continue; - } - } } } + } void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb) diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index b4d2911e73..005eb12c91 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -91,9 +91,13 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator size_t _total_shipped_size{0}; + const std::set _non_idempotent_code = {dsn::apps::RPC_RRDB_RRDB_INCR, + dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, + dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE}; + METRIC_VAR_DECLARE_counter(dup_shipped_successful_requests); METRIC_VAR_DECLARE_counter(dup_shipped_failed_requests); - METRIC_VAR_DECLARE_counter(dup_retry_no_idempotent_duplicate_qps); + METRIC_VAR_DECLARE_counter(dup_retry_non_idempotent_duplicate_request); }; // Decodes the binary `request_data` into write request in thrift struct, and diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index b447ccabb1..14f2474d63 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -122,7 +122,7 @@ METRIC_DEFINE_counter(replica, "The number of DUPLICATE requests"); METRIC_DEFINE_counter(replica, - force_receive_no_idempotent_duplicate_qps, + force_receive_non_idempotent_duplicate_request, dsn::metric_unit::kRequests, "statistic the those no idempotent qps of DUPLICATE requests Force received"); @@ -173,7 +173,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) METRIC_VAR_INIT_replica(check_and_set_latency_ns), METRIC_VAR_INIT_replica(check_and_mutate_latency_ns), METRIC_VAR_INIT_replica(dup_requests), - METRIC_VAR_INIT_replica(force_receive_no_idempotent_duplicate_qps), + METRIC_VAR_INIT_replica(force_receive_non_idempotent_duplicate_request), METRIC_VAR_INIT_replica(dup_time_lag_ms), METRIC_VAR_INIT_replica(dup_lagging_writes), _put_batch_size(0), @@ -422,11 +422,12 @@ int pegasus_write_service::duplicate(int64_t decree, continue; } + // Parse non-idempotent writes via duplication if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR || request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { // receive no idempotent request from master cluster via duplication - METRIC_VAR_INCREMENT(force_receive_no_idempotent_duplicate_qps); + METRIC_VAR_INCREMENT(force_receive_non_idempotent_duplicate_request); if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { incr_rpc rpc(write); diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 7414b288e8..1a9da5871c 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -232,7 +232,7 @@ class pegasus_write_service : dsn::replication::replica_base METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns); METRIC_VAR_DECLARE_counter(dup_requests); - METRIC_VAR_DECLARE_counter(force_receive_no_idempotent_duplicate_qps); + METRIC_VAR_DECLARE_counter(force_receive_non_idempotent_duplicate_request); METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms); METRIC_VAR_DECLARE_counter(dup_lagging_writes); diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index a938316e2c..8cea82a139 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -1159,11 +1159,11 @@ struct row_data incr_qps += row.incr_qps; check_and_set_qps += row.check_and_set_qps; check_and_mutate_qps += row.check_and_mutate_qps; - force_receive_no_idempotent_duplicate_qps += row.force_receive_no_idempotent_duplicate_qps; + force_receive_non_idempotent_duplicate_request += row.force_receive_non_idempotent_duplicate_request; scan_qps += row.scan_qps; duplicate_qps += row.duplicate_qps; dup_shipped_ops += row.dup_shipped_ops; - dup_retry_no_idempotent_duplicate_qps += row.dup_retry_no_idempotent_duplicate_qps; + dup_retry_non_idempotent_duplicate_request += row.dup_retry_non_idempotent_duplicate_request; dup_failed_shipping_ops += row.dup_failed_shipping_ops; dup_recent_mutation_loss_count += row.dup_recent_mutation_loss_count; recent_read_cu += row.recent_read_cu; @@ -1228,11 +1228,11 @@ struct row_data double incr_qps = 0; double check_and_set_qps = 0; double check_and_mutate_qps = 0; - double force_receive_no_idempotent_duplicate_qps = 0; + double force_receive_non_idempotent_duplicate_request = 0; double scan_qps = 0; double duplicate_qps = 0; double dup_shipped_ops = 0; - double dup_retry_no_idempotent_duplicate_qps = 0; + double dup_retry_non_idempotent_duplicate_request = 0; double dup_failed_shipping_ops = 0; double dup_recent_mutation_loss_count = 0; double recent_read_cu = 0; @@ -1551,16 +1551,16 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name, row.check_and_set_qps += value; else if (counter_name == "check_and_mutate_qps") row.check_and_mutate_qps += value; - else if (counter_name == "force_receive_no_idempotent_duplicate_qps") - row.force_receive_no_idempotent_duplicate_qps += value; + else if (counter_name == "force_receive_non_idempotent_duplicate_request") + row.force_receive_non_idempotent_duplicate_request += value; else if (counter_name == "scan_qps") row.scan_qps += value; else if (counter_name == "duplicate_qps") row.duplicate_qps += value; else if (counter_name == "dup_shipped_ops") row.dup_shipped_ops += value; - else if (counter_name == "dup_retry_no_idempotent_duplicate_qps") - row.dup_retry_no_idempotent_duplicate_qps += value; + else if (counter_name == "dup_retry_non_idempotent_duplicate_request") + row.dup_retry_non_idempotent_duplicate_request += value; else if (counter_name == "dup_failed_shipping_ops") row.dup_failed_shipping_ops += value; else if (counter_name == "dup_recent_mutation_loss_count") diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index 117df5a772..736be04f01 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -554,8 +554,8 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args) sum.incr_qps += row.incr_qps; sum.check_and_set_qps += row.check_and_set_qps; sum.check_and_mutate_qps += row.check_and_mutate_qps; - sum.force_receive_no_idempotent_duplicate_qps += - row.force_receive_no_idempotent_duplicate_qps; + sum.force_receive_non_idempotent_duplicate_request += + row.force_receive_non_idempotent_duplicate_request; sum.scan_qps += row.scan_qps; sum.recent_read_cu += row.recent_read_cu; @@ -653,7 +653,7 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args) tp.append_data(row.incr_qps); tp.append_data(row.check_and_set_qps); tp.append_data(row.check_and_mutate_qps); - tp.append_data(row.force_receive_no_idempotent_duplicate_qps); + tp.append_data(row.force_receive_non_idempotent_duplicate_request); tp.append_data(row.scan_qps); tp.append_data(row.recent_read_cu); tp.append_data(row.recent_write_cu); From 0e19a0105b5ed7bbb3b12472510747363173da75 Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Mon, 4 Mar 2024 15:57:29 +0800 Subject: [PATCH 09/23] format code --- src/server/pegasus_mutation_duplicator.cpp | 74 ++++++++++------------ src/server/pegasus_mutation_duplicator.h | 7 +- src/shell/command_helper.h | 6 +- 3 files changed, 43 insertions(+), 44 deletions(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 03a23cdb61..57fbaea47e 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -239,12 +239,12 @@ void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicat // there maybe more than one mutation in one dup rpc for (auto entry : rpc.request().entries) { // not a non idempotent request - if(!_non_idempotent_code.count(entry.task_code)){ - continue ; + if (!_non_idempotent_code.count(entry.task_code)) { + continue; } METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request); - dsn::message_ex *write =dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message); + dsn::message_ex *write = dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message); if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { incr_rpc raw_rpc(write); @@ -252,50 +252,46 @@ void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicat raw_rpc.request().key.length()); LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing " - "duplication," - "key is [{}]", - unmarshall_key); - continue; + "duplication," + "key is [{}]", + unmarshall_key); + continue; } if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { - check_and_set_rpc raw_rpc(write); - absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), - raw_rpc.request().hash_key.length()); - absl::string_view unmarshall_ori_sort_key( - raw_rpc.request().check_sort_key.data(), - raw_rpc.request().check_sort_key.length()); - absl::string_view unmarshall_set_sort_key( - raw_rpc.request().set_sort_key.data(), - raw_rpc.request().set_sort_key.length()); - - LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried " - "when doing duplication," - "hash key [{}], check sort key [{}]," - "set sort key [{}]", - unmarshall_hash_key, - unmarshall_ori_sort_key, - unmarshall_set_sort_key); - continue; + check_and_set_rpc raw_rpc(write); + absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), + raw_rpc.request().hash_key.length()); + absl::string_view unmarshall_ori_sort_key(raw_rpc.request().check_sort_key.data(), + raw_rpc.request().check_sort_key.length()); + absl::string_view unmarshall_set_sort_key(raw_rpc.request().set_sort_key.data(), + raw_rpc.request().set_sort_key.length()); + + LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried " + "when doing duplication," + "hash key [{}], check sort key [{}]," + "set sort key [{}]", + unmarshall_hash_key, + unmarshall_ori_sort_key, + unmarshall_set_sort_key); + continue; } if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { - check_and_mutate_rpc raw_rpc(write); - absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), - raw_rpc.request().hash_key.length()); - absl::string_view unmarshall_ori_sort_key( - raw_rpc.request().check_sort_key.data(), - raw_rpc.request().check_sort_key.length()); - - LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been " - "retried when doing duplication," - "hash key is [{}] , sort key is [{}] .", - unmarshall_hash_key, - unmarshall_ori_sort_key); - continue; + check_and_mutate_rpc raw_rpc(write); + absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), + raw_rpc.request().hash_key.length()); + absl::string_view unmarshall_ori_sort_key(raw_rpc.request().check_sort_key.data(), + raw_rpc.request().check_sort_key.length()); + + LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been " + "retried when doing duplication," + "hash key is [{}] , sort key is [{}] .", + unmarshall_hash_key, + unmarshall_ori_sort_key); + continue; } } - } void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb) diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index 005eb12c91..1b6532049f 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -91,9 +91,10 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator size_t _total_shipped_size{0}; - const std::set _non_idempotent_code = {dsn::apps::RPC_RRDB_RRDB_INCR, - dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, - dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE}; + const std::set _non_idempotent_code = { + dsn::apps::RPC_RRDB_RRDB_INCR, + dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, + dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE}; METRIC_VAR_DECLARE_counter(dup_shipped_successful_requests); METRIC_VAR_DECLARE_counter(dup_shipped_failed_requests); diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 8cea82a139..5cf1559283 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -1159,11 +1159,13 @@ struct row_data incr_qps += row.incr_qps; check_and_set_qps += row.check_and_set_qps; check_and_mutate_qps += row.check_and_mutate_qps; - force_receive_non_idempotent_duplicate_request += row.force_receive_non_idempotent_duplicate_request; + force_receive_non_idempotent_duplicate_request += + row.force_receive_non_idempotent_duplicate_request; scan_qps += row.scan_qps; duplicate_qps += row.duplicate_qps; dup_shipped_ops += row.dup_shipped_ops; - dup_retry_non_idempotent_duplicate_request += row.dup_retry_non_idempotent_duplicate_request; + dup_retry_non_idempotent_duplicate_request += + row.dup_retry_non_idempotent_duplicate_request; dup_failed_shipping_ops += row.dup_failed_shipping_ops; dup_recent_mutation_loss_count += row.dup_recent_mutation_loss_count; recent_read_cu += row.recent_read_cu; From e827c789b2a097d647cb9a3a0b2d85009a439e03 Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Wed, 6 Mar 2024 19:38:38 +0800 Subject: [PATCH 10/23] pass IWYU --- src/server/pegasus_mutation_duplicator.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index 1b6532049f..f9706b9182 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -23,21 +23,24 @@ #include #include #include +#include #include +#include "absl/strings/string_view.h" #include "replica/duplication/mutation_duplicator.h" #include "rrdb/rrdb.client.h" +#include "rrdb/rrdb.code.definition.h" #include "runtime/pipeline.h" #include "runtime/task/task_code.h" #include "runtime/task/task_tracker.h" #include "utils/chrono_literals.h" -#include "absl/strings/string_view.h" #include "utils/metrics.h" #include "utils/zlocks.h" namespace dsn { class blob; class error_code; + namespace replication { struct replica_base; } // namespace replication From bb56598017e0e010e149b4a71826d93b3f742fb1 Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Thu, 7 Mar 2024 16:03:33 +0800 Subject: [PATCH 11/23] pass IWYU2 --- src/server/test/pegasus_mutation_duplicator_test.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp index fac7c449b5..8855bb5f76 100644 --- a/src/server/test/pegasus_mutation_duplicator_test.cpp +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include From 3d0694fb51c0cb293e3a2b0ff11bc6705181fa38 Mon Sep 17 00:00:00 2001 From: ninsmiracle <110282526+ninsmiracle@users.noreply.github.com> Date: Wed, 13 Mar 2024 19:47:02 +0800 Subject: [PATCH 12/23] Update src/server/pegasus_mutation_duplicator.cpp Co-authored-by: Yingchun Lai --- src/server/pegasus_mutation_duplicator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 57fbaea47e..d73d7abf88 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -63,7 +63,7 @@ METRIC_DEFINE_counter(replica, METRIC_DEFINE_counter(replica, dup_retry_non_idempotent_duplicate_request, dsn::metric_unit::kRequests, - "The qps of Non-idempotent write when doing DUPLICATE which is Retried"); + "The number of retried non-idempotent DUPLICATE requests sent from client"); namespace dsn { namespace replication { From 7833e8cb641820b4a8d68839cd2c7b51051d9784 Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Wed, 13 Mar 2024 22:16:36 +0800 Subject: [PATCH 13/23] fix by comment and format --- src/common/duplication_common.cpp | 14 ++++++++------ src/common/duplication_common.h | 2 +- src/replica/duplication/mutation_batch.cpp | 2 +- src/replica/replica_2pc.cpp | 2 +- src/server/info_collector.cpp | 2 +- src/server/info_collector.h | 6 +++--- src/server/pegasus_mutation_duplicator.cpp | 2 +- src/server/pegasus_write_service.cpp | 6 +++--- src/server/pegasus_write_service.h | 2 +- src/shell/command_helper.h | 10 +++++----- src/shell/commands/table_management.cpp | 6 +++--- 11 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index 5a9b502b65..b5066ed346 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -37,12 +37,14 @@ DSN_DEFINE_uint32(replication, "send mutation log batch bytes size per rpc"); DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE); -DSN_DEFINE_bool(replication, - force_send_non_idempotent_when_duplication, - false, - "receive client idempotent write requests and send them to backup cluster when " - "doing duplication"); -DSN_TAG_VARIABLE(force_send_non_idempotent_when_duplication, FT_MUTABLE); +DSN_DEFINE_bool( + replication, + duplication_unsafe_allow_non_idempotent, + false, + "Turn on the switch so that the cluster can accept non-idempotent writes and forward these " + "writes via duplication " + "Note that this switch may cause data inconsistency between clusters. So we say it is unsafe "); +DSN_TAG_VARIABLE(duplication_unsafe_allow_non_idempotent, FT_MUTABLE); namespace dsn { namespace replication { diff --git a/src/common/duplication_common.h b/src/common/duplication_common.h index d0578ad869..702c2667ea 100644 --- a/src/common/duplication_common.h +++ b/src/common/duplication_common.h @@ -31,7 +31,7 @@ #include "utils/fmt_utils.h" DSN_DECLARE_uint32(duplicate_log_batch_bytes); -DSN_DECLARE_bool(force_send_non_idempotent_when_duplication); +DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent); namespace dsn { namespace replication { diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index b92f3e853b..7ad22fdcd3 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -175,7 +175,7 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree // before the duplication was added. // To ignore means this write will be lost, which is acceptable under this rare case. if (!task_spec::get(update.code)->rpc_request_is_write_idempotent && - !FLAGS_force_send_non_idempotent_when_duplication) { + !FLAGS_duplication_unsafe_allow_non_idempotent) { continue; } blob bb; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 43c89c249c..997524c6ed 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -156,7 +156,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) } if (is_duplication_master() && !spec->rpc_request_is_write_idempotent && - !FLAGS_force_send_non_idempotent_when_duplication) { + !FLAGS_duplication_unsafe_allow_non_idempotent) { // Ignore non-idempotent write, because duplication provides no guarantee of atomicity to // make this write produce the same result on multiple clusters. METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests); diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp index 23329beb75..023789ca3b 100644 --- a/src/server/info_collector.cpp +++ b/src/server/info_collector.cpp @@ -207,7 +207,7 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s INIT_COUNTER(incr_qps); INIT_COUNTER(check_and_set_qps); INIT_COUNTER(check_and_mutate_qps); - INIT_COUNTER(force_receive_non_idempotent_duplicate_request); + INIT_COUNTER(dup_unsafe_received_non_idempotent_duplicate_request); INIT_COUNTER(scan_qps); INIT_COUNTER(duplicate_qps); INIT_COUNTER(dup_shipped_ops); diff --git a/src/server/info_collector.h b/src/server/info_collector.h index 484d309356..d9d2397793 100644 --- a/src/server/info_collector.h +++ b/src/server/info_collector.h @@ -66,8 +66,8 @@ class info_collector incr_qps->set(row_stats.incr_qps); check_and_set_qps->set(row_stats.check_and_set_qps); check_and_mutate_qps->set(row_stats.check_and_mutate_qps); - force_receive_non_idempotent_duplicate_request->set( - row_stats.force_receive_non_idempotent_duplicate_request); + dup_unsafe_received_non_idempotent_duplicate_request->set( + row_stats.dup_unsafe_received_non_idempotent_duplicate_request); scan_qps->set(row_stats.scan_qps); duplicate_qps->set(row_stats.duplicate_qps); dup_shipped_ops->set(row_stats.dup_shipped_ops); @@ -148,7 +148,7 @@ class info_collector ::dsn::perf_counter_wrapper incr_qps; ::dsn::perf_counter_wrapper check_and_set_qps; ::dsn::perf_counter_wrapper check_and_mutate_qps; - ::dsn::perf_counter_wrapper force_receive_non_idempotent_duplicate_request; + ::dsn::perf_counter_wrapper dup_unsafe_received_non_idempotent_duplicate_request; ::dsn::perf_counter_wrapper scan_qps; ::dsn::perf_counter_wrapper duplicate_qps; ::dsn::perf_counter_wrapper dup_shipped_ops; diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 57fbaea47e..a86593cfd9 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -232,7 +232,7 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicate_rpc &rpc) { - if (!FLAGS_force_send_non_idempotent_when_duplication) { + if (!FLAGS_duplication_unsafe_allow_non_idempotent) { return; } diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 14f2474d63..60c4f67493 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -122,7 +122,7 @@ METRIC_DEFINE_counter(replica, "The number of DUPLICATE requests"); METRIC_DEFINE_counter(replica, - force_receive_non_idempotent_duplicate_request, + dup_unsafe_received_non_idempotent_duplicate_request, dsn::metric_unit::kRequests, "statistic the those no idempotent qps of DUPLICATE requests Force received"); @@ -173,7 +173,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) METRIC_VAR_INIT_replica(check_and_set_latency_ns), METRIC_VAR_INIT_replica(check_and_mutate_latency_ns), METRIC_VAR_INIT_replica(dup_requests), - METRIC_VAR_INIT_replica(force_receive_non_idempotent_duplicate_request), + METRIC_VAR_INIT_replica(dup_unsafe_received_non_idempotent_duplicate_request), METRIC_VAR_INIT_replica(dup_time_lag_ms), METRIC_VAR_INIT_replica(dup_lagging_writes), _put_batch_size(0), @@ -427,7 +427,7 @@ int pegasus_write_service::duplicate(int64_t decree, request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { // receive no idempotent request from master cluster via duplication - METRIC_VAR_INCREMENT(force_receive_non_idempotent_duplicate_request); + METRIC_VAR_INCREMENT(dup_unsafe_received_non_idempotent_duplicate_request); if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { incr_rpc rpc(write); diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 1a9da5871c..7e3f5745a1 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -232,7 +232,7 @@ class pegasus_write_service : dsn::replication::replica_base METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns); METRIC_VAR_DECLARE_counter(dup_requests); - METRIC_VAR_DECLARE_counter(force_receive_non_idempotent_duplicate_request); + METRIC_VAR_DECLARE_counter(dup_unsafe_received_non_idempotent_duplicate_request); METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms); METRIC_VAR_DECLARE_counter(dup_lagging_writes); diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 5cf1559283..c20909db28 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -1159,8 +1159,8 @@ struct row_data incr_qps += row.incr_qps; check_and_set_qps += row.check_and_set_qps; check_and_mutate_qps += row.check_and_mutate_qps; - force_receive_non_idempotent_duplicate_request += - row.force_receive_non_idempotent_duplicate_request; + dup_unsafe_received_non_idempotent_duplicate_request += + row.dup_unsafe_received_non_idempotent_duplicate_request; scan_qps += row.scan_qps; duplicate_qps += row.duplicate_qps; dup_shipped_ops += row.dup_shipped_ops; @@ -1230,7 +1230,7 @@ struct row_data double incr_qps = 0; double check_and_set_qps = 0; double check_and_mutate_qps = 0; - double force_receive_non_idempotent_duplicate_request = 0; + double dup_unsafe_received_non_idempotent_duplicate_request = 0; double scan_qps = 0; double duplicate_qps = 0; double dup_shipped_ops = 0; @@ -1553,8 +1553,8 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name, row.check_and_set_qps += value; else if (counter_name == "check_and_mutate_qps") row.check_and_mutate_qps += value; - else if (counter_name == "force_receive_non_idempotent_duplicate_request") - row.force_receive_non_idempotent_duplicate_request += value; + else if (counter_name == "dup_unsafe_received_non_idempotent_duplicate_request") + row.dup_unsafe_received_non_idempotent_duplicate_request += value; else if (counter_name == "scan_qps") row.scan_qps += value; else if (counter_name == "duplicate_qps") diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index 736be04f01..17910ebe96 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -554,8 +554,8 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args) sum.incr_qps += row.incr_qps; sum.check_and_set_qps += row.check_and_set_qps; sum.check_and_mutate_qps += row.check_and_mutate_qps; - sum.force_receive_non_idempotent_duplicate_request += - row.force_receive_non_idempotent_duplicate_request; + sum.dup_unsafe_received_non_idempotent_duplicate_request += + row.dup_unsafe_received_non_idempotent_duplicate_request; sum.scan_qps += row.scan_qps; sum.recent_read_cu += row.recent_read_cu; @@ -653,7 +653,7 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args) tp.append_data(row.incr_qps); tp.append_data(row.check_and_set_qps); tp.append_data(row.check_and_mutate_qps); - tp.append_data(row.force_receive_non_idempotent_duplicate_request); + tp.append_data(row.dup_unsafe_received_non_idempotent_duplicate_request); tp.append_data(row.scan_qps); tp.append_data(row.recent_read_cu); tp.append_data(row.recent_write_cu); From d8e26b1525297d501938eebe96b3e8a1fab8eccc Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Thu, 14 Mar 2024 10:00:20 +0800 Subject: [PATCH 14/23] make comment in code more clearly --- src/server/pegasus_write_service.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 60c4f67493..f02a169b8e 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -426,7 +426,10 @@ int pegasus_write_service::duplicate(int64_t decree, if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR || request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { - // receive no idempotent request from master cluster via duplication + // receive non-idempotent request from master cluster via duplication when + // FLAG_duplication_unsafe_allow_non_idempotent set as true. + // This metric greater than zero means that there is already the possibility of + // inconsistency between clusters METRIC_VAR_INCREMENT(dup_unsafe_received_non_idempotent_duplicate_request); if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { From 3831f7af4afb29529c17b4432f53877e5a32d2a3 Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Thu, 14 Mar 2024 10:04:07 +0800 Subject: [PATCH 15/23] make function name better --- src/server/pegasus_mutation_duplicator.cpp | 4 ++-- src/server/pegasus_mutation_duplicator.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 129417932c..4270d76102 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -212,7 +212,7 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, _inflights[hash].push_front(rpc); _env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s); - type_force_send_no_idempotent_if_need(rpc); + type_force_send_non_idempotent_if_need(rpc); return; } @@ -230,7 +230,7 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, } } -void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicate_rpc &rpc) +void pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplicate_rpc &rpc) { if (!FLAGS_duplication_unsafe_allow_non_idempotent) { return; diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index f9706b9182..dedc301d37 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -76,7 +76,7 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator void on_duplicate_reply(uint64_t hash, callback, duplicate_rpc, dsn::error_code err); - void type_force_send_no_idempotent_if_need(duplicate_rpc &rpc); + void type_force_send_non_idempotent_if_need(duplicate_rpc &rpc); private: friend class pegasus_mutation_duplicator_test; From 01fa7af3452b690e2ca37f6967fd5317a4b109db Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Thu, 14 Mar 2024 10:32:11 +0800 Subject: [PATCH 16/23] fix type_force_send_non_idempotent_if_need --- src/server/pegasus_mutation_duplicator.cpp | 24 ++++++---------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 4270d76102..ecf9112e68 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -248,47 +248,35 @@ void pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplica if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { incr_rpc raw_rpc(write); - absl::string_view unmarshall_key(raw_rpc.request().key.data(), - raw_rpc.request().key.length()); LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing " "duplication," "key is [{}]", - unmarshall_key); + raw_rpc.request().key); continue; } if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { check_and_set_rpc raw_rpc(write); - absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), - raw_rpc.request().hash_key.length()); - absl::string_view unmarshall_ori_sort_key(raw_rpc.request().check_sort_key.data(), - raw_rpc.request().check_sort_key.length()); - absl::string_view unmarshall_set_sort_key(raw_rpc.request().set_sort_key.data(), - raw_rpc.request().set_sort_key.length()); LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried " "when doing duplication," "hash key [{}], check sort key [{}]," "set sort key [{}]", - unmarshall_hash_key, - unmarshall_ori_sort_key, - unmarshall_set_sort_key); + raw_rpc.request().hash_key, + raw_rpc.request().check_sort_key, + raw_rpc.request().set_sort_key); continue; } if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { check_and_mutate_rpc raw_rpc(write); - absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(), - raw_rpc.request().hash_key.length()); - absl::string_view unmarshall_ori_sort_key(raw_rpc.request().check_sort_key.data(), - raw_rpc.request().check_sort_key.length()); LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been " "retried when doing duplication," "hash key is [{}] , sort key is [{}] .", - unmarshall_hash_key, - unmarshall_ori_sort_key); + raw_rpc.request().hash_key, + raw_rpc.request().check_sort_key); continue; } } From c400ef7af5f2ae9406ff510d2d2d2701b64e9aab Mon Sep 17 00:00:00 2001 From: ninsmiracle <110282526+ninsmiracle@users.noreply.github.com> Date: Thu, 14 Mar 2024 11:09:36 +0800 Subject: [PATCH 17/23] Update src/common/duplication_common.cpp Co-authored-by: Yingchun Lai --- src/common/duplication_common.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index b5066ed346..1dc232fe5a 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -42,8 +42,8 @@ DSN_DEFINE_bool( duplication_unsafe_allow_non_idempotent, false, "Turn on the switch so that the cluster can accept non-idempotent writes and forward these " - "writes via duplication " - "Note that this switch may cause data inconsistency between clusters. So we say it is unsafe "); + "writes via duplication. Note that this switch may cause data inconsistency between " + "clusters. So we say it is unsafe."); DSN_TAG_VARIABLE(duplication_unsafe_allow_non_idempotent, FT_MUTABLE); namespace dsn { From 8dc378f71a83019acd96f368ef4d9d1f818fb139 Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Thu, 14 Mar 2024 11:24:08 +0800 Subject: [PATCH 18/23] move DSN_DECLARE in duplication common --- src/common/duplication_common.h | 3 --- src/replica/duplication/load_from_private_log.cpp | 2 ++ src/replica/duplication/mutation_batch.cpp | 2 ++ src/replica/replica_2pc.cpp | 1 + src/server/pegasus_mutation_duplicator.cpp | 5 ++++- src/server/pegasus_mutation_duplicator.h | 2 +- src/server/test/pegasus_mutation_duplicator_test.cpp | 2 ++ 7 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/common/duplication_common.h b/src/common/duplication_common.h index 702c2667ea..580fe792e1 100644 --- a/src/common/duplication_common.h +++ b/src/common/duplication_common.h @@ -30,9 +30,6 @@ #include "utils/flags.h" #include "utils/fmt_utils.h" -DSN_DECLARE_uint32(duplicate_log_batch_bytes); -DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent); - namespace dsn { namespace replication { diff --git a/src/replica/duplication/load_from_private_log.cpp b/src/replica/duplication/load_from_private_log.cpp index 16cdcb9239..851260ac82 100644 --- a/src/replica/duplication/load_from_private_log.cpp +++ b/src/replica/duplication/load_from_private_log.cpp @@ -57,6 +57,8 @@ METRIC_DEFINE_counter(replica, dsn::metric_unit::kMutations, "The number of mutations read from private log for dup"); +DSN_DECLARE_uint32(duplicate_log_batch_bytes); + namespace dsn { namespace replication { diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index 7ad22fdcd3..ac630fab1e 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -35,6 +35,8 @@ #include "utils/error_code.h" #include "utils/fmt_logging.h" +DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent); + METRIC_DEFINE_gauge_int64(replica, dup_recent_lost_mutations, dsn::metric_unit::kMutations, diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 997524c6ed..4056fff70e 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -105,6 +105,7 @@ DSN_DEFINE_uint64( DSN_DECLARE_int32(max_mutation_count_in_prepare_list); DSN_DECLARE_int32(staleness_for_commit); +DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent); namespace dsn { namespace replication { diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index ecf9112e68..56bc274994 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -65,6 +65,9 @@ METRIC_DEFINE_counter(replica, dsn::metric_unit::kRequests, "The number of retried non-idempotent DUPLICATE requests sent from client"); +DSN_DECLARE_uint32(duplicate_log_batch_bytes); +DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent); + namespace dsn { namespace replication { struct replica_base; @@ -239,7 +242,7 @@ void pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplica // there maybe more than one mutation in one dup rpc for (auto entry : rpc.request().entries) { // not a non idempotent request - if (!_non_idempotent_code.count(entry.task_code)) { + if (!_non_idempotent_codes.count(entry.task_code)) { continue; } diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index dedc301d37..34d75cb368 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -94,7 +94,7 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator size_t _total_shipped_size{0}; - const std::set _non_idempotent_code = { + const std::set _non_idempotent_codes = { dsn::apps::RPC_RRDB_RRDB_INCR, dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE}; diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp index 8855bb5f76..57e237051b 100644 --- a/src/server/test/pegasus_mutation_duplicator_test.cpp +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -47,6 +47,8 @@ #include "utils/blob.h" #include "utils/error_code.h" +DSN_DECLARE_uint32(duplicate_log_batch_bytes); + namespace pegasus { namespace server { From fcaabfebd1833f43251babf54e4d77925f90a5c2 Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Thu, 14 Mar 2024 11:31:47 +0800 Subject: [PATCH 19/23] move dup_unsafe_received_non_idempotent_duplicate_request discription to define --- src/server/pegasus_write_service.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index f02a169b8e..d46a48dbe3 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -124,7 +124,10 @@ METRIC_DEFINE_counter(replica, METRIC_DEFINE_counter(replica, dup_unsafe_received_non_idempotent_duplicate_request, dsn::metric_unit::kRequests, - "statistic the those no idempotent qps of DUPLICATE requests Force received"); + "receive non-idempotent request from master cluster via duplication when " + "FLAG_duplication_unsafe_allow_non_idempotent set as true." + "This metric greater than zero means that there is already the possibility " + "of inconsistency between clusters."); METRIC_DEFINE_percentile_int64(replica, dup_time_lag_ms, @@ -426,10 +429,7 @@ int pegasus_write_service::duplicate(int64_t decree, if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR || request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { - // receive non-idempotent request from master cluster via duplication when - // FLAG_duplication_unsafe_allow_non_idempotent set as true. - // This metric greater than zero means that there is already the possibility of - // inconsistency between clusters + METRIC_VAR_INCREMENT(dup_unsafe_received_non_idempotent_duplicate_request); if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { From cc853e77348c41652c2bcfae4215ba9f670b9374 Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Thu, 14 Mar 2024 14:18:20 +0800 Subject: [PATCH 20/23] pass IWYU --- src/common/duplication_common.cpp | 1 + src/common/duplication_common.h | 1 - src/replica/duplication/load_from_private_log.cpp | 2 +- src/replica/duplication/mutation_batch.cpp | 2 +- src/replica/replica_2pc.cpp | 1 - src/server/pegasus_mutation_duplicator.cpp | 3 ++- src/server/test/pegasus_mutation_duplicator_test.cpp | 2 +- 7 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index 1dc232fe5a..8378096c63 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -27,6 +27,7 @@ #include "nlohmann/json_fwd.hpp" #include "utils/config_api.h" #include "utils/error_code.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/singleton.h" #include "utils/time_utils.h" diff --git a/src/common/duplication_common.h b/src/common/duplication_common.h index 580fe792e1..a83000bcdd 100644 --- a/src/common/duplication_common.h +++ b/src/common/duplication_common.h @@ -27,7 +27,6 @@ #include "duplication_types.h" #include "runtime/rpc/rpc_holder.h" #include "utils/errors.h" -#include "utils/flags.h" #include "utils/fmt_utils.h" namespace dsn { diff --git a/src/replica/duplication/load_from_private_log.cpp b/src/replica/duplication/load_from_private_log.cpp index 851260ac82..0d120d458f 100644 --- a/src/replica/duplication/load_from_private_log.cpp +++ b/src/replica/duplication/load_from_private_log.cpp @@ -21,7 +21,6 @@ #include #include "absl/strings/string_view.h" -#include "common/duplication_common.h" #include "duplication_types.h" #include "load_from_private_log.h" #include "replica/duplication/mutation_batch.h" @@ -33,6 +32,7 @@ #include "utils/error_code.h" #include "utils/errors.h" #include "utils/fail_point.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/ports.h" diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index ac630fab1e..bba31283c1 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -22,7 +22,6 @@ #include #include "absl/strings/string_view.h" -#include "common/duplication_common.h" #include "common/replication.codes.h" #include "consensus_types.h" #include "metadata_types.h" @@ -33,6 +32,7 @@ #include "utils/autoref_ptr.h" #include "utils/blob.h" #include "utils/error_code.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent); diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 4056fff70e..965cdfb3f7 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -38,7 +38,6 @@ #include "bulk_load/replica_bulk_loader.h" #include "bulk_load_types.h" -#include "common/duplication_common.h" #include "common/fs_manager.h" #include "common/gpid.h" #include "common/replication.codes.h" diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 56bc274994..93a3940a29 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -36,6 +36,7 @@ #include "duplication_internal_types.h" #include "pegasus/client.h" #include "pegasus_key_schema.h" +#include "pegasus_rpc_types.h" #include "rrdb/rrdb.code.definition.h" #include "rrdb/rrdb_types.h" #include "runtime/message_utils.h" @@ -46,9 +47,9 @@ #include "utils/chrono_literals.h" #include "utils/error_code.h" #include "utils/errors.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/rand.h" -#include "pegasus_rpc_types.h" METRIC_DEFINE_counter(replica, dup_shipped_successful_requests, diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp index 57e237051b..bde2ccd4bd 100644 --- a/src/server/test/pegasus_mutation_duplicator_test.cpp +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -30,7 +30,6 @@ #include "backup_types.h" #include "base/pegasus_rpc_types.h" -#include "common/duplication_common.h" #include "common/gpid.h" #include "common/replication.codes.h" #include "duplication_internal_types.h" @@ -46,6 +45,7 @@ #include "server/pegasus_write_service.h" #include "utils/blob.h" #include "utils/error_code.h" +#include "utils/flags.h" DSN_DECLARE_uint32(duplicate_log_batch_bytes); From 5d357241926dbd9cf7fef15107dd917b9e8faef1 Mon Sep 17 00:00:00 2001 From: ninsmiracle <110282526+ninsmiracle@users.noreply.github.com> Date: Thu, 14 Mar 2024 14:23:16 +0800 Subject: [PATCH 21/23] Update src/server/pegasus_mutation_duplicator.cpp Co-authored-by: Yingchun Lai --- src/server/pegasus_mutation_duplicator.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 93a3940a29..bfea4a6d48 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -254,8 +254,7 @@ void pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplica incr_rpc raw_rpc(write); LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing " - "duplication," - "key is [{}]", + "duplication, key is '{}'", raw_rpc.request().key); continue; } From 76faaa73d37a3c8c5569196e09848a1b2e857a52 Mon Sep 17 00:00:00 2001 From: ninsmiracle <110282526+ninsmiracle@users.noreply.github.com> Date: Thu, 14 Mar 2024 14:23:26 +0800 Subject: [PATCH 22/23] Update src/server/pegasus_mutation_duplicator.cpp Co-authored-by: Yingchun Lai --- src/server/pegasus_mutation_duplicator.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index bfea4a6d48..07ae941736 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -276,8 +276,7 @@ void pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplica check_and_mutate_rpc raw_rpc(write); LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been " - "retried when doing duplication," - "hash key is [{}] , sort key is [{}] .", + "retried when doing duplication, hash key is '{}', sort key is '{}'.", raw_rpc.request().hash_key, raw_rpc.request().check_sort_key); continue; From 2f90f2e13a659b6570fc2ca4b23779930cdb3564 Mon Sep 17 00:00:00 2001 From: ninsmiracle <110282526+ninsmiracle@users.noreply.github.com> Date: Thu, 14 Mar 2024 14:23:55 +0800 Subject: [PATCH 23/23] Update src/server/pegasus_mutation_duplicator.cpp Co-authored-by: Yingchun Lai --- src/server/pegasus_mutation_duplicator.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 07ae941736..62b6ee2a41 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -263,9 +263,8 @@ void pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplica check_and_set_rpc raw_rpc(write); LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried " - "when doing duplication," - "hash key [{}], check sort key [{}]," - "set sort key [{}]", + "when doing duplication, hash key '{}', check sort key '{}', set sort " + "key '{}'", raw_rpc.request().hash_key, raw_rpc.request().check_sort_key, raw_rpc.request().set_sort_key);