Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to force send non-idempotent write when doing duplication #1908

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e60dc0a
support_no_idempotent_dup
Feb 18, 2024
31a4271
format and IWYU
Feb 18, 2024
61b446f
add two matric and type key if dup non-idempotent write retried
Feb 26, 2024
0d1152d
deal with conflict
Feb 26, 2024
0db1a56
pull
Feb 27, 2024
cef197a
small fix
Feb 27, 2024
a4f510a
PASS IWYU
Feb 27, 2024
538090b
format code
Feb 27, 2024
c5e39ca
Update src/common/duplication_common.cpp
ninsmiracle Mar 4, 2024
11c013f
Merge branch 'apache:master' into support_no_idempotent_dup
ninsmiracle Mar 4, 2024
b521752
fix by github comment
Mar 4, 2024
0e19a01
format code
Mar 4, 2024
e827c78
pass IWYU
Mar 6, 2024
bb56598
pass IWYU2
Mar 7, 2024
3d0694f
Update src/server/pegasus_mutation_duplicator.cpp
ninsmiracle Mar 13, 2024
7833e8c
fix by comment and format
Mar 13, 2024
95953ac
Merge branch 'support_no_idempotent_dup' of github.com:ninsmiracle/in…
Mar 13, 2024
d8e26b1
make comment in code more clearly
Mar 14, 2024
3831f7a
make function name better
Mar 14, 2024
01fa7af
fix type_force_send_non_idempotent_if_need
Mar 14, 2024
c400ef7
Update src/common/duplication_common.cpp
ninsmiracle Mar 14, 2024
8dc378f
move DSN_DECLARE in duplication common
Mar 14, 2024
fcaabfe
move dup_unsafe_received_non_idempotent_duplicate_request discription…
Mar 14, 2024
cc853e7
pass IWYU
Mar 14, 2024
5d35724
Update src/server/pegasus_mutation_duplicator.cpp
ninsmiracle Mar 14, 2024
76faaa7
Update src/server/pegasus_mutation_duplicator.cpp
ninsmiracle Mar 14, 2024
2f90f2e
Update src/server/pegasus_mutation_duplicator.cpp
ninsmiracle Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/common/duplication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ DSN_DEFINE_uint32(replication,
"send mutation log batch bytes size per rpc");
DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);

DSN_DEFINE_bool("replication",
ninsmiracle marked this conversation as resolved.
Show resolved Hide resolved
force_send_no_idempotent_when_duplication,
false,
"receive client idempotent write requests and send them to backup cluster when "
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
"doing duplication");
DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE);

