diff --git a/rs/interfaces/src/p2p/state_sync.rs b/rs/interfaces/src/p2p/state_sync.rs index b79ae84bc7c..42f71c9e2b8 100644 --- a/rs/interfaces/src/p2p/state_sync.rs +++ b/rs/interfaces/src/p2p/state_sync.rs @@ -68,8 +68,4 @@ pub trait StateSyncClient: Send + Sync { fn should_cancel(&self, id: &StateSyncArtifactId) -> bool; /// Get a specific chunk from the specified state. fn chunk(&self, id: &StateSyncArtifactId, chunk_id: ChunkId) -> Option; - /// Finish a state sync by delivering the `StateSyncMessage` returned in `Chunkable::add_chunks`. - /// This function should be called only once for each completed state sync message. - /// TODO: (NET-1469) In the future the above invariant should be enforced by the API. - fn deliver_state_sync(&self, msg: Self::Message); } diff --git a/rs/p2p/state_sync_manager/src/lib.rs b/rs/p2p/state_sync_manager/src/lib.rs index 4c283933814..95e50f06598 100644 --- a/rs/p2p/state_sync_manager/src/lib.rs +++ b/rs/p2p/state_sync_manager/src/lib.rs @@ -287,9 +287,6 @@ mod tests { let mut seq = Sequence::new(); let mut seq2 = Sequence::new(); s.expect_should_cancel().returning(move |_| false); - s.expect_deliver_state_sync().return_once(move |_| { - finished_c.notify_waiters(); - }); s.expect_available_states().return_const(vec![]); let mut t = MockTransport::default(); t.expect_rpc().times(50).returning(|p, _| { @@ -316,7 +313,10 @@ mod tests { .in_sequence(&mut seq); c.expect_add_chunk() .once() - .return_once(|_, _| Ok(())) + .return_once(move |_, _| { + finished_c.notify_waiters(); + Ok(()) + }) .in_sequence(&mut seq); c.expect_completed() .times(49) diff --git a/rs/p2p/state_sync_manager/src/ongoing.rs b/rs/p2p/state_sync_manager/src/ongoing.rs index 07dde40d3bd..eff4a388520 100644 --- a/rs/p2p/state_sync_manager/src/ongoing.rs +++ b/rs/p2p/state_sync_manager/src/ongoing.rs @@ -138,9 +138,7 @@ impl OngoingStateSync { // undercount active downloads for this peer but this is acceptable since everything will be reset anyway every // 5-10min when state sync restarts. self.active_downloads.entry(result.peer_id).and_modify(|v| { *v = v.saturating_sub(1) }); - // Usually it is discouraged to use await in the event loop. - // In this case it is ok because the function only is async if state sync completed. - self.handle_downloaded_chunk_result(result).await; + self.handle_downloaded_chunk_result(result); self.spawn_chunk_downloads(); } Err(err) => { @@ -182,23 +180,18 @@ impl OngoingStateSync { } while let Some(Ok((finished, _))) = self.downloading_chunks.join_next().await { - self.handle_downloaded_chunk_result(finished).await; + self.handle_downloaded_chunk_result(finished); } } - async fn handle_downloaded_chunk_result( + fn handle_downloaded_chunk_result( &mut self, DownloadResult { peer_id, result }: DownloadResult, ) { self.metrics.record_chunk_download_result(&result); match result { // Received chunk - Ok(Some(msg)) => { - let state_sync_c = self.state_sync.clone(); - let _ = self - .rt - .spawn_blocking(move || state_sync_c.deliver_state_sync(msg)) - .await; + Ok(Some(_)) => { self.state_sync_finished = true; } Ok(None) => {} diff --git a/rs/p2p/state_sync_manager/tests/common.rs b/rs/p2p/state_sync_manager/tests/common.rs index d01b4bc00df..189fa312e36 100644 --- a/rs/p2p/state_sync_manager/tests/common.rs +++ b/rs/p2p/state_sync_manager/tests/common.rs @@ -103,7 +103,7 @@ impl State { .map(|chunk_size| vec![0; *chunk_size]) } - /// Calulcates the artifact Id of the current state by hashing the ChunkId map. + /// Calculates the artifact Id of the current state by hashing the ChunkId map. pub fn artifact_id(&self) -> StateSyncArtifactId { let state = self.0.lock().unwrap(); let mut hasher = DefaultHasher::new(); @@ -229,14 +229,6 @@ impl StateSyncClient for FakeStateSync { self.global_state.chunk(chunk_id) } - - fn deliver_state_sync(&self, msg: StateSyncMessage) { - if !self.uses_global() { - self.local_state.set_height(msg.height); - } else { - panic!("Node that follows global state should not start state sync"); - } - } } pub struct FakeChunkable { @@ -244,6 +236,7 @@ pub struct FakeChunkable { syncing_state: StateSyncArtifactId, // [meta-manifest, manifests, chunks] chunk_sets: [HashSet; 3], + is_completed: bool, } impl FakeChunkable { @@ -262,6 +255,7 @@ impl FakeChunkable { local_state, syncing_state: global_state.artifact_id(), chunk_sets, + is_completed: false, } } } @@ -299,12 +293,17 @@ impl Chunkable for FakeChunkable { self.local_state.add_chunk(chunk_id, chunk.len()) } + let elems = self.chunk_sets.iter().map(|set| set.len()).sum::(); + if elems == 0 { + self.local_state.set_height(self.syncing_state.height); + self.is_completed = true; + } + Ok(()) } fn completed(&self) -> Option { - let elems = self.chunk_sets.iter().map(|set| set.len()).sum::(); - if elems == 0 { + if self.is_completed { Some(state_sync_artifact(self.syncing_state.clone())) } else { None @@ -360,7 +359,6 @@ pub struct SharableMockStateSync { start_state_sync_calls: Arc, should_cancel_calls: Arc, chunk_calls: Arc, - deliver_state_sync_calls: Arc, } impl SharableMockStateSync { @@ -381,7 +379,6 @@ impl SharableMockStateSync { self.start_state_sync_calls.store(0, Ordering::SeqCst); self.should_cancel_calls.store(0, Ordering::SeqCst); self.chunk_calls.store(0, Ordering::SeqCst); - self.deliver_state_sync_calls.store(0, Ordering::SeqCst); } } @@ -407,10 +404,6 @@ impl StateSyncClient for SharableMockStateSync { self.chunk_calls.fetch_add(1, Ordering::SeqCst); self.mock.lock().unwrap().chunk(id, chunk_id) } - fn deliver_state_sync(&self, msg: StateSyncMessage) { - self.deliver_state_sync_calls.fetch_add(1, Ordering::SeqCst); - self.mock.lock().unwrap().deliver_state_sync(msg) - } } /// Returns tuple of link latency and capacity in bytes for the described link diff --git a/rs/p2p/test_utils/src/mocks.rs b/rs/p2p/test_utils/src/mocks.rs index d44fe350724..97bec89c201 100644 --- a/rs/p2p/test_utils/src/mocks.rs +++ b/rs/p2p/test_utils/src/mocks.rs @@ -26,8 +26,6 @@ mock! { fn should_cancel(&self, id: &StateSyncArtifactId) -> bool; fn chunk(&self, id: &StateSyncArtifactId, chunk_id: ChunkId) -> Option; - - fn deliver_state_sync(&self, msg: T); } } diff --git a/rs/state_manager/src/lib.rs b/rs/state_manager/src/lib.rs index dbcf40bcef0..c1b48271ece 100644 --- a/rs/state_manager/src/lib.rs +++ b/rs/state_manager/src/lib.rs @@ -42,6 +42,7 @@ use ic_metrics::{buckets::decimal_buckets, MetricsRegistry}; use ic_protobuf::proxy::{ProtoProxy, ProxyDecodeError}; use ic_protobuf::{messaging::xnet::v1, state::v1 as pb}; use ic_registry_subnet_type::SubnetType; +use ic_replicated_state::page_map::PageAllocatorFileDescriptor; use ic_replicated_state::{ canister_state::execution_state::SandboxMemory, page_map::{PersistenceError, StorageMetrics}, @@ -64,6 +65,8 @@ use prost::Message; use std::convert::{From, TryFrom}; use std::fs::File; use std::fs::OpenOptions; +use std::os::unix::io::RawFd; +use std::os::unix::prelude::IntoRawFd; use std::path::{Path, PathBuf}; use std::sync::{ atomic::{AtomicU64, Ordering}, @@ -74,10 +77,6 @@ use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, sync::Mutex, }; - -use ic_replicated_state::page_map::PageAllocatorFileDescriptor; -use std::os::unix::io::RawFd; -use std::os::unix::prelude::IntoRawFd; use tempfile::tempfile; use uuid::Uuid; diff --git a/rs/state_manager/src/state_sync.rs b/rs/state_manager/src/state_sync.rs index 8dc0f027364..ca049b88946 100644 --- a/rs/state_manager/src/state_sync.rs +++ b/rs/state_manager/src/state_sync.rs @@ -31,32 +31,72 @@ impl StateSync { } } + #[cfg(test)] + fn new_for_testing( + state_manager: Arc, + state_sync_refs: StateSyncRefs, + log: ReplicaLogger, + ) -> Self { + Self { + state_manager, + state_sync_refs, + log, + } + } + /// Returns requested state as a Chunkable artifact for StateSync. fn create_chunkable_state( &self, id: &StateSyncArtifactId, ) -> Option + Send>> { info!(self.log, "Starting state sync @{}", id.height); - crate::state_sync::chunkable::IncompleteState::try_new( + chunkable::IncompleteState::try_new( self.log.clone(), id.height, CryptoHashOfState::from(id.hash.clone()), - self.state_manager.state_layout.clone(), - self.state_manager.latest_manifest(), - self.state_manager.metrics.clone(), - self.state_manager.own_subnet_type, + Arc::new(self.clone()), Arc::new(Mutex::new(scoped_threadpool::Pool::new( NUMBER_OF_CHECKPOINT_THREADS, ))), - self.state_sync_refs.clone(), - self.state_manager.get_fd_factory(), - self.state_manager.malicious_flags.clone(), ) .map(|incomplete_state| { Box::new(incomplete_state) as Box + Send> }) } + /// Loads the synced checkpoint and gets the corresponding replicated state. + /// Delivers both to the state manager and updates the internals of the state manager. + fn deliver_state_sync(&self, message: StateSyncMessage) { + let height = message.height; + info!(self.log, "Received state {} at height", message.height); + let ro_layout = self + .state_manager + .state_layout + .checkpoint(height) + .expect("failed to create checkpoint layout"); + let state = crate::checkpoint::load_checkpoint_parallel( + &ro_layout, + self.state_manager.own_subnet_type, + &self.state_manager.metrics.checkpoint_metrics, + self.state_manager.get_fd_factory(), + ) + .expect("failed to recover checkpoint"); + + self.state_manager.on_synced_checkpoint( + state, + height, + message.manifest, + message.meta_manifest, + message.root_hash, + ); + + let height = self.state_manager.states.read().last_advertised; + let ids = self.get_all_validated_ids_by_height(height); + if let Some(ids) = ids.last() { + self.state_manager.states.write().last_advertised = ids.height; + } + } + pub fn get_validated_by_identifier( &self, msg_id: &StateSyncArtifactId, @@ -273,36 +313,4 @@ impl StateSyncClient for StateSync { let msg = self.get_validated_by_identifier(id)?; msg.get_chunk(chunk_id) } - - /// Blocking. Makes synchronous file system calls. - fn deliver_state_sync(&self, message: StateSyncMessage) { - let height = message.height; - info!(self.log, "Received state {} at height", message.height); - let ro_layout = self - .state_manager - .state_layout - .checkpoint(height) - .expect("failed to create checkpoint layout"); - let state = crate::checkpoint::load_checkpoint_parallel( - &ro_layout, - self.state_manager.own_subnet_type, - &self.state_manager.metrics.checkpoint_metrics, - self.state_manager.get_fd_factory(), - ) - .expect("failed to recover checkpoint"); - - self.state_manager.on_synced_checkpoint( - state, - height, - message.manifest, - message.meta_manifest, - message.root_hash, - ); - - let height = self.state_manager.states.read().last_advertised; - let ids = self.get_all_validated_ids_by_height(height); - if let Some(ids) = ids.last() { - self.state_manager.states.write().last_advertised = ids.height; - } - } } diff --git a/rs/state_manager/src/state_sync/chunkable.rs b/rs/state_manager/src/state_sync/chunkable.rs index be19bc82394..caedb4d2055 100644 --- a/rs/state_manager/src/state_sync/chunkable.rs +++ b/rs/state_manager/src/state_sync/chunkable.rs @@ -5,6 +5,7 @@ use crate::{ MetaManifest, StateSyncChunk, StateSyncMessage, FILE_CHUNK_ID_OFFSET, FILE_GROUP_CHUNK_ID_OFFSET, MANIFEST_CHUNK_ID_OFFSET, META_MANIFEST_CHUNK, }, + state_sync::StateSync, StateManagerMetrics, StateSyncMetrics, StateSyncRefs, CRITICAL_ERROR_STATE_SYNC_CORRUPTED_CHUNKS, LABEL_COPY_CHUNKS, LABEL_COPY_FILES, LABEL_FETCH, LABEL_FETCH_MANIFEST_CHUNK, LABEL_FETCH_META_MANIFEST_CHUNK, LABEL_FETCH_STATE_CHUNK, @@ -78,6 +79,7 @@ enum DownloadState { pub(crate) struct IncompleteState { log: ReplicaLogger, root: PathBuf, + state_sync: Arc, state_layout: StateLayout, height: Height, root_hash: CryptoHashOfState, @@ -190,18 +192,12 @@ impl IncompleteState { log: ReplicaLogger, height: Height, root_hash: CryptoHashOfState, - state_layout: StateLayout, - manifest_with_checkpoint_layout: Option<(Manifest, CheckpointLayout)>, - metrics: StateManagerMetrics, - own_subnet_type: SubnetType, + state_sync: Arc, thread_pool: Arc>, - state_sync_refs: StateSyncRefs, - fd_factory: Arc, - malicious_flags: MaliciousFlags, ) -> Option { // The state sync manager in the p2p layer is expected not to start a new state sync when there is an ongoing state sync. // Here we check it again as a last resort and return `None` if there is already an active state sync reference. - let mut active = state_sync_refs.active.write(); + let mut active = state_sync.state_sync_refs.active.write(); if let Some((active_height, _hash)) = active.as_ref() { warn!( &log, @@ -212,25 +208,27 @@ impl IncompleteState { return None; } *active = Some((height, root_hash.clone())); + let state_layout = state_sync.state_manager.state_layout.clone(); // Create the `IncompleteState` object while holding the write lock on the active state sync reference. Some(Self { log, root: state_layout .state_sync_scratchpad(height) .expect("failed to create directory for state sync scratchpad"), + state_sync: state_sync.clone(), state_layout, height, root_hash, state: DownloadState::Blank, - manifest_with_checkpoint_layout, - metrics, + manifest_with_checkpoint_layout: state_sync.state_manager.latest_manifest(), + metrics: state_sync.state_manager.metrics.clone(), started_at: Instant::now(), fetch_started_at: None, - own_subnet_type, + own_subnet_type: state_sync.state_manager.own_subnet_type, thread_pool, - state_sync_refs: state_sync_refs.clone(), - fd_factory, - malicious_flags, + state_sync_refs: state_sync.state_sync_refs.clone(), + fd_factory: state_sync.state_manager.fd_factory.clone(), + malicious_flags: state_sync.state_manager.malicious_flags.clone(), }) } @@ -1405,6 +1403,7 @@ impl Chunkable for IncompleteState { &meta_manifest, ); + self.state_sync.deliver_state_sync(artifact.clone()); self.state = DownloadState::Complete(Box::new(artifact.clone())); self.state_sync_refs .cache @@ -1586,6 +1585,7 @@ impl Chunkable for IncompleteState { meta_manifest, ); + self.state_sync.deliver_state_sync(artifact.clone()); self.state = DownloadState::Complete(Box::new(artifact)); self.state_sync_refs .cache diff --git a/rs/state_manager/src/state_sync/chunkable/cache/tests.rs b/rs/state_manager/src/state_sync/chunkable/cache/tests.rs index 22ce04742cc..56c01b84959 100644 --- a/rs/state_manager/src/state_sync/chunkable/cache/tests.rs +++ b/rs/state_manager/src/state_sync/chunkable/cache/tests.rs @@ -1,7 +1,10 @@ use super::*; +use crate::StateManagerImpl; +use ic_config::state_manager::Config; use ic_metrics::MetricsRegistry; -use ic_replicated_state::page_map::TestPageAllocatorFileDescriptorImpl; +use ic_test_utilities_consensus::fake::FakeVerifier; use ic_test_utilities_logger::with_test_replica_logger; +use ic_test_utilities_types::ids::subnet_test_id; use ic_types::{ crypto::CryptoHash, state_sync::StateSyncVersion::{self, *}, @@ -14,7 +17,7 @@ const NUM_THREADS: u32 = 3; /// `IncompleteState` struct TestEnvironment { log: ReplicaLogger, - metrics: StateManagerMetrics, + state_sync: Arc, state_layout: StateLayout, cache: Arc>, _root_dir: TempDir, @@ -24,17 +27,33 @@ impl TestEnvironment { fn new(log: ReplicaLogger) -> Self { let root_dir = tempfile::TempDir::new().expect("failed to create a temporary directory"); let cache = Arc::new(parking_lot::RwLock::new(StateSyncCache::new(log.clone()))); - let metrics = StateManagerMetrics::new(&MetricsRegistry::new(), log.clone()); - let state_layout = StateLayout::try_new( + let state_sync_refs = StateSyncRefs { + active: Arc::new(parking_lot::RwLock::new(Default::default())), + cache: Arc::clone(&cache), + }; + + let config = Config::new(root_dir.path().into()); + let state_manager = Arc::new(StateManagerImpl::new( + Arc::new(FakeVerifier::new()), + subnet_test_id(42), + SubnetType::Application, log.clone(), - root_dir.path().to_owned(), &MetricsRegistry::new(), - ) - .unwrap(); + &config, + None, + ic_types::malicious_flags::MaliciousFlags::default(), + )); + + let state_layout = state_manager.state_layout.clone(); + let state_sync = Arc::new(StateSync::new_for_testing( + state_manager, + state_sync_refs, + log.clone(), + )); Self { log, - metrics, + state_sync, state_layout, cache, _root_dir: root_dir, @@ -111,22 +130,12 @@ fn incomplete_state_for_tests( state: DownloadState, ) -> IncompleteState { let hash = CryptoHashOfState::from(CryptoHash(vec![0; 32])); - let state_sync_refs = StateSyncRefs { - active: Arc::new(parking_lot::RwLock::new(Default::default())), - cache: Arc::clone(&env.cache), - }; let mut result = IncompleteState::try_new( env.log.clone(), height, hash, - env.state_layout.clone(), - None, - env.metrics.clone(), - SubnetType::Application, + env.state_sync.clone(), Arc::new(Mutex::new(scoped_threadpool::Pool::new(NUM_THREADS))), - state_sync_refs, - Arc::new(TestPageAllocatorFileDescriptorImpl::new()), - MaliciousFlags::default(), ) .expect("there exists an ongoing state sync"); diff --git a/rs/state_manager/tests/state_manager.rs b/rs/state_manager/tests/state_manager.rs index 2ddb6b41d6a..a12bb26ee28 100644 --- a/rs/state_manager/tests/state_manager.rs +++ b/rs/state_manager/tests/state_manager.rs @@ -479,8 +479,7 @@ fn rejoining_node_doesnt_accumulate_states() { .expect("failed to get state sync messages"); let chunkable = set_fetch_state_and_start_start_sync(&dst_state_manager, &dst_state_sync, &id); - let dst_msg = pipe_state_sync(msg.clone(), chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg.clone(), chunkable); assert_eq!( src_state_manager.get_latest_state().take(), dst_state_manager.get_latest_state().take() @@ -2167,8 +2166,7 @@ fn can_do_simple_state_sync_transfer() { let chunkable = set_fetch_state_and_start_start_sync(&dst_state_manager, &dst_state_sync, &id); - let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg, chunkable); let recovered_state = dst_state_manager .get_state_at(height(1)) @@ -2278,7 +2276,7 @@ fn test_start_and_cancel_state_sync() { drop(chunkable); // starting state sync for state @3 should succeed after the old one is cancelled. - let chunkable = dst_state_sync + let mut chunkable = dst_state_sync .start_state_sync(&id3) .expect("failed to start state sync"); // The dst state manager has not reached the state @3 yet. It is not requested to fetch a newer state either. @@ -2288,15 +2286,18 @@ fn test_start_and_cancel_state_sync() { let msg = src_state_sync .get_validated_by_identifier(&id3) .expect("failed to get state sync messages"); - let dst_msg = pipe_state_sync(msg, chunkable); - // Although the dst state manager finishes downloading chunks, the state sync is not delivered yet. - // We should not cancel this ongoing state sync for state @3. + let omit: HashSet = + maplit::hashset! {ChunkId::new(FILE_GROUP_CHUNK_ID_OFFSET)}; + let completion = pipe_partial_state_sync(&msg, &mut *chunkable, &omit, false); + assert!( + matches!(completion, Err(StateSyncErrorCode::ChunksMoreNeeded)), + "Unexpectedly completed state sync" + ); + // The state sync is not finished yet. We should not cancel this ongoing state sync for state @3. assert!(!dst_state_sync.should_cancel(&id3)); - // the state sync should finish successfully. - dst_state_sync.deliver_state_sync(dst_msg); - + pipe_state_sync(msg, chunkable); let recovered_state = dst_state_manager .get_state_at(height(3)) .expect("Destination state manager didn't receive the state") @@ -2501,8 +2502,7 @@ fn can_state_sync_from_cache() { assert_eq!(fetch_chunks, chunkable.chunks_to_download().collect()); // Download chunk 1 - let dst_msg = pipe_state_sync(msg2.clone(), chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg2.clone(), chunkable); let recovered_state = dst_state_manager .get_state_at(height(2)) @@ -2595,9 +2595,7 @@ fn can_state_sync_from_cache_alone() { // This is because the omitted file `canister_states/00000000000000640101/software.wasm` in the first state sync // is the same as the other one. As a result, it will be copied and does not need to be fetched. let _res = pipe_meta_manifest(&msg, &mut *chunkable, false); - let dst_msg = pipe_manifest(&msg, &mut *chunkable, false).unwrap(); - - dst_state_sync.deliver_state_sync(dst_msg); + let _res = pipe_manifest(&msg, &mut *chunkable, false).unwrap(); let recovered_state = dst_state_manager .get_state_at(height(1)) @@ -2692,8 +2690,7 @@ fn can_state_sync_after_aborting_in_prep_phase() { let result = pipe_manifest(&msg, &mut *chunkable, false); assert!(matches!(result, Err(StateSyncErrorCode::ChunksMoreNeeded))); - let dst_msg = pipe_state_sync(msg.clone(), chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg.clone(), chunkable); let recovered_state = dst_state_manager .get_state_at(height(2)) @@ -2802,8 +2799,7 @@ fn state_sync_can_reject_invalid_chunks() { } // Provide correct chunks to dst - let dst_msg = pipe_state_sync(msg.clone(), chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg.clone(), chunkable); let recovered_state = dst_state_manager .get_state_at(height(1)) @@ -2854,8 +2850,7 @@ fn can_state_sync_into_existing_checkpoint() { CertificationScope::Full, ); - let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg, chunkable); assert_no_remaining_chunks(dst_metrics); assert_error_counters(dst_metrics); @@ -2926,9 +2921,7 @@ fn can_group_small_files_in_state_sync() { .chunks_to_download() .any(|chunk_id| chunk_id.get() == FILE_GROUP_CHUNK_ID_OFFSET)); - let dst_msg = pipe_state_sync(msg, chunkable); - - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg, chunkable); let recovered_state = dst_state_manager .get_state_at(height(1)) @@ -2978,8 +2971,7 @@ fn can_commit_after_prev_state_is_gone() { let chunkable = set_fetch_state_and_start_start_sync(&dst_state_manager, &dst_state_sync, &id); - let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg, chunkable); dst_state_manager.remove_states_below(height(2)); @@ -3034,8 +3026,7 @@ fn can_commit_without_prev_hash_mismatch_after_taking_tip_at_the_synced_height() let chunkable = set_fetch_state_and_start_start_sync(&dst_state_manager, &dst_state_sync, &id); - let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg, chunkable); assert_eq!(height(3), dst_state_manager.latest_state_height()); let (tip_height, tip) = dst_state_manager.take_tip(); @@ -3080,8 +3071,7 @@ fn can_state_sync_based_on_old_checkpoint() { let chunkable = set_fetch_state_and_start_start_sync(&dst_state_manager, &dst_state_sync, &id); - let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg, chunkable); let expected_state = src_state_manager.get_latest_state(); @@ -3292,8 +3282,7 @@ fn can_recover_from_corruption_on_state_sync() { let chunkable = set_fetch_state_and_start_start_sync(&dst_state_manager, &dst_state_sync, &id); - let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg, chunkable); let expected_state = src_state_manager.get_latest_state(); @@ -3338,8 +3327,7 @@ fn can_commit_below_state_sync() { assert_eq!(tip_height, height(0)); let chunkable = set_fetch_state_and_start_start_sync(&dst_state_manager, &dst_state_sync, &id); - let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_sync.deliver_state_sync(dst_msg); + pipe_state_sync(msg, chunkable); // Check committing an old state doesn't panic dst_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); dst_state_manager.flush_tip_channel(); @@ -3392,10 +3380,9 @@ fn can_state_sync_below_commit() { let (_height, state) = dst_state_manager.take_tip(); dst_state_manager.remove_states_below(height(2)); assert_eq!(dst_state_manager.checkpoint_heights(), vec![height(2)]); + // Perform the state sync after the state manager reaches height 2. + pipe_state_sync(msg, chunkable); - let dst_msg = pipe_state_sync(msg, chunkable); - // the state sync finishes after the state manager reaches height 2. - dst_state_sync.deliver_state_sync(dst_msg); assert_eq!( dst_state_manager.checkpoint_heights(), vec![height(1), height(2)]