Skip to content

Commit

Permalink
fix: install snapshot req with offset GE 0 should not start a new ses…
Browse files Browse the repository at this point in the history
…sion.

A install-snapshot always ends with a req with data len to be 0 and
offset GE 0.
If such a req is re-sent, e.g., when timeout, the receiver will try to
install a snapshot with empty data, if it just finished the previous
install snapshot req(`snapshot_state` is None) and do not reject a
install snapshot req with offset GE 0.
Which results in a `fatal storage error`, since the storage tries to
decode an empty snapshot data.

- feature: add config `install_snapshot_timeout`.
  • Loading branch information
drmingdrmer committed Aug 21, 2021
1 parent af69ccb commit 2eccb9e
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 19 deletions.
19 changes: 19 additions & 0 deletions async-raft/src/config.rs
Expand Up @@ -84,6 +84,11 @@ pub struct Config {
/// is performed for heartbeats, so the main item of concern here is network latency. This
/// value is also used as the default timeout for sending heartbeats.
pub heartbeat_interval: u64,

/// The timeout for sending a snapshot segment, in millisecond.
/// By default it is heartbeat_interval * 4
pub install_snapshot_timeout: u64,

/// The maximum number of entries per payload allowed to be transmitted during replication.
///
/// When configuring this value, it is important to note that setting this value too low could
Expand Down Expand Up @@ -118,6 +123,7 @@ impl Config {
election_timeout_min: None,
election_timeout_max: None,
heartbeat_interval: None,
install_snapshot_timeout: None,
max_payload_entries: None,
replication_lag_threshold: None,
snapshot_policy: None,
Expand Down Expand Up @@ -145,6 +151,10 @@ pub struct ConfigBuilder {
pub election_timeout_max: Option<u64>,
/// The interval at which leaders will send heartbeats to followers to avoid election timeout.
pub heartbeat_interval: Option<u64>,

/// The timeout for sending a snapshot segment, in millisecond.
pub install_snapshot_timeout: Option<u64>,

/// The maximum number of entries per payload allowed to be transmitted during replication.
pub max_payload_entries: Option<u64>,
/// The distance behind in log replication a follower must fall before it is considered "lagging".
Expand Down Expand Up @@ -174,6 +184,11 @@ impl ConfigBuilder {
self
}

pub fn install_snapshot_timeout(mut self, val: u64) -> Self {
self.install_snapshot_timeout = Some(val);
self
}

/// Set the desired value for `max_payload_entries`.
pub fn max_payload_entries(mut self, val: u64) -> Self {
self.max_payload_entries = Some(val);
Expand Down Expand Up @@ -211,6 +226,9 @@ impl ConfigBuilder {
if election_timeout_min <= heartbeat_interval {
return Err(ConfigError::InvalidElectionTimeoutMinMax);
}

let install_snapshot_timeout = self.install_snapshot_timeout.unwrap_or(heartbeat_interval * 4);

let max_payload_entries = self.max_payload_entries.unwrap_or(DEFAULT_MAX_PAYLOAD_ENTRIES);
if max_payload_entries == 0 {
return Err(ConfigError::MaxPayloadEntriesTooSmall);
Expand All @@ -223,6 +241,7 @@ impl ConfigBuilder {
election_timeout_min,
election_timeout_max,
heartbeat_interval,
install_snapshot_timeout,
max_payload_entries,
replication_lag_threshold,
snapshot_policy,
Expand Down
26 changes: 20 additions & 6 deletions async-raft/src/core/install_snapshot.rs
Expand Up @@ -12,6 +12,7 @@ use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::AppData;
use crate::AppDataResponse;
use crate::MessageSummary;
use crate::RaftError;
use crate::RaftNetwork;
use crate::RaftStorage;
Expand All @@ -24,7 +25,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Leaders always send chunks in order. It is important to note that, according to the Raft spec,
/// a log may only have one snapshot at any time. As snapshot contents are application specific,
/// the Raft log will only store a pointer to the snapshot file along with the index & term.
#[tracing::instrument(level = "trace", skip(self, req))]
#[tracing::instrument(level = "debug", skip(self, req), fields(req=%req.summary()))]
pub(super) async fn handle_install_snapshot_request(
&mut self,
req: InstallSnapshotRequest,
Expand Down Expand Up @@ -68,7 +69,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// to receive the new snapshot,
// - Mismatched id with offset greater than 0 is an out of order message that should be rejected.
match self.snapshot_state.take() {
None => return self.begin_installing_snapshot(req).await,
None => {
return self.begin_installing_snapshot(req).await;
}
Some(SnapshotState::Snapshotting { handle, .. }) => {
handle.abort(); // Abort the current compaction in favor of installation from leader.
return self.begin_installing_snapshot(req).await;
Expand All @@ -93,10 +96,21 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
}

#[tracing::instrument(level = "trace", skip(self, req))]
#[tracing::instrument(level = "debug", skip(self, req), fields(req=%req.summary()))]
async fn begin_installing_snapshot(&mut self, req: InstallSnapshotRequest) -> RaftResult<InstallSnapshotResponse> {
// Create a new snapshot and begin writing its contents.
let id = req.meta.snapshot_id.clone();

if req.offset > 0 {
return Err(RaftError::SnapshotMismatch {
expect: SnapshotSegmentId {
id: id.clone(),
offset: 0,
},
got: SnapshotSegmentId { id, offset: req.offset },
});
}

// Create a new snapshot and begin writing its contents.
let mut snapshot = self.storage.create_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?;
snapshot.as_mut().write_all(&req.data).await?;

Expand All @@ -119,7 +133,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
})
}

#[tracing::instrument(level = "trace", skip(self, req, snapshot))]
#[tracing::instrument(level = "debug", skip(self, req, snapshot), fields(req=%req.summary()))]
async fn continue_installing_snapshot(
&mut self,
req: InstallSnapshotRequest,
Expand Down Expand Up @@ -158,7 +172,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Finalize the installation of a new snapshot.
///
/// Any errors which come up from this routine will cause the Raft node to go into shutdown.
#[tracing::instrument(level = "trace", skip(self, req, snapshot))]
#[tracing::instrument(level = "debug", skip(self, req, snapshot), fields(req=%req.summary()))]
async fn finalize_snapshot_installation(
&mut self,
req: InstallSnapshotRequest,
Expand Down
9 changes: 7 additions & 2 deletions async-raft/src/raft.rs
Expand Up @@ -710,8 +710,13 @@ pub struct InstallSnapshotRequest {
impl MessageSummary for InstallSnapshotRequest {
fn summary(&self) -> String {
format!(
"term={}, leader_id={}, meta={:?}, offset={}, done={}",
self.term, self.leader_id, self.meta, self.offset, self.done
"term={}, leader_id={}, meta={:?}, offset={}, len={}, done={}",
self.term,
self.leader_id,
self.meta,
self.offset,
self.data.len(),
self.done
)
}
}
Expand Down
19 changes: 13 additions & 6 deletions async-raft/src/replication/mod.rs
Expand Up @@ -154,8 +154,13 @@ struct ReplicationCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: Raf
outbound_buffer: Vec<OutboundEntry<D>>,
/// The heartbeat interval for ensuring that heartbeats are always delivered in a timely fashion.
heartbeat: Interval,

// TODO(xp): collect configs in one struct.
/// The timeout duration for heartbeats.
heartbeat_timeout: Duration,

/// The timeout for sending snapshot segment.
install_snapshot_timeout: Duration,
}

impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> ReplicationCore<D, R, N, S> {
Expand All @@ -174,6 +179,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
) -> ReplicationStream<D> {
let (raft_tx, raft_rx) = mpsc::unbounded_channel();
let heartbeat_timeout = Duration::from_millis(config.heartbeat_interval);
let install_snapshot_timeout = Duration::from_millis(config.install_snapshot_timeout);

let max_payload_entries = config.max_payload_entries as usize;
let this = Self {
id,
Expand All @@ -193,6 +200,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
raft_rx,
heartbeat: interval(heartbeat_timeout),
heartbeat_timeout,
install_snapshot_timeout,
replication_buffer: Vec::new(),
outbound_buffer: Vec::new(),
};
Expand Down Expand Up @@ -918,14 +926,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
"sending snapshot chunk"
);

// TODO(xp): refine install_snapshot timeout

let res = match timeout(
self.core.heartbeat_timeout * 5,
let res = timeout(
self.core.install_snapshot_timeout,
self.core.network.install_snapshot(self.core.target, req),
)
.await
{
.await;

let res = match res {
Ok(outer_res) => match outer_res {
Ok(res) => res,
Err(err) => {
Expand Down
32 changes: 27 additions & 5 deletions async-raft/tests/api_install_snapshot.rs
Expand Up @@ -26,7 +26,11 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let config = Arc::new(
Config::build("test".into())
.validate()
.expect("failed to build Raft config"),
);
let router = Arc::new(RaftRouter::new(config.clone()));

let mut want = 0;
Expand All @@ -35,17 +39,26 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
{
router.new_raft_node(0).await;

router.wait_for_log(&btreeset![0], want, None, "empty").await?;
router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?;
router
.wait_for_log(&btreeset![0], want, None, "empty")
.await?;
router
.wait_for_state(&btreeset![0], State::NonVoter, None, "empty")
.await?;

router.initialize_from_single_node(0).await?;
want += 1;

router.wait_for_log(&btreeset![0], want, None, "init leader").await?;
router
.wait_for_log(&btreeset![0], want, None, "init leader")
.await?;
router.assert_stable_cluster(Some(1), Some(want)).await;
}

let n = router.remove_node(0).await.ok_or_else(|| anyhow::anyhow!("node not found"))?;
let n = router
.remove_node(0)
.await
.ok_or_else(|| anyhow::anyhow!("node not found"))?;
let req0 = InstallSnapshotRequest {
term: 1,
leader_id: 0,
Expand All @@ -58,6 +71,15 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
data: vec![1, 2, 3],
done: false,
};

tracing::info!("--- only allow to begin a new session when offset is 0");
{
let mut req = req0.clone();
req.offset = 2;
let res = n.0.install_snapshot(req).await;
assert_eq!("expect: ss1+0, got: ss1+2", res.unwrap_err().to_string());
}

tracing::info!("--- install and write ss1:[0,3)");
{
let req = req0.clone();
Expand Down

0 comments on commit 2eccb9e

Please sign in to comment.