Skip to content

Commit d8d35a5

Browse files
committed
fix: completely deflake the state sync manager tests
1 parent 1650452 commit d8d35a5

File tree

2 files changed

+12
-35
lines changed

2 files changed

+12
-35
lines changed

rs/p2p/state_sync_manager/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ impl<T: 'static + Send> StateSyncManager<T> {
181181
self.metrics.ongoing_state_sync_metrics.clone(),
182182
Arc::new(Mutex::new(chunkable)),
183183
artifact_id.clone(),
184-
self.state_sync.clone(),
185184
self.transport.clone(),
186185
);
187186
// Add peer that initiated this state sync to ongoing state sync.

rs/p2p/state_sync_manager/src/ongoing.rs

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::routes::{build_chunk_handler_request, parse_chunk_handler_response};
2121

2222
use ic_async_utils::JoinMap;
2323
use ic_base_types::NodeId;
24-
use ic_interfaces::p2p::state_sync::{ChunkId, Chunkable, StateSyncArtifactId, StateSyncClient};
24+
use ic_interfaces::p2p::state_sync::{ChunkId, Chunkable, StateSyncArtifactId};
2525
use ic_logger::{error, info, ReplicaLogger};
2626
use ic_quic_transport::{Shutdown, Transport};
2727
use rand::{
@@ -42,7 +42,7 @@ const PARALLEL_CHUNK_DOWNLOADS: usize = 10;
4242
const ONGOING_STATE_SYNC_CHANNEL_SIZE: usize = 200;
4343
const CHUNK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(10);
4444

45-
struct OngoingStateSync<T: Send> {
45+
struct OngoingStateSync {
4646
log: ReplicaLogger,
4747
rt: Handle,
4848
artifact_id: StateSyncArtifactId,
@@ -58,7 +58,6 @@ struct OngoingStateSync<T: Send> {
5858
// Event tasks
5959
downloading_chunks: JoinMap<ChunkId, DownloadResult>,
6060
// State sync
61-
state_sync: Arc<dyn StateSyncClient<Message = T>>,
6261
state_sync_finished: bool,
6362
}
6463

@@ -79,7 +78,6 @@ pub(crate) fn start_ongoing_state_sync<T: Send + 'static>(
7978
metrics: OngoingStateSyncMetrics,
8079
tracker: Arc<Mutex<Box<dyn Chunkable<T> + Send>>>,
8180
artifact_id: StateSyncArtifactId,
82-
state_sync: Arc<dyn StateSyncClient<Message = T>>,
8381
transport: Arc<dyn Transport>,
8482
) -> OngoingStateSyncHandle {
8583
let (new_peers_tx, new_peers_rx) = tokio::sync::mpsc::channel(ONGOING_STATE_SYNC_CHANNEL_SIZE);
@@ -94,7 +92,6 @@ pub(crate) fn start_ongoing_state_sync<T: Send + 'static>(
9492
allowed_downloads: 0,
9593
chunks_to_download: Box::new(std::iter::empty()),
9694
downloading_chunks: JoinMap::new(),
97-
state_sync,
9895
state_sync_finished: false,
9996
};
10097

@@ -110,8 +107,8 @@ pub(crate) fn start_ongoing_state_sync<T: Send + 'static>(
110107
}
111108
}
112109

