Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.5.2"
version = "7.5.3"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
47 changes: 32 additions & 15 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,40 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));

// Step1, validate request
bool is_leader = m_my_repl_id == get_leader_id();
// Check if leader itself is requested to move out.
if (m_my_repl_id == member_out.id) {
// immediate=false successor=-1, nuraft will choose an alive peer with highest priority as successor, and wait
// until the successor finishes the catch-up of the latest log, and then resign. Return NOT_LEADER and let
// client retry.
if (is_leader) {
RD_LOGI(trace_id, "Step1. Replace member, leader is the member_out so yield leadership, task_id={}", task_id);
raft_server()->yield_leadership(false /* immediate */, -1 /* successor */);
}
RD_LOGE(trace_id, "Step1. Replace member, I am not leader, can not handle the request, task_id={}", task_id);
return make_async_error<>(ReplServiceError::NOT_LEADER);
}
if (commit_quorum >= 1) {
// Reduce the quorum size BEFORE checking leadership. When two members are down, the raft leader will
// eventually yield its leadership after leadership_expiry (default: 20x heartbeat interval) because it
// cannot reach majority. Once leadership is lost, the remaining single node cannot elect itself without a
// reduced election quorum. By calling reset_quorum_size here first (which sets both custom_commit_quorum
// and custom_election_quorum to 1), the current leader is able to maintain leadership, and if leadership
// was already lost, the node will self-elect on the next election timeout. The caller should retry on
// NOT_LEADER to allow time for self-election to complete.
reset_quorum_size(commit_quorum, trace_id);
}

if (!is_leader) { return make_async_error<>(ReplServiceError::NOT_LEADER); }

// I am leader and not out_member
// TODO support rollback, this could happen when the first task failed, and we want to launch a new task to
// remediate it. Need to rollback the first task. And for the same task, it's reentrant and idempotent.
auto existing_task_id = get_replace_member_task_id();
if (!existing_task_id.empty() && existing_task_id != task_id) {
RD_LOGE(trace_id, "Step1. Replace member, task_id={} is not the same as existing task_id={}", task_id,
existing_task_id);
reset_quorum_size(0, trace_id);
return make_async_error<>(ReplServiceError::REPLACE_MEMBER_TASK_MISMATCH);
}

Expand All @@ -239,18 +267,10 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
return make_async_success<>();
}
RD_LOGE(trace_id, "Step1. Replace member invalid parameter, out member is not found, task_id={}", task_id);
reset_quorum_size(0, trace_id);
return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND);
}
if (m_my_repl_id != get_leader_id()) { return make_async_error<>(ReplServiceError::NOT_LEADER); }
// Check if leader itself is requested to move out.
if (m_my_repl_id == member_out.id) {
// immediate=false successor=-1, nuraft will choose an alive peer with highest priority as successor, and wait
// until the successor finishes the catch-up of the latest log, and then resign. Return NOT_LEADER and let
// client retry.
raft_server()->yield_leadership(false /* immediate */, -1 /* successor */);
RD_LOGI(trace_id, "Step1. Replace member, leader is the member_out so yield leadership, task_id={}", task_id);
return make_async_error<>(ReplServiceError::NOT_LEADER);
}

// quorum safety check. TODO currently only consider lsn, need to check last response time.
auto active_peers = get_active_peers();
// active_peers doesn't include leader itself.
Expand All @@ -272,18 +292,15 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
"Step1. Replace member, quorum safety check failed, active_peers={}, "
"active_peers_exclude_out/in_member={}, required_quorum={}, commit_quorum={}, task_id={}",
active_peers.size(), active_num, quorum, commit_quorum, task_id);
reset_quorum_size(0, trace_id);
return make_async_error<>(ReplServiceError::QUORUM_NOT_MET);
}

if (commit_quorum >= 1) {
// Two members are down and leader cant form the quorum. Reduce the quorum size.
reset_quorum_size(commit_quorum, trace_id);
}

// Step 2: Handle out member.
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("replace_member_set_learner_failure")) {
RD_LOGE(trace_id, "Simulating set member to learner failure");
reset_quorum_size(0, trace_id);
return make_async_error(ReplServiceError::FAILED);
}
#endif
Expand Down
39 changes: 24 additions & 15 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -780,24 +780,33 @@ class RaftReplDevTestBase : public testing::Test {
void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); }
repl_lsn_t get_truncation_upper_limit() { return dbs_[0]->get_truncation_upper_limit(); }

