Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft arbiter support. #379

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ void FSMCaller::do_committed(int64_t committed_index) {
continue;
}
Iterator iter(&iter_impl);
_fsm->on_apply(iter);
if (!_node->arbiter()) {
_fsm->on_apply(iter);
} else {
for (;iter.valid();iter.next()) {}
}
LOG_IF(ERROR, iter.valid())
<< "Node " << _node->node_id()
<< " Iterator is still valid, did you return before iterator "
Expand Down Expand Up @@ -353,7 +357,11 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) {
return;
}

_fsm->on_snapshot_save(writer, done);
if (!_node->arbiter()) {
_fsm->on_snapshot_save(writer, done);
} else {
done->Run();
}
return;
}

Expand Down Expand Up @@ -402,7 +410,10 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) {
return done->Run();
}

ret = _fsm->on_snapshot_load(reader);
if (!_node->arbiter()) {
ret = _fsm->on_snapshot_load(reader);
}

if (ret != 0) {
done->status().set_error(ret, "StateMachine on_snapshot_load failed");
done->Run();
Expand Down Expand Up @@ -454,12 +465,16 @@ int FSMCaller::on_leader_start(int64_t term, int64_t lease_epoch) {
}

void FSMCaller::do_leader_stop(const butil::Status& status) {
_fsm->on_leader_stop(status);
if (!_node->arbiter()) {
_fsm->on_leader_stop(status);
}
}

void FSMCaller::do_leader_start(const LeaderStartContext& leader_start_context) {
_node->leader_lease_start(leader_start_context.lease_epoch);
_fsm->on_leader_start(leader_start_context.term);
if (!_node->arbiter()) {
_fsm->on_leader_start(leader_start_context.term);
}
}

int FSMCaller::on_start_following(const LeaderChangeContext& start_following_context) {
Expand Down
4 changes: 3 additions & 1 deletion src/braft/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ LogManager::LogManager()
, _next_wait_id(0)
, _first_log_index(0)
, _last_log_index(0)
, _complete_index(std::numeric_limits<int64_t>::max())
{
CHECK_EQ(0, start_disk_thread());
}
Expand Down Expand Up @@ -276,7 +277,8 @@ int LogManager::truncate_prefix(const int64_t first_index_kept,
_last_log_index = first_index_kept - 1;
}
_config_manager->truncate_prefix(first_index_kept);
TruncatePrefixClosure* c = new TruncatePrefixClosure(first_index_kept);
TruncatePrefixClosure* c = new TruncatePrefixClosure(
std::min(first_index_kept, _complete_index.load(butil::memory_order_relaxed)));
const int rc = bthread::execution_queue_execute(_disk_queue, c);
lck.unlock();
for (size_t i = 0; i < saved_logs_in_memory.size(); ++i) {
Expand Down
5 changes: 5 additions & 0 deletions src/braft/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ class BAIDU_CACHELINE_ALIGNMENT LogManager {
// Get the internal status of LogManager.
void get_status(LogManagerStatus* status);

void set_complete_index(int64_t index) {
_complete_index.store(index, butil::memory_order_relaxed);
}

private:
friend class AppendBatcher;
struct WaitMeta {
Expand Down Expand Up @@ -218,6 +222,7 @@ friend class AppendBatcher;
int64_t _last_log_index;
// the last snapshot's log_id
LogId _last_snapshot_id;
butil::atomic<int64_t> _complete_index;
// the virtual first log, for finding next_index of replicator, which
// can avoid install_snapshot too often in extreme case where a follower's
// install_snapshot is slower than leader's save_snapshot
Expand Down
37 changes: 22 additions & 15 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ class ConfigurationChangeDone : public Closure {
if (_leader_start) {
_node->leader_lease_start(_lease_epoch);
_node->_options.fsm->on_leader_start(_term);
if (_node->arbiter()) {
// todo: handle errors
CHECK(!_node->transfer_leadership_to(ANY_PEER)) << "Arbiter " << _node->node_id()
<< " fail to transfer leader to others";
CHECK(!_node->is_leader()) << "Arbiter " << _node->node_id()
<< " is still leader after transfer_leadership_to ANY_PEER";
}
}
}
delete this;
Expand Down Expand Up @@ -671,6 +678,10 @@ int NodeImpl::execute_applying_tasks(
}

void NodeImpl::apply(const Task& task) {
if (arbiter()) {
task.done->status().set_error(EPERM, "Node is arbiter");
return run_closure_in_bthread(task.done);
}
LogEntry* entry = new LogEntry;
entry->AddRef();
entry->data.swap(*task.data);
Expand Down Expand Up @@ -1751,6 +1762,10 @@ void NodeImpl::request_peers_to_vote(const std::set<PeerId>& peers,
}
}

int64_t NodeImpl::complete_index() {
return _replicator_group.complete_index();
}

// in lock
void NodeImpl::step_down(const int64_t term, bool wakeup_a_candidate,
const butil::Status& status) {
Expand Down Expand Up @@ -1878,21 +1893,6 @@ void NodeImpl::check_step_down(const int64_t request_term, const PeerId& server_
}
}

class LeaderStartClosure : public Closure {
public:
LeaderStartClosure(StateMachine* fsm, int64_t term) : _fsm(fsm), _term(term) {}
~LeaderStartClosure() {}
void Run() {
if (status().ok()) {
_fsm->on_leader_start(_term);
}
delete this;
}
private:
StateMachine* _fsm;
int64_t _term;
};

// in lock
void NodeImpl::become_leader() {
CHECK(_state == STATE_CANDIDATE);
Expand Down Expand Up @@ -2483,6 +2483,9 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
_ballot_box->set_last_committed_index(
std::min(request->committed_index(),
prev_log_index));
if (arbiter()) {
_log_manager->set_complete_index(request->complete_index());
}
return;
}

Expand Down Expand Up @@ -3394,6 +3397,10 @@ bool NodeImpl::is_leader_lease_valid() {

void NodeImpl::get_leader_lease_status(LeaderLeaseStatus* lease_status) {
// Fast path for leader to lease check
if (arbiter()) {
lease_status->state = LEASE_EXPIRED;
return;
}
LeaderLease::LeaseInfo internal_info;
_leader_lease.get_lease_info(&internal_info);
switch (internal_info.state) {
Expand Down
3 changes: 3 additions & 0 deletions src/braft/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ friend class VoteBallotCtx;

bool disable_cli() const { return _options.disable_cli; }

bool arbiter() { return _options.arbiter;}
int64_t complete_index();

private:
friend class butil::RefCountedThreadSafe<NodeImpl>;

Expand Down
2 changes: 1 addition & 1 deletion src/braft/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ PeerId Node::leader_id() {
}

bool Node::is_leader() {
return _impl->is_leader();
return !_impl->arbiter() && _impl->is_leader();
}

bool Node::is_leader_lease_valid() {
Expand Down
8 changes: 8 additions & 0 deletions src/braft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,13 @@ struct NodeOptions {
// Default: false
bool disable_cli;

// If true, this node will not have a copy of data and only participates in elections
// from user's viewpoint this node will never become leader,
// on_apply/on_snapshot_save/on_snapshot_load/on_leader_start/on_leader_stop etc will not be called
// todo: avoid installing snapshot for arbiter
// Default: false
bool arbiter;

// Construct a default instance
NodeOptions();

Expand All @@ -609,6 +616,7 @@ inline NodeOptions::NodeOptions()
, snapshot_file_system_adaptor(NULL)
, snapshot_throttle(NULL)
, disable_cli(false)
, arbiter(false)
{}

inline int NodeOptions::get_catchup_timeout_ms() {
Expand Down
1 change: 1 addition & 0 deletions src/braft/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ message AppendEntriesRequest {
required int64 prev_log_index = 6;
repeated EntryMeta entries = 7;
required int64 committed_index = 8;
optional int64 complete_index = 9;
};

message AppendEntriesResponse {
Expand Down
14 changes: 14 additions & 0 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ void Replicator::_send_empty_entries(bool is_heartbeat) {
_heartbeat_counter++;
// set RPC timeout for heartbeat, how long should timeout be is waiting to be optimized.
cntl->set_timeout_ms(*_options.election_timeout_ms / 2);
request->set_complete_index(_options.node->complete_index());
} else {
_st.st = APPENDING_ENTRIES;
_st.first_log_index = _next_index;
Expand Down Expand Up @@ -758,6 +759,10 @@ void Replicator::_wait_more_entries() {
}

void Replicator::_install_snapshot() {
CHECK(!_options.node->arbiter()) << "node " << _options.group_id << ":" << _options.server_id
<< " refuse to send InstallSnapshotRequest to " << _options.peer_id
<< " because I am arbiter";

if (_reader) {
// follower's readonly mode change may cause two install_snapshot
// one possible case is:
Expand Down Expand Up @@ -1569,4 +1574,13 @@ bool ReplicatorGroup::readonly(const PeerId& peer) const {
return Replicator::readonly(rid);
}

int64_t ReplicatorGroup::complete_index() const {
int64_t rst = std::numeric_limits<int64_t>::max();
for (std::map<PeerId, ReplicatorIdAndStatus>::const_iterator
iter = _rmap.begin(); iter != _rmap.end(); ++iter) {
rst = std::min(rst, Replicator::get_next_index(iter->second.id));
}
return rst;
}

} // namespace braft
3 changes: 3 additions & 0 deletions src/braft/replicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ class ReplicatorGroup {
// Check if a replicator is in readonly
bool readonly(const PeerId& peer) const;

// all log index before `complete_index()` have been persisted by all peers
int64_t complete_index() const;

private:

int _add_replicator(const PeerId& peer, ReplicatorId *rid);
Expand Down