Skip to content

Commit

Permalink
Merge pull request #398 from lintanghui/master
Browse files Browse the repository at this point in the history
添加witness副本支持
  • Loading branch information
PFZheng committed Jul 20, 2023
2 parents e32b78a + d260d29 commit 59c40e5
Show file tree
Hide file tree
Showing 15 changed files with 404 additions and 47 deletions.
27 changes: 27 additions & 0 deletions docs/cn/witness.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
witness 副本只作为仲裁者进行投票,不保存实际的业务数据。
## 实现方案
对于witness的实现,需要考虑部署方式。 对于2+1部署,如果不允许witness当选选主,那么当主节点异常宕机的时候,如果wintess拥有比另外一个副本更新的entry,那么会导致选主失败,为了提高可用性,需要考虑允许witness短时间内允许当选为主,wintess成为主以后再主动transfer leader给另一个副本。**通过允许witness临时成为主可以提高系统的可用性**

对于4+1 的部署方式,实现相对简单,只需要让witness不能当选为主即可,因为即便主节点故障,依然至少有一个副本拥有最新的entry从而可以当选为主。由于witness不能当选为主,因此在同步raft log的时候也可以不需要同步log data给witness。当4+1部署的时候,如果不允许witness当选为主,那么最多只能容忍一个节点故障,如果允许witness临时当选为主,那么可以容忍两个节点故障。允许witness当选为主时,实现
则与2+1部署一致。

## 详细实现
### witness不允许当选为主
当witness不允许当选为主时,只需要在初始化Node的时候禁止election_timeout timer进行初始化即可,同时可以无需进行data复制。

### witness允许临时当选为主
允许witness当选为主可以提升服务的可用性。具体实现为:
* 设置raft_enable_witness_to_leader flag为true,允许witness临时选举为主
* election_timeout设置为正常节点的两倍,在主节点异常宕机的时候,允许witness发起选主,同时由于election_timeout比数据副本大,可以保证数据副本优先被选为主,只有数据副本选主失败时,witness才会主动发起选主。
* witness当选为主时,禁止安装快照请求,避免从节点获取到空快照覆盖原有的业务数据
* 新增witness副本时, witness向leader发送install sanpshot请求,如果replicator本身是witness,则无需进行data文件的复制,只需复制最新的entry即可。

## witness 使用注意事项
* 如果不允许witness当选为主时,相比原有raft方式部署,服务可用性会明显降低
* 当允许witness临时当选为主时,极端情况下,可能导致从节点无法获取到最新的log entry从而导致数据丢失。
例如:
```
2+1的时候,日志为 [1, 8],某一时刻 replica1(leader) [1, 8] replica2 [1, 5] witness[4,8]。witness snapshot save,truncate 数据到 [7,8]。replica1(leader) 挂了,replica2 [1, 5] 和 witness 的数据接不上了, 此时会导致日志丢失。
```
用户在使用witness的时候,需要评估witness带来的可用性降低以及可能丢失部分最新数据的风险。
如果业务无法接受数据丢失,可以自定义实现LogStorage, 只有半数以上副本拥有entry时,witness才能truncate 该entry之前的log。
37 changes: 29 additions & 8 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,53 @@ typedef std::string GroupId;
// GroupId with version, format: {group_id}_{index}
typedef std::string VersionedGroupId;

enum Role {
REPLICA = 0,
WITNESS = 1,
};

// Represent a participant in a replicating group.
struct PeerId {
butil::EndPoint addr; // ip+port.
int idx; // idx in same addr, default 0
Role role = REPLICA;

PeerId() : idx(0), role(REPLICA) {}
explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0), role(REPLICA) {}
PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_), role(REPLICA) {}
PeerId(butil::EndPoint addr_, int idx_, bool witness) : addr(addr_), idx(idx_) {
if (witness) {
this->role = WITNESS;
}
}

PeerId() : idx(0) {}
explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0) {}
PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_) {}
/*intended implicit*/PeerId(const std::string& str)
{ CHECK_EQ(0, parse(str)); }
PeerId(const PeerId& id) : addr(id.addr), idx(id.idx) {}
PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role) {}

void reset() {
addr.ip = butil::IP_ANY;
addr.port = 0;
idx = 0;
role = REPLICA;
}

bool is_empty() const {
return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0);
}

