Skip to content

Commit

Permalink
Refactor: remove Engine::push_command, use Engine.output.push_command…
Browse files Browse the repository at this point in the history
… instead
  • Loading branch information
drmingdrmer committed Jan 8, 2023
1 parent e1a548c commit 47d2937
Showing 1 changed file with 28 additions and 32 deletions.
60 changes: 28 additions & 32 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ where
self.assign_log_ids(entries.iter_mut());
self.state.extend_log_ids_from_same_leader(entries);

self.push_command(Command::AppendInputEntries { range: 0..l });
self.output.push_command(Command::AppendInputEntries { range: 0..l });

let entry = &mut entries[0];
if let Some(m) = entry.get_membership() {
Expand All @@ -187,7 +187,7 @@ where
}
self.try_update_membership(entry);

self.push_command(Command::MoveInputCursorBy { n: l });
self.output.push_command(Command::MoveInputCursorBy { n: l });

// With the new config, start to elect to become leader
self.elect();
Expand All @@ -214,13 +214,13 @@ where

// Slow-path: send vote request, let a quorum grant it.

self.push_command(Command::SendVote {
self.output.push_command(Command::SendVote {
vote_req: VoteRequest::new(self.state.vote, self.state.last_log_id().copied()),
});

// TODO: For compatibility. remove it. The runtime does not need to know about server state.
self.update_server_state_if_changed();
self.push_command(Command::InstallElectionTimer { can_be_leader: true });
self.output.push_command(Command::InstallElectionTimer { can_be_leader: true });
}

#[tracing::instrument(level = "debug", skip_all)]
Expand Down Expand Up @@ -302,16 +302,16 @@ where
// If peer's vote is greater than current vote, revert to follower state.
if resp.vote > self.state.vote {
self.state.vote = resp.vote;
self.push_command(Command::SaveVote { vote: self.state.vote });
self.output.push_command(Command::SaveVote { vote: self.state.vote });
}

// Seen a higher log.
// TODO: if already installed a timer with can_be_leader==false, it should not install a timer with
// can_be_leader==true.
if resp.last_log_id.as_ref() > self.state.last_log_id() {
self.push_command(Command::InstallElectionTimer { can_be_leader: false });
self.output.push_command(Command::InstallElectionTimer { can_be_leader: false });
} else {
self.push_command(Command::InstallElectionTimer { can_be_leader: true });
self.output.push_command(Command::InstallElectionTimer { can_be_leader: true });
}

debug_assert!(self.is_voter());
Expand Down Expand Up @@ -341,7 +341,7 @@ where
self.assign_log_ids(entries.iter_mut());
self.state.extend_log_ids_from_same_leader(entries);

self.push_command(Command::AppendInputEntries { range: 0..l });
self.output.push_command(Command::AppendInputEntries { range: 0..l });

// Fast commit:
// If the cluster has only one voter, then an entry will be committed as soon as it is appended.
Expand Down Expand Up @@ -387,10 +387,10 @@ where
}

