Skip to content

Commit

Permalink
Feature: RaftNetwork::send_append_entries() can return PartialSuccess
Browse files Browse the repository at this point in the history
If there are too many log entries and the `RPCOption.ttl` is not
sufficient, an application can opt to only send a portion of the
entries, with `AppendEntriesResponse::PartialSuccess(Option<LogId>)`, to
inform Openraft with the last replicated log id. Thus replication can
still make progress.

For example, it tries to send log entries `[1-2..3-10]`, the application
is allowed to send just `[1-2..1-3]` and return `PartialSuccess(1-3)`,

### Caution

The returned matching log id must be **greater than or equal to** the
first log id(`AppendEntriesRequest::prev_log_id`) of the entries to
send. If no RPC reply is received, `RaftNetwork::send_append_entries`
**must** return an `RPCError` to inform Openraft that the first log
id(`AppendEntriesRequest::prev_log_id`) may not match on the remote
target node.

- Fix: #822
  • Loading branch information
drmingdrmer committed May 10, 2023
1 parent 5583e53 commit 1ee82cb
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 89 deletions.
4 changes: 2 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ where
match response {
replication::Response::Progress {
target,
id,
request_id: id,
result,
session_id,
} => {
Expand Down Expand Up @@ -1576,7 +1576,7 @@ where
let _ = node.tx_repl.send(Replicate::Heartbeat);
}
Inflight::Logs { id, log_id_range } => {
let _ = node.tx_repl.send(Replicate::logs(id, log_id_range));
let _ = node.tx_repl.send(Replicate::logs(Some(id), log_id_range));
}
Inflight::Snapshot { id, last_log_id } => {
let _ = last_log_id;
Expand Down
12 changes: 12 additions & 0 deletions openraft/src/display_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ impl<'a, T: fmt::Display> fmt::Display for DisplayOption<'a, T> {
}
}

pub(crate) trait DisplayOptionExt<'a, T: fmt::Display> {
fn display(&'a self) -> DisplayOption<'a, T>;
}

impl<T> DisplayOptionExt<'_, T> for Option<T>
where T: fmt::Display
{
fn display(&self) -> DisplayOption<T> {
DisplayOption(self)
}
}

/// Implement `Display` for `&[T]` if T is `Display`.
///
/// It outputs at most `MAX` elements, excluding those from the 5th to the second-to-last one:
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ where C: RaftTypeConfig
/// `send_none` specifies whether to force to send a message even when there is no data to send.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn initiate_replication(&mut self, send_none: SendNone) {
tracing::debug!(progress = debug(&self.leader.progress), "send_to_all");
tracing::debug!(progress = debug(&self.leader.progress), "{}", func_name!());

for (id, prog_entry) in self.leader.progress.iter_mut() {
// TODO: update matching should be done here for leader
Expand All @@ -299,6 +299,7 @@ where C: RaftTypeConfig
}

let t = prog_entry.next_send(self.state, self.config.max_payload_entries);
tracing::debug!(target = display(*id), send = debug(&t), "next send");

match t {
Ok(inflight) => {
Expand Down
6 changes: 6 additions & 0 deletions openraft/src/progress/inflight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ impl<NID: NodeId> Inflight<NID> {

/// Update inflight state when log upto `upto` is acknowledged by a follower/learner.
pub(crate) fn ack(&mut self, upto: Option<LogId<NID>>) {
let request_id = self.get_id();

match self {
Inflight::None => {
unreachable!("no inflight data")
Expand All @@ -162,6 +164,10 @@ impl<NID: NodeId> Inflight<NID> {
*self = Inflight::None;
}
}

if let Some(request_id) = request_id {
self.set_id(request_id);
}
}

/// Update inflight state when a conflicting log id is responded by a follower/learner.
Expand Down
9 changes: 9 additions & 0 deletions openraft/src/progress/inflight/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ fn test_inflight_ack() -> anyhow::Result<()> {
Ok(())
}

#[test]
fn test_inflight_ack_inherit_request_id() -> anyhow::Result<()> {
let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10))).with_id(10);

f.ack(Some(log_id(5)));
assert_eq!(Some(10), f.get_id());
Ok(())
}

#[test]
fn test_inflight_conflict() -> anyhow::Result<()> {
{
Expand Down
53 changes: 43 additions & 10 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Public Raft interface and data types.

use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Display;
use std::marker::PhantomData;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -30,6 +30,7 @@ use crate::core::sm;
use crate::core::RaftCore;
use crate::core::Tick;
use crate::core::TickHandle;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySlice;
use crate::engine::Engine;
use crate::engine::EngineConfig;
Expand Down Expand Up @@ -357,7 +358,7 @@ where
async fn send_external_command(
&self,
cmd: ExternalCommand,
cmd_desc: impl Display + Default,
cmd_desc: impl fmt::Display + Default,
) -> Result<(), Fatal<C::NodeId>> {
let send_res = self.inner.tx_api.send(RaftMsg::ExternalCommand { cmd });

Expand Down Expand Up @@ -731,8 +732,8 @@ where

async fn get_core_stopped_error(
&self,
when: impl Display,
message_summary: Option<impl Display + Default>,
when: impl fmt::Display,
message_summary: Option<impl fmt::Display + Default>,
) -> Fatal<C::NodeId> {
// Wait for the core task to finish.
self.join_core_task().await;
Expand Down Expand Up @@ -1000,7 +1001,7 @@ pub struct AppendEntriesRequest<C: RaftTypeConfig> {
impl<C: RaftTypeConfig> Debug for AppendEntriesRequest<C>
where C::D: Debug
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AppendEntriesRequest")
.field("vote", &self.vote)
.field("prev_log_id", &self.prev_log_id)
Expand All @@ -1023,12 +1024,41 @@ impl<C: RaftTypeConfig> MessageSummary<AppendEntriesRequest<C>> for AppendEntrie
}

/// The response to an `AppendEntriesRequest`.
///
/// [`RaftNetwork::send_append_entries`] returns this type only when received an RPC reply.
/// Otherwise it should return [`RPCError`].
///
/// [`RPCError`]: crate::error::RPCError
/// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries
#[derive(Debug)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum AppendEntriesResponse<NID: NodeId> {
/// Successfully replicated all log entries to the target node.
Success,

/// Successfully sent the first portion of log entries.
///
/// [`RaftNetwork::send_append_entries`] can return a partial success.
/// For example, it tries to send log entries `[1-2..3-10]`, the application is allowed to send
/// just `[1-2..1-3]` and return `PartialSuccess(1-3)`,
///
/// ### Caution
///
/// The returned matching log id must be **greater than or equal to** the first log
/// id([`AppendEntriesRequest::prev_log_id`]) of the entries to send. If no RPC reply is
/// received, [`RaftNetwork::send_append_entries`] must return an [`RPCError`] to inform
/// Openraft that the first log id([`AppendEntriesRequest::prev_log_id`]) may not match on
/// the remote target node.
///
/// [`RPCError`]: crate::error::RPCError
/// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries
PartialSuccess(Option<LogId<NID>>),

/// The first log id([`AppendEntriesRequest::prev_log_id`]) of the entries to send does not
/// match on the remote target node.
Conflict,

/// Seen a vote `v` that does not hold `mine_vote >= v`.
/// And a leader's vote(committed vote) must be total order with other vote.
/// Therefore it has to be a higher vote: `mine_vote < v`
Expand All @@ -1045,12 +1075,15 @@ impl<NID: NodeId> AppendEntriesResponse<NID> {
}
}

impl<NID: NodeId> MessageSummary<AppendEntriesResponse<NID>> for AppendEntriesResponse<NID> {
fn summary(&self) -> String {
impl<NID: NodeId> fmt::Display for AppendEntriesResponse<NID> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AppendEntriesResponse::Success => "Success".to_string(),
AppendEntriesResponse::HigherVote(vote) => format!("Higher vote, {}", vote),
AppendEntriesResponse::Conflict => "Conflict".to_string(),
AppendEntriesResponse::Success => write!(f, "Success"),
AppendEntriesResponse::PartialSuccess(m) => {
write!(f, "PartialSuccess({})", m.display())
}
AppendEntriesResponse::HigherVote(vote) => write!(f, "Higher vote, {}", vote),
AppendEntriesResponse::Conflict => write!(f, "Conflict"),
}
}
}
Expand Down
Loading

0 comments on commit 1ee82cb

Please sign in to comment.