Skip to content

Commit

Permalink
fix: snapshot replication does not need to send a last 0 size chunk
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Aug 22, 2021
1 parent c5f02b9 commit eee8e53
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 6 deletions.
1 change: 1 addition & 0 deletions async-raft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tracing-futures = "0.2.4"
[dev-dependencies]
maplit = "1.0.2"
memstore = { version="0.2.0", path="../memstore" }
pretty_assertions = "0.7.2"
tracing-subscriber = "0.2.10"

[features]
Expand Down
17 changes: 11 additions & 6 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,31 +910,36 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::Snapshot>) -> RaftResult<()> {
let end = snapshot.snapshot.seek(SeekFrom::End(0)).await?;

let mut offset = 0;

self.replication_core.next_index = snapshot.meta.last_log_id.index + 1;
self.replication_core.matched = snapshot.meta.last_log_id;
let mut buf = Vec::with_capacity(self.replication_core.config.snapshot_max_chunk_size as usize);

loop {
// Build the RPC.
snapshot.snapshot.seek(SeekFrom::Start(offset)).await?;
let nread = snapshot.snapshot.read_buf(&mut buf).await?;
let done = nread == 0; // If bytes read == 0, then we're done.
let n_read = snapshot.snapshot.read_buf(&mut buf).await?;

let done = (offset + n_read as u64) == end; // If bytes read == 0, then we're done.
let req = InstallSnapshotRequest {
term: self.replication_core.term,
leader_id: self.replication_core.id,
meta: snapshot.meta.clone(),
offset,
data: Vec::from(&buf[..nread]),
data: Vec::from(&buf[..n_read]),
done,
};
buf.clear();

// Send the RPC over to the target.
tracing::debug!(
snapshot_size = req.data.len(),
nread,
req.done,
req.offset,
end,
req.done,
"sending snapshot chunk"
);

Expand Down Expand Up @@ -978,7 +983,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

// Everything is good, so update offset for sending the next chunk.
offset += nread as u64;
offset += n_read as u64;

// Check raft channel to ensure we are staying up-to-date, then loop.
if let Some(Some((event, span))) = self.replication_core.repl_rx.recv().now_or_never() {
Expand Down
4 changes: 4 additions & 0 deletions async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ use maplit::btreeset;
use memstore::ClientRequest as MemClientRequest;
use memstore::ClientResponse as MemClientResponse;
use memstore::MemStore;
#[allow(unused_imports)]
use pretty_assertions::assert_eq;
#[allow(unused_imports)]
use pretty_assertions::assert_ne;
use tokio::sync::RwLock;
use tracing_subscriber::prelude::*;

Expand Down
4 changes: 4 additions & 0 deletions async-raft/tests/leader_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use fixtures::RaftRouter;
use futures::stream::StreamExt;
use maplit::btreeset;
use maplit::hashmap;
#[allow(unused_imports)]
use pretty_assertions::assert_eq;
#[allow(unused_imports)]
use pretty_assertions::assert_ne;

#[macro_use]
mod fixtures;
Expand Down

0 comments on commit eee8e53

Please sign in to comment.