// Still need to replicate to learners, even when it is fast-committed.
self.push_command(Command::ReplicateEntries {
self.output.push_command(Command::ReplicateEntries {
upto: Some(*entries.last().unwrap().get_log_id()),
});
self.push_command(Command::MoveInputCursorBy { n: l });
self.output.push_command(Command::MoveInputCursorBy { n: l });
}

/// Append entries to follower/learner.
Expand Down Expand Up @@ -481,7 +481,7 @@ where
tracing::debug!(committed = display(committed.summary()), "update committed");

if let Some(prev_committed) = self.state.update_committed(&committed) {
self.push_command(Command::FollowerCommit {
self.output.push_command(Command::FollowerCommit {
// TODO(xp): when restart, commit is reset to None. Use last_applied instead.
already_committed: prev_committed,
upto: committed.unwrap(),
Expand Down Expand Up @@ -518,11 +518,11 @@ where

self.state.extend_log_ids(entries);

self.push_command(Command::AppendInputEntries { range: since..l });
self.output.push_command(Command::AppendInputEntries { range: since..l });
self.follower_update_membership(entries.iter());

// TODO(xp): should be moved to handle_append_entries_req()
self.push_command(Command::MoveInputCursorBy { n: l });
self.output.push_command(Command::MoveInputCursorBy { n: l });
}

/// Delete log entries since log index `since`, inclusive, when the log at `since` is found conflict with the
Expand All @@ -545,7 +545,7 @@ where

self.state.log_ids.truncate(since);

self.push_command(Command::DeleteConflictLog { since: since_log_id });
self.output.push_command(Command::DeleteConflictLog { since: since_log_id });

// If the effective membership is from a conflicting log,
// the membership state has to revert to the last committed membership config.
Expand Down Expand Up @@ -580,7 +580,7 @@ where
};

self.state.membership_state = mem_state;
self.push_command(Command::UpdateMembership {
self.output.push_command(Command::UpdateMembership {
membership: self.state.membership_state.effective.clone(),
});

Expand Down Expand Up @@ -659,7 +659,7 @@ where

st.log_ids.purge(&upto);

self.push_command(Command::PurgeLog { upto });
self.output.push_command(Command::PurgeLog { upto });
}

/// Update membership state with a committed membership config
Expand Down Expand Up @@ -706,7 +706,7 @@ where

self.state.membership_state.effective = em.clone();

self.push_command(Command::UpdateMembership {
self.output.push_command(Command::UpdateMembership {
membership: self.state.membership_state.effective.clone(),
});

Expand Down Expand Up @@ -787,7 +787,7 @@ where
debug_assert!(log_id.is_some(), "a valid update can never set matching to None");

if node_id != self.config.id {
self.push_command(Command::UpdateReplicationMetrics {
self.output.push_command(Command::UpdateReplicationMetrics {
target: node_id,
matching: log_id.unwrap(),
});
Expand All @@ -801,10 +801,10 @@ where
}

if let Some(prev_committed) = self.state.update_committed(&committed) {
self.push_command(Command::ReplicateCommitted {
self.output.push_command(Command::ReplicateCommitted {
committed: self.state.committed,
});
self.push_command(Command::LeaderCommit {
self.output.push_command(Command::LeaderCommit {
already_committed: prev_committed,
upto: self.state.committed.unwrap(),
});
Expand Down Expand Up @@ -867,7 +867,7 @@ where
snap_last_log_id.summary(),
self.state.committed.summary()
);
self.push_command(Command::CancelSnapshot { snapshot_meta: meta });
self.output.push_command(Command::CancelSnapshot { snapshot_meta: meta });
return;
}

Expand Down Expand Up @@ -995,11 +995,11 @@ where
// There is an active leader.
// Do not elect for a longer while.
// TODO: Installing a timer should not be part of the Engine's job.
self.push_command(Command::InstallElectionTimer { can_be_leader: false });
self.output.push_command(Command::InstallElectionTimer { can_be_leader: false });
} else {
// There is an active candidate.
// Do not elect for a short while.
self.push_command(Command::InstallElectionTimer { can_be_leader: true });
self.output.push_command(Command::InstallElectionTimer { can_be_leader: true });
}

if self.internal_server_state.is_following() {
Expand Down Expand Up @@ -1039,9 +1039,9 @@ where
index: self.state.last_log_id().next_index(),
};
self.state.log_ids.append(log_id);
self.push_command(Command::AppendBlankLog { log_id });
self.output.push_command(Command::AppendBlankLog { log_id });
self.update_progress(self.config.id, Some(log_id));
self.push_command(Command::ReplicateEntries { upto: Some(log_id) });
self.output.push_command(Command::ReplicateEntries { upto: Some(log_id) });
}

/// update replication streams to reflect replication progress change.
Expand All @@ -1053,7 +1053,7 @@ where
targets.push((*node_id, *matched));
}
}
self.push_command(Command::UpdateReplicationStreams { targets });
self.output.push_command(Command::UpdateReplicationStreams { targets });
}
}

Expand Down Expand Up @@ -1085,7 +1085,7 @@ where
);

self.update_membership_state(memberships);
self.push_command(Command::UpdateMembership {
self.output.push_command(Command::UpdateMembership {
membership: self.state.membership_state.effective.clone(),
});

Expand Down Expand Up @@ -1252,7 +1252,7 @@ where

if vote > &self.state.vote {
self.state.vote = *vote;
self.push_command(Command::SaveVote { vote: *vote });
self.output.push_command(Command::SaveVote { vote: *vote });
}

self.switch_internal_server_state();
Expand Down Expand Up @@ -1294,10 +1294,6 @@ where
self.state.vote.node_id == self.config.id && self.state.vote.committed
}

fn push_command(&mut self, cmd: Command<NID, N>) {
self.output.push_command(cmd)
}

// --- handlers ---

pub(crate) fn vote_handler(&mut self) -> VoteHandler<NID, N> {
Expand Down

0 comments on commit 47d2937

Please sign in to comment.