Skip to content

Commit

Permalink
fix(state_sync_manager): reject state adverts that differ from curren…
Browse files Browse the repository at this point in the history
…t state sync
  • Loading branch information
tthebst committed Jan 16, 2024
1 parent a6f568b commit 29fc6ad
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/p2p/state_sync_manager/BUILD.bazel
Expand Up @@ -35,6 +35,7 @@ DEV_DEPENDENCIES = [
"//rs/p2p/test_utils",
"//rs/test_utilities/logger",
"//rs/types/types_test_utils",
"@crate_index//:mockall",
"@crate_index//:turmoil",
]

Expand Down
2 changes: 1 addition & 1 deletion rs/p2p/state_sync_manager/Cargo.toml
Expand Up @@ -34,5 +34,5 @@ ic-p2p-test-utils = { path = "../test_utils" }
ic-state-manager = { path = "../../state_manager" }
ic-test-utilities-logger = { path = "../../test_utilities/logger" }
ic-types-test-utils = { path = "../../types/types_test_utils" }
mockall = { workspace = true }
turmoil = { workspace = true }

126 changes: 123 additions & 3 deletions rs/p2p/state_sync_manager/src/lib.rs
Expand Up @@ -150,8 +150,12 @@ impl<T: 'static + Send> StateSyncManager<T> {
self.metrics.adverts_received_total.inc();
// Remove ongoing state sync if finished or try to add peer if ongoing.
if let Some(ongoing) = &mut self.ongoing_state_sync {
// Try to add peer to state sync peer set.
let _ = ongoing.sender.try_send(peer_id);
if ongoing.artifact_id == artifact_id {
// `try_send` is used beacuse the ongoing state sync can be blocked. This can, for example happen because of
// file system operations. In that case we don't want to block the main event loop here. It is also fine
// to drop adverts since peers will readvertise anyway.
let _ = ongoing.sender.try_send(peer_id);
}
if ongoing.jh.is_finished() {
info!(self.log, "Cleaning up state sync {}", artifact_id.height);
self.ongoing_state_sync = None;
Expand All @@ -178,7 +182,7 @@ impl<T: 'static + Send> StateSyncManager<T> {
&self.rt,
self.metrics.ongoing_state_sync_metrics.clone(),
Arc::new(Mutex::new(chunkable)),
artifact_id,
artifact_id.clone(),
self.state_sync.clone(),
self.transport.clone(),
self.cancellation.child_token(),
Expand Down Expand Up @@ -234,3 +238,119 @@ impl<T: 'static + Send> StateSyncManager<T> {
}
}
}