void do_replace_member(std::shared_ptr< TestReplicatedDB > db, std::string& task_id, replica_id_t member_out,
replica_id_t member_in, uint32_t commit_quorum,
ReplServiceError error = ReplServiceError::OK) {
LOGINFO("Start replace member task_id={}, out={}, in={}", task_id, boost::uuids::to_string(member_out),
boost::uuids::to_string(member_in));
replica_member_info out{member_out, ""};
replica_member_info in{member_in, ""};
auto result =
hs()->repl_service().replace_member(db->repl_dev()->group_id(), task_id, out, in, commit_quorum).get();
if (error == ReplServiceError::OK) {
ASSERT_EQ(result.hasError(), false) << "Error in replacing member, err=" << result.error();
} else {
ASSERT_EQ(result.hasError(), true);
ASSERT_EQ(result.error(), error) << "Error in replacing member, err=" << result.error();
}
}

void replace_member(std::shared_ptr< TestReplicatedDB > db, std::string& task_id, replica_id_t member_out,
replica_id_t member_in, uint32_t commit_quorum = 0,
ReplServiceError error = ReplServiceError::OK) {
this->run_on_leader(db, [this, error, db, &task_id, member_out, member_in, commit_quorum]() {
LOGINFO("Start replace member task_id={}, out={}, in={}", task_id, boost::uuids::to_string(member_out),
boost::uuids::to_string(member_in));

replica_member_info out{member_out, ""};
replica_member_info in{member_in, ""};
auto result =
hs()->repl_service().replace_member(db->repl_dev()->group_id(), task_id, out, in, commit_quorum).get();
if (error == ReplServiceError::OK) {
ASSERT_EQ(result.hasError(), false) << "Error in replacing member, err=" << result.error();
} else {
ASSERT_EQ(result.hasError(), true);
ASSERT_EQ(result.error(), error) << "Error in replacing member, err=" << result.error();
}
});
if (commit_quorum == 0) {
this->run_on_leader(db, [this, error, db, &task_id, member_out, member_in, commit_quorum]() {
do_replace_member(db, task_id, member_out, member_in, commit_quorum, error);
});
} else {
do_replace_member(db, task_id, member_out, member_in, commit_quorum, error);
}
}

void remove_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_id) {
Expand Down
24 changes: 22 additions & 2 deletions src/tests/test_raft_repl_dev_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,33 @@ TEST_F(ReplDevDynamicTest, TwoMemberDown) {
LOGINFO("Shutdown replica 2");
}

LOGINFO("Sleep 10 seconds to waiting for leadership expiring");
sleep(10);
std::string task_id = "task_id";
if (g_helper->replica_num() == 0) {
// Replace down replica 2 with spare replica 3 with commit quorum 1
// so that leader can go ahead with replacing member.
// After reset_quorum_size(1) is applied, the node may need one election timeout
// to self-elect as leader, so retry on NOT_LEADER.
LOGINFO("Replace member started, task_id={}", task_id);
replace_member(db, task_id, g_helper->replica_id(member_out), g_helper->replica_id(member_in),
1 /* commit quorum*/);
constexpr int max_retries = 3;
bool succeeded = false;
for (int i = 0; i < max_retries; ++i) {
auto result = hs()->repl_service()
.replace_member(db->repl_dev()->group_id(), task_id,
replica_member_info{g_helper->replica_id(member_out), ""},
replica_member_info{g_helper->replica_id(member_in), ""}, 1)
.get();
if (!result.hasError()) {
succeeded = true;
break;
}
ASSERT_EQ(result.error(), ReplServiceError::NOT_LEADER)
<< "Replace member failed with unexpected error: " << result.error();
LOGINFO("Replace member returned NOT_LEADER, retry {}/{}", i + 1, max_retries);
std::this_thread::sleep_for(std::chrono::seconds(2));
}
ASSERT_TRUE(succeeded) << "Replace member failed after " << max_retries << " retries";
this->write_on_leader(num_io_entries, true /* wait_for_commit */);
LOGINFO("Leader completed num_io={}", num_io_entries);
}
Expand Down
Loading