namespace dsn {
namespace replication {

Expand Down
1 change: 1 addition & 0 deletions src/common/duplication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "utils/fmt_utils.h"

DSN_DECLARE_uint32(duplicate_log_batch_bytes);
DSN_DECLARE_bool(force_send_no_idempotent_when_duplication);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's necessary . Cause once I remove this line , compilation can not be passed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DSN_DECLARE_xxx macros are recommended to move to cpp files where actually use it, don't make the header files to be too large.

The same to DSN_DECLARE_uint32(duplicate_log_batch_bytes);


namespace dsn {
namespace replication {
Expand Down
4 changes: 3 additions & 1 deletion src/replica/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vector>

#include "absl/strings/string_view.h"
#include "common/duplication_common.h"
#include "common/replication.codes.h"
#include "consensus_types.h"
#include "metadata_types.h"
Expand Down Expand Up @@ -173,7 +174,8 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree
// ERR_OPERATION_DISABLED, but there could still be a mutation written
// before the duplication was added.
// To ignore means this write will be lost, which is acceptable under this rare case.
if (!task_spec::get(update.code)->rpc_request_is_write_idempotent) {
if (!task_spec::get(update.code)->rpc_request_is_write_idempotent &&
!FLAGS_force_send_no_idempotent_when_duplication) {
continue;
}
blob bb;
Expand Down
1 change: 1 addition & 0 deletions src/replica/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class binary_reader;
class binary_writer;
class blob;
class gpid;

namespace utils {
class latency_tracer;
} // namespace utils
Expand Down
5 changes: 3 additions & 2 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

#include "bulk_load/replica_bulk_loader.h"
#include "bulk_load_types.h"
#include "common/duplication_common.h"
#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
Expand Down Expand Up @@ -82,7 +83,6 @@ DSN_DEFINE_bool(replication,
true,
"reject client write requests if disk status is space insufficient");
DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE);

DSN_DEFINE_int32(replication,
prepare_timeout_ms_for_secondaries,
3000,
Expand Down Expand Up @@ -155,7 +155,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) {
if (is_duplication_master() && !spec->rpc_request_is_write_idempotent &&
!FLAGS_force_send_no_idempotent_when_duplication) {
// Ignore non-idempotent write, because duplication provides no guarantee of atomicity to
// make this write produce the same result on multiple clusters.
METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests);
Expand Down
2 changes: 2 additions & 0 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,11 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s
INIT_COUNTER(incr_qps);
INIT_COUNTER(check_and_set_qps);
INIT_COUNTER(check_and_mutate_qps);
INIT_COUNTER(force_receive_no_idempotent_duplicate_qps);
INIT_COUNTER(scan_qps);
INIT_COUNTER(duplicate_qps);
INIT_COUNTER(dup_shipped_ops);
INIT_COUNTER(dup_retry_no_idempotent_duplicate_qps);
INIT_COUNTER(dup_failed_shipping_ops);
INIT_COUNTER(dup_recent_mutation_loss_count);
INIT_COUNTER(recent_read_cu);
Expand Down
6 changes: 6 additions & 0 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ class info_collector
incr_qps->set(row_stats.incr_qps);
check_and_set_qps->set(row_stats.check_and_set_qps);
check_and_mutate_qps->set(row_stats.check_and_mutate_qps);
force_receive_no_idempotent_duplicate_qps->set(
row_stats.force_receive_no_idempotent_duplicate_qps);
scan_qps->set(row_stats.scan_qps);
duplicate_qps->set(row_stats.duplicate_qps);
dup_shipped_ops->set(row_stats.dup_shipped_ops);
dup_failed_shipping_ops->set(row_stats.dup_failed_shipping_ops);
dup_retry_no_idempotent_duplicate_qps->set(
row_stats.dup_retry_no_idempotent_duplicate_qps);
dup_recent_mutation_loss_count->set(row_stats.dup_recent_mutation_loss_count);
recent_read_cu->set(row_stats.recent_read_cu);
recent_write_cu->set(row_stats.recent_write_cu);
Expand Down Expand Up @@ -144,10 +148,12 @@ class info_collector
::dsn::perf_counter_wrapper incr_qps;
::dsn::perf_counter_wrapper check_and_set_qps;
::dsn::perf_counter_wrapper check_and_mutate_qps;
::dsn::perf_counter_wrapper force_receive_no_idempotent_duplicate_qps;
::dsn::perf_counter_wrapper scan_qps;
::dsn::perf_counter_wrapper duplicate_qps;
::dsn::perf_counter_wrapper dup_shipped_ops;
::dsn::perf_counter_wrapper dup_failed_shipping_ops;
::dsn::perf_counter_wrapper dup_retry_no_idempotent_duplicate_qps;
::dsn::perf_counter_wrapper dup_recent_mutation_loss_count;
::dsn::perf_counter_wrapper recent_read_cu;
::dsn::perf_counter_wrapper recent_write_cu;
Expand Down
94 changes: 94 additions & 0 deletions src/server/pegasus_mutation_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "utils/errors.h"
#include "utils/fmt_logging.h"
#include "utils/rand.h"
#include "pegasus_rpc_types.h"

METRIC_DEFINE_counter(replica,
dup_shipped_successful_requests,
Expand All @@ -59,6 +60,11 @@ METRIC_DEFINE_counter(replica,
dsn::metric_unit::kRequests,
"The number of failed DUPLICATE requests sent from client");

METRIC_DEFINE_counter(replica,
dup_retry_no_idempotent_duplicate_qps,
dsn::metric_unit::kRequests,
"The qps of Non-idempotent write when doing DUPLICATE which is Retried");
ninsmiracle marked this conversation as resolved.
Show resolved Hide resolved

namespace dsn {
namespace replication {
struct replica_base;
Expand Down Expand Up @@ -101,6 +107,22 @@ using namespace dsn::literals::chrono_literals;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_INCR) {
dsn::apps::incr_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
dsn::apps::check_and_set_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
dsn::apps::check_and_mutate_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}

LOG_FATAL("unexpected task code: {}", tc);
__builtin_unreachable();
}
Expand Down Expand Up @@ -189,6 +211,9 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
// retry this rpc
_inflights[hash].push_front(rpc);
_env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the non-idempotent request be retried twice here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,it will. So I record specific raw_key into log.


type_force_send_no_idempotent_if_need(rpc);

return;
}
if (_inflights[hash].empty()) {
Expand All @@ -205,6 +230,75 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
}
}

void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicate_rpc &rpc)
{

// there maybe more than one mutation in one dup rpc
if (FLAGS_force_send_no_idempotent_when_duplication) {
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
for (auto entry : rpc.request().entries) {
if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {

acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
METRIC_VAR_INCREMENT(dup_retry_no_idempotent_duplicate_qps);

dsn::message_ex *write =
dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message);

if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
incr_rpc raw_rpc(write);
absl::string_view unmarshall_key(raw_rpc.request().key.data(),
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
raw_rpc.request().key.length());

LOG_DEBUG(
"Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing "
"duplication,"
"key is [{}]",
unmarshall_key);
continue;
}

if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
check_and_set_rpc raw_rpc(write);
absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(),
raw_rpc.request().hash_key.length());
absl::string_view unmarshall_ori_sort_key(
raw_rpc.request().check_sort_key.data(),
raw_rpc.request().check_sort_key.length());
absl::string_view unmarshall_set_sort_key(
raw_rpc.request().set_sort_key.data(),
raw_rpc.request().set_sort_key.length());

LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried "
"when doing duplication,"
"hash key [{}], check sort key [{}],"
"set sort key [{}]",
unmarshall_hash_key,
unmarshall_ori_sort_key,
unmarshall_set_sort_key);
continue;
}

if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
check_and_mutate_rpc raw_rpc(write);
absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(),
raw_rpc.request().hash_key.length());
absl::string_view unmarshall_ori_sort_key(
raw_rpc.request().check_sort_key.data(),
raw_rpc.request().check_sort_key.length());

LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been "
"retried when doing duplication,"
"hash key is [{}] , sort key is [{}] .",
unmarshall_hash_key,
unmarshall_ori_sort_key);
continue;
}
}
}
}
}

