diff --git a/src/braft/node.cpp b/src/braft/node.cpp index b5ba39eb..2d039fa6 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -70,6 +70,9 @@ DECLARE_bool(raft_enable_leader_lease); DEFINE_bool(raft_enable_witness_to_leader, false, "enable witness temporarily to become leader when leader down accidently"); +DEFINE_bool(raft_enable_peer_not_in_conf_can_elec, false, + "enable peer not in the conf can initiate elections"); + #ifndef UNIT_TEST static bvar::Adder g_num_nodes("raft_node_count"); #else @@ -1622,10 +1625,12 @@ void NodeImpl::pre_vote(std::unique_lock* lck, bool triggered) { " configuration is possibly out of date"; return; } - if (!_conf.contains(_server_id)) { + if (!FLAGS_raft_enable_peer_not_in_conf_can_elec) { + if (!_conf.contains(_server_id)) { LOG(WARNING) << "node " << _group_id << ':' << _server_id << " can't do pre_vote as it is not in " << _conf.conf; return; + } } int64_t old_term = _current_term; @@ -1681,10 +1686,12 @@ void NodeImpl::elect_self(std::unique_lock* lck, bool old_leader_stepped_down) { LOG(INFO) << "node " << _group_id << ":" << _server_id << " term " << _current_term << " start vote and grant vote self"; - if (!_conf.contains(_server_id)) { + if (!FLAGS_raft_enable_peer_not_in_conf_can_elec) { + if (!_conf.contains(_server_id)) { LOG(WARNING) << "node " << _group_id << ':' << _server_id << " can't do elect_self as it is not in " << _conf.conf; return; + } } // cancel follower election timer if (_state == STATE_FOLLOWER) { @@ -2393,6 +2400,16 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl, brpc::ClosureGuard done_guard(done); std::unique_lock lck(_mutex); + // for test + const int64_t reject_log_index = get_reject_log_index(); + if (reject_log_index > 0 && + request->prev_log_index() + 1 >= reject_log_index) { + // _last_leader_timestamp = butil::monotonic_time_ms(); + // don't interfere check_dead_nodes + cntl->SetFailed(EBUSY, "handle_append_entries_request reject_log_index"); + return; + } + // pre set term, to avoid get term in lock response->set_term(_current_term); @@ -2894,6 +2911,10 @@ void NodeImpl::get_status(NodeStatus* status) { } } +void NodeImpl::get_log_mgr_status(LogManagerStatus* log_manager_status) { + _log_manager->get_status(log_manager_status); +} + void NodeImpl::stop_replicator(const std::set& keep, const std::set& drop) { for (std::set::const_iterator diff --git a/src/braft/node.h b/src/braft/node.h index b9dd3e82..19d53685 100644 --- a/src/braft/node.h +++ b/src/braft/node.h @@ -214,6 +214,8 @@ friend class VoteBallotCtx; // see from the website, which is generated by |describe| actually. void get_status(NodeStatus* status); + void get_log_mgr_status(LogManagerStatus* log_manager_status); + // Readonly mode func void enter_readonly_mode(); void leave_readonly_mode(); @@ -241,6 +243,10 @@ friend class VoteBallotCtx; bool disable_cli() const { return _options.disable_cli; } bool is_witness() const { return _options.witness; } + + // for test + void set_reject_log_index(const int64_t log_index) { reject_log_index_ = log_index; } + int64_t get_reject_log_index() const { return reject_log_index_; } private: friend class butil::RefCountedThreadSafe; @@ -533,6 +539,9 @@ friend class butil::RefCountedThreadSafe; LeaderLease _leader_lease; FollowerLease _follower_lease; + + // for test + int64_t reject_log_index_ {0}; }; } diff --git a/src/braft/raft.cpp b/src/braft/raft.cpp index 6069f706..d57cea85 100644 --- a/src/braft/raft.cpp +++ b/src/braft/raft.cpp @@ -230,6 +230,10 @@ void Node::get_status(NodeStatus* status) { return _impl->get_status(status); } +void Node::get_log_mgr_status(LogManagerStatus* log_manager_status) { + return _impl->get_log_mgr_status(log_manager_status); +} + void Node::enter_readonly_mode() { return _impl->enter_readonly_mode(); } @@ -242,6 +246,14 @@ bool Node::readonly() { return _impl->readonly(); } +void Node::set_reject_log_index(const int64_t log_index) { + _impl->set_reject_log_index(log_index); +} + +int64_t Node::get_reject_log_index() const { + return _impl->get_reject_log_index(); +} + // ------------- Iterator void Iterator::next() { if (valid()) { diff --git a/src/braft/raft.h b/src/braft/raft.h index 8a7f5d8e..7271a21b 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -45,6 +45,7 @@ class LeaderChangeContext; class FileSystemAdaptor; class SnapshotThrottle; class LogStorage; +class LogManagerStatus; const PeerId ANY_PEER(butil::EndPoint(butil::IP_ANY, 0), 0); @@ -762,6 +763,8 @@ class Node { // see from the website. void get_status(NodeStatus* status); + void get_log_mgr_status(LogManagerStatus* log_manager_status); + // Make this node enter readonly mode. // Readonly mode should only be used to protect the system in some extreme cases. // For example, in a storage system, too many write requests flood into the system @@ -789,6 +792,10 @@ class Node { // is less than the majority. bool readonly(); + void set_reject_log_index(const int64_t log_index); + + int64_t get_reject_log_index() const; + private: NodeImpl* _impl; }; diff --git a/test/test_node.cpp b/test/test_node.cpp index 407e16af..c71cb26d 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -24,6 +24,7 @@ DECLARE_int32(raft_max_parallel_append_entries_rpc_num); DECLARE_bool(raft_enable_append_entries_cache); DECLARE_int32(raft_max_append_entries_cache_size); DECLARE_bool(raft_enable_witness_to_leader); +DECLARE_bool(raft_enable_peer_not_in_conf_can_elec); } @@ -3566,6 +3567,322 @@ TEST_P(NodeTest, readonly) { cluster.stop_all(); } +TEST_P(NodeTest, reject_specified_log_index) { + uint32_t port = 5006; + uint32_t port1 = port + 1; + uint32_t port2 = port1 + 1; + + std::vector peers; + braft::PeerId peer0; + peer0.addr.ip = butil::my_ip(); + peer0.addr.port = port; + peer0.idx = 0; + + // start cluster + peers.push_back(peer0); + Cluster cluster("unittest", peers); + ASSERT_EQ(0, cluster.start(peer0.addr)); + LOG(NOTICE) << "start single cluster " << peer0; + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + LOG(NOTICE) << "leader is " << leader->node_id().peer_id; + + + bthread::CountdownEvent cond(1); + for (int i = 0; i < 1; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // start peer1 + braft::PeerId peer1; + peer1.addr.ip = butil::my_ip(); + peer1.addr.port = port1; + peer1.idx = 0; + + ASSERT_EQ(0, cluster.start(peer1.addr, true)); + LOG(NOTICE) << "start peer " << peer1; + // wait until started successfully + usleep(1000* 1000); + + // add peer1 + cond.reset(1); + leader->add_peer(peer1, NEW_ADDPEERCLOSURE(&cond, 0)); + cond.wait(); + LOG(NOTICE) << "add peer " << peer1; + + // start peer2 + braft::PeerId peer2; + peer2.addr.ip = butil::my_ip(); + peer2.addr.port = port2; + peer2.idx = 0; + + ASSERT_EQ(0, cluster.start(peer2.addr, true)); + LOG(NOTICE) << "start peer2 " << peer2; + // wait until started successfully + usleep(1000* 1000); + + // add peer2 + cond.reset(1); + leader->add_peer(peer2, NEW_ADDPEERCLOSURE(&cond, 0)); + cond.wait(); + LOG(NOTICE) << "add peer2 " << peer2; + usleep(3000* 1000); + + std::vector empty_peers; + leader->list_peers(&empty_peers); + ASSERT_EQ(3, empty_peers.size()); // 1 leader + 2 follower + + braft::LogManagerStatus log_manager_status_1; + leader->get_log_mgr_status(&log_manager_status_1); + ASSERT_EQ(log_manager_status_1.last_index, log_manager_status_1.known_applied_index); + + std::vector followers; + cluster.followers(&followers); + + // ensure that the leader and follower log states are consistent + for (const auto& one : followers) { + LOG(INFO) << "check log status node:" << one->node_id(); + braft::LogManagerStatus follower_log_manager_status; + one->get_log_mgr_status(&follower_log_manager_status); + ASSERT_EQ(follower_log_manager_status.last_index, follower_log_manager_status.known_applied_index); + ASSERT_EQ(follower_log_manager_status.known_applied_index, log_manager_status_1.known_applied_index); + } + + const int64_t will_reject_log_index = log_manager_status_1.last_index + 1; + followers[0]->set_reject_log_index(will_reject_log_index); + ASSERT_EQ(will_reject_log_index, followers[0]->get_reject_log_index()); + + bthread::CountdownEvent cond_1(1); + for (int i = 0; i < 1; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + + usleep(1* 1000* 1000); + + braft::LogManagerStatus log_manager_status_2; + leader->get_log_mgr_status(&log_manager_status_2); + ASSERT_EQ(log_manager_status_2.last_index, log_manager_status_1.last_index + 1); + ASSERT_EQ(log_manager_status_2.known_applied_index, log_manager_status_1.known_applied_index +1); + + ASSERT_EQ(log_manager_status_2.known_applied_index, log_manager_status_2.last_index); + ASSERT_EQ(log_manager_status_2.known_applied_index, will_reject_log_index); + + + braft::LogManagerStatus log_manager_status_follower_1; + followers[0]->get_log_mgr_status(&log_manager_status_follower_1); + ASSERT_EQ(log_manager_status_follower_1.last_index, log_manager_status_follower_1.known_applied_index); + ASSERT_EQ(log_manager_status_follower_1.last_index, log_manager_status_2.last_index - 1); + + // restore restriction receiving log + followers[0]->set_reject_log_index(0); + usleep(1* 1000* 1000); + + braft::LogManagerStatus log_manager_status_follower_2; + followers[0]->get_log_mgr_status(&log_manager_status_follower_2); + ASSERT_EQ(log_manager_status_follower_2.last_index, log_manager_status_follower_2.known_applied_index); + ASSERT_EQ(log_manager_status_follower_2.last_index, log_manager_status_2.last_index); + + + LOG(INFO) << "will stop_all"; + cluster.stop_all(); +} + +TEST_P(NodeTest, test_leader_exits_in_the_second_phase_of_joint_consensus) { + uint32_t port = 5006; + uint32_t port1 = port + 1; + uint32_t port2 = port1 + 1; + uint32_t port3 = port2 + 1; + + std::vector peers; + braft::PeerId peer0; + peer0.addr.ip = butil::my_ip(); + peer0.addr.port = port; + peer0.idx = 0; + + // start cluster + peers.push_back(peer0); + Cluster cluster("unittest", peers); + ASSERT_EQ(0, cluster.start(peer0.addr)); + LOG(NOTICE) << "start single cluster " << peer0; + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + LOG(NOTICE) << "leader is " << leader->node_id().peer_id; + + + bthread::CountdownEvent cond(1); + for (int i = 0; i < 1; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // start peer1 + braft::PeerId peer1; + peer1.addr.ip = butil::my_ip(); + peer1.addr.port = port1; + peer1.idx = 0; + + ASSERT_EQ(0, cluster.start(peer1.addr, true)); + LOG(NOTICE) << "start peer " << peer1; + // wait until started successfully + usleep(1000* 1000); + + // add peer1 + cond.reset(1); + leader->add_peer(peer1, NEW_ADDPEERCLOSURE(&cond, 0)); + cond.wait(); + LOG(NOTICE) << "add peer " << peer1; + + // start peer2 + braft::PeerId peer2; + peer2.addr.ip = butil::my_ip(); + peer2.addr.port = port2; + peer2.idx = 0; + + ASSERT_EQ(0, cluster.start(peer2.addr, true)); + LOG(NOTICE) << "start peer2 " << peer2; + // wait until started successfully + usleep(1000* 1000); + + // add peer2 + cond.reset(1); + leader->add_peer(peer2, NEW_ADDPEERCLOSURE(&cond, 0)); + cond.wait(); + LOG(NOTICE) << "add peer2 " << peer2; + usleep(2 * 1000* 1000); + + std::vector empty_peers; + leader->list_peers(&empty_peers); + ASSERT_EQ(3, empty_peers.size()); // 1 leader + 2 follower + + usleep(2 * 1000* 1000); + + braft::LogManagerStatus log_manager_status_1; + leader->get_log_mgr_status(&log_manager_status_1); + ASSERT_EQ(log_manager_status_1.last_index, log_manager_status_1.known_applied_index); + + std::vector followers; + cluster.followers(&followers); + + // ensure that the leader and follower log states are consistent + for (const auto& one : followers) { + LOG(INFO) << "check log status node:" << one->node_id(); + braft::LogManagerStatus follower_log_manager_status; + one->get_log_mgr_status(&follower_log_manager_status); + ASSERT_EQ(follower_log_manager_status.last_index, follower_log_manager_status.known_applied_index); + ASSERT_EQ(follower_log_manager_status.known_applied_index, log_manager_status_1.known_applied_index); + } + + // check log index + braft::LogManagerStatus log_manager_status_ld_1; + leader->get_log_mgr_status(&log_manager_status_ld_1); + ASSERT_EQ(log_manager_status_ld_1.last_index, log_manager_status_ld_1.known_applied_index); + + std::vector followers_1; + cluster.followers(&followers_1); + + ASSERT_EQ(2, followers_1.size()); + + // ensure that the leader and follower log states are consistent + for (const auto& one : followers_1) { + LOG(INFO) << "change_peers check log status node:" << one->node_id(); + + while(true) { + braft::LogManagerStatus follower_log_manager_status; + one->get_log_mgr_status(&follower_log_manager_status); + + if (follower_log_manager_status.known_applied_index == log_manager_status_ld_1.known_applied_index) { + break; + } else { + LOG(INFO) << "change_peers check log status node:" << one->node_id() << " wait log"; + usleep(100 * 1000); + } + } + } + + const int64_t will_reject_log_index = log_manager_status_1.last_index + 2; + + // start peer3 + braft::PeerId peer3; + peer3.addr.ip = butil::my_ip(); + peer3.addr.port = port3; + peer3.idx = 0; + + ASSERT_EQ(0, cluster.start(peer3.addr, true)); + LOG(NOTICE) << "start peer " << peer3; + // wait until started successfully + usleep(1000 * 1000); + + braft::Configuration new_conf; // 0,1,3 + new_conf.add_peer(peer0); + new_conf.add_peer(peer1); + new_conf.add_peer(peer3); + + + std::vector followers_2; + cluster.followers(&followers_2); + ASSERT_EQ(3, followers_2.size()); + + // followers_2[0] -> peer1 -> set_reject_log_index + // followers_2[1] -> peer2 -> not set_reject_log_index -> expect the current node to initiate an election + // followers_2[2] -> peer3 -> set_reject_log_index + + followers_2[0]->set_reject_log_index(will_reject_log_index); + followers_2[2]->set_reject_log_index(will_reject_log_index); + + // joint consensus + braft::SynchronizedClosure done; + leader->_impl->change_peers(new_conf, &done); + // done.wait(); + usleep(1000 * 1000); + + cluster.stop(leader->node_id().peer_id.addr); + + followers_2[0]->set_reject_log_index(0); + followers_2[2]->set_reject_log_index(0); + + uint32_t sleep_count = 10; + while (sleep_count--) { + usleep(1000 * 1000); + } + + // unable to elect leader + braft::Node* ld_node = cluster.leader(); + ASSERT_EQ(nullptr, ld_node); + + + braft::FLAGS_raft_enable_peer_not_in_conf_can_elec = true; + usleep(5 * 1000 * 1000); + + std::vector followers_4; + cluster.followers(&followers_4); + ASSERT_EQ(2, followers_4.size()); + + LOG(INFO) << "will stop_all"; + cluster.stop_all(); +} + INSTANTIATE_TEST_CASE_P(NodeTestWithoutPipelineReplication, NodeTest, ::testing::Values("NoReplcation"));