113-
impl<T: 'static + Send> OngoingStateSync<T> {
114-
pub async fn run(
110+
impl OngoingStateSync {
111+
pub async fn run<T: 'static + Send>(
115112
mut self,
116113
cancellation: CancellationToken,
117114
tracker: Arc<Mutex<Box<dyn Chunkable<T> + Send>>>,
@@ -165,15 +162,12 @@ impl<T: 'static + Send> OngoingStateSync<T> {
165162
self.metrics
166163
.peers_serving_state
167164
.set(self.active_downloads.len() as i64);
168-
// Conditions on when to exit (in addition to timeout)
169-
if self.state_sync_finished
170-
|| self.active_downloads.is_empty()
171-
|| self.state_sync.should_cancel(&self.artifact_id)
172-
{
173-
info!(self.log, "Stopping ongoing state sync because: finished: {}; no peers: {}; should cancel: {};",
165+
if self.state_sync_finished || self.active_downloads.is_empty() {
166+
info!(
167+
self.log,
168+
"Stopping ongoing state sync because: finished: {}; no peers: {};",
174169
self.state_sync_finished,
175170
self.active_downloads.is_empty(),
176-
self.state_sync.should_cancel(&self.artifact_id)
177171
);
178172
break;
179173
}
@@ -182,6 +176,7 @@ impl<T: 'static + Send> OngoingStateSync<T> {
182176
while let Some(Ok((finished, _))) = self.downloading_chunks.join_next().await {
183177
self.handle_downloaded_chunk_result(finished);
184178
}
179+
self.new_peers_rx.close();
185180
}
186181

187182
fn handle_downloaded_chunk_result(
@@ -215,7 +210,7 @@ impl<T: 'static + Send> OngoingStateSync<T> {
215210
}
216211
}
217212

218-
fn spawn_chunk_downloads(
213+
fn spawn_chunk_downloads<T: 'static + Send>(
219214
&mut self,
220215
cancellation: CancellationToken,
221216
tracker: Arc<Mutex<Box<dyn Chunkable<T> + Send>>>,
@@ -286,7 +281,7 @@ impl<T: 'static + Send> OngoingStateSync<T> {
286281
}
287282
}
288283

289-
async fn download_chunk_task(
284+
async fn download_chunk_task<T: 'static + Send>(
290285
peer_id: NodeId,
291286
client: Arc<dyn Transport>,
292287
tracker: Arc<Mutex<Box<dyn Chunkable<T> + Send>>>,
@@ -374,13 +369,11 @@ pub(crate) enum DownloadChunkError {
374369

375370
#[cfg(test)]
376371
mod tests {
377-
use std::sync::atomic::{AtomicBool, Ordering};
378-
379372
use axum::http::{Response, StatusCode};
380373
use bytes::{Bytes, BytesMut};
381374
use ic_interfaces::p2p::state_sync::AddChunkError;
382375
use ic_metrics::MetricsRegistry;
383-
use ic_p2p_test_utils::mocks::{MockChunkable, MockStateSync, MockTransport};
376+
use ic_p2p_test_utils::mocks::{MockChunkable, MockTransport};
384377
use ic_test_utilities_logger::with_test_replica_logger;
385378
use ic_types::{crypto::CryptoHash, Height};
386379
use ic_types_test_utils::ids::{NODE_1, NODE_2};
@@ -403,10 +396,6 @@ mod tests {
403396
#[test]
404397
fn test_should_cancel() {
405398
with_test_replica_logger(|log| {
406-
let mut s = MockStateSync::default();
407-
s.expect_should_cancel()
408-
.return_once(|_| false)
409-
.return_const(true);
410399
let mut t = MockTransport::default();
411400
t.expect_rpc().returning(|_, _| {
412401
Ok(Response::builder()
@@ -428,7 +417,6 @@ mod tests {
428417
height: Height::from(1),
429418
hash: CryptoHash(vec![]),
430419
},
431-
Arc::new(s),
432420
Arc::new(t),
433421
);
434422

@@ -443,8 +431,6 @@ mod tests {
443431
#[test]
444432
fn test_chunk_verification_failed() {
445433
with_test_replica_logger(|log| {
446-
let mut s = MockStateSync::default();
447-
s.expect_should_cancel().return_const(false);
448434
let mut t = MockTransport::default();
449435
t.expect_rpc().returning(|_, _| {
450436
Ok(Response::builder()
@@ -469,7 +455,6 @@ mod tests {
469455
height: Height::from(1),
470456
hash: CryptoHash(vec![]),
471457
},
472-
Arc::new(s),
473458
Arc::new(t),
474459
);
475460

@@ -486,11 +471,6 @@ mod tests {
486471
#[test]
487472
fn test_add_peer_multiple_times_to_ongoing_state_sync() {
488473
with_test_replica_logger(|log| {
489-
let should_cancel = Arc::new(AtomicBool::default());
490-
let should_cancel_c = should_cancel.clone();
491-
let mut s = MockStateSync::default();
492-
s.expect_should_cancel()
493-
.returning(move |_| should_cancel_c.load(Ordering::SeqCst));
494474
let mut t = MockTransport::default();
495475
t.expect_rpc().returning(|_, _| {
496476
Ok(Response::builder()
@@ -516,14 +496,12 @@ mod tests {
516496
height: Height::from(1),
517497
hash: CryptoHash(vec![]),
518498
},
519-
Arc::new(s),
520499
Arc::new(t),
521500
);
522501

523502
rt.block_on(async move {
524503
ongoing.sender.send(NODE_1).await.unwrap();
525504
ongoing.sender.send(NODE_1).await.unwrap();
526-
should_cancel.store(true, Ordering::SeqCst);
527505
ongoing.sender.send(NODE_1).await.unwrap();
528506
// State sync should exit because NODE_1 got removed.
529507
ongoing.shutdown.shutdown().await;

0 commit comments

Comments
 (0)