diff --git a/openraft/src/core/admin.rs b/openraft/src/core/admin.rs index 2b7ce20f3..f960e6170 100644 --- a/openraft/src/core/admin.rs +++ b/openraft/src/core/admin.rs @@ -23,6 +23,7 @@ use crate::raft::AddLearnerResponse; use crate::raft::ClientWriteResponse; use crate::raft::RaftRespTx; use crate::raft_types::LogIdOptionExt; +use crate::raft_types::RaftLogId; use crate::runtime::RaftRuntime; use crate::versioned::Updatable; use crate::ChangeMembers; @@ -40,15 +41,15 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS &mut self, target: C::NodeId, node: Option, - ) -> Result<(), AddLearnerError> { + ) -> Result, AddLearnerError> { let curr = &self.core.engine.state.membership_state.effective.membership; let new_membership = curr.add_learner(target, node)?; tracing::debug!(?new_membership, "new_config"); - self.write_entry(EntryPayload::Membership(new_membership), None).await?; + let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?; - Ok(()) + Ok(log_id) } /// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding @@ -60,24 +61,26 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS /// /// If `blocking` is `true`, the result is sent to `tx` as the target node log has caught up. Otherwise, result is /// sent at once, no matter whether the target node log is lagging or not. - #[tracing::instrument(level = "debug", skip(self, tx))] + #[tracing::instrument(level = "debug", skip(self))] pub(super) async fn add_learner( &mut self, target: C::NodeId, node: Option, tx: RaftRespTx, AddLearnerError>, - blocking: bool, - ) { + ) -> Result<(), Fatal> { tracing::debug!("add target node {} as learner {:?}", target, self.nodes.keys()); // Ensure the node doesn't already exist in the current // config, in the set of new nodes already being synced, or in the nodes being removed. + // TODO: remove this if target == self.core.id { tracing::debug!("target node is this node"); + let _ = tx.send(Ok(AddLearnerResponse { + membership_log_id: self.core.engine.state.membership_state.effective.log_id, matched: self.core.engine.state.last_log_id(), })); - return; + return Ok(()); } let curr = &self.core.engine.state.membership_state.effective; @@ -86,8 +89,11 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS if let Some(t) = self.nodes.get(&target) { tracing::debug!("target node is already a cluster member or is being synced"); - let _ = tx.send(Ok(AddLearnerResponse { matched: t.matched })); - return; + let _ = tx.send(Ok(AddLearnerResponse { + membership_log_id: self.core.engine.state.membership_state.effective.log_id, + matched: t.matched, + })); + return Ok(()); } else { unreachable!( "node {} in membership but there is no replication stream for it", @@ -99,29 +105,31 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS // TODO(xp): when new membership log is appended, write_entry() should be responsible to setup new replication // stream. let res = self.write_add_learner_entry(target, node).await; - if let Err(e) = res { - let _ = tx.send(Err(e)); - return; - } - - if blocking { - let state = self.spawn_replication_stream(target, Some(tx)).await; - // TODO(xp): nodes, i.e., replication streams, should also be a property of follower or candidate, for - // sending vote requests etc? - self.nodes.insert(target, state); - } else { - let state = self.spawn_replication_stream(target, None).await; - self.nodes.insert(target, state); + let log_id = match res { + Ok(x) => x, + Err(e) => { + let _ = tx.send(Err(e)); + return Ok(()); + } + }; - // non-blocking mode, do not know about the replication stat. - let _ = tx.send(Ok(AddLearnerResponse { matched: None })); - } + // TODO(xp): nodes, i.e., replication streams, should also be a property of follower or candidate, for + // sending vote requests etc? + let state = self.spawn_replication_stream(target).await; + self.nodes.insert(target, state); tracing::debug!( "after add target node {} as learner {:?}", target, self.core.engine.state.last_log_id() ); + + let _ = tx.send(Ok(AddLearnerResponse { + membership_log_id: Some(log_id), + matched: None, + })); + + Ok(()) } /// return true if there is pending uncommitted config change @@ -250,7 +258,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS &mut self, payload: EntryPayload, resp_tx: Option, ClientWriteError>>, - ) -> Result<(), Fatal> { + ) -> Result, Fatal> { let mut entry_refs = [EntryRef::new(&payload)]; // TODO: it should returns membership config error etc. currently this is done by the caller. self.core.engine.leader_append_entries(&mut entry_refs); @@ -262,7 +270,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS self.run_engine_commands(&entry_refs).await?; - Ok(()) + Ok(*entry_refs[0].get_log_id()) } #[tracing::instrument(level = "debug", skip_all)] diff --git a/openraft/src/core/leader_state.rs b/openraft/src/core/leader_state.rs index d2baca221..42834fedb 100644 --- a/openraft/src/core/leader_state.rs +++ b/openraft/src/core/leader_state.rs @@ -81,7 +81,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS }; for target in targets { - let state = self.spawn_replication_stream(target, None).await; + let state = self.spawn_replication_stream(target).await; self.nodes.insert(target, state); } @@ -148,8 +148,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS RaftMsg::ClientWriteRequest { rpc, tx } => { self.write_entry(rpc.payload, Some(tx)).await?; } - RaftMsg::AddLearner { id, node, tx, blocking } => { - self.add_learner(id, node, tx, blocking).await; + RaftMsg::AddLearner { id, node, tx } => { + self.add_learner(id, node, tx).await?; } RaftMsg::ChangeMembership { changes, diff --git a/openraft/src/core/replication.rs b/openraft/src/core/replication.rs index 2c185ab14..4b01180b4 100644 --- a/openraft/src/core/replication.rs +++ b/openraft/src/core/replication.rs @@ -8,10 +8,7 @@ use crate::core::LeaderState; use crate::core::ReplicationState; use crate::core::ServerState; use crate::core::SnapshotState; -use crate::error::AddLearnerError; use crate::metrics::UpdateMatchedLogId; -use crate::raft::AddLearnerResponse; -use crate::raft::RaftRespTx; use crate::replication::ReplicaEvent; use crate::replication::ReplicationStream; use crate::replication::UpdateReplication; @@ -28,13 +25,9 @@ use crate::StorageError; impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderState<'a, C, N, S> { /// Spawn a new replication stream returning its replication state handle. - #[tracing::instrument(level = "debug", skip(self, caller_tx))] + #[tracing::instrument(level = "debug", skip(self))] #[allow(clippy::type_complexity)] - pub(super) async fn spawn_replication_stream( - &mut self, - target: C::NodeId, - caller_tx: Option, AddLearnerError>>, - ) -> ReplicationState { + pub(super) async fn spawn_replication_stream(&mut self, target: C::NodeId) -> ReplicationState { let target_node = self.core.engine.state.membership_state.effective.get_node(&target); let repl_stream = ReplicationStream::new::( @@ -53,7 +46,6 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS matched: None, repl_stream, remove_since: None, - tx: caller_tx, failures: 0, } } @@ -135,18 +127,6 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS state.matched = Some(matched); - // Issue a response on the learners response channel if needed. - if state.is_line_rate(&self.core.engine.state.last_log_id(), &self.core.config) { - // This replication became line rate. - - // When adding a learner, it blocks until the replication becomes line-rate. - if let Some(tx) = state.tx.take() { - // TODO(xp): define a specific response type for learner matched event. - let x = AddLearnerResponse { matched: state.matched }; - let _ = tx.send(Ok(x)); - } - } - // Drop replication stream if needed. if self.try_remove_replication(target).await { // nothing to do diff --git a/openraft/src/core/replication_state.rs b/openraft/src/core/replication_state.rs index 64545b571..c30601c2e 100644 --- a/openraft/src/core/replication_state.rs +++ b/openraft/src/core/replication_state.rs @@ -2,9 +2,6 @@ use std::fmt::Debug; use std::fmt::Formatter; use crate::config::Config; -use crate::error::AddLearnerError; -use crate::raft::AddLearnerResponse; -use crate::raft::RaftRespTx; use crate::raft_types::LogIdOptionExt; use crate::replication::ReplicationStream; use crate::LogId; @@ -23,10 +20,6 @@ pub(crate) struct ReplicationState { /// /// It will be reset once a successful replication is done. pub failures: u64, - - /// The response channel to use for when this node has successfully synced with the cluster. - #[allow(clippy::type_complexity)] - pub tx: Option, AddLearnerError>>, } impl MessageSummary> for ReplicationState { diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 96d52a461..c62a93fa5 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -15,6 +15,7 @@ use tokio::task::JoinHandle; use tracing::Span; use crate::config::Config; +use crate::core::is_matched_upto_date; use crate::core::Expectation; use crate::core::RaftCore; use crate::error::AddLearnerError; @@ -118,6 +119,8 @@ enum CoreState { } struct RaftInner, S: RaftStorage> { + id: C::NodeId, + config: Arc, tx_api: mpsc::UnboundedSender<(RaftMsg, Span)>, rx_metrics: watch::Receiver>, // TODO(xp): it does not need to be a async mutex. @@ -180,7 +183,7 @@ impl, S: RaftStorage> Raft, S: RaftStorage> Raft, S: RaftStorage> Raft Result, AddLearnerError> { let (tx, rx) = oneshot::channel(); - self.call_core(RaftMsg::AddLearner { id, node, blocking, tx }, rx).await + let resp = self.call_core(RaftMsg::AddLearner { id, node, tx }, rx).await?; + + if !blocking { + return Ok(resp); + } + + if self.inner.id == id { + return Ok(resp); + } + + // Otherwise, blocks until the replication to the new learner becomes up to date. + + // The log id of the membership that contains the added learner. + let membership_log_id = resp.membership_log_id; + + let res0 = Arc::new(std::sync::Mutex::new(resp)); + let res = res0.clone(); + + let wait_res = self + .wait(None) + .metrics( + |metrics| match self.check_replication_upto_date(metrics, id, membership_log_id) { + Ok(resp) => { + res.lock().unwrap().membership_log_id = resp; + true + } + // keep waiting + Err(_) => false, + }, + "wait new learner to become line-rate", + ) + .await; + + tracing::info!(wait_res = debug(&wait_res), "waiting for replication to new learner"); + + let r = { + let x = res0.lock().unwrap(); + x.clone() + }; + Ok(r) + } + + /// Returns Ok() with the latest known matched log id if it should quit waiting: leader change, node removed, or + /// replication becomes upto date. + /// + /// Returns Err() if it should keep waiting. + fn check_replication_upto_date( + &self, + metrics: &RaftMetrics, + node_id: C::NodeId, + membership_log_id: Option>, + ) -> Result>, ()> { + if metrics.membership_config.log_id < membership_log_id { + // Waiting for the latest metrics to report. + return Err(()); + } + + if !metrics.membership_config.membership.contains(&node_id) { + // This learner has been removed. + return Ok(None); + } + + let repl = match &metrics.replication { + None => { + // This node is no longer a leader. + return Ok(None); + } + Some(x) => x, + }; + + let replication_metrics = &repl.data().replication; + let target_metrics = match replication_metrics.get(&node_id) { + None => { + // Maybe replication is not reported yet. Keep waiting. + return Err(()); + } + Some(x) => x, + }; + + let matched = target_metrics.matched(); + + let last_log_id = LogId::new(matched.leader_id, metrics.last_log_index.unwrap_or_default()); + + if is_matched_upto_date(&Some(matched), &Some(last_log_id), &self.inner.config) { + // replication became up to date. + return Ok(Some(matched)); + } + + // Not up to date, keep waiting. + Err(()) } /// Propose a cluster configuration change. @@ -604,6 +698,10 @@ pub(crate) type RaftRespRx = oneshot::Receiver>; #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct AddLearnerResponse { + /// The log id of the membership that contains the added learner. + pub membership_log_id: Option>, + + /// The last log id that matches leader log. pub matched: Option>, } @@ -640,9 +738,6 @@ pub(crate) enum RaftMsg, S: RaftStor node: Option, - /// If block until the newly added learner becomes line-rate. - blocking: bool, - /// Send the log id when the replication becomes line-rate. tx: RaftRespTx, AddLearnerError>, }, @@ -696,8 +791,8 @@ where RaftMsg::Initialize { members, .. } => { format!("Initialize: {:?}", members) } - RaftMsg::AddLearner { id, blocking, .. } => { - format!("AddLearner: id: {}, blocking: {}", id, blocking) + RaftMsg::AddLearner { id, node, .. } => { + format!("AddLearner: id: {}, node: {:?}", id, node) } RaftMsg::ChangeMembership { changes: members, diff --git a/openraft/tests/membership/t10_add_learner.rs b/openraft/tests/membership/t10_add_learner.rs index a1be603de..7356573db 100644 --- a/openraft/tests/membership/t10_add_learner.rs +++ b/openraft/tests/membership/t10_add_learner.rs @@ -4,7 +4,6 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::raft::AddLearnerResponse; use openraft::Config; use openraft::LeaderId; use openraft::LogId; @@ -37,12 +36,7 @@ async fn add_learner_basic() -> Result<()> { tracing::info!("--- re-adding leader does nothing"); { let res = router.add_learner(0, 0).await?; - assert_eq!( - AddLearnerResponse { - matched: Some(LogId::new(LeaderId::new(1, 0), log_index)) - }, - res - ); + assert_eq!(Some(LogId::new(LeaderId::new(1, 0), log_index)), res.matched); } tracing::info!("--- add new node node-1"); @@ -78,12 +72,7 @@ async fn add_learner_basic() -> Result<()> { tracing::info!("--- re-add node-1, nothing changes"); { let res = router.add_learner(0, 1).await?; - assert_eq!( - AddLearnerResponse { - matched: Some(LogId::new(LeaderId::new(1, 0), log_index)) - }, - res - ); + assert_eq!(Some(LogId::new(LeaderId::new(1, 0), log_index)), res.matched); } Ok(()) @@ -120,7 +109,7 @@ async fn add_learner_non_blocking() -> Result<()> { let raft = router.get_raft_handle(&0)?; let res = raft.add_learner(1, None, false).await?; - assert_eq!(AddLearnerResponse { matched: None }, res); + assert_eq!(None, res.matched); } Ok(())