void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb)
{
_total_shipped_size = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/server/pegasus_mutation_duplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator

void on_duplicate_reply(uint64_t hash, callback, duplicate_rpc, dsn::error_code err);

void type_force_send_no_idempotent_if_need(duplicate_rpc &rpc);

private:
friend class pegasus_mutation_duplicator_test;

Expand All @@ -91,6 +93,7 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator

METRIC_VAR_DECLARE_counter(dup_shipped_successful_requests);
METRIC_VAR_DECLARE_counter(dup_shipped_failed_requests);
METRIC_VAR_DECLARE_counter(dup_retry_no_idempotent_duplicate_qps);
};

// Decodes the binary `request_data` into write request in thrift struct, and
Expand Down
40 changes: 40 additions & 0 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ METRIC_DEFINE_counter(replica,
dsn::metric_unit::kRequests,
"The number of DUPLICATE requests");

METRIC_DEFINE_counter(replica,
force_receive_no_idempotent_duplicate_qps,
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
dsn::metric_unit::kRequests,
"statistic the those no idempotent qps of DUPLICATE requests Force received");

METRIC_DEFINE_percentile_int64(replica,
dup_time_lag_ms,
dsn::metric_unit::kMilliSeconds,
Expand Down Expand Up @@ -168,6 +173,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
METRIC_VAR_INIT_replica(check_and_set_latency_ns),
METRIC_VAR_INIT_replica(check_and_mutate_latency_ns),
METRIC_VAR_INIT_replica(dup_requests),
METRIC_VAR_INIT_replica(force_receive_no_idempotent_duplicate_qps),
METRIC_VAR_INIT_replica(dup_time_lag_ms),
METRIC_VAR_INIT_replica(dup_lagging_writes),
_put_batch_size(0),
Expand Down Expand Up @@ -415,6 +421,40 @@ int pegasus_write_service::duplicate(int64_t decree,
}
continue;
}