bool is_witness() const {
return role == WITNESS;
}
int parse(const std::string& str) {
reset();
char ip_str[64];
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d", ip_str, &addr.port, &idx)) {
int value = REPLICA;
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str, &addr.port, &idx, &value)) {
reset();
return -1;
}
role = (Role)value;
if (role > WITNESS) {
reset();
return -1;
}
Expand All @@ -72,7 +93,7 @@ struct PeerId {

std::string to_string() const {
char str[128];
snprintf(str, sizeof(str), "%s:%d", butil::endpoint2str(addr).c_str(), idx);
snprintf(str, sizeof(str), "%s:%d:%d", butil::endpoint2str(addr).c_str(), idx, int(role));
return std::string(str);
}

Expand All @@ -96,7 +117,7 @@ inline bool operator!=(const PeerId& id1, const PeerId& id2) {
}

inline std::ostream& operator << (std::ostream& os, const PeerId& id) {
return os << id.addr << ':' << id.idx;
return os << id.addr << ':' << id.idx << ':' << int(id.role);
}

struct NodeId {
Expand Down
54 changes: 45 additions & 9 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ BRPC_VALIDATE_GFLAG(raft_rpc_channel_connect_timeout_ms, brpc::PositiveInteger);

DECLARE_bool(raft_enable_leader_lease);

DEFINE_bool(raft_enable_witness_to_leader, false,
"enable witness temporarily to become leader when leader down accidently");

#ifndef UNIT_TEST
static bvar::Adder<int64_t> g_num_nodes("raft_node_count");
#else
Expand Down Expand Up @@ -253,6 +256,10 @@ int NodeImpl::init_snapshot_storage() {
opt.init_term = _current_term;
opt.filter_before_copy_remote = _options.filter_before_copy_remote;
opt.usercode_in_pthread = _options.usercode_in_pthread;
// not need to copy data file when it is witness.
if (_options.witness) {
opt.copy_file = false;
}
if (_options.snapshot_file_system_adaptor) {
opt.file_system_adaptor = *_options.snapshot_file_system_adaptor;
}
Expand Down Expand Up @@ -498,9 +505,18 @@ int NodeImpl::init(const NodeOptions& options) {
<< ", did you forget to call braft::add_service()?";
return -1;
}

CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms));
CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms));
if (options.witness) {
// When this node is a witness, set the election_timeout to be twice
// of the normal replica to ensure that the normal replica has a higher
// priority and is selected as the master
if (FLAGS_raft_enable_witness_to_leader) {
CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms * 2));
CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms * 2 + options.max_clock_drift_ms));
}
} else {
CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms));
CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms));
}
CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms));
CHECK_EQ(0, _snapshot_timer.init(this, options.snapshot_interval_s * 1000));

