Skip to content

Commit

Permalink
Add leadership expiration in case of leader failed to see response fr…
Browse files Browse the repository at this point in the history
…om a quorum (#20)

* add leadership expiration
  • Loading branch information
andy-yx-chen authored and datatechnology committed Apr 30, 2019
1 parent a90b092 commit a25a730
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
3 changes: 3 additions & 0 deletions include/basic_types.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ typedef uint16_t ushort;
typedef uint32_t uint;
typedef int32_t int32;

using time_point = std::chrono::high_resolution_clock::time_point;
using system_clock = std::chrono::high_resolution_clock;

#endif // _BASIC_TYPES_HXX_
11 changes: 11 additions & 0 deletions include/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace cornerstone {
max_hb_interval_(ctx.params_->max_hb_interval()),
next_log_idx_(0),
matched_idx_(0),
last_resp_(),
busy_flag_(false),
pending_commit_flag_(false),
hb_enabled_(false),
Expand Down Expand Up @@ -98,6 +99,15 @@ namespace cornerstone {
matched_idx_ = idx;
}

const time_point& get_last_resp() const {
return last_resp_;
}

template<typename T>
void set_last_resp(T&& value) {
last_resp_ = std::forward<T>(value);
}

void set_pending_commit() {
pending_commit_flag_.store(true);
}
Expand Down Expand Up @@ -141,6 +151,7 @@ namespace cornerstone {
int32 max_hb_interval_;
ulong next_log_idx_;
ulong matched_idx_;
time_point last_resp_;
std::atomic_bool busy_flag_;
std::atomic_bool pending_commit_flag_;
bool hb_enabled_;
Expand Down
33 changes: 27 additions & 6 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@ ptr<resp_msg> raft_server::handle_vote_req(req_msg& req) {
}

ptr<resp_msg> raft_server::handle_cli_req(req_msg& req) {
// optimization: check leader expiration
static volatile ulong time_elasped_since_quorum_resp(std::numeric_limits<ulong>::max());
if (role_ == srv_role::leader && peers_.size() > 0 && time_elasped_since_quorum_resp > ctx_->params_->election_timeout_upper_bound_ * 2) {
std::vector<time_point> peer_resp_times;
for (auto& peer : peers_) {
peer_resp_times.push_back(peer.second->get_last_resp());
}

std::sort(peer_resp_times.begin(), peer_resp_times.end());
time_elasped_since_quorum_resp = std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - peer_resp_times[peers_.size() / 2]).count();
if (time_elasped_since_quorum_resp > ctx_->params_->election_timeout_upper_bound_ * 2) {
return cs_new<resp_msg>(state_->get_term(), msg_type::append_entries_response, id_, -1);
}
}

ptr<resp_msg> resp (cs_new<resp_msg>(state_->get_term(), msg_type::append_entries_response, id_, leader_));
if (role_ != srv_role::leader) {
return resp;
Expand Down Expand Up @@ -324,6 +339,12 @@ void raft_server::handle_peer_resp(ptr<resp_msg>& resp, const ptr<rpc_exception>
return;
}

// update peer last response time
auto peer = peers_.find(resp->get_src());
if (peer != peers_.end()) {
peer->second->set_last_resp(system_clock::now());
}

l_->debug(
lstrfmt("Receive a %s message from peer %d with Result=%d, Term=%llu, NextIndex=%llu")
.fmt(__msg_type_str[resp->get_type()], resp->get_src(), resp->get_accepted() ? 1 : 0, resp->get_term(), resp->get_next_idx()));
Expand Down Expand Up @@ -371,16 +392,16 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
}

// try to commit with this response
// TODO: keep this to save a "new" operation for each response
std::unique_ptr<ulong> matched_indexes(new ulong[peers_.size() + 1]);
matched_indexes.get()[0] = log_store_->next_slot() - 1;
static std::vector<ulong> matched_indexes;
matched_indexes.clear();
matched_indexes.emplace_back(log_store_->next_slot() - 1);
int i = 1;
for (it = peers_.begin(); it != peers_.end(); ++it, ++i) {
matched_indexes.get()[i] = it->second->get_matched_idx();
matched_indexes.emplace_back(it->second->get_matched_idx());
}

std::sort(matched_indexes.get(), matched_indexes.get() + (peers_.size() + 1), std::greater<ulong>());
commit(matched_indexes.get()[(peers_.size() + 1) / 2]);
std::sort(matched_indexes.begin(), matched_indexes.end(), std::greater<ulong>());
commit(matched_indexes[(peers_.size() + 1) / 2]);
need_to_catchup = p->clear_pending_commit() || resp.get_next_idx() < log_store_->next_slot();
}
else {
Expand Down

0 comments on commit a25a730

Please sign in to comment.