if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
// receive no idempotent request from master cluster via duplication
METRIC_VAR_INCREMENT(force_receive_no_idempotent_duplicate_qps);

if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
incr_rpc rpc(write);
resp.__set_error(_impl->incr(ctx.decree, rpc.request(), rpc.response()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments to describe what's you aim here.

if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
check_and_set_rpc rpc(write);
resp.__set_error(_impl->check_and_set(ctx.decree, rpc.request(), rpc.response()));
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
check_and_mutate_rpc rpc(write);
resp.__set_error(
_impl->check_and_mutate(ctx.decree, rpc.request(), rpc.response()));
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
}

resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code));
return empty_put(ctx.decree);
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ class pegasus_write_service : dsn::replication::replica_base
METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns);

METRIC_VAR_DECLARE_counter(dup_requests);
METRIC_VAR_DECLARE_counter(force_receive_no_idempotent_duplicate_qps);
METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
METRIC_VAR_DECLARE_counter(dup_lagging_writes);

Expand Down
8 changes: 8 additions & 0 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1134,9 +1134,11 @@ struct row_data
incr_qps += row.incr_qps;
check_and_set_qps += row.check_and_set_qps;
check_and_mutate_qps += row.check_and_mutate_qps;
force_receive_no_idempotent_duplicate_qps += row.force_receive_no_idempotent_duplicate_qps;
scan_qps += row.scan_qps;
duplicate_qps += row.duplicate_qps;
dup_shipped_ops += row.dup_shipped_ops;
dup_retry_no_idempotent_duplicate_qps += row.dup_retry_no_idempotent_duplicate_qps;
dup_failed_shipping_ops += row.dup_failed_shipping_ops;
dup_recent_mutation_loss_count += row.dup_recent_mutation_loss_count;
recent_read_cu += row.recent_read_cu;
Expand Down Expand Up @@ -1201,9 +1203,11 @@ struct row_data
double incr_qps = 0;
double check_and_set_qps = 0;
double check_and_mutate_qps = 0;
double force_receive_no_idempotent_duplicate_qps = 0;
double scan_qps = 0;
double duplicate_qps = 0;
double dup_shipped_ops = 0;
double dup_retry_no_idempotent_duplicate_qps = 0;
double dup_failed_shipping_ops = 0;
double dup_recent_mutation_loss_count = 0;
double recent_read_cu = 0;
Expand Down Expand Up @@ -1439,12 +1443,16 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
row.check_and_set_qps += value;
else if (counter_name == "check_and_mutate_qps")
row.check_and_mutate_qps += value;
else if (counter_name == "force_receive_no_idempotent_duplicate_qps")
row.force_receive_no_idempotent_duplicate_qps += value;
else if (counter_name == "scan_qps")
row.scan_qps += value;
else if (counter_name == "duplicate_qps")
row.duplicate_qps += value;
else if (counter_name == "dup_shipped_ops")
row.dup_shipped_ops += value;
else if (counter_name == "dup_retry_no_idempotent_duplicate_qps")
row.dup_retry_no_idempotent_duplicate_qps += value;
else if (counter_name == "dup_failed_shipping_ops")
row.dup_failed_shipping_ops += value;
else if (counter_name == "dup_recent_mutation_loss_count")
Expand Down
4 changes: 4 additions & 0 deletions src/shell/commands/table_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args)
sum.incr_qps += row.incr_qps;
sum.check_and_set_qps += row.check_and_set_qps;
sum.check_and_mutate_qps += row.check_and_mutate_qps;
sum.force_receive_no_idempotent_duplicate_qps +=
row.force_receive_no_idempotent_duplicate_qps;

sum.scan_qps += row.scan_qps;
sum.recent_read_cu += row.recent_read_cu;
sum.recent_write_cu += row.recent_write_cu;
Expand Down Expand Up @@ -644,6 +647,7 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args)
tp.append_data(row.incr_qps);
tp.append_data(row.check_and_set_qps);
tp.append_data(row.check_and_mutate_qps);
tp.append_data(row.force_receive_no_idempotent_duplicate_qps);
tp.append_data(row.scan_qps);
tp.append_data(row.recent_read_cu);
tp.append_data(row.recent_write_cu);
Expand Down