From 0ee6c122126772d6b9fda4002059190abc418777 Mon Sep 17 00:00:00 2001 From: Li Wang Date: Mon, 23 Aug 2021 11:02:11 +0800 Subject: [PATCH] raft arbiter support Signed-off-by: Li Wang --- src/braft/fsm_caller.cpp | 17 +++++++++++++--- src/braft/node.cpp | 13 +++++++++++- src/braft/node.h | 2 ++ src/braft/raft.h | 3 +++ src/braft/snapshot.cpp | 36 +++++++++++++++++---------------- src/braft/snapshot_executor.cpp | 3 +++ src/braft/storage.h | 1 + 7 files changed, 54 insertions(+), 21 deletions(-) diff --git a/src/braft/fsm_caller.cpp b/src/braft/fsm_caller.cpp index 98913eea..1f9c2604 100644 --- a/src/braft/fsm_caller.cpp +++ b/src/braft/fsm_caller.cpp @@ -298,7 +298,12 @@ void FSMCaller::do_committed(int64_t committed_index) { continue; } Iterator iter(&iter_impl); - _fsm->on_apply(iter); + if (!_node->arbiter()) { + _fsm->on_apply(iter); + } else { + for (;iter.valid();iter.next()) { + } + } LOG_IF(ERROR, iter.valid()) << "Node " << _node->node_id() << " Iterator is still valid, did you return before iterator " @@ -353,7 +358,11 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) { return; } - _fsm->on_snapshot_save(writer, done); + if (!_node->arbiter()) { + _fsm->on_snapshot_save(writer, done); + } else { + done->Run(); + } return; } @@ -402,7 +411,9 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) { return done->Run(); } - ret = _fsm->on_snapshot_load(reader); + if (!_node->arbiter()) { + ret = _fsm->on_snapshot_load(reader); + } if (ret != 0) { done->status().set_error(ret, "StateMachine on_snapshot_load failed"); done->Run(); diff --git a/src/braft/node.cpp b/src/braft/node.cpp index 4483f686..f3ad8c97 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -1039,6 +1039,11 @@ void NodeImpl::handle_election_timeout() { return; } + + if (arbiter()) { + return; + } + bool triggered = _vote_triggered; _vote_triggered = false; @@ -1086,6 +1091,12 @@ void NodeImpl::handle_timeout_now_request(brpc::Controller* controller, << state2str(saved_state) << " at term=" << saved_term; return; } + if (arbiter()) { + response->set_term(_current_term); + response->set_success(false); + lck.unlock(); + return; + } const butil::EndPoint remote_side = controller->remote_side(); const int64_t saved_term = _current_term; if (FLAGS_raft_enable_leader_lease) { @@ -2698,7 +2709,7 @@ void NodeImpl::describe(std::ostream& os, bool use_html) { lck.unlock(); const char *newline = use_html ? "
" : "\r\n"; os << "peer_id: " << _server_id << newline; - os << "state: " << state2str(st) << newline; + os << "state: " << state2str(st) << (arbiter() ? ", ARBITER" : "") << newline; os << "readonly: " << readonly << newline; os << "term: " << term << newline; os << "conf_index: " << conf_index << newline; diff --git a/src/braft/node.h b/src/braft/node.h index d8565f39..2565d664 100644 --- a/src/braft/node.h +++ b/src/braft/node.h @@ -241,6 +241,8 @@ friend class VoteBallotCtx; bool disable_cli() const { return _options.disable_cli; } + bool arbiter() const { return _options.arbiter; } + private: friend class butil::RefCountedThreadSafe; diff --git a/src/braft/raft.h b/src/braft/raft.h index d696101f..ee8d4027 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -588,6 +588,8 @@ struct NodeOptions { // Default: false bool disable_cli; + bool arbiter; + // Construct a default instance NodeOptions(); @@ -609,6 +611,7 @@ inline NodeOptions::NodeOptions() , snapshot_file_system_adaptor(NULL) , snapshot_throttle(NULL) , disable_cli(false) + , arbiter(false) {} inline int NodeOptions::get_catchup_timeout_ms() { diff --git a/src/braft/snapshot.cpp b/src/braft/snapshot.cpp index 8bf7d173..8d91649a 100644 --- a/src/braft/snapshot.cpp +++ b/src/braft/snapshot.cpp @@ -976,23 +976,25 @@ void LocalSnapshotCopier::copy_file(const std::string& filename) { set_error(ECANCELED, "%s", berror(ECANCELED)); return; } - scoped_refptr session - = _copier.start_to_copy_to_file(filename, file_path, NULL); - if (session == NULL) { - LOG(WARNING) << "Fail to copy " << filename - << " path: " << _writer->get_path(); - set_error(-1, "Fail to copy %s", filename.c_str()); - return; - } - _cur_session = session.get(); - lck.unlock(); - session->join(); - lck.lock(); - _cur_session = NULL; - lck.unlock(); - if (!session->status().ok()) { - set_error(session->status().error_code(), session->status().error_cstr()); - return; + if (!_storage->dummy) { + scoped_refptr session + = _copier.start_to_copy_to_file(filename, file_path, NULL); + if (session == NULL) { + LOG(WARNING) << "Fail to copy " << filename + << " path: " << _writer->get_path(); + set_error(-1, "Fail to copy %s", filename.c_str()); + return; + } + _cur_session = session.get(); + lck.unlock(); + session->join(); + lck.lock(); + _cur_session = NULL; + lck.unlock(); + if (!session->status().ok()) { + set_error(session->status().error_code(), session->status().error_cstr()); + return; + } } if (_writer->add_file(filename, &meta) != 0) { set_error(EIO, "Fail to add file to writer"); diff --git a/src/braft/snapshot_executor.cpp b/src/braft/snapshot_executor.cpp index 64403013..8567a9df 100644 --- a/src/braft/snapshot_executor.cpp +++ b/src/braft/snapshot_executor.cpp @@ -364,6 +364,9 @@ int SnapshotExecutor::init(const SnapshotExecutorOptions& options) { _snapshot_throttle = options.snapshot_throttle; _snapshot_storage->set_snapshot_throttle(options.snapshot_throttle); } + if (_node->arbiter()) { + _snapshot_storage->dummy = true; + } if (_snapshot_storage->init() != 0) { LOG(ERROR) << "node " << _node->node_id() << " fail to init snapshot storage, uri " << options.uri; diff --git a/src/braft/storage.h b/src/braft/storage.h index c5c53546..8b1593ff 100644 --- a/src/braft/storage.h +++ b/src/braft/storage.h @@ -354,6 +354,7 @@ class SnapshotStorage { } static butil::Status destroy(const std::string& uri); + bool dummy = false; }; inline brpc::Extension* log_storage_extension() {