#[cfg(test)]
mod tests {
use std::backtrace::Backtrace;

use axum::{http::StatusCode, response::Response};
use bytes::{Bytes, BytesMut};
use ic_interfaces::p2p::state_sync::ChunkId;
use ic_metrics::MetricsRegistry;
use ic_p2p_test_utils::mocks::{MockChunkable, MockStateSync, MockTransport};
use ic_test_utilities_logger::with_test_replica_logger;
use ic_types::{crypto::CryptoHash, CryptoHashOfState, Height};
use ic_types_test_utils::ids::{NODE_1, NODE_2};
use mockall::Sequence;
use prost::Message;
use tokio::{runtime::Runtime, sync::Notify};

use super::*;

#[derive(Clone)]
struct TestMessage;

fn compress_empty_bytes() -> Bytes {
let mut raw = BytesMut::new();
Bytes::new()
.encode(&mut raw)
.expect("Allocated enough memory");
Bytes::from(zstd::bulk::compress(&raw, zstd::DEFAULT_COMPRESSION_LEVEL).unwrap())
}

/// Don't add peers that advertise a state that differs from the current sync.
#[test]
fn test_reject_peer_with_different_state() {
// Abort process if a thread panics. This catches detached tokio tasks that panic.
// https://github.com/tokio-rs/tokio/issues/4516
std::panic::set_hook(Box::new(|info| {
let stacktrace = Backtrace::force_capture();
println!("Got panic. @info:{}\n@stackTrace:{}", info, stacktrace);
std::process::abort();
}));
with_test_replica_logger(|log| {
let finished = Arc::new(Notify::new());
let finished_c = finished.clone();
let mut s = MockStateSync::<TestMessage>::default();
let mut seq = Sequence::new();
let mut seq2 = Sequence::new();
s.expect_should_cancel().returning(move |_| false);
s.expect_deliver_state_sync().return_once(move |_| {
finished_c.notify_waiters();
});
s.expect_available_states().return_const(vec![]);
let mut t = MockTransport::default();
t.expect_rpc().returning(|p, _| {
if p == &NODE_2 {
panic!("NODE 2 should not be added to the state sync")
}
Ok(Response::builder()
.status(StatusCode::OK)
.extension(NODE_1)
.body(compress_empty_bytes())
.unwrap())
});
let mut c = MockChunkable::<TestMessage>::default();
// Endless iterator
c.expect_chunks_to_download()
.once()
.returning(|| Box::new((0..50).map(ChunkId::from)));
c.expect_chunks_to_download()
.returning(|| Box::new(std::iter::empty()));
c.expect_add_chunk()
.times(49)
.return_const(Ok(()))
.in_sequence(&mut seq);
c.expect_add_chunk()
.once()
.return_once(|_, _| Ok(()))
.in_sequence(&mut seq);
c.expect_completed()
.times(49)
.return_const(None)
.in_sequence(&mut seq2);
c.expect_completed()
.once()
.return_once(|| Some(TestMessage))
.in_sequence(&mut seq2);
s.expect_start_state_sync()
.once()
.return_once(|_| Some(Box::new(c)));

let rt = Runtime::new().unwrap();
let old_id = StateSyncArtifactId {
height: Height::from(0),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
};
let id = StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
};

let (handler_tx, handler_rx) = tokio::sync::mpsc::channel(100);
start_state_sync_manager(
&log,
&MetricsRegistry::default(),
rt.handle(),
Arc::new(t) as Arc<_>,
Arc::new(s) as Arc<_>,
handler_rx,
);
rt.block_on(async move {
handler_tx.send((id, NODE_1)).await.unwrap();
handler_tx.send((old_id, NODE_2)).await.unwrap();
finished.notified().await;
});
});
}
}
6 changes: 3 additions & 3 deletions rs/p2p/state_sync_manager/src/ongoing.rs
Expand Up @@ -72,6 +72,7 @@ struct OngoingStateSync<T: Send> {

pub(crate) struct OngoingStateSyncHandle {
pub sender: Sender<NodeId>,
pub artifact_id: StateSyncArtifactId,
pub jh: JoinHandle<()>,
}

Expand All @@ -94,7 +95,7 @@ pub(crate) fn start_ongoing_state_sync<T: Send + 'static>(
let ongoing = OngoingStateSync {
log,
rt: rt.clone(),
artifact_id,
artifact_id: artifact_id.clone(),
metrics,
transport,
cancellation,
Expand All @@ -111,6 +112,7 @@ pub(crate) fn start_ongoing_state_sync<T: Send + 'static>(
let jh = rt.spawn(ongoing.run());
OngoingStateSyncHandle {
sender: new_peers_tx,
artifact_id,
jh,
}
}
Expand Down Expand Up @@ -148,7 +150,6 @@ impl<T: 'static + Send> OngoingStateSync<T> {
// Usually it is discouraged to use await in the event loop.
// In this case it is ok because the function only is async if state sync completed.
self.handle_downloaded_chunk_result(result).await;

self.spawn_chunk_downloads();
}
Err(err) => {
Expand Down Expand Up @@ -383,7 +384,6 @@ mod tests {
use tokio::runtime::Runtime;

use super::*;

#[derive(Clone)]
struct TestMessage;

Expand Down

0 comments on commit 29fc6ad

Please sign in to comment.