Skip to content

Commit

Permalink
raft arbiter support.
Browse files Browse the repository at this point in the history
Signed-off-by: Yichao Li <liyichao.good@gmail.com>
  • Loading branch information
liyichao committed Dec 1, 2022
1 parent 351c74a commit edef4ce
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 22 deletions.
25 changes: 20 additions & 5 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ void FSMCaller::do_committed(int64_t committed_index) {
continue;
}
Iterator iter(&iter_impl);
_fsm->on_apply(iter);
if (!_node->arbiter()) {
_fsm->on_apply(iter);
} else {
for (;iter.valid();iter.next()) {}
}
LOG_IF(ERROR, iter.valid())
<< "Node " << _node->node_id()
<< " Iterator is still valid, did you return before iterator "
Expand Down Expand Up @@ -353,7 +357,11 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) {
return;
}

_fsm->on_snapshot_save(writer, done);
if (!_node->arbiter()) {
_fsm->on_snapshot_save(writer, done);
} else {
done->Run();
}
return;
}

Expand Down Expand Up @@ -402,7 +410,10 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) {
return done->Run();
}

ret = _fsm->on_snapshot_load(reader);
if (!_node->arbiter()) {
ret = _fsm->on_snapshot_load(reader);
}

if (ret != 0) {
done->status().set_error(ret, "StateMachine on_snapshot_load failed");
done->Run();
Expand Down Expand Up @@ -454,12 +465,16 @@ int FSMCaller::on_leader_start(int64_t term, int64_t lease_epoch) {
}

void FSMCaller::do_leader_stop(const butil::Status& status) {
_fsm->on_leader_stop(status);
if (!_node->arbiter()) {
_fsm->on_leader_stop(status);
}
}

void FSMCaller::do_leader_start(const LeaderStartContext& leader_start_context) {
_node->leader_lease_start(leader_start_context.lease_epoch);
_fsm->on_leader_start(leader_start_context.term);
if (!_node->arbiter()) {
_fsm->on_leader_start(leader_start_context.term);
}
}

int FSMCaller::on_start_following(const LeaderChangeContext& start_following_context) {
Expand Down
4 changes: 3 additions & 1 deletion src/braft/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ LogManager::LogManager()
, _next_wait_id(0)
, _first_log_index(0)
, _last_log_index(0)
, _complete_index(std::numeric_limits<int64_t>::max())
{
CHECK_EQ(0, start_disk_thread());
}
Expand Down Expand Up @@ -276,7 +277,8 @@ int LogManager::truncate_prefix(const int64_t first_index_kept,
_last_log_index = first_index_kept - 1;
}
_config_manager->truncate_prefix(first_index_kept);
TruncatePrefixClosure* c = new TruncatePrefixClosure(first_index_kept);
TruncatePrefixClosure* c = new TruncatePrefixClosure(
std::min(first_index_kept, _complete_index.load(butil::memory_order_relaxed)));
const int rc = bthread::execution_queue_execute(_disk_queue, c);
lck.unlock();
for (size_t i = 0; i < saved_logs_in_memory.size(); ++i) {
Expand Down
5 changes: 5 additions & 0 deletions src/braft/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ class BAIDU_CACHELINE_ALIGNMENT LogManager {
// Get the internal status of LogManager.
void get_status(LogManagerStatus* status);

void set_complete_index(int64_t index) {
_complete_index.store(index, butil::memory_order_relaxed);
}

private:
friend class AppendBatcher;
struct WaitMeta {
Expand Down Expand Up @@ -218,6 +222,7 @@ friend class AppendBatcher;
int64_t _last_log_index;
// the last snapshot's log_id
LogId _last_snapshot_id;
butil::atomic<int64_t> _complete_index;
// the virtual first log, for finding next_index of replicator, which
// can avoid install_snapshot too often in extreme case where a follower's
// install_snapshot is slower than leader's save_snapshot
Expand Down
37 changes: 22 additions & 15 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ class ConfigurationChangeDone : public Closure {
if (_leader_start) {
_node->leader_lease_start(_lease_epoch);
_node->_options.fsm->on_leader_start(_term);
if (_node->arbiter()) {
// todo: handle errors
CHECK(!_node->transfer_leadership_to(ANY_PEER)) << "Arbiter " << _node->node_id()
<< " fail to transfer leader to others";
CHECK(!_node->is_leader()) << "Arbiter " << _node->node_id()
<< " is still leader after transfer_leadership_to ANY_PEER";
}
}
}
delete this;
Expand Down Expand Up @@ -667,6 +674,10 @@ int NodeImpl::execute_applying_tasks(
}

void NodeImpl::apply(const Task& task) {
if (arbiter()) {
task.done->status().set_error(EPERM, "Node is arbiter");
return run_closure_in_bthread(task.done);
}
LogEntry* entry = new LogEntry;
entry->AddRef();
entry->data.swap(*task.data);
Expand Down Expand Up @@ -1745,6 +1756,10 @@ void NodeImpl::request_peers_to_vote(const std::set<PeerId>& peers,
}
}

int64_t NodeImpl::complete_index() {
return _replicator_group.complete_index();
}

// in lock
void NodeImpl::step_down(const int64_t term, bool wakeup_a_candidate,
const butil::Status& status) {
Expand Down Expand Up @@ -1872,21 +1887,6 @@ void NodeImpl::check_step_down(const int64_t request_term, const PeerId& server_
}
}

class LeaderStartClosure : public Closure {
public:
LeaderStartClosure(StateMachine* fsm, int64_t term) : _fsm(fsm), _term(term) {}
~LeaderStartClosure() {}
void Run() {
if (status().ok()) {
_fsm->on_leader_start(_term);
}
delete this;
}
private:
StateMachine* _fsm;
int64_t _term;
};

// in lock
void NodeImpl::become_leader() {
CHECK(_state == STATE_CANDIDATE);
Expand Down Expand Up @@ -2477,6 +2477,9 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
_ballot_box->set_last_committed_index(
std::min(request->committed_index(),
prev_log_index));
if (arbiter()) {
_log_manager->set_complete_index(request->complete_index());
}
return;
}

Expand Down Expand Up @@ -3388,6 +3391,10 @@ bool NodeImpl::is_leader_lease_valid() {

void NodeImpl::get_leader_lease_status(LeaderLeaseStatus* lease_status) {
// Fast path for leader to lease check
if (arbiter()) {
lease_status->state = LEASE_EXPIRED;
return;
}
LeaderLease::LeaseInfo internal_info;
_leader_lease.get_lease_info(&internal_info);
switch (internal_info.state) {
Expand Down
3 changes: 3 additions & 0 deletions src/braft/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ friend class VoteBallotCtx;

bool disable_cli() const { return _options.disable_cli; }

bool arbiter() { return _options.arbiter;}
int64_t complete_index();

private:
friend class butil::RefCountedThreadSafe<NodeImpl>;

Expand Down
2 changes: 1 addition & 1 deletion src/braft/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ PeerId Node::leader_id() {
}

bool Node::is_leader() {
return _impl->is_leader();
return !_impl->arbiter() && _impl->is_leader();
}

bool Node::is_leader_lease_valid() {
Expand Down
8 changes: 8 additions & 0 deletions src/braft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,13 @@ struct NodeOptions {
// Default: false
bool disable_cli;

// If true, this node will not have a copy of data and only participates in elections
// from user's viewpoint this node will never become leader,
// on_apply/on_snapshot_save/on_snapshot_load/on_leader_start/on_leader_stop etc will not be called
// todo: avoid installing snapshot for arbiter
// Default: false
bool arbiter;

// Construct a default instance
NodeOptions();

Expand All @@ -609,6 +616,7 @@ inline NodeOptions::NodeOptions()
, snapshot_file_system_adaptor(NULL)
, snapshot_throttle(NULL)
, disable_cli(false)
, arbiter(false)
{}

inline int NodeOptions::get_catchup_timeout_ms() {
Expand Down
1 change: 1 addition & 0 deletions src/braft/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ message AppendEntriesRequest {
required int64 prev_log_index = 6;
repeated EntryMeta entries = 7;
required int64 committed_index = 8;
optional int64 complete_index = 9;
};

message AppendEntriesResponse {
Expand Down
14 changes: 14 additions & 0 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ void Replicator::_send_empty_entries(bool is_heartbeat) {
_heartbeat_counter++;
// set RPC timeout for heartbeat, how long should timeout be is waiting to be optimized.
cntl->set_timeout_ms(*_options.election_timeout_ms / 2);
request->set_complete_index(_options.node->complete_index());
} else {
_st.st = APPENDING_ENTRIES;
_st.first_log_index = _next_index;
Expand Down Expand Up @@ -756,6 +757,10 @@ void Replicator::_wait_more_entries() {
}

void Replicator::_install_snapshot() {
CHECK(!_options.node->arbiter()) << "node " << _options.group_id << ":" << _options.server_id
<< " refuse to send InstallSnapshotRequest to " << _options.peer_id
<< " because I am arbiter";

if (_reader) {
// follower's readonly mode change may cause two install_snapshot
// one possible case is:
Expand Down Expand Up @@ -1567,4 +1572,13 @@ bool ReplicatorGroup::readonly(const PeerId& peer) const {
return Replicator::readonly(rid);
}

int64_t ReplicatorGroup::complete_index() const {
int64_t rst = std::numeric_limits<int64_t>::max();
for (std::map<PeerId, ReplicatorIdAndStatus>::const_iterator
iter = _rmap.begin(); iter != _rmap.end(); ++iter) {
rst = std::min(rst, Replicator::get_next_index(iter->second.id));
}
return rst;
}

} // namespace braft
3 changes: 3 additions & 0 deletions src/braft/replicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ class ReplicatorGroup {
// Check if a replicator is in readonly
bool readonly(const PeerId& peer) const;

// all log index before `complete_index()` have been persisted by all peers
int64_t complete_index() const;

private:

int _add_replicator(const PeerId& peer, ReplicatorId *rid);
Expand Down

0 comments on commit edef4ce

Please sign in to comment.