Expand All @@ -524,7 +540,11 @@ int NodeImpl::init(const NodeOptions& options) {
_fsm_caller = new FSMCaller();

_leader_lease.init(options.election_timeout_ms);
_follower_lease.init(options.election_timeout_ms, options.max_clock_drift_ms);
if (options.witness) {
_follower_lease.init(options.election_timeout_ms * 2, options.max_clock_drift_ms);
} else {
_follower_lease.init(options.election_timeout_ms, options.max_clock_drift_ms);
}

// log storage and log manager init
if (init_log_storage() != 0) {
Expand Down Expand Up @@ -812,14 +832,26 @@ void NodeImpl::handle_stepdown_timeout() {
<< " state is " << state2str(_state);
return;
}

check_witness(_conf.conf);
int64_t now = butil::monotonic_time_ms();
check_dead_nodes(_conf.conf, now);
if (!_conf.old_conf.empty()) {
check_dead_nodes(_conf.old_conf, now);
}
}

void NodeImpl::check_witness(const Configuration& conf) {
if (is_witness()) {
LOG(WARNING) << "node " << node_id()
<< " term " << _current_term
<< " steps down as it's a witness but become leader temporarily"
<< " conf: " << conf;
butil::Status status;
status.set_error(ETRANSFERLEADERSHIP, "Witness becomes leader temporarily");
step_down(_current_term, true, status);
}
}

void NodeImpl::unsafe_register_conf_change(const Configuration& old_conf,
const Configuration& new_conf,
Closure* done) {
Expand Down Expand Up @@ -1302,9 +1334,14 @@ void NodeImpl::unsafe_reset_election_timeout_ms(int election_timeout_ms,
_replicator_group.reset_heartbeat_interval(
heartbeat_timeout(_options.election_timeout_ms));
_replicator_group.reset_election_timeout_interval(_options.election_timeout_ms);
_election_timer.reset(election_timeout_ms);
_leader_lease.reset_election_timeout_ms(election_timeout_ms);
_follower_lease.reset_election_timeout_ms(election_timeout_ms, _options.max_clock_drift_ms);
if (_options.witness && FLAGS_raft_enable_witness_to_leader) {
_election_timer.reset(election_timeout_ms * 2);
_follower_lease.reset_election_timeout_ms(election_timeout_ms * 2, _options.max_clock_drift_ms);
} else {
_election_timer.reset(election_timeout_ms);
_leader_lease.reset_election_timeout_ms(election_timeout_ms);
_follower_lease.reset_election_timeout_ms(election_timeout_ms, _options.max_clock_drift_ms);
}
}

void NodeImpl::on_error(const Error& e) {
Expand Down Expand Up @@ -2569,7 +2606,6 @@ void NodeImpl::handle_install_snapshot_request(brpc::Controller* cntl,
InstallSnapshotResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);

if (_snapshot_executor == NULL) {
cntl->SetFailed(EINVAL, "Not support snapshot");
return;
Expand Down
4 changes: 2 additions & 2 deletions src/braft/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ friend class VoteBallotCtx;
int bootstrap(const BootstrapOptions& options);

bool disable_cli() const { return _options.disable_cli; }

bool is_witness() const { return _options.witness; }
private:
friend class butil::RefCountedThreadSafe<NodeImpl>;

Expand Down Expand Up @@ -305,7 +305,7 @@ friend class butil::RefCountedThreadSafe<NodeImpl>;
void* meta, bthread::TaskIterator<LogEntryAndClosure>& iter);
void apply(LogEntryAndClosure tasks[], size_t size);
void check_dead_nodes(const Configuration& conf, int64_t now_ms);

void check_witness(const Configuration& conf);
bool handle_out_of_order_append_entries(brpc::Controller* cntl,
const AppendEntriesRequest* request,
AppendEntriesResponse* response,
Expand Down
14 changes: 14 additions & 0 deletions src/braft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,20 @@ struct NodeOptions {
// Default: false
bool disable_cli;

// If true, this node is a witness.
// 1. FLAGS_raft_enable_witness_to_leader = false
// It will never be elected as leader. So we don't need to init _vote_timer and _election_timer.
// 2. FLAGS_raft_enable_witness_to_leader = true
// It can be electd as leader, but should transfer leader to normal replica as soon as possible.
//
// Warning:
// 1. FLAGS_raft_enable_witness_to_leader = false
// When leader down and witness had newer log entry, it may cause leader election fail.
// 2. FLAGS_raft_enable_witness_to_leader = true
// When leader shutdown and witness was elected as leader, if follower delay over one snapshot,
// it may cause data lost because witness had truncated log entry before snapshot.
// Default: false
bool witness = false;
// Construct a default instance
NodeOptions();

Expand Down
14 changes: 11 additions & 3 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ DEFINE_int32(raft_retry_replicate_interval_ms, 1000,
BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms,
brpc::PositiveInteger);

DECLARE_bool(raft_enable_witness_to_leader);
DECLARE_int64(raft_append_entry_high_lat_us);
DECLARE_bool(raft_trace_append_entry_latency);

Expand Down Expand Up @@ -628,8 +629,10 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf *data) {
} else {
CHECK(entry->type != ENTRY_TYPE_CONFIGURATION) << "log_index=" << log_index;
}
em->set_data_len(entry->data.length());
data->append(entry->data);
if (!is_witness() || FLAGS_raft_enable_witness_to_leader) {
em->set_data_len(entry->data.length());
data->append(entry->data);
}
entry->Release();
return 0;
}
Expand Down Expand Up @@ -765,6 +768,10 @@ void Replicator::_wait_more_entries() {
}

void Replicator::_install_snapshot() {
NodeImpl *node_impl = _options.node;
if (node_impl->is_witness()) {
return _block(butil::gettimeofday_us(), EBUSY);
}
if (_reader) {
// follower's readonly mode change may cause two install_snapshot
// one possible case is:
Expand Down Expand Up @@ -1527,12 +1534,13 @@ int ReplicatorGroup::find_the_next_candidate(
}
const int64_t next_index = Replicator::get_next_index(iter->second.id);
const int consecutive_error_times = Replicator::get_consecutive_error_times(iter->second.id);
if (consecutive_error_times == 0 && next_index > max_index) {
if (consecutive_error_times == 0 && next_index > max_index && !iter->first.is_witness()) {
max_index = next_index;
if (peer_id) {
*peer_id = iter->first;
}
}

}
if (max_index == 0) {
return -1;
Expand Down
3 changes: 3 additions & 0 deletions src/braft/replicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
}
return true;
}
bool is_witness() const {
return _options.peer_id.is_witness();
}
void _close_reader();
int64_t _last_rpc_send_timestamp() {
return _options.replicator_status->last_rpc_send_timestamp.load(butil::memory_order_relaxed);
Expand Down
14 changes: 10 additions & 4 deletions src/braft/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ SnapshotWriter* LocalSnapshotStorage::create(bool from_empty) {
}

SnapshotCopier* LocalSnapshotStorage::start_to_copy_from(const std::string& uri) {
LocalSnapshotCopier* copier = new LocalSnapshotCopier();
LocalSnapshotCopier* copier = new LocalSnapshotCopier(_copy_file);
copier->_storage = this;
copier->_filter_before_copy_remote = _filter_before_copy_remote;
copier->_fs = _fs.get();
Expand Down Expand Up @@ -738,16 +738,19 @@ butil::Status LocalSnapshotStorage::gc_instance(const std::string& uri) const {
// LocalSnapshotCopier

LocalSnapshotCopier::LocalSnapshotCopier()
: _tid(INVALID_BTHREAD)
: LocalSnapshotCopier(true){}

LocalSnapshotCopier::LocalSnapshotCopier(bool copy_file):
_tid(INVALID_BTHREAD)
, _cancelled(false)
, _filter_before_copy_remote(false)
, _copy_file(copy_file)
, _fs(NULL)
, _throttle(NULL)
, _writer(NULL)
, _storage(NULL)
, _reader(NULL)
, _cur_session(NULL)
{}
, _cur_session(NULL){}

LocalSnapshotCopier::~LocalSnapshotCopier() {
CHECK(!_writer);
Expand All @@ -769,6 +772,9 @@ void LocalSnapshotCopier::copy() {
if (!ok()) {
break;
}
if (!_copy_file) {
break;
}
std::vector<std::string> files;
_remote_snapshot.list_files(&files);
for (size_t i = 0; i < files.size() && ok(); ++i) {
Expand Down
4 changes: 4 additions & 0 deletions src/braft/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class LocalSnapshotCopier : public SnapshotCopier {
friend class LocalSnapshotStorage;
public:
LocalSnapshotCopier();
LocalSnapshotCopier(bool copy_file);
~LocalSnapshotCopier();
virtual void cancel();
virtual void join();
Expand All @@ -166,6 +167,7 @@ friend class LocalSnapshotStorage;
bthread_t _tid;
bool _cancelled;
bool _filter_before_copy_remote;
bool _copy_file = true;
FileSystemAdaptor* _fs;
SnapshotThrottle* _throttle;
LocalSnapshotWriter* _writer;
Expand Down Expand Up @@ -204,6 +206,7 @@ friend class LocalSnapshotCopier;

void set_server_addr(butil::EndPoint server_addr) { _addr = server_addr; }
bool has_server_addr() { return _addr != butil::EndPoint(); }
void set_copy_file(bool copy_file) { _copy_file = copy_file; }
private:
SnapshotWriter* create(bool from_empty) WARN_UNUSED_RESULT;
int destroy_snapshot(const std::string& path);
Expand All @@ -217,6 +220,7 @@ friend class LocalSnapshotCopier;
int64_t _last_snapshot_index;
std::map<int64_t, int> _ref_map;
butil::EndPoint _addr;
bool _copy_file = true;
scoped_refptr<FileSystemAdaptor> _fs;
scoped_refptr<SnapshotThrottle> _snapshot_throttle;
};
Expand Down
3 changes: 3 additions & 0 deletions src/braft/snapshot_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ int SnapshotExecutor::init(const SnapshotExecutorOptions& options) {
if (tmp != NULL && !tmp->has_server_addr()) {
tmp->set_server_addr(options.addr);
}
if (!options.copy_file) {
tmp->set_copy_file(false);
}
SnapshotReader* reader = _snapshot_storage->open();
if (reader == NULL) {
return 0;
Expand Down
Loading

0 comments on commit 59c40e5

Please sign in to comment.