Skip to content

Commit

Permalink
Fix race on ASIO client during prevote and vote (#106)
Browse files Browse the repository at this point in the history
* Fix race on ASIO client during prevote and vote

* Both prevote and vote requests also should honor `busy_flag_` in
`peer`. Otherwise, race on the same client happens and it will result
in various memory corruptions.

* [Update PR] Support leave_cluster_request

* Remove server also should honor the busy flag.

* [Update PR] Comments

* [Update PR] Handle missing cases
  • Loading branch information
greensky00 committed Mar 4, 2020
1 parent c7db414 commit 661ec77
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 36 deletions.
7 changes: 0 additions & 7 deletions include/libnuraft/asio_service.hxx
Expand Up @@ -42,13 +42,6 @@ public:
using meta_cb_params = asio_service_meta_cb_params;
using options = asio_service_options;

enum log_level {
debug = 0x0,
info,
warnning,
error
};

asio_service(const options& _opt = options(),
ptr<logger> _l = nullptr);

Expand Down
16 changes: 16 additions & 0 deletions include/libnuraft/peer.hxx
Expand Up @@ -89,6 +89,8 @@ public:
, reconn_backoff_(0)
, suppress_following_error_(false)
, abandoned_(false)
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, l_(logger)
{
reset_ls_timer();
Expand Down Expand Up @@ -292,6 +294,14 @@ public:
return suppress_following_error_.compare_exchange_strong(exp, desired);
}

void set_rsv_msg(const ptr<req_msg>& m, const rpc_handler& h) {
rsv_msg_ = m;
rsv_msg_handler_ = h;
}

ptr<req_msg> get_rsv_msg() const { return rsv_msg_; }
rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; }

private:
void handle_rpc_result(ptr<peer> myself,
ptr<rpc_client> my_rpc_client,
Expand Down Expand Up @@ -410,6 +420,12 @@ private:
// All operations on this peer should be rejected.
std::atomic<bool> abandoned_;

// Reserved message that should be sent next time.
ptr<req_msg> rsv_msg_;

// Handler for reserved message.
rpc_handler rsv_msg_handler_;

// Logger instance.
ptr<logger> l_;
};
Expand Down
4 changes: 4 additions & 0 deletions include/libnuraft/rpc_cli.hxx
Expand Up @@ -26,6 +26,8 @@ limitations under the License.
#include "resp_msg.hxx"
#include "rpc_exception.hxx"

#include <cstdint>

namespace nuraft {

class resp_msg;
Expand All @@ -39,6 +41,8 @@ class rpc_client {

public:
virtual void send(ptr<req_msg>& req, rpc_handler& when_done) = 0;

virtual uint64_t get_id() const = 0;
};

}
Expand Down
36 changes: 36 additions & 0 deletions src/asio_service.cxx
Expand Up @@ -162,6 +162,7 @@ class asio_service_impl {

const asio_service::options& get_options() const { return my_opt_; }
asio::io_service& get_io_svc() { return io_svc_; }
uint64_t assign_client_id() { return client_id_counter_.fetch_add(1); }

private:
#ifndef SSL_LIBRARY_NOT_FOUND
Expand All @@ -186,6 +187,7 @@ class asio_service_impl {
std::atomic<uint32_t> worker_id_;
std::list< ptr<std::thread> > worker_handles_;
asio_service::options my_opt_;
std::atomic<uint64_t> client_id_counter_;
ptr<logger> l_;
friend asio_service;
};
Expand Down Expand Up @@ -741,8 +743,10 @@ class asio_rpc_client
, ssl_ready_(false)
, num_send_fails_(0)
, abandoned_(false)
, socket_busy_(false)
, l_(l)
{
client_id_ = impl_->assign_client_id();
if (ssl_enabled_) {
#ifdef SSL_LIBRARY_NOT_FOUND
assert(0); // Should not reach here.
Expand All @@ -769,6 +773,10 @@ class asio_rpc_client
}

public:
uint64_t get_id() const {
return client_id_;
}

#ifndef SSL_LIBRARY_NOT_FOUND
bool verify_certificate(bool preverified,
asio::ssl::verify_context& ctx)
Expand Down Expand Up @@ -926,6 +934,9 @@ class asio_rpc_client
return;
}

// Socket should be idle now. If not, it should be a bug.
set_busy_flag(true);

// If we reach here, that means connection is valid.
// Reset the counter.
num_send_fails_ = 0;
Expand Down Expand Up @@ -1010,6 +1021,24 @@ class asio_rpc_client
std::placeholders::_2 ) );
}
private:
void set_busy_flag(bool to) {
if (to == true) {
bool exp = false;
if (!socket_busy_.compare_exchange_strong(exp, true)) {
p_ft("socket is already in use, race happened on connection to %s:%s",
host_.c_str(), port_.c_str());
assert(0);
}
} else {
bool exp = true;
if (!socket_busy_.compare_exchange_strong(exp, false)) {
p_ft("socket is already idle, race happened on connection to %s:%s",
host_.c_str(), port_.c_str());
assert(0);
}
}
}

