Skip to content

Commit

Permalink
Feature: add Raft::trigger_elect() and Raft::trigger_heartbeat()
Browse files Browse the repository at this point in the history
…to let user manually trigger a election or send a heartbeat log
  • Loading branch information
drmingdrmer committed Aug 6, 2022
1 parent 78fd73d commit b681775
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 9 deletions.
21 changes: 21 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::ClientWriteTx;
use crate::raft::ExternalCommand;
use crate::raft::RaftAddLearnerTx;
use crate::raft::RaftMsg;
use crate::raft::RaftRespTx;
Expand Down Expand Up @@ -1330,6 +1331,25 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
RaftMsg::ExternalRequest { req } => {
req(&self.engine.state, &mut self.storage, &mut self.network);
}
RaftMsg::ExternalCommand { cmd } => {
match cmd {
ExternalCommand::Elect => {
if self.engine.state.membership_state.effective.is_voter(&self.id) {
// TODO: reject if it is already a leader?
self.engine.elect();
self.run_engine_commands::<Entry<C>>(&[]).await?;
tracing::debug!("ExternalCommand: triggered election");
} else {
// Node is switched to learner after setting up next election time.
}
}
ExternalCommand::Heartbeat => {
// TODO: reject if it is not leader?
let log_id = self.write_entry(EntryPayload::Blank, None).await?;
tracing::debug!(log_id = display(&log_id), "ExternalCommand: sent heartbeat log");
}
}
}
RaftMsg::Tick { i } => {
// check every timer

Expand Down Expand Up @@ -1365,6 +1385,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
if now >= t {
if self.runtime_config.enable_heartbeat.load(Ordering::Relaxed) {
// heartbeat by sending a blank log
// TODO: use Engine::append_blank_log
let log_id = self.write_entry(EntryPayload::Blank, None).await?;
tracing::debug!(log_id = display(&log_id), "sent heartbeat log");
}
Expand Down
56 changes: 52 additions & 4 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,36 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
self.inner.runtime_config.enable_elect.store(enabled, Ordering::Relaxed);
}

/// Trigger election at once and return at once.
///
/// Returns error when RaftCore has Fatal error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_elect(false)`.
pub async fn trigger_elect(&self) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::Elect, "trigger_elect").await
}

/// Trigger a heartbeat at once and return at once.
///
/// Returns error when RaftCore has Fatal error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_heartbeat(false)`.
pub async fn trigger_heartbeat(&self) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await
}

async fn send_external_command(
&self,
cmd: ExternalCommand,
cmd_desc: impl Display + Default,
) -> Result<(), Fatal<C::NodeId>> {
let send_res = self.inner.tx_api.send(RaftMsg::ExternalCommand { cmd });

if send_res.is_err() {
let fatal = self.get_core_stopped_error("sending external command to RaftCore", Some(cmd_desc)).await;
return Err(fatal);
}
Ok(())
}

/// Submit an AppendEntries RPC to this Raft node.
///
/// These RPCs are sent by the cluster leader to replicate log entries (§5.3), and are also
Expand Down Expand Up @@ -618,7 +648,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
}
}

async fn get_core_stopped_error(&self, when: impl Display, message_summary: Option<String>) -> Fatal<C::NodeId> {
async fn get_core_stopped_error(
&self,
when: impl Display,
message_summary: Option<impl Display + Default>,
) -> Fatal<C::NodeId> {
// Wait for the core task to finish.
self.join_core_task().await;

Expand All @@ -634,9 +668,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,

tracing::error!(
core_result = debug(&core_res),
"failure {}; message: {:?}",
"failure {}; message: {}",
when,
message_summary
message_summary.unwrap_or_default()
);

match core_res {
Expand Down Expand Up @@ -841,6 +875,10 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor
req: Box<dyn FnOnce(&RaftState<C::NodeId, C::Node>, &mut S, &mut N) + Send + 'static>,
},

ExternalCommand {
cmd: ExternalCommand,
},

/// A tick event to wake up RaftCore to check timeout etc.
Tick {
/// ith tick
Expand Down Expand Up @@ -945,6 +983,9 @@ where
)
}
RaftMsg::ExternalRequest { .. } => "External Request".to_string(),
RaftMsg::ExternalCommand { cmd } => {
format!("ExternalCommand: {:?}", cmd)
}
RaftMsg::Tick { i } => {
format!("Tick {}", i)
}
Expand Down Expand Up @@ -988,7 +1029,14 @@ where
}
}

//////////////////////////////////////////////////////////////////////////////////////////////////
/// Commands send by user
#[derive(Debug, Clone)]
pub(crate) enum ExternalCommand {
/// Trigger an election at once.
Elect,
/// Emit a heartbeat message, only if the node is leader.
Heartbeat,
}

/// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2).
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
Expand Down
5 changes: 2 additions & 3 deletions openraft/tests/life_cycle/t30_connect_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn network_connection_error() -> anyhow::Result<()> {

tracing::info!("--- since there are no heartbeat is sent, let node-1 to elect");
{
n1.enable_elect(true);
n1.trigger_elect().await?;
n1.wait(timeout()).state(ServerState::Leader, "node-1 become leader").await?;

tracing::info!("--- cluster should work with only a node-2 un-connectable");
Expand All @@ -66,8 +66,7 @@ async fn network_connection_error() -> anyhow::Result<()> {
tracing::info!("--- set node-1 un-connectable too. let node-2 to elect. It should fail");
{
router.set_connectable(1, false);
n1.enable_elect(false);
n2.enable_elect(true);
n2.trigger_elect().await?;

let res = n2.wait(timeout()).state(ServerState::Leader, "node-2 can not be leader").await;
assert!(res.is_err());
Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/membership/t25_elect_with_new_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn leader_election_after_changing_0_to_01234() -> Result<()> {

// Let node-1 become leader.
let node_1 = router.get_raft_handle(&1)?;
node_1.enable_elect(true);
node_1.trigger_elect().await?;
log_index += 1; // leader initial blank log

router
Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/metrics/t30_leader_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async fn leader_metrics() -> Result<()> {

tracing::info!("--- let node-1 to elect to take leadership from node-0");
{
n1.enable_elect(true);
n1.trigger_elect().await?;
n1.wait(timeout()).state(ServerState::Leader, "node-1 becomes leader").await?;
n1.wait(timeout()).metrics(|x| x.replication.is_some(), "node-1 starts replication").await?;

Expand Down

0 comments on commit b681775

Please sign in to comment.