diff --git a/conanfile.py b/conanfile.py index a7c2e276c..9f1c87ee9 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.5.2" + version = "7.5.3" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 63b052b31..d7afe089f 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -219,12 +219,40 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id)); // Step1, validate request + bool is_leader = m_my_repl_id == get_leader_id(); + // Check if leader itself is requested to move out. + if (m_my_repl_id == member_out.id) { + // immediate=false successor=-1, nuraft will choose an alive peer with highest priority as successor, and wait + // until the successor finishes the catch-up of the latest log, and then resign. Return NOT_LEADER and let + // client retry. + if (is_leader) { + RD_LOGI(trace_id, "Step1. Replace member, leader is the member_out so yield leadership, task_id={}", task_id); + raft_server()->yield_leadership(false /* immediate */, -1 /* successor */); + } + RD_LOGE(trace_id, "Step1. Replace member, I am not leader, can not handle the request, task_id={}", task_id); + return make_async_error<>(ReplServiceError::NOT_LEADER); + } + if (commit_quorum >= 1) { + // Reduce the quorum size BEFORE checking leadership. When two members are down, the raft leader will + // eventually yield its leadership after leadership_expiry (default: 20x heartbeat interval) because it + // cannot reach majority. Once leadership is lost, the remaining single node cannot elect itself without a + // reduced election quorum. By calling reset_quorum_size here first (which sets both custom_commit_quorum + // and custom_election_quorum to 1), the current leader is able to maintain leadership, and if leadership + // was already lost, the node will self-elect on the next election timeout. The caller should retry on + // NOT_LEADER to allow time for self-election to complete. + reset_quorum_size(commit_quorum, trace_id); + } + + if (!is_leader) { return make_async_error<>(ReplServiceError::NOT_LEADER); } + + // I am leader and not out_member // TODO support rollback, this could happen when the first task failed, and we want to launch a new task to // remediate it. Need to rollback the first task. And for the same task, it's reentrant and idempotent. auto existing_task_id = get_replace_member_task_id(); if (!existing_task_id.empty() && existing_task_id != task_id) { RD_LOGE(trace_id, "Step1. Replace member, task_id={} is not the same as existing task_id={}", task_id, existing_task_id); + reset_quorum_size(0, trace_id); return make_async_error<>(ReplServiceError::REPLACE_MEMBER_TASK_MISMATCH); } @@ -239,18 +267,10 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const return make_async_success<>(); } RD_LOGE(trace_id, "Step1. Replace member invalid parameter, out member is not found, task_id={}", task_id); + reset_quorum_size(0, trace_id); return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND); } - if (m_my_repl_id != get_leader_id()) { return make_async_error<>(ReplServiceError::NOT_LEADER); } - // Check if leader itself is requested to move out. - if (m_my_repl_id == member_out.id) { - // immediate=false successor=-1, nuraft will choose an alive peer with highest priority as successor, and wait - // until the successor finishes the catch-up of the latest log, and then resign. Return NOT_LEADER and let - // client retry. - raft_server()->yield_leadership(false /* immediate */, -1 /* successor */); - RD_LOGI(trace_id, "Step1. Replace member, leader is the member_out so yield leadership, task_id={}", task_id); - return make_async_error<>(ReplServiceError::NOT_LEADER); - } + // quorum safety check. TODO currently only consider lsn, need to check last response time. auto active_peers = get_active_peers(); // active_peers doesn't include leader itself. @@ -272,18 +292,15 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const "Step1. Replace member, quorum safety check failed, active_peers={}, " "active_peers_exclude_out/in_member={}, required_quorum={}, commit_quorum={}, task_id={}", active_peers.size(), active_num, quorum, commit_quorum, task_id); + reset_quorum_size(0, trace_id); return make_async_error<>(ReplServiceError::QUORUM_NOT_MET); } - if (commit_quorum >= 1) { - // Two members are down and leader cant form the quorum. Reduce the quorum size. - reset_quorum_size(commit_quorum, trace_id); - } - // Step 2: Handle out member. #ifdef _PRERELEASE if (iomgr_flip::instance()->test_flip("replace_member_set_learner_failure")) { RD_LOGE(trace_id, "Simulating set member to learner failure"); + reset_quorum_size(0, trace_id); return make_async_error(ReplServiceError::FAILED); } #endif diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 3c313c452..8fe2eed43 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -780,24 +780,33 @@ class RaftReplDevTestBase : public testing::Test { void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); } repl_lsn_t get_truncation_upper_limit() { return dbs_[0]->get_truncation_upper_limit(); } + void do_replace_member(std::shared_ptr< TestReplicatedDB > db, std::string& task_id, replica_id_t member_out, + replica_id_t member_in, uint32_t commit_quorum, + ReplServiceError error = ReplServiceError::OK) { + LOGINFO("Start replace member task_id={}, out={}, in={}", task_id, boost::uuids::to_string(member_out), + boost::uuids::to_string(member_in)); + replica_member_info out{member_out, ""}; + replica_member_info in{member_in, ""}; + auto result = + hs()->repl_service().replace_member(db->repl_dev()->group_id(), task_id, out, in, commit_quorum).get(); + if (error == ReplServiceError::OK) { + ASSERT_EQ(result.hasError(), false) << "Error in replacing member, err=" << result.error(); + } else { + ASSERT_EQ(result.hasError(), true); + ASSERT_EQ(result.error(), error) << "Error in replacing member, err=" << result.error(); + } + } + void replace_member(std::shared_ptr< TestReplicatedDB > db, std::string& task_id, replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum = 0, ReplServiceError error = ReplServiceError::OK) { - this->run_on_leader(db, [this, error, db, &task_id, member_out, member_in, commit_quorum]() { - LOGINFO("Start replace member task_id={}, out={}, in={}", task_id, boost::uuids::to_string(member_out), - boost::uuids::to_string(member_in)); - - replica_member_info out{member_out, ""}; - replica_member_info in{member_in, ""}; - auto result = - hs()->repl_service().replace_member(db->repl_dev()->group_id(), task_id, out, in, commit_quorum).get(); - if (error == ReplServiceError::OK) { - ASSERT_EQ(result.hasError(), false) << "Error in replacing member, err=" << result.error(); - } else { - ASSERT_EQ(result.hasError(), true); - ASSERT_EQ(result.error(), error) << "Error in replacing member, err=" << result.error(); - } - }); + if (commit_quorum == 0) { + this->run_on_leader(db, [this, error, db, &task_id, member_out, member_in, commit_quorum]() { + do_replace_member(db, task_id, member_out, member_in, commit_quorum, error); + }); + } else { + do_replace_member(db, task_id, member_out, member_in, commit_quorum, error); + } } void remove_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_id) { diff --git a/src/tests/test_raft_repl_dev_dynamic.cpp b/src/tests/test_raft_repl_dev_dynamic.cpp index 501dd2f55..968eb31a7 100644 --- a/src/tests/test_raft_repl_dev_dynamic.cpp +++ b/src/tests/test_raft_repl_dev_dynamic.cpp @@ -198,13 +198,33 @@ TEST_F(ReplDevDynamicTest, TwoMemberDown) { LOGINFO("Shutdown replica 2"); } + LOGINFO("Sleep 10 seconds to waiting for leadership expiring"); + sleep(10); std::string task_id = "task_id"; if (g_helper->replica_num() == 0) { // Replace down replica 2 with spare replica 3 with commit quorum 1 // so that leader can go ahead with replacing member. + // After reset_quorum_size(1) is applied, the node may need one election timeout + // to self-elect as leader, so retry on NOT_LEADER. LOGINFO("Replace member started, task_id={}", task_id); - replace_member(db, task_id, g_helper->replica_id(member_out), g_helper->replica_id(member_in), - 1 /* commit quorum*/); + constexpr int max_retries = 3; + bool succeeded = false; + for (int i = 0; i < max_retries; ++i) { + auto result = hs()->repl_service() + .replace_member(db->repl_dev()->group_id(), task_id, + replica_member_info{g_helper->replica_id(member_out), ""}, + replica_member_info{g_helper->replica_id(member_in), ""}, 1) + .get(); + if (!result.hasError()) { + succeeded = true; + break; + } + ASSERT_EQ(result.error(), ReplServiceError::NOT_LEADER) + << "Replace member failed with unexpected error: " << result.error(); + LOGINFO("Replace member returned NOT_LEADER, retry {}/{}", i + 1, max_retries); + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + ASSERT_TRUE(succeeded) << "Replace member failed after " << max_retries << " retries"; this->write_on_leader(num_io_entries, true /* wait_for_commit */); LOGINFO("Leader completed num_io={}", num_io_entries); }