Skip to content

Commit

Permalink
chore: [NET-1469][MR-558] Refactor deliver_state_sync of the state sy…
Browse files Browse the repository at this point in the history
…nc client interface
  • Loading branch information
ShuoWangNSL committed Mar 19, 2024
1 parent d51f0f5 commit 22d95d7
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 153 deletions.
4 changes: 0 additions & 4 deletions rs/interfaces/src/p2p/state_sync.rs
Expand Up @@ -68,8 +68,4 @@ pub trait StateSyncClient: Send + Sync {
fn should_cancel(&self, id: &StateSyncArtifactId) -> bool;
/// Get a specific chunk from the specified state.
fn chunk(&self, id: &StateSyncArtifactId, chunk_id: ChunkId) -> Option<Chunk>;
/// Finish a state sync by delivering the `StateSyncMessage` returned in `Chunkable::add_chunks`.
/// This function should be called only once for each completed state sync message.
/// TODO: (NET-1469) In the future the above invariant should be enforced by the API.
fn deliver_state_sync(&self, msg: Self::Message);
}
8 changes: 4 additions & 4 deletions rs/p2p/state_sync_manager/src/lib.rs
Expand Up @@ -287,9 +287,6 @@ mod tests {
let mut seq = Sequence::new();
let mut seq2 = Sequence::new();
s.expect_should_cancel().returning(move |_| false);
s.expect_deliver_state_sync().return_once(move |_| {
finished_c.notify_waiters();
});
s.expect_available_states().return_const(vec![]);
let mut t = MockTransport::default();
t.expect_rpc().times(50).returning(|p, _| {
Expand All @@ -316,7 +313,10 @@ mod tests {
.in_sequence(&mut seq);
c.expect_add_chunk()
.once()
.return_once(|_, _| Ok(()))
.return_once(move |_, _| {
finished_c.notify_waiters();
Ok(())
})
.in_sequence(&mut seq);
c.expect_completed()
.times(49)
Expand Down
15 changes: 4 additions & 11 deletions rs/p2p/state_sync_manager/src/ongoing.rs
Expand Up @@ -138,9 +138,7 @@ impl<T: 'static + Send> OngoingStateSync<T> {
// undercount active downloads for this peer but this is acceptable since everything will be reset anyway every
// 5-10min when state sync restarts.
self.active_downloads.entry(result.peer_id).and_modify(|v| { *v = v.saturating_sub(1) });
// Usually it is discouraged to use await in the event loop.
// In this case it is ok because the function only is async if state sync completed.
self.handle_downloaded_chunk_result(result).await;
self.handle_downloaded_chunk_result(result);
self.spawn_chunk_downloads();
}
Err(err) => {
Expand Down Expand Up @@ -182,23 +180,18 @@ impl<T: 'static + Send> OngoingStateSync<T> {
}

while let Some(Ok((finished, _))) = self.downloading_chunks.join_next().await {
self.handle_downloaded_chunk_result(finished).await;
self.handle_downloaded_chunk_result(finished);
}
}

async fn handle_downloaded_chunk_result(
fn handle_downloaded_chunk_result(
&mut self,
DownloadResult { peer_id, result }: DownloadResult<T>,
) {
self.metrics.record_chunk_download_result(&result);
match result {
// Received chunk
Ok(Some(msg)) => {
let state_sync_c = self.state_sync.clone();
let _ = self
.rt
.spawn_blocking(move || state_sync_c.deliver_state_sync(msg))
.await;
Ok(Some(_)) => {
self.state_sync_finished = true;
}
Ok(None) => {}
Expand Down
27 changes: 10 additions & 17 deletions rs/p2p/state_sync_manager/tests/common.rs
Expand Up @@ -103,7 +103,7 @@ impl State {
.map(|chunk_size| vec![0; *chunk_size])
}

/// Calulcates the artifact Id of the current state by hashing the ChunkId map.
/// Calculates the artifact Id of the current state by hashing the ChunkId map.
pub fn artifact_id(&self) -> StateSyncArtifactId {
let state = self.0.lock().unwrap();
let mut hasher = DefaultHasher::new();
Expand Down Expand Up @@ -229,21 +229,14 @@ impl StateSyncClient for FakeStateSync {

self.global_state.chunk(chunk_id)
}

fn deliver_state_sync(&self, msg: StateSyncMessage) {
if !self.uses_global() {
self.local_state.set_height(msg.height);
} else {
panic!("Node that follows global state should not start state sync");
}
}
}

pub struct FakeChunkable {
local_state: State,
syncing_state: StateSyncArtifactId,
// [meta-manifest, manifests, chunks]
chunk_sets: [HashSet<ChunkId>; 3],
is_completed: bool,
}

impl FakeChunkable {
Expand All @@ -262,6 +255,7 @@ impl FakeChunkable {
local_state,
syncing_state: global_state.artifact_id(),
chunk_sets,
is_completed: false,
}
}
}
Expand Down Expand Up @@ -299,12 +293,17 @@ impl Chunkable<StateSyncMessage> for FakeChunkable {
self.local_state.add_chunk(chunk_id, chunk.len())
}

let elems = self.chunk_sets.iter().map(|set| set.len()).sum::<usize>();
if elems == 0 {
self.local_state.set_height(self.syncing_state.height);
self.is_completed = true;
}

Ok(())
}

fn completed(&self) -> Option<StateSyncMessage> {
let elems = self.chunk_sets.iter().map(|set| set.len()).sum::<usize>();
if elems == 0 {
if self.is_completed {
Some(state_sync_artifact(self.syncing_state.clone()))
} else {
None
Expand Down Expand Up @@ -360,7 +359,6 @@ pub struct SharableMockStateSync {
start_state_sync_calls: Arc<AtomicUsize>,
should_cancel_calls: Arc<AtomicUsize>,
chunk_calls: Arc<AtomicUsize>,
deliver_state_sync_calls: Arc<AtomicUsize>,
}

impl SharableMockStateSync {
Expand All @@ -381,7 +379,6 @@ impl SharableMockStateSync {
self.start_state_sync_calls.store(0, Ordering::SeqCst);
self.should_cancel_calls.store(0, Ordering::SeqCst);
self.chunk_calls.store(0, Ordering::SeqCst);
self.deliver_state_sync_calls.store(0, Ordering::SeqCst);
}
}

Expand All @@ -407,10 +404,6 @@ impl StateSyncClient for SharableMockStateSync {
self.chunk_calls.fetch_add(1, Ordering::SeqCst);
self.mock.lock().unwrap().chunk(id, chunk_id)
}
fn deliver_state_sync(&self, msg: StateSyncMessage) {
self.deliver_state_sync_calls.fetch_add(1, Ordering::SeqCst);
self.mock.lock().unwrap().deliver_state_sync(msg)
}
}

/// Returns tuple of link latency and capacity in bytes for the described link
Expand Down
2 changes: 0 additions & 2 deletions rs/p2p/test_utils/src/mocks.rs
Expand Up @@ -26,8 +26,6 @@ mock! {
fn should_cancel(&self, id: &StateSyncArtifactId) -> bool;

fn chunk(&self, id: &StateSyncArtifactId, chunk_id: ChunkId) -> Option<Chunk>;

fn deliver_state_sync(&self, msg: T);
}
}

Expand Down
7 changes: 3 additions & 4 deletions rs/state_manager/src/lib.rs
Expand Up @@ -42,6 +42,7 @@ use ic_metrics::{buckets::decimal_buckets, MetricsRegistry};
use ic_protobuf::proxy::{ProtoProxy, ProxyDecodeError};
use ic_protobuf::{messaging::xnet::v1, state::v1 as pb};
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::page_map::PageAllocatorFileDescriptor;
use ic_replicated_state::{
canister_state::execution_state::SandboxMemory,
page_map::{PersistenceError, StorageMetrics},
Expand All @@ -64,6 +65,8 @@ use prost::Message;
use std::convert::{From, TryFrom};
use std::fs::File;
use std::fs::OpenOptions;
use std::os::unix::io::RawFd;
use std::os::unix::prelude::IntoRawFd;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicU64, Ordering},
Expand All @@ -74,10 +77,6 @@ use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
sync::Mutex,
};

use ic_replicated_state::page_map::PageAllocatorFileDescriptor;
use std::os::unix::io::RawFd;
use std::os::unix::prelude::IntoRawFd;
use tempfile::tempfile;
use uuid::Uuid;

Expand Down
88 changes: 48 additions & 40 deletions rs/state_manager/src/state_sync.rs
Expand Up @@ -31,32 +31,72 @@ impl StateSync {
}
}

#[cfg(test)]
fn new_for_testing(
state_manager: Arc<StateManagerImpl>,
state_sync_refs: StateSyncRefs,
log: ReplicaLogger,
) -> Self {
Self {
state_manager,
state_sync_refs,
log,
}
}

/// Returns requested state as a Chunkable artifact for StateSync.
fn create_chunkable_state(
&self,
id: &StateSyncArtifactId,
) -> Option<Box<dyn Chunkable<StateSyncMessage> + Send>> {
info!(self.log, "Starting state sync @{}", id.height);
crate::state_sync::chunkable::IncompleteState::try_new(
chunkable::IncompleteState::try_new(
self.log.clone(),
id.height,
CryptoHashOfState::from(id.hash.clone()),
self.state_manager.state_layout.clone(),
self.state_manager.latest_manifest(),
self.state_manager.metrics.clone(),
self.state_manager.own_subnet_type,
Arc::new(self.clone()),
Arc::new(Mutex::new(scoped_threadpool::Pool::new(
NUMBER_OF_CHECKPOINT_THREADS,
))),
self.state_sync_refs.clone(),
self.state_manager.get_fd_factory(),
self.state_manager.malicious_flags.clone(),
)
.map(|incomplete_state| {
Box::new(incomplete_state) as Box<dyn Chunkable<StateSyncMessage> + Send>
})
}

/// Loads the synced checkpoint and gets the corresponding replicated state.
/// Delivers both to the state manager and updates the internals of the state manager.
fn deliver_state_sync(&self, message: StateSyncMessage) {
let height = message.height;
info!(self.log, "Received state {} at height", message.height);
let ro_layout = self
.state_manager
.state_layout
.checkpoint(height)
.expect("failed to create checkpoint layout");
let state = crate::checkpoint::load_checkpoint_parallel(
&ro_layout,
self.state_manager.own_subnet_type,
&self.state_manager.metrics.checkpoint_metrics,
self.state_manager.get_fd_factory(),
)
.expect("failed to recover checkpoint");

self.state_manager.on_synced_checkpoint(
state,
height,
message.manifest,
message.meta_manifest,
message.root_hash,
);

let height = self.state_manager.states.read().last_advertised;
let ids = self.get_all_validated_ids_by_height(height);
if let Some(ids) = ids.last() {
self.state_manager.states.write().last_advertised = ids.height;
}
}

pub fn get_validated_by_identifier(
&self,
msg_id: &StateSyncArtifactId,
Expand Down Expand Up @@ -273,36 +313,4 @@ impl StateSyncClient for StateSync {
let msg = self.get_validated_by_identifier(id)?;
msg.get_chunk(chunk_id)
}

/// Blocking. Makes synchronous file system calls.
fn deliver_state_sync(&self, message: StateSyncMessage) {
let height = message.height;
info!(self.log, "Received state {} at height", message.height);
let ro_layout = self
.state_manager
.state_layout
.checkpoint(height)
.expect("failed to create checkpoint layout");
let state = crate::checkpoint::load_checkpoint_parallel(
&ro_layout,
self.state_manager.own_subnet_type,
&self.state_manager.metrics.checkpoint_metrics,
self.state_manager.get_fd_factory(),
)
.expect("failed to recover checkpoint");

self.state_manager.on_synced_checkpoint(
state,
height,
message.manifest,
message.meta_manifest,
message.root_hash,
);

let height = self.state_manager.states.read().last_advertised;
let ids = self.get_all_validated_ids_by_height(height);
if let Some(ids) = ids.last() {
self.state_manager.states.write().last_advertised = ids.height;
}
}
}
28 changes: 14 additions & 14 deletions rs/state_manager/src/state_sync/chunkable.rs
Expand Up @@ -5,6 +5,7 @@ use crate::{
MetaManifest, StateSyncChunk, StateSyncMessage, FILE_CHUNK_ID_OFFSET,
FILE_GROUP_CHUNK_ID_OFFSET, MANIFEST_CHUNK_ID_OFFSET, META_MANIFEST_CHUNK,
},
state_sync::StateSync,
StateManagerMetrics, StateSyncMetrics, StateSyncRefs,
CRITICAL_ERROR_STATE_SYNC_CORRUPTED_CHUNKS, LABEL_COPY_CHUNKS, LABEL_COPY_FILES, LABEL_FETCH,
LABEL_FETCH_MANIFEST_CHUNK, LABEL_FETCH_META_MANIFEST_CHUNK, LABEL_FETCH_STATE_CHUNK,
Expand Down Expand Up @@ -78,6 +79,7 @@ enum DownloadState {
pub(crate) struct IncompleteState {
log: ReplicaLogger,
root: PathBuf,
state_sync: Arc<StateSync>,
state_layout: StateLayout,
height: Height,
root_hash: CryptoHashOfState,
Expand Down Expand Up @@ -190,18 +192,12 @@ impl IncompleteState {
log: ReplicaLogger,
height: Height,
root_hash: CryptoHashOfState,
state_layout: StateLayout,
manifest_with_checkpoint_layout: Option<(Manifest, CheckpointLayout<ReadOnly>)>,
metrics: StateManagerMetrics,
own_subnet_type: SubnetType,
state_sync: Arc<StateSync>,
thread_pool: Arc<Mutex<scoped_threadpool::Pool>>,
state_sync_refs: StateSyncRefs,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
malicious_flags: MaliciousFlags,
) -> Option<IncompleteState> {
// The state sync manager in the p2p layer is expected not to start a new state sync when there is an ongoing state sync.
// Here we check it again as a last resort and return `None` if there is already an active state sync reference.
let mut active = state_sync_refs.active.write();
let mut active = state_sync.state_sync_refs.active.write();
if let Some((active_height, _hash)) = active.as_ref() {
warn!(
&log,
Expand All @@ -212,25 +208,27 @@ impl IncompleteState {
return None;
}
*active = Some((height, root_hash.clone()));
let state_layout = state_sync.state_manager.state_layout.clone();
// Create the `IncompleteState` object while holding the write lock on the active state sync reference.
Some(Self {
log,
root: state_layout
.state_sync_scratchpad(height)
.expect("failed to create directory for state sync scratchpad"),
state_sync: state_sync.clone(),
state_layout,
height,
root_hash,
state: DownloadState::Blank,
manifest_with_checkpoint_layout,
metrics,
manifest_with_checkpoint_layout: state_sync.state_manager.latest_manifest(),
metrics: state_sync.state_manager.metrics.clone(),
started_at: Instant::now(),
fetch_started_at: None,
own_subnet_type,
own_subnet_type: state_sync.state_manager.own_subnet_type,
thread_pool,
state_sync_refs: state_sync_refs.clone(),
fd_factory,
malicious_flags,
state_sync_refs: state_sync.state_sync_refs.clone(),
fd_factory: state_sync.state_manager.fd_factory.clone(),
malicious_flags: state_sync.state_manager.malicious_flags.clone(),
})
}

Expand Down Expand Up @@ -1405,6 +1403,7 @@ impl Chunkable<StateSyncMessage> for IncompleteState {
&meta_manifest,
);

self.state_sync.deliver_state_sync(artifact.clone());
self.state = DownloadState::Complete(Box::new(artifact.clone()));
self.state_sync_refs
.cache
Expand Down Expand Up @@ -1586,6 +1585,7 @@ impl Chunkable<StateSyncMessage> for IncompleteState {
meta_manifest,
);

self.state_sync.deliver_state_sync(artifact.clone());
self.state = DownloadState::Complete(Box::new(artifact));
self.state_sync_refs
.cache
Expand Down

0 comments on commit 22d95d7

Please sign in to comment.