Skip to content

Commit

Permalink
Change: RaftState: make vote private. Accesses vote via 2 new public …
Browse files Browse the repository at this point in the history
…methods: `vote_ref()` and `vote_last_modified()`.

There are several timer-related states that support heartbeat, election,
and other functionalities. These states must be updated correctly in
several places when the raft state changes. For example, when the vote
or the membership changes. However, achieving consistency in all updates becomes difficult.

To address this issue, this patch reduces the number of states that need
to be maintained. Only the last-updated time is recorded every time the
`vote` changes. Other states, such as leader lease and election timeout,
are calculated only when needed.

- Refactor: remove `VoteWiseTime`, remove `Command::InstallElectionTimer`.

- Refactor: remove `next_election_time` from `RaftCore`

- Refactor: update the last-modified time every time `vote` changes.
  Calculate leader lease and election timeout only when used. This way
  it reduces the number of state to maintain.

- Refactor: Engine: move now from RaftState `timer` to track time,

- Refactor: add UTime to track last-updated time for an object.
  • Loading branch information
drmingdrmer committed Mar 12, 2023
1 parent ae28352 commit 9ddb571
Show file tree
Hide file tree
Showing 31 changed files with 588 additions and 361 deletions.
1 change: 0 additions & 1 deletion openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@ pub(crate) use snapshot_state::SnapshotResult;
pub(crate) use snapshot_state::SnapshotState;
pub(crate) use tick::Tick;
pub(crate) use tick::TickHandle;
pub(crate) use tick::VoteWiseTime;
161 changes: 85 additions & 76 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use crate::config::SnapshotPolicy;
use crate::core::ServerState;
use crate::core::SnapshotResult;
use crate::core::SnapshotState;
use crate::core::VoteWiseTime;
use crate::engine::Command;
use crate::engine::Engine;
use crate::engine::SendResult;
Expand Down Expand Up @@ -153,9 +152,6 @@ pub struct RaftCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<
/// Received snapshot that are ready to install.
pub(crate) received_snapshot: BTreeMap<SnapshotId, Box<S::SnapshotData>>,

/// The time to elect if a follower does not receive any append-entry message.
pub(crate) next_election_time: VoteWiseTime<C::NodeId>,

pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C, N, S>>,
pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C, N, S>>,

