Skip to content

Commit

Permalink
Fix: Error After change_membership: assertion failed: value > prev: #…
Browse files Browse the repository at this point in the history
…584

Problem:

Error After change_membership: `assertion failed: value > prev`,
when changing membership by converting a learner to a voter.

Because the replication streams are re-spawned, thus progress reverts to
zero. Then a reverted progress causes the panic.

Solution:

When re-spawning replications, remember the previous progress.

- Fix: #584
  • Loading branch information
drmingdrmer committed Oct 28, 2022
1 parent ee134c0 commit 56486a6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
7 changes: 4 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
pub(crate) async fn spawn_replication_stream(
&mut self,
target: C::NodeId,
matched: Option<LogId<C::NodeId>>,
) -> Result<ReplicationStream<C::NodeId>, N::ConnectionError> {
let target_node = self.engine.state.membership_state.effective.get_node(&target);
let membership_log_id = self.engine.state.membership_state.effective.log_id;
Expand All @@ -1007,6 +1008,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.config.clone(),
self.engine.state.last_log_id(),
self.engine.state.committed,
matched,
network,
self.storage.get_log_reader().await,
self.tx_api.clone(),
Expand Down Expand Up @@ -1611,9 +1613,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
Command::UpdateReplicationStreams { targets } => {
self.remove_all_replication().await;

// TODO: use _matched to initialize replication
for (node_id, _matched) in targets.iter() {
match self.spawn_replication_stream(*node_id).await {
for (node_id, matched) in targets.iter() {
match self.spawn_replication_stream(*node_id, *matched).await {
Ok(state) => {
if let Some(l) = &mut self.leader_data {
l.nodes.insert(*node_id, state);
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
config: Arc<Config>,
last_log: Option<LogId<C::NodeId>>,
committed: Option<LogId<C::NodeId>>,
matched: Option<LogId<C::NodeId>>,
network: N::Network,
log_reader: S::LogReader,
raft_core_tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>,
Expand All @@ -158,7 +159,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
config,
target_repl_state: TargetReplState::LineRate,
committed,
matched: None,
matched,
max_possible_matched_index: last_log.index(),
raft_core_tx,
repl_rx,
Expand Down

0 comments on commit 56486a6

Please sign in to comment.