void close_socket() {
// Do nothing,
// early closing socket before destroying this instance
Expand Down Expand Up @@ -1214,6 +1243,7 @@ class asio_rpc_client
std::placeholders::_1,
std::placeholders::_2 ) );
} else {
set_busy_flag(false);
ptr<rpc_exception> except;
when_done(rsp, except);
}
Expand All @@ -1233,6 +1263,8 @@ class asio_rpc_client
// just use the buffer as it is for ctx.
ctx_buf->pos(0);
rsp->set_ctx(ctx_buf);

set_busy_flag(false);
ptr<rpc_exception> except;
when_done(rsp, except);
return;
Expand Down Expand Up @@ -1281,6 +1313,7 @@ class asio_rpc_client
rsp->set_ctx(actual_ctx);
}

set_busy_flag(false);
ptr<rpc_exception> except;
when_done(rsp, except);
}
Expand Down Expand Up @@ -1324,6 +1357,8 @@ class asio_rpc_client
std::atomic<bool> ssl_ready_;
std::atomic<size_t> num_send_fails_;
std::atomic<bool> abandoned_;
std::atomic<bool> socket_busy_;
uint64_t client_id_;
ptr<logger> l_;
};

Expand Down Expand Up @@ -1356,6 +1391,7 @@ asio_service_impl::asio_service_impl(const asio_service::options& _opt,
, num_active_workers_(0)
, worker_id_(0)
, my_opt_(_opt)
, client_id_counter_(1)
, l_(l)
{
if (my_opt_.enable_ssl_) {
Expand Down
28 changes: 25 additions & 3 deletions src/handle_append_entries.cxx
Expand Up @@ -121,8 +121,23 @@ bool raft_server::request_append_entries(ptr<peer> p) {

if (p->make_busy()) {
p_tr("send request to %d\n", (int)p->get_id());
ptr<req_msg> msg = create_append_entries_req(*p);

// If reserved message exists, process it first.
ptr<req_msg> msg = p->get_rsv_msg();
rpc_handler m_handler = p->get_rsv_msg_handler();
if (msg) {
// Clear the reserved message.
p->set_rsv_msg(nullptr, nullptr);
p_in("found reserved message to peer %d, type %d",
p->get_id(), msg->get_type());

} else {
// Normal message.
msg = create_append_entries_req(*p);
m_handler = resp_handler_;
}
if (!msg) {
// Even normal message doesn't exist.
p->set_free();
return true;
}
Expand All @@ -148,12 +163,14 @@ bool raft_server::request_append_entries(ptr<peer> p) {
p->reset_long_pause_warnings();

} else {
// FIXME: `manual_free` is deprecated, need to get rid of it.

// It means that this is not an actual recovery,
// but just temporarily freed busy flag.
p->reset_manual_free();
}

p->send_req(p, msg, resp_handler_);
p->send_req(p, msg, m_handler);
p->reset_ls_timer();

if ( srv_to_leave_ &&
Expand Down Expand Up @@ -771,7 +788,12 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
if ( write_paused_ &&
p->get_id() == next_leader_candidate_ &&
p_matched_idx &&
p_matched_idx == log_store_->next_slot() - 1 ) {
p_matched_idx == log_store_->next_slot() - 1 &&
p->make_busy() ) {
// NOTE:
// If `make_busy` fails (very unlikely to happen), next
// response handler (of heartbeat, append_entries ..) will
// retry this.
p_in("ready to resign, server id %d, "
"latest log index %zu, "
"%zu us elapsed, resign now",
Expand Down
9 changes: 7 additions & 2 deletions src/handle_join_leave.cxx
Expand Up @@ -392,14 +392,19 @@ ptr<resp_msg> raft_server::handle_rm_srv_req(req_msg& req) {
id_, srv_id, 0,
log_store_->next_slot() - 1,
quick_commit_index_.load() ) );
p->send_req(p, leave_req, ex_resp_handler_);
// WARNING:
// DO NOT reset HB counter to 0 as removing server
// may be requested multiple times, and anyway we should
// remove that server.
p->set_leave_flag();

p_in("sent leave request to peer %d", p->get_id());
if (p->make_busy()) {
p->send_req(p, leave_req, ex_resp_handler_);
p_in("sent leave request to peer %d", p->get_id());
} else {
p->set_rsv_msg(leave_req, ex_resp_handler_);
p_in("peer %d is currently busy, keep the message", p->get_id());
}

resp->accept(log_store_->next_slot());
return resp;
Expand Down
7 changes: 6 additions & 1 deletion src/handle_priority.cxx
Expand Up @@ -137,7 +137,12 @@ void raft_server::broadcast_priority_change(const int srv_id,
v.push_back(le);

ptr<peer> pp = it->second;
pp->send_req(pp, req, resp_handler_);
if (pp->make_busy()) {
pp->send_req(pp, req, resp_handler_);
} else {
p_er("peer %d is currently busy, cannot send request",
pp->get_id());
}
}
}

Expand Down
39 changes: 27 additions & 12 deletions src/handle_vote.cxx
Expand Up @@ -45,6 +45,7 @@ bool raft_server::check_cond_for_zp_election() {
}

void raft_server::request_prevote() {
ptr<raft_params> params = ctx_->get_params();
ptr<cluster_config> c_config = get_config();
for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
Expand All @@ -60,11 +61,24 @@ void raft_server::request_prevote() {
} else {
// Since second time: reset only if `rpc_` is null.
recreate = pp->need_to_reconnect();

// Or if it is not active long time, reconnect as well.
int32 last_active_time_ms = pp->get_active_timer_us() / 1000;
if ( last_active_time_ms >
params->heart_beat_interval_ * peer::RECONNECT_LIMIT ) {
p_wn( "connection to peer %d is not active long time: %zu ms, "
"need reconnection for prevote",
pp->get_id(),
last_active_time_ms );
recreate = true;
}
}

if (recreate) {
p_in("reset RPC client for peer %d", s_config->get_id());
pp->recreate_rpc(s_config, *ctx_);
pp->set_free();
pp->set_manual_free();
}
}
}
Expand Down Expand Up @@ -107,7 +121,12 @@ void raft_server::request_prevote() {
term_for_log(log_store_->next_slot() - 1),
log_store_->next_slot() - 1,
quick_commit_index_.load() ) );
pp->send_req(pp, req, resp_handler_);
if (pp->make_busy()) {
pp->send_req(pp, req, resp_handler_);
} else {
p_wn("failed to send prevote request: peer %d (%s) is busy",
pp->get_id(), pp->get_endpoint().c_str());
}
}
}

Expand Down Expand Up @@ -179,7 +198,12 @@ void raft_server::request_vote(bool ignore_priority) {
msg_type_to_string(req->get_type()).c_str(),
it->second->get_id(),
state_->get_term() );
pp->send_req(pp, req, resp_handler_);
if (pp->make_busy()) {
pp->send_req(pp, req, resp_handler_);
} else {
p_wn("failed to send vote request: peer %d (%s) is busy",
pp->get_id(), pp->get_endpoint().c_str());
}
}
}

Expand Down Expand Up @@ -291,18 +315,9 @@ void raft_server::handle_vote_resp(resp_msg& resp) {
}

ptr<resp_msg> raft_server::handle_prevote_req(req_msg& req) {
// Once we get a pre-vote request from a peer,
// it means that the peer has not received any messages or heartbeats,
// so that we should clear the busy flag of it.
ulong next_idx_for_resp = 0;
auto entry = peers_.find(req.get_src());
if (entry != peers_.end()) {
peer* pp = entry->second.get();
if (pp->is_busy()) {
p_in("busy_flag of peer %d was set, clear the flag.", req.get_src());
pp->set_free();
}
} else {
if (entry == peers_.end()) {
// This node already has been removed, set a special value.
next_idx_for_resp = std::numeric_limits<ulong>::max();
}
Expand Down

0 comments on commit 661ec77

Please sign in to comment.