Expand Down Expand Up @@ -193,21 +189,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
async fn do_main(&mut self, rx_shutdown: oneshot::Receiver<()>) -> Result<(), Fatal<C::NodeId>> {
tracing::debug!("raft node is initializing");

let now = Instant::now();
self.engine.timer.update_now(now);

self.engine.startup();
self.run_engine_commands::<Entry<C>>(&[]).await?;

// To ensure that restarted nodes don't disrupt a stable cluster.
if self.engine.state.server_state == ServerState::Follower {
// If there is only one voter, elect at once.
let voter_count = self.engine.state.membership_state.effective().voter_ids().count();
if voter_count == 1 {
let now = Instant::now();
self.next_election_time = VoteWiseTime::new(*self.engine.state.get_vote(), now);
} else {
self.set_next_election_time(false);
}
}

// Initialize metrics.
self.report_metrics(Update::Update(None));

Expand Down Expand Up @@ -439,15 +426,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
/// Currently heartbeat is a blank log
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub async fn send_heartbeat(&mut self, emitter: impl Display) -> Result<bool, Fatal<C::NodeId>> {
tracing::debug!(now = debug(&self.engine.state.now), "send_heartbeat");
tracing::debug!(now = debug(self.engine.timer.now()), "send_heartbeat");

let mut lh = if let Some((lh, _)) =
self.engine.get_leader_handler_or_reject::<(), ClientWriteError<C::NodeId, C::Node>>(None)
{
lh
} else {
tracing::debug!(
now = debug(&self.engine.state.now),
now = debug(self.engine.timer.now()),
"{} failed to send heartbeat",
emitter
);
Expand Down Expand Up @@ -551,29 +538,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(())
}

/// Set a value for the next election timeout.
#[tracing::instrument(level = "trace", skip(self))]
pub(crate) fn set_next_election_time(&mut self, can_be_leader: bool) {
let now = Instant::now();

let mut t = Duration::from_millis(self.config.new_rand_election_timeout());
if !can_be_leader {
t *= 2;
}
tracing::debug!(
"update election timeout after: {:?}, can_be_leader: {}",
t,
can_be_leader
);

// TODO: election timer should be bound to `(vote, membership_log_id)`:
// i.e., when membership is updated, the previous election timer should be
// invalidated. e.g., in a same `vote`, a learner becomes voter and then
// becomes learner again. election timer should be cleared for learner,
// set for voter and then cleared again.
self.next_election_time = VoteWiseTime::new(*self.engine.state.get_vote(), now + t);
}

pub(crate) async fn handle_building_snapshot_result(
&mut self,
result: SnapshotResult<C::NodeId, C::Node>,
Expand Down Expand Up @@ -1040,7 +1004,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// Vote request needs to check if the lease of the last leader expired.
// Thus it is time sensitive. Update the cached time for it.
let now = Instant::now();
self.engine.state.update_now(now);
self.engine.timer.update_now(now);
tracing::debug!(
vote_request = display(rpc.summary()),
"handle vote request: now: {:?}",
Expand All @@ -1050,6 +1014,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.handle_vote_request(rpc, tx).await?;
}
RaftMsg::VoteResponse { target, resp, vote } => {
let now = Instant::now();
self.engine.timer.update_now(now);

if self.does_vote_match(&vote, "VoteResponse") {
self.handle_vote_resp(resp, target).await?;
}
Expand Down Expand Up @@ -1104,39 +1071,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// check every timer

let now = Instant::now();
self.engine.state.update_now(now);
// TODO: store server start time and use relative time
self.engine.timer.update_now(now);
tracing::debug!("received tick: {}, now: {:?}", i, now);

let current_vote = self.engine.state.get_vote();

tracing::debug!(
"next_election_time: {:?}, current_vote: {:?}, got vote-wise time: {:?}",
self.next_election_time,
current_vote,
self.next_election_time.get_time(current_vote)
);

// Follower/Candidate timer: next election
if let Some(t) = self.next_election_time.get_time(current_vote) {
#[allow(clippy::collapsible_else_if)]
if now < t {
// timeout has not expired.
tracing::debug!("now: {:?} has not pass next election time: {:?}", now, t);
} else {
#[allow(clippy::collapsible_else_if)]
if self.runtime_config.enable_elect.load(Ordering::Relaxed) {
if self.engine.state.membership_state.effective().is_voter(&self.id) {
self.engine.elect();
self.run_engine_commands::<Entry<C>>(&[]).await?;
} else {
// Node is switched to learner after setting up next election time.
tracing::debug!("this node is not a voter");
}
} else {
tracing::debug!("election is disabled");
}
}
}
self.handle_tick_election().await?;

// TODO: test: with heartbeat log, election is automatically rejected.
// TODO: test: fixture: make isolated_nodes a single-way isolating.
Expand Down Expand Up @@ -1197,6 +1136,79 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn handle_tick_election(&mut self) -> Result<(), StorageError<C::NodeId>> {
let now = *self.engine.timer.now();

tracing::debug!("try to trigger election by tick, now: {:?}", now);

// TODO: leader lease should be extended. Or it has to examine if it is leader
// before electing.
if self.engine.state.server_state == ServerState::Leader {
tracing::debug!("already a leader, do not elect again");
return Ok(());
}

if !self.engine.state.membership_state.effective().is_voter(&self.id) {
tracing::debug!("this node is not a voter");
return Ok(());
}

if !self.runtime_config.enable_elect.load(Ordering::Relaxed) {
tracing::debug!("election is disabled");
return Ok(());
}

if self.engine.state.membership_state.effective().voter_ids().count() == 1 {
tracing::debug!("this is the only voter, do election at once");
} else {
tracing::debug!("there are multiple voter, check election timeout");

let current_vote = self.engine.state.get_vote();
let utime = self.engine.state.vote_last_modified();
let timer_config = &self.engine.config.timer_config;

let mut election_timeout = if current_vote.is_committed() {
timer_config.leader_lease + timer_config.election_timeout
} else {
timer_config.election_timeout
};

if let Some(l) = self.engine.internal_server_state.leading() {
if l.is_there_greater_log() {
election_timeout += timer_config.election_timeout;
}
}

tracing::debug!(
"vote utime: {:?}, current_vote: {}, now-utime:{:?}, election_timeout: {:?}",
utime,
current_vote,
utime.map(|x| now - x),
election_timeout,
);

// Follower/Candidate timer: next election
if utime > Some(now - election_timeout) {
tracing::debug!("election timeout has not yet passed",);
return Ok(());
}

tracing::info!("election timeout passed, check if it is a voter for election");
}

// Every time elect, reset this flag.
if let Some(l) = self.engine.internal_server_state.leading_mut() {
l.reset_greater_log();
}

tracing::info!("do trigger election");
self.engine.elect();
self.run_engine_commands::<Entry<C>>(&[]).await?;

Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn handle_replication_progress(
&mut self,
Expand Down Expand Up @@ -1342,9 +1354,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
Command::SaveVote { vote } => {
self.storage.save_vote(&vote).await?;
}
Command::InstallElectionTimer { can_be_leader } => {
self.set_next_election_time(can_be_leader);
}
Command::PurgeLog { upto } => self.storage.purge_logs_upto(upto).await?,
Command::DeleteConflictLog { since } => {
self.storage.delete_conflict_logs_since(since).await?;
Expand Down
29 changes: 0 additions & 29 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,9 @@ use tracing::Level;
use tracing::Span;

use crate::raft::RaftMsg;
use crate::NodeId;
use crate::RaftNetworkFactory;
use crate::RaftStorage;
use crate::RaftTypeConfig;
use crate::Vote;

/// An instant time point bound to a vote.
///
/// If the vote on a node changes, the timeout belonging to a previous vote becomes invalid.
/// See: https://datafuselabs.github.io/openraft/vote.html
#[derive(Debug)]
pub(crate) struct VoteWiseTime<NID: NodeId> {
pub(crate) vote: Vote<NID>,
pub(crate) time: Instant,
}

impl<NID: NodeId> VoteWiseTime<NID> {
pub(crate) fn new(vote: Vote<NID>, time: Instant) -> Self {
Self { vote, time }
}

/// Return the time if vote does not change since it is set.
pub(crate) fn get_time(&self, current_vote: &Vote<NID>) -> Option<Instant> {
debug_assert!(&self.vote <= current_vote);

if &self.vote == current_vote {
Some(self.time)
} else {
None
}
}
}

/// Emit RaftMsg::Tick event at regular `interval`.
pub(crate) struct Tick<C, N, S>
Expand Down
11 changes: 0 additions & 11 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,6 @@ where
/// Send vote to all other members
SendVote { vote_req: VoteRequest<NID> },

/// Install a timer to trigger an election, e.g., calling `Engine::elect()` after some
/// `timeout` which is decided by the runtime. An already installed timer should be
/// cleared.
InstallElectionTimer {
/// When a candidate fails to elect, it falls back to follower.
/// If many enough greater last-log-ids are seen, then this node can never become a leader.
/// Thus give it a longer sleep time before next election.
can_be_leader: bool,
},

/// Purge log from the beginning to `upto`, inclusive.
PurgeLog { upto: LogId<NID> },

Expand Down Expand Up @@ -181,7 +171,6 @@ where
Command::MoveInputCursorBy { .. } => {}
Command::SaveVote { .. } => flags.set_data_changed(),
Command::SendVote { .. } => {}
Command::InstallElectionTimer { .. } => {}
Command::PurgeLog { .. } => flags.set_data_changed(),
Command::DeleteConflictLog { .. } => flags.set_data_changed(),
Command::InstallSnapshot { .. } => flags.set_data_changed(),
Expand Down
14 changes: 6 additions & 8 deletions openraft/src/engine/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use maplit::btreeset;
use pretty_assertions::assert_eq;
use tokio::time::Instant;

use crate::core::ServerState;
use crate::engine::Command;
Expand All @@ -10,6 +11,7 @@ use crate::engine::LogIdList;
use crate::raft::VoteRequest;
use crate::raft_state::VoteStateReader;
use crate::testing::log_id;
use crate::utime::UTime;
use crate::CommittedLeaderId;
use crate::EffectiveMembership;
use crate::LogId;
Expand Down Expand Up @@ -101,7 +103,7 @@ fn test_elect() -> anyhow::Result<()> {
.set_effective(Arc::new(EffectiveMembership::new(Some(log_id(0, 1)), m1())));

// Build in-progress election state
eng.state.vote = Vote::new_committed(1, 2);
eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(1, 2));
eng.vote_handler().become_leading();
eng.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));

Expand Down Expand Up @@ -183,13 +185,9 @@ fn test_elect() -> anyhow::Result<()> {
);

assert_eq!(
vec![
Command::SaveVote { vote: Vote::new(1, 1) },
Command::SendVote {
vote_req: VoteRequest::new(Vote::new(1, 1), Some(log_id(1, 1)))
},
Command::InstallElectionTimer { can_be_leader: true },
],
vec![Command::SaveVote { vote: Vote::new(1, 1) }, Command::SendVote {
vote_req: VoteRequest::new(Vote::new(1, 1), Some(log_id(1, 1)))
},],
eng.output.commands
);
}
Expand Down
Loading

0 comments on commit 9ddb571

Please sign in to comment.