diff --git a/rs/protobuf/def/state/sync/v1/manifest.proto b/rs/protobuf/def/state/sync/v1/manifest.proto index 0966b68d081..73728889bd0 100644 --- a/rs/protobuf/def/state/sync/v1/manifest.proto +++ b/rs/protobuf/def/state/sync/v1/manifest.proto @@ -20,3 +20,8 @@ message Manifest { repeated FileInfo file_table = 2; repeated ChunkInfo chunk_table = 3; } + +message MetaManifest { + uint32 version = 1; + repeated bytes sub_manifest_hashes = 2; +} \ No newline at end of file diff --git a/rs/protobuf/src/gen/state/state.sync.v1.rs b/rs/protobuf/src/gen/state/state.sync.v1.rs index d377d83c7a1..a5103a5bfa5 100644 --- a/rs/protobuf/src/gen/state/state.sync.v1.rs +++ b/rs/protobuf/src/gen/state/state.sync.v1.rs @@ -30,3 +30,11 @@ pub struct Manifest { #[prost(message, repeated, tag = "3")] pub chunk_table: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MetaManifest { + #[prost(uint32, tag = "1")] + pub version: u32, + #[prost(bytes = "vec", repeated, tag = "2")] + pub sub_manifest_hashes: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +} diff --git a/rs/state_manager/src/lib.rs b/rs/state_manager/src/lib.rs index 9bd4dbb62ee..82d0ff10771 100644 --- a/rs/state_manager/src/lib.rs +++ b/rs/state_manager/src/lib.rs @@ -10,7 +10,7 @@ pub mod tree_diff; pub mod tree_hash; use crate::{ - manifest::{build_meta_manifest, compute_bundled_manifest, MAX_SUPPORTED_STATE_SYNC_VERSION}, + manifest::{compute_bundled_manifest, MAX_SUPPORTED_STATE_SYNC_VERSION}, state_sync::chunkable::cache::StateSyncCache, tip::{spawn_tip_thread, TipRequest}, }; @@ -1817,19 +1817,23 @@ impl StateManagerImpl { fn find_checkpoint_by_root_hash( &self, root_hash: &CryptoHashOfState, - ) -> Option<(Height, Manifest)> { + ) -> Option<(Height, Manifest, Arc)> { self.states .read() .states_metadata .iter() - .find_map( - |(h, metadata)| match (metadata.root_hash(), metadata.manifest()) { - (Some(hash), Some(manifest)) if hash == root_hash => { - Some((*h, manifest.clone())) - } - _ => None, - }, - ) + .find_map(|(h, metadata)| { + let bundled_manifest = metadata.bundled_manifest.clone()?; + if &bundled_manifest.root_hash == root_hash { + Some(( + *h, + bundled_manifest.manifest, + bundled_manifest.meta_manifest, + )) + } else { + None + } + }) } fn on_synced_checkpoint( @@ -1837,6 +1841,7 @@ impl StateManagerImpl { state: ReplicatedState, height: Height, manifest: Manifest, + meta_manifest: Arc, root_hash: CryptoHashOfState, ) { if self @@ -1902,10 +1907,6 @@ impl StateManagerImpl { .iter() .map(|f| f.size_bytes as i64) .sum(); - // The computation of meta_manifest is temporary in this replica version. - // In future versions, meta_manifest will also be part of StateSyncMessage - // and can be directly populated here without extra computation. - let meta_manifest = build_meta_manifest(&manifest); states.states_metadata.insert( height, @@ -1914,7 +1915,7 @@ impl StateManagerImpl { bundled_manifest: Some(BundledManifest { root_hash, manifest, - meta_manifest: Arc::new(meta_manifest), + meta_manifest, }), state_sync_file_group: None, }, @@ -2520,7 +2521,7 @@ impl StateManager for StateManagerImpl { // Let's see if we already have this state locally. This might // be the case if we are in subnet recovery mode and // re-introducing some old state with a new height. - if let Some((checkpoint_height, manifest)) = self.find_checkpoint_by_root_hash(&root_hash) { + if let Some((checkpoint_height, manifest, meta_manifest)) = self.find_checkpoint_by_root_hash(&root_hash) { info!(self.log, "Copying checkpoint {} with root hash {:?} under new height {}", checkpoint_height, root_hash, height); @@ -2529,7 +2530,7 @@ impl StateManager for StateManagerImpl { Ok(_) => { let state = load_checkpoint(&self.state_layout, height, &self.metrics, self.own_subnet_type, Arc::clone(&self.get_fd_factory())) .expect("failed to load checkpoint"); - self.on_synced_checkpoint(state, height, manifest, root_hash); + self.on_synced_checkpoint(state, height, manifest, meta_manifest, root_hash); return; } Err(e) => { diff --git a/rs/state_manager/src/manifest.rs b/rs/state_manager/src/manifest.rs index f0968a2619a..44bd6973a7b 100644 --- a/rs/state_manager/src/manifest.rs +++ b/rs/state_manager/src/manifest.rs @@ -15,7 +15,7 @@ use crate::{ use bit_vec::BitVec; use hash::{chunk_hasher, file_hasher, manifest_hasher, ManifestHash}; use ic_crypto_sha::Sha256; -use ic_logger::{error, fatal, ReplicaLogger}; +use ic_logger::{error, fatal, warn, ReplicaLogger}; use ic_replicated_state::PageIndex; use ic_state_layout::{CheckpointLayout, ReadOnly}; use ic_sys::{mmap::ScopedMmap, PAGE_SIZE}; @@ -35,6 +35,8 @@ use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, Weak}; +pub use ic_types::state_sync::DEFAULT_CHUNK_SIZE; + /// Initial version. pub const STATE_SYNC_V1: u32 = 1; @@ -49,8 +51,6 @@ pub const CURRENT_STATE_SYNC_VERSION: u32 = STATE_SYNC_V2; /// The replica will panic if trying to deal with a manifest with a version higher than this. pub const MAX_SUPPORTED_STATE_SYNC_VERSION: u32 = STATE_SYNC_V2; -pub const DEFAULT_CHUNK_SIZE: u32 = 1 << 20; // 1 MiB. - /// When computing a manifest, we recompute the hash of every /// `REHASH_EVERY_NTH_CHUNK` chunk, even if we know it to be unchanged and /// have a hash computed earlier by this replica process. @@ -902,6 +902,15 @@ pub fn compute_manifest( metrics .manifest_size .set(encode_manifest(&manifest).len() as i64); + + if manifest.chunk_table.len() > FILE_GROUP_CHUNK_ID_OFFSET as usize / 2 { + warn!( + log, + "The chunk table is longer than half of the available chunk ID space in state sync. chunk table length: {}, state sync max chunk id: {}", + manifest.chunk_table.len(), + FILE_GROUP_CHUNK_ID_OFFSET - 1, + ); + } Ok(manifest) } diff --git a/rs/state_manager/src/manifest/tests/computation.rs b/rs/state_manager/src/manifest/tests/computation.rs index 935dbd3eb71..1630b5b5485 100644 --- a/rs/state_manager/src/manifest/tests/computation.rs +++ b/rs/state_manager/src/manifest/tests/computation.rs @@ -353,6 +353,56 @@ fn test_validate_sub_manifest() { }), validate_sub_manifest(num, &[], &meta_manifest) ); + + // Test that an altered chunk gives a different hash. + let sub_manifest_0 = encoded_manifest + [0..std::cmp::min(DEFAULT_CHUNK_SIZE as usize, encoded_manifest.len())] + .to_vec(); + let expected_hash = hash_concat!(21u8, b"ic-state-sub-manifest", &sub_manifest_0[..]); + assert_eq!(expected_hash, meta_manifest.sub_manifest_hashes[0]); + + let mut bad_chunk = sub_manifest_0; + let alter_position = bad_chunk.len() / 2; + bad_chunk[alter_position] = bad_chunk[alter_position].wrapping_add(1); + let actual_hash = hash_concat!(21u8, b"ic-state-sub-manifest", &bad_chunk[..]); + + assert_eq!( + Err(ChunkValidationError::InvalidChunkHash { + chunk_ix: 0, + expected_hash: expected_hash.to_vec(), + actual_hash: actual_hash.to_vec() + }), + validate_sub_manifest(0, &bad_chunk, &meta_manifest) + ); +} + +#[test] +fn test_get_sub_manifest_based_on_index() { + let (file_table, chunk_table) = dummy_file_table_and_chunk_table(); + let manifest = Manifest::new(STATE_SYNC_V2, file_table, chunk_table); + let meta_manifest = build_meta_manifest(&manifest); + let encoded_manifest = encode_manifest(&manifest); + + let mut assembled_manifest = Vec::new(); + + // For each element in the meta-manifest, we can get a corresponding sub-manifest as part of the manifest. + let len = meta_manifest.sub_manifest_hashes.len(); + for index in 0..len { + // The same way that `StateSyncMessage` uses to serve a sub-manifest in state sync. + let start = index * DEFAULT_CHUNK_SIZE as usize; + let end = std::cmp::min(start + DEFAULT_CHUNK_SIZE as usize, encoded_manifest.len()); + let sub_manifest = encoded_manifest + .get(start..end) + .expect("failed to get the sub-manifest") + .to_vec(); + assembled_manifest.extend(sub_manifest); + } + assert_eq!(assembled_manifest, encoded_manifest); + + // Test that we get nothing given an out-of-bound index. + let start = len * DEFAULT_CHUNK_SIZE as usize; + let end = std::cmp::min(start + DEFAULT_CHUNK_SIZE as usize, encoded_manifest.len()); + assert!(encoded_manifest.get(start..end).is_none()); } #[test] diff --git a/rs/state_manager/src/state_sync.rs b/rs/state_manager/src/state_sync.rs index b66758a54c1..a4c76786f43 100644 --- a/rs/state_manager/src/state_sync.rs +++ b/rs/state_manager/src/state_sync.rs @@ -99,6 +99,7 @@ impl ArtifactClient for StateSync { msg_id: &StateSyncArtifactId, ) -> Option { let mut file_group_to_populate: Option> = None; + let state_sync_message = self .state_manager .states @@ -108,6 +109,7 @@ impl ArtifactClient for StateSync { .find_map(|(height, metadata)| { if metadata.root_hash() == Some(&msg_id.hash) { let manifest = metadata.manifest()?; + let meta_manifest = metadata.meta_manifest()?; let checkpoint_root = self.state_manager.state_layout.checkpoint(*height).ok()?; let state_sync_file_group = match &metadata.state_sync_file_group { @@ -125,6 +127,7 @@ impl ArtifactClient for StateSync { height: *height, root_hash: msg_id.hash.clone(), checkpoint_root: checkpoint_root.raw_path().to_path_buf(), + meta_manifest, manifest: manifest.clone(), state_sync_file_group, }) @@ -182,12 +185,14 @@ impl ArtifactClient for StateSync { if h > filter.height { let metadata = states.states_metadata.get(&h)?; let manifest = metadata.manifest()?; + let meta_manifest = metadata.meta_manifest()?; let checkpoint_root = self.state_manager.state_layout.checkpoint(h).ok()?; let msg = StateSyncMessage { height: h, root_hash: metadata.root_hash()?.clone(), checkpoint_root: checkpoint_root.raw_path().to_path_buf(), manifest: manifest.clone(), + meta_manifest, state_sync_file_group: Default::default(), }; Some(StateSyncArtifact::message_to_advert(&msg)) @@ -325,10 +330,12 @@ impl ArtifactProcessor for StateSync { self.state_manager.get_fd_factory(), ) .expect("failed to recover checkpoint"); + self.state_manager.on_synced_checkpoint( state, height, message.manifest, + message.meta_manifest, message.root_hash, ); } diff --git a/rs/state_manager/src/state_sync/chunkable.rs b/rs/state_manager/src/state_sync/chunkable.rs index 4caa4e001c2..268bb003948 100644 --- a/rs/state_manager/src/state_sync/chunkable.rs +++ b/rs/state_manager/src/state_sync/chunkable.rs @@ -19,7 +19,9 @@ use ic_types::{ }, malicious_flags::MaliciousFlags, state_sync::{ - decode_manifest, FileGroupChunks, Manifest, FILE_GROUP_CHUNK_ID_OFFSET, MANIFEST_CHUNK, + decode_manifest, decode_meta_manifest, state_sync_chunk_type, FileGroupChunks, Manifest, + MetaManifest, StateSyncChunk, FILE_CHUNK_ID_OFFSET, FILE_GROUP_CHUNK_ID_OFFSET, + MANIFEST_CHUNK_ID_OFFSET, META_MANIFEST_CHUNK, }, CryptoHashOfState, Height, }; @@ -27,7 +29,7 @@ use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::time::Instant; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet}, sync::{Arc, Mutex}, }; @@ -37,19 +39,35 @@ pub mod cache; // necessary. const ALWAYS_VALIDATE: bool = false; +type SubManifest = Vec; /// The state of the communication with up-to-date nodes. #[derive(Clone)] enum DownloadState { - /// Haven't received any chunks yet, waiting for the manifest chunk. + /// Haven't received any chunks yet, waiting for the meta-manifest chunk. Blank, + /// In the process of assembling the manifest. + Prep { + /// The received meta-manifest + meta_manifest: MetaManifest, + /// This field stores the sub-manifests received and can be used to reconstruct the whole manifest. + manifest_in_construction: BTreeMap, + /// Set of chunks that still needed to be fetched for the manifest. + manifest_chunks: BTreeSet, + }, /// In the process of loading chunks, have some more to load. Loading { + /// The received meta-manifest + meta_manifest: MetaManifest, /// The received manifest manifest: Manifest, state_sync_file_group: FileGroupChunks, /// Set of chunks that still need to be fetched. For the purpose of this - /// set chunk 0 is the manifest. To get indices into the manifests's - /// chunk table subtract 1. + /// set, chunk 0 is the meta-manifest. + /// + /// To get indices into the manifest's chunk table, subtract 1. Note that + /// it does not apply to file group chunks because they are assigned with + /// a dedicated chunk id range. + /// The manifest chunks are not part of `fetch_chunks` because they are fetched in the `Prep` phase. fetch_chunks: HashSet, }, /// Successfully completed and returned the artifact to P2P, nothing else to @@ -103,7 +121,15 @@ impl Drop for IncompleteState { .with_label_values(&["aborted_blank"]) .observe(elapsed.as_secs_f64()); } + DownloadState::Prep { .. } => { + self.metrics + .state_sync_metrics + .duration + .with_label_values(&["aborted_prep"]) + .observe(elapsed.as_secs_f64()); + } DownloadState::Loading { + meta_manifest: _, manifest: _, state_sync_file_group, fetch_chunks, @@ -141,6 +167,7 @@ impl Drop for IncompleteState { // passing it to the cache might alter the download state let description = match self.state { DownloadState::Blank => "aborted before receiving any chunks", + DownloadState::Prep { .. } => "aborted before receiving the entire manifest", DownloadState::Loading { .. } => "aborted before receiving all the chunks", DownloadState::Complete(_) => "completed successfully", }; @@ -342,10 +369,10 @@ impl IncompleteState { byte_range.start, byte_range.end, src_data.len(), - new_chunk_idx + 1 + new_chunk_idx + FILE_CHUNK_ID_OFFSET ); bad_chunks.push(idx); - corrupted_chunks.lock().unwrap().push(new_chunk_idx + 1); + corrupted_chunks.lock().unwrap().push(new_chunk_idx + FILE_CHUNK_ID_OFFSET); if !validate_data && ALWAYS_VALIDATE { error!( log, @@ -373,11 +400,11 @@ impl IncompleteState { byte_range.start, byte_range.end, err, - new_chunk_idx + 1 + new_chunk_idx + FILE_CHUNK_ID_OFFSET ); bad_chunks.push(idx); - corrupted_chunks.lock().unwrap().push(new_chunk_idx + 1); + corrupted_chunks.lock().unwrap().push(new_chunk_idx + FILE_CHUNK_ID_OFFSET); if !validate_data && ALWAYS_VALIDATE { error!( log, @@ -617,9 +644,9 @@ impl IncompleteState { byte_range.start, byte_range.end, src_map.len(), - *dst_chunk_index + 1 + *dst_chunk_index + FILE_CHUNK_ID_OFFSET ); - corrupted_chunks.lock().unwrap().push(*dst_chunk_index + 1); + corrupted_chunks.lock().unwrap().push(*dst_chunk_index + FILE_CHUNK_ID_OFFSET); continue; } #[cfg(not(target_os = "linux"))] @@ -643,10 +670,10 @@ impl IncompleteState { byte_range.start, byte_range.end, err, - *dst_chunk_index + 1 + *dst_chunk_index + FILE_CHUNK_ID_OFFSET ); - corrupted_chunks.lock().unwrap().push(*dst_chunk_index + 1); + corrupted_chunks.lock().unwrap().push(*dst_chunk_index + FILE_CHUNK_ID_OFFSET); if !validate_data && ALWAYS_VALIDATE { error!( log, @@ -748,6 +775,7 @@ impl IncompleteState { height: Height, root_hash: CryptoHashOfState, manifest: &Manifest, + meta_manifest: &MetaManifest, ) -> Artifact { Artifact::StateSync(StateSyncMessage { height, @@ -757,6 +785,7 @@ impl IncompleteState { .unwrap() .raw_path() .to_path_buf(), + meta_manifest: Arc::new(meta_manifest.clone()), manifest: manifest.clone(), // `state_sync_file_group` and `checkpoint_root` are not included in the integrity hash of this artifact. // Therefore it is OK to pass a default value here as it is only used when fetching chunks. @@ -1007,7 +1036,11 @@ impl IncompleteState { // diff_script contains indices into the manifest chunk table, but p2p // counts the manifest itself as chunk 0, so all other chunk indices are // shifted by 1 - let mut fetch_chunks = diff_script.fetch_chunks.iter().map(|i| *i + 1).collect(); + let mut fetch_chunks = diff_script + .fetch_chunks + .iter() + .map(|i| *i + FILE_CHUNK_ID_OFFSET) + .collect(); let diff_bytes: u64 = diff_script .fetch_chunks @@ -1086,7 +1119,10 @@ impl IncompleteState { .remaining .sub(zeros_chunks as i64); - non_zero_chunks.iter().map(|i| *i + 1).collect() + non_zero_chunks + .iter() + .map(|i| *i + FILE_CHUNK_ID_OFFSET) + .collect() } } } @@ -1094,9 +1130,18 @@ impl IncompleteState { impl Chunkable for IncompleteState { fn chunks_to_download(&self) -> Box> { match self.state { - DownloadState::Blank => Box::new(std::iter::once(MANIFEST_CHUNK)), - DownloadState::Complete(_) => Box::new(std::iter::empty()), + DownloadState::Blank => Box::new(std::iter::once(META_MANIFEST_CHUNK)), + DownloadState::Prep { + meta_manifest: _, + manifest_in_construction: _, + ref manifest_chunks, + } => { + #[allow(clippy::needless_collect)] + let ids: Vec<_> = manifest_chunks.iter().map(|id| ChunkId::new(*id)).collect(); + Box::new(ids.into_iter()) + } DownloadState::Loading { + meta_manifest: _, manifest: _, state_sync_file_group: _, ref fetch_chunks, @@ -1108,11 +1153,12 @@ impl Chunkable for IncompleteState { .collect(); Box::new(ids.into_iter()) } + DownloadState::Complete(_) => Box::new(std::iter::empty()), } } fn add_chunk(&mut self, artifact_chunk: ArtifactChunk) -> Result { - let ix = artifact_chunk.chunk_id.get() as usize; + let ix = artifact_chunk.chunk_id.get(); let payload = match artifact_chunk.artifact_chunk_data { ArtifactChunkData::SemiStructuredChunkData(ref payload) => payload, @@ -1131,33 +1177,134 @@ impl Chunkable for IncompleteState { Ok(*artifact.clone()) } - DownloadState::Blank => { - if artifact_chunk.chunk_id == MANIFEST_CHUNK { - let manifest = decode_manifest(payload).map_err(|err| { + if artifact_chunk.chunk_id == META_MANIFEST_CHUNK { + let meta_manifest = decode_meta_manifest(payload).map_err(|err| { warn!( self.log, - "Failed to decode manifest chunk for state {}: {}", self.height, err + "Failed to decode meta-manifest chunk for state {}: {}", + self.height, + err ); ChunkVerificationFailed })?; + crate::manifest::validate_meta_manifest(&meta_manifest, &self.root_hash) + .map_err(|err| { + warn!(self.log, "Received invalid meta-manifest: {}", err); + ChunkVerificationFailed + })?; + let manifest_chunks_len = meta_manifest.sub_manifest_hashes.len(); + debug!( + self.log, + "Received META_MANIFEST chunk for state {}, got {} more chunks to download for the manifest", + self.height, + manifest_chunks_len + ); + trace!(self.log, "Received meta-manifest:\n{:?}", meta_manifest); + + assert!( + MANIFEST_CHUNK_ID_OFFSET + .checked_add(manifest_chunks_len as u32) + .is_some(), + "Not enough chunk id space for manifest chunks!" + ); + let manifest_chunks = (MANIFEST_CHUNK_ID_OFFSET + ..MANIFEST_CHUNK_ID_OFFSET + manifest_chunks_len as u32) + .collect(); + + self.state = DownloadState::Prep { + meta_manifest, + manifest_in_construction: Default::default(), + manifest_chunks, + }; + + Err(ChunksMoreNeeded) + } else { + warn!( + self.log, + "Received non-meta-manifest chunk {} on blank state {}", ix, self.height + ); + Err(ChunkVerificationFailed) + } + } + DownloadState::Prep { + ref meta_manifest, + ref mut manifest_in_construction, + ref mut manifest_chunks, + } => { + let manifest_chunk_index = match state_sync_chunk_type(ix) { + StateSyncChunk::MetaManifestChunk => { + // Have already seen the meta-manifest chunk + return Err(ChunksMoreNeeded); + } + StateSyncChunk::ManifestChunk(index) => index as usize, + _ => { + // Have not requested such chunks + return Err(ChunkVerificationFailed); + } + }; + debug_assert!(ix >= MANIFEST_CHUNK_ID_OFFSET); + + if !manifest_chunks.contains(&ix) { + return Err(ChunksMoreNeeded); + } + + crate::manifest::validate_sub_manifest( + manifest_chunk_index, + &payload[..], + meta_manifest, + ) + .map_err(|err| { + warn!(self.log, "Received invalid sub-manifest: {}", err); + ChunkVerificationFailed + })?; + manifest_in_construction.insert(ix, payload.clone()); + manifest_chunks.remove(&ix); + + debug!( + self.log, + "Received MANIFEST chunk {} for state {}, got {} more chunks to download", + manifest_chunk_index, + self.height, + manifest_chunks.len() + ); + + if manifest_chunks.is_empty() { + let length: usize = manifest_in_construction.values().map(|x| x.len()).sum(); + let mut encoded_manifest = Vec::with_capacity(length); + // The sub-manifests are stored in a BTreeMap so the manifest can be assembled by adding each sub-manifest in order. + manifest_in_construction + .values() + .for_each(|sub_manifest| encoded_manifest.extend(sub_manifest)); + + // Since manifest version 2, the authenticity of a manifest comes from the meta-manifest hash which is signed in the CUP. + // It implies severe problems if all sub-manifests pass validation but we fail to get a valid manifest from them. + // The replica should panic in such situation otherwise the state sync will stall in the Prep phase. + let manifest = decode_manifest(&encoded_manifest).map_err(|err| { + fatal!( + self.log, + "Received all sub-manifests but failed to decode manifest chunk for state {}: {}", self.height, err + ); + })?; + crate::manifest::validate_manifest(&manifest, &self.root_hash).map_err( |err| { - warn!(self.log, "Received invalid manifest: {}", err); - ChunkVerificationFailed + fatal!(self.log, "Received all sub-manifests but the assembled manifest is invalid: {}", err); }, )?; debug!( self.log, - "Received MANIFEST chunk for state {}, got {} more chunks to download", + "Received MANIFEST chunks for state {}, got {} more chunks to download", self.height, manifest.chunk_table.len() ); trace!(self.log, "Received manifest:\n{}", manifest); + let meta_manifest = meta_manifest.clone(); + let mut fetch_chunks = self.initialize_state_on_disk(&manifest); if fetch_chunks.is_empty() { @@ -1183,6 +1330,7 @@ impl Chunkable for IncompleteState { self.height, self.root_hash.clone(), &manifest, + &meta_manifest, ); self.state = DownloadState::Complete(Box::new(artifact.clone())); @@ -1198,14 +1346,26 @@ impl Chunkable for IncompleteState { warn!( self.log, "The chunk table has {} chunks so file group chunks will not be used due to ID conflicts. \ - Please consider increasing the id offset (currently: {})", + Please consider increasing the ID offset (currently: {})", manifest.chunk_table.len(), FILE_GROUP_CHUNK_ID_OFFSET, ); } + // The chunks in the chunk table should not collide with the manifest chunk IDs. + // The `state_sync_file_group` should either be empty or have no chunks in collision with manifest chunk IDs. + // Otherwise, the up-to-date nodes will not be able to serve the requested chunks correctly. + // + // Such a collision can only happen if there is a bug given the current value of the offset. + // See comments of `MANIFEST_CHUNK_ID_OFFSET` and `FILE_GROUP_CHUNK_ID_OFFSET` for analysis. + assert!(manifest.chunk_table.len() < MANIFEST_CHUNK_ID_OFFSET as usize); + if let Some(last_group_chunk) = state_sync_file_group.last_chunk_id() { + assert!(last_group_chunk < MANIFEST_CHUNK_ID_OFFSET) + } + for (&chunk_id, chunk_table_indices) in state_sync_file_group.iter() { for &chunk_table_index in chunk_table_indices.iter() { - fetch_chunks.remove(&(chunk_table_index as usize + 1)); + fetch_chunks + .remove(&(chunk_table_index as usize + FILE_CHUNK_ID_OFFSET)); } // We decide to fetch all the file group chunks unconditionally for two reasons: // 1. `canister.pbuf` files change between checkpoints and are unlikely to be covered in the copy phase. @@ -1214,6 +1374,7 @@ impl Chunkable for IncompleteState { } let num_fetch_chunks = fetch_chunks.len(); self.state = DownloadState::Loading { + meta_manifest, manifest, state_sync_file_group, fetch_chunks, @@ -1227,23 +1388,15 @@ impl Chunkable for IncompleteState { Err(ChunksMoreNeeded) } } else { - warn!( - self.log, - "Received non-manifest chunk {} on blank state {}", ix, self.height - ); - Err(ChunkVerificationFailed) + Err(ChunksMoreNeeded) } } DownloadState::Loading { + ref meta_manifest, ref manifest, ref mut fetch_chunks, ref state_sync_file_group, } => { - if artifact_chunk.chunk_id == MANIFEST_CHUNK { - // Have already seen the manifest chunk - return Err(ChunksMoreNeeded); - } - debug!( self.log, "Received chunk {} / {} of state {}", @@ -1252,20 +1405,21 @@ impl Chunkable for IncompleteState { self.height ); - if !fetch_chunks.contains(&(ix)) { + if !fetch_chunks.contains(&(ix as usize)) { return Err(ChunksMoreNeeded); } // Each index in `chunk_table_indices` is mapped to a piece of payload bytes // with its corresponding start and end position. - let (chunk_table_indices, payload_pieces) = - if ix < FILE_GROUP_CHUNK_ID_OFFSET as usize { + let (chunk_table_indices, payload_pieces) = match state_sync_chunk_type(ix) { + StateSyncChunk::FileChunk(index) => { // If it is a normal chunk, there is only one index mapped to the whole payload. - (vec![ix as u32 - 1], vec![(0, payload.len())]) - } else { + (vec![index], vec![(0, payload.len())]) + } + StateSyncChunk::FileGroupChunk(index) => { // If it is a file group chunk, divide it into pieces according to the `FileGroupChunks`. let chunk_table_indices = state_sync_file_group - .get(&(ix as u32)) + .get(&index) .ok_or(ChunkVerificationFailed)? .clone(); @@ -1283,7 +1437,12 @@ impl Chunkable for IncompleteState { return Err(ChunkVerificationFailed); } (chunk_table_indices, payload_pieces) - }; + } + _ => { + // meta-manifest/manifest chunks are not expected in the `Loading` phase. + return Err(ChunksMoreNeeded); + } + }; let log = &self.log; let metrics = &self.metrics; @@ -1322,7 +1481,7 @@ impl Chunkable for IncompleteState { ); } - fetch_chunks.remove(&ix); + fetch_chunks.remove(&(ix as usize)); if fetch_chunks.is_empty() { debug!( @@ -1363,6 +1522,7 @@ impl Chunkable for IncompleteState { self.height, self.root_hash.clone(), manifest, + meta_manifest, ); self.state = DownloadState::Complete(Box::new(artifact.clone())); diff --git a/rs/state_manager/src/state_sync/chunkable/cache.rs b/rs/state_manager/src/state_sync/chunkable/cache.rs index f3f9d3903f2..4117bd82ae6 100644 --- a/rs/state_manager/src/state_sync/chunkable/cache.rs +++ b/rs/state_manager/src/state_sync/chunkable/cache.rs @@ -97,7 +97,7 @@ impl StateSyncCache { fetch_chunks: HashSet, state_sync_file_group: FileGroupChunks, ) { - // fetch_chunks, as stored by IncompleteState considers the manifest as chunk 0 + // fetch_chunks, as stored by IncompleteState considers the meta-manifest as chunk 0 // For the cache we store indices into the manifest's chunk table as // missing_chunks. debug_assert!(!fetch_chunks.contains(&0)); @@ -105,7 +105,7 @@ impl StateSyncCache { for i in fetch_chunks.into_iter() { assert_ne!(0, i); if i < FILE_GROUP_CHUNK_ID_OFFSET as usize { - missing_chunks.insert(i - 1); + missing_chunks.insert(i - FILE_CHUNK_ID_OFFSET); } else { // If it's a chunk group, the individual chunks are missing in the manifest, // not the group @@ -118,7 +118,7 @@ impl StateSyncCache { debug_assert!(missing_chunks .iter() - .all(|i| *i + 1 < FILE_GROUP_CHUNK_ID_OFFSET as usize)); + .all(|i| *i + FILE_CHUNK_ID_OFFSET < FILE_GROUP_CHUNK_ID_OFFSET as usize)); // We rename the folder to decouple the cache from active state syncs a bit. // Otherwise we'd have to assume that there won't be an active state sync at @@ -181,6 +181,7 @@ impl StateSyncCache { match std::mem::replace(&mut sync.state, DownloadState::Blank) { DownloadState::Loading { + meta_manifest: _, manifest, state_sync_file_group, fetch_chunks, @@ -192,10 +193,10 @@ impl StateSyncCache { self.push_inner(sync, manifest, fetch_chunks, state_sync_file_group); } } - DownloadState::Complete(_) | DownloadState::Blank => { + DownloadState::Complete(_) | DownloadState::Blank | DownloadState::Prep { .. } => { // Nothing to cache // Sanity check that the folder is gone (if completed, should have been moved to - // a permanent checkpoint, if blank, should never have been created) + // a permanent checkpoint, if blank or prep, should never have been created) if sync.root.exists() { warn!( self.log, diff --git a/rs/state_manager/src/state_sync/chunkable/cache/tests.rs b/rs/state_manager/src/state_sync/chunkable/cache/tests.rs index 61df504629e..1583dcc8dcf 100644 --- a/rs/state_manager/src/state_sync/chunkable/cache/tests.rs +++ b/rs/state_manager/src/state_sync/chunkable/cache/tests.rs @@ -44,11 +44,16 @@ impl TestEnvironment { /// if the contents make sense. fn fake_loading(seed: u32) -> (DownloadState, Manifest, HashSet, FileGroupChunks) { let manifest = Manifest::new(seed, vec![], vec![]); + let meta_manifest = MetaManifest { + version: 0, + sub_manifest_hashes: vec![], + }; let fetch_chunks: HashSet = maplit::hashset! { (seed + 1) as usize, FILE_GROUP_CHUNK_ID_OFFSET as usize }; let state_sync_file_group = FileGroupChunks::new(maplit::btreemap! { FILE_GROUP_CHUNK_ID_OFFSET => vec![3, 4]}); let state = DownloadState::Loading { + meta_manifest, manifest: manifest.clone(), state_sync_file_group: state_sync_file_group.clone(), fetch_chunks: fetch_chunks.clone(), @@ -59,11 +64,16 @@ fn fake_loading(seed: u32) -> (DownloadState, Manifest, HashSet, FileGrou /// Creates a fake DownloadState::Completed for an empty state. fn fake_complete() -> DownloadState { let manifest = Manifest::new(0, vec![], vec![]); + let meta_manifest = MetaManifest { + version: 0, + sub_manifest_hashes: vec![], + }; let artifact = Artifact::StateSync(StateSyncMessage { height: Height::new(0), root_hash: CryptoHashOfState::from(CryptoHash(vec![0; 32])), checkpoint_root: PathBuf::new(), manifest, + meta_manifest: Arc::new(meta_manifest), state_sync_file_group: Default::default(), }); DownloadState::Complete(Box::new(artifact)) @@ -73,11 +83,14 @@ fn ungroup_fetch_chunks( fetch_chunks: &HashSet, file_groups: &FileGroupChunks, ) -> HashSet { - let mut result: HashSet = fetch_chunks.iter().map(|i| i - 1).collect(); + let mut result: HashSet = fetch_chunks + .iter() + .map(|i| i - FILE_CHUNK_ID_OFFSET) + .collect(); // Replace groups by their elements for (key, chunks) in file_groups.iter() { if fetch_chunks.contains(&(*key as usize)) { - result.remove(&(*key as usize - 1)); + result.remove(&(*key as usize - FILE_CHUNK_ID_OFFSET)); result.extend(chunks.iter().map(|i| *i as usize)); } } @@ -117,6 +130,7 @@ fn incomplete_state_for_tests( // if Loading, populate the scratchpad with a file named after the seed // contained in manifest if let DownloadState::Loading { + meta_manifest: _, ref manifest, state_sync_file_group: _, fetch_chunks: _, diff --git a/rs/state_manager/tests/common/mod.rs b/rs/state_manager/tests/common/mod.rs index 803b56f43f4..7ab0cebfd00 100644 --- a/rs/state_manager/tests/common/mod.rs +++ b/rs/state_manager/tests/common/mod.rs @@ -22,12 +22,14 @@ use ic_test_utilities_tmpdir::tmpdir; use ic_types::{ artifact::{Artifact, StateSyncMessage}, chunkable::{ + ArtifactChunk, ArtifactChunkData, ArtifactErrorCode::{ChunkVerificationFailed, ChunksMoreNeeded}, ChunkId, Chunkable, ChunkableArtifact, }, consensus::certification::{Certification, CertificationContent}, crypto::Signed, signature::ThresholdSignature, + state_sync::MANIFEST_CHUNK_ID_OFFSET, xnet::{CertifiedStreamSlice, StreamIndex, StreamSlice}, CanisterId, CryptoHashOfState, Cycles, Height, RegistryVersion, SubnetId, }; @@ -379,41 +381,124 @@ pub fn replace_wasm(state: &mut ReplicatedState, canister_id: CanisterId) { .wasm_binary = WasmBinary::new(wasm); } +#[derive(Debug, PartialEq, Eq)] +pub enum StateSyncErrorCode { + ChunksMoreNeeded, + MetaManifestVerificationFailed, + ManifestVerificationFailed, + OtherChunkVerificationFailed, +} + pub fn pipe_state_sync(src: StateSyncMessage, mut dst: Box) -> StateSyncMessage { - pipe_partial_state_sync(&src, &mut *dst, &Default::default()) + pipe_partial_state_sync(&src, &mut *dst, &Default::default(), false) .expect("State sync not completed.") } -/// Pipe the manifest (chunk 0) from src to dest and return the StateSyncMessage -/// if the state sync completes -pub fn pipe_manifest(src: &StateSyncMessage, dst: &mut dyn Chunkable) -> Option { +fn alter_chunk_data(chunk: &mut ArtifactChunk) { + let mut chunk_data = match &chunk.artifact_chunk_data { + ArtifactChunkData::UnitChunkData(_) => { + panic!( + "Unexpected artifact chunk data type: {:?}", + chunk.artifact_chunk_data + ); + } + ArtifactChunkData::SemiStructuredChunkData(chunk_data) => chunk_data.clone(), + }; + match chunk_data.last_mut() { + Some(last) => { + // Alter the last element of chunk_data. + *last = last.wrapping_add(1); + } + None => { + // chunk_data is originally empty. Reset it to some non-empty value. + chunk_data = vec![9; 100]; + } + } + chunk.artifact_chunk_data = ArtifactChunkData::SemiStructuredChunkData(chunk_data); +} + +/// Pipe the meta-manifest (chunk 0) from src to dest. +/// Alter the chunk data if `use_bad_chunk` is set to true. +pub fn pipe_meta_manifest( + src: &StateSyncMessage, + dst: &mut dyn Chunkable, + use_bad_chunk: bool, +) -> Result { let ids: Vec<_> = dst.chunks_to_download().collect(); - // Only the manifest should be requested + // Only the meta-manifest should be requested assert_eq!(ids, vec! {ChunkId::new(0)}); let id = ids[0]; - let chunk = Box::new(src.clone()) + let mut chunk = Box::new(src.clone()) .get_chunk(id) .unwrap_or_else(|| panic!("Requested unknown chunk {}", id)); + if use_bad_chunk { + alter_chunk_data(&mut chunk); + } + match dst.add_chunk(chunk) { - Ok(Artifact::StateSync(msg)) => Some(msg), + Ok(Artifact::StateSync(msg)) => Ok(msg), Ok(artifact) => { panic!("Unexpected artifact type: {:?}", artifact); } - Err(ChunksMoreNeeded) => None, - Err(ChunkVerificationFailed) => panic!("Encountered invalid chunk {}", id), + Err(ChunksMoreNeeded) => Err(StateSyncErrorCode::ChunksMoreNeeded), + Err(ChunkVerificationFailed) => Err(StateSyncErrorCode::MetaManifestVerificationFailed), + } +} + +/// Pipe the manifest chunks from src to dest and +/// return the StateSyncMessage if the state sync completes. +/// Alter the data of the chunk in the middle position if `use_bad_chunk` is set to true. +pub fn pipe_manifest( + src: &StateSyncMessage, + dst: &mut dyn Chunkable, + use_bad_chunk: bool, +) -> Result { + let ids: Vec<_> = dst.chunks_to_download().collect(); + + // Only the manifest chunks should be requested + let manifest_chunks: HashSet<_> = (MANIFEST_CHUNK_ID_OFFSET + ..MANIFEST_CHUNK_ID_OFFSET + src.meta_manifest.sub_manifest_hashes.len() as u32) + .map(|x| ChunkId::new(x)) + .collect(); + assert!(ids.iter().all(|id| manifest_chunks.contains(id))); + + for (index, id) in ids.iter().enumerate() { + let mut chunk = Box::new(src.clone()) + .get_chunk(*id) + .unwrap_or_else(|| panic!("Requested unknown chunk {}", id)); + + if use_bad_chunk && index == ids.len() / 2 { + alter_chunk_data(&mut chunk); + } + + match dst.add_chunk(chunk) { + Ok(Artifact::StateSync(msg)) => { + return Ok(msg); + } + Ok(artifact) => { + panic!("Unexpected artifact type: {:?}", artifact); + } + Err(ChunksMoreNeeded) => (), + Err(ChunkVerificationFailed) => { + return Err(StateSyncErrorCode::ManifestVerificationFailed) + } + } } + Err(StateSyncErrorCode::ChunksMoreNeeded) } /// Pipe chunks from src to dst, but omit any chunks in omit +/// Alter the data of the chunk in the middle position if `use_bad_chunk` is set to true. pub fn pipe_partial_state_sync( src: &StateSyncMessage, dst: &mut dyn Chunkable, omit: &HashSet, -) -> Option { + use_bad_chunk: bool, +) -> Result { loop { let ids: Vec<_> = dst.chunks_to_download().collect(); @@ -422,28 +507,34 @@ pub fn pipe_partial_state_sync( } let mut omitted_chunks = false; - for id in ids { - if omit.contains(&id) { + for (index, id) in ids.iter().enumerate() { + if omit.contains(id) { omitted_chunks = true; continue; } - let chunk = Box::new(src.clone()) - .get_chunk(id) + let mut chunk = Box::new(src.clone()) + .get_chunk(*id) .unwrap_or_else(|| panic!("Requested unknown chunk {}", id)); + if use_bad_chunk && index == ids.len() / 2 { + alter_chunk_data(&mut chunk); + } + match dst.add_chunk(chunk) { Ok(Artifact::StateSync(msg)) => { - return Some(msg); + return Ok(msg); } Ok(artifact) => { panic!("Unexpected artifact type: {:?}", artifact); } Err(ChunksMoreNeeded) => (), - Err(ChunkVerificationFailed) => panic!("Encountered invalid chunk {}", id), + Err(ChunkVerificationFailed) => { + return Err(StateSyncErrorCode::OtherChunkVerificationFailed) + } } } if omitted_chunks { - return None; + return Err(StateSyncErrorCode::ChunksMoreNeeded); } } unreachable!() diff --git a/rs/state_manager/tests/state_manager.rs b/rs/state_manager/tests/state_manager.rs index a8a71f3d5e8..fad925e1541 100644 --- a/rs/state_manager/tests/state_manager.rs +++ b/rs/state_manager/tests/state_manager.rs @@ -13,7 +13,9 @@ use ic_replicated_state::{ ReplicatedState, Stream, }; use ic_state_machine_tests::{StateMachine, StateMachineBuilder}; -use ic_state_manager::{DirtyPageMap, FileType, PageMapType, StateManagerImpl}; +use ic_state_manager::{ + manifest::build_meta_manifest, DirtyPageMap, FileType, PageMapType, StateManagerImpl, +}; use ic_sys::PAGE_SIZE; use ic_test_utilities::{ consensus::fake::FakeVerifier, @@ -29,11 +31,11 @@ use ic_test_utilities_metrics::{fetch_int_counter_vec, fetch_int_gauge, Labels}; use ic_test_utilities_tmpdir::tmpdir; use ic_types::{ artifact::{Priority, StateSyncArtifactId}, - chunkable::ChunkId, + chunkable::{ChunkId, ChunkableArtifact}, crypto::CryptoHash, ingress::{IngressState, IngressStatus, WasmResult}, messages::CallbackId, - state_sync::FILE_GROUP_CHUNK_ID_OFFSET, + state_sync::{FILE_GROUP_CHUNK_ID_OFFSET, MANIFEST_CHUNK_ID_OFFSET, META_MANIFEST_CHUNK}, time::Time, xnet::{StreamIndex, StreamIndexedQueue}, CanisterId, CryptoHashOfPartialState, CryptoHashOfState, Height, PrincipalId, @@ -1857,6 +1859,63 @@ fn can_do_simple_state_sync_transfer() { }) } +#[test] +fn state_sync_message_returns_none_for_invalid_chunk_requests() { + state_manager_test_with_state_sync(|_, src_state_manager, src_state_sync| { + let (_height, mut state) = src_state_manager.take_tip(); + insert_dummy_canister(&mut state, canister_test_id(100)); + + src_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); + let hash = wait_for_checkpoint(&*src_state_manager, height(1)); + let id = StateSyncArtifactId { + height: height(1), + hash, + }; + + let msg = src_state_sync + .get_validated_by_identifier(&id) + .expect("failed to get state sync messages"); + + let normal_chunk_id_end_exclusive = msg.manifest.chunk_table.len() as u32 + 1; + + let file_group_chunk_id_end_exclusive = + FILE_GROUP_CHUNK_ID_OFFSET + msg.state_sync_file_group.keys().count() as u32; + + let sub_manifest_chunk_id_end_exclusive = + MANIFEST_CHUNK_ID_OFFSET + msg.meta_manifest.sub_manifest_hashes.len() as u32; + + let src = Box::new(msg); + + assert!(src.clone().get_chunk(META_MANIFEST_CHUNK).is_some()); + + for i in 1..normal_chunk_id_end_exclusive { + assert!(src.clone().get_chunk(ChunkId::new(i)).is_some()); + } + + assert!(normal_chunk_id_end_exclusive <= FILE_GROUP_CHUNK_ID_OFFSET); + for i in (normal_chunk_id_end_exclusive..FILE_GROUP_CHUNK_ID_OFFSET).step_by(100) { + assert!(src.clone().get_chunk(ChunkId::new(i)).is_none()); + } + + for i in FILE_GROUP_CHUNK_ID_OFFSET..file_group_chunk_id_end_exclusive { + assert!(src.clone().get_chunk(ChunkId::new(i)).is_some()); + } + + assert!(file_group_chunk_id_end_exclusive <= MANIFEST_CHUNK_ID_OFFSET); + for i in (file_group_chunk_id_end_exclusive..MANIFEST_CHUNK_ID_OFFSET).step_by(100) { + assert!(src.clone().get_chunk(ChunkId::new(i)).is_none()); + } + + for i in MANIFEST_CHUNK_ID_OFFSET..sub_manifest_chunk_id_end_exclusive { + assert!(src.clone().get_chunk(ChunkId::new(i)).is_some()); + } + + for i in (sub_manifest_chunk_id_end_exclusive..=u32::MAX).step_by(100) { + assert!(src.clone().get_chunk(ChunkId::new(i)).is_none()); + } + }) +} + #[test] fn can_state_sync_from_cache() { state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { @@ -1942,10 +2001,13 @@ fn can_state_sync_from_cache() { { let mut chunkable = dst_state_sync.create_chunkable_state(&id); - // First fetch chunk 0 (the manifest), and then ask for all chunks afterwards, + // First fetch chunk 0 (the meta-manifest) and manifest chunks, and then ask for all chunks afterwards, // but never receive the chunk for `system_metadata.pbuf` and FILE_GROUP_CHUNK_ID_OFFSET - let completion = pipe_partial_state_sync(&msg, &mut *chunkable, &omit); - assert!(completion.is_none(), "Unexpectedly completed state sync"); + let completion = pipe_partial_state_sync(&msg, &mut *chunkable, &omit, false); + assert!( + matches!(completion, Err(StateSyncErrorCode::ChunksMoreNeeded)), + "Unexpectedly completed state sync" + ); } assert_no_remaining_chunks(dst_metrics); // Second state sync continues from first state and successfully finishes @@ -1958,8 +2020,10 @@ fn can_state_sync_from_cache() { let mut chunkable = dst_state_sync.create_chunkable_state(&id); - let result = pipe_manifest(&msg, &mut *chunkable); - assert!(result.is_none()); + let result = pipe_meta_manifest(&msg, &mut *chunkable, false); + assert!(matches!(result, Err(StateSyncErrorCode::ChunksMoreNeeded))); + let result = pipe_manifest(&msg, &mut *chunkable, false); + assert!(matches!(result, Err(StateSyncErrorCode::ChunksMoreNeeded))); let file_group_chunks: HashSet = msg .state_sync_file_group @@ -2009,8 +2073,9 @@ fn can_state_sync_from_cache() { let mut chunkable = dst_state_sync.create_chunkable_state(&id); - // The manifest alone is enough to complete the sync - let dst_msg = pipe_manifest(&msg, &mut *chunkable).unwrap(); + // The meta-manifest and manifest are enough to complete the sync + let _res = pipe_meta_manifest(&msg, &mut *chunkable, false); + let dst_msg = pipe_manifest(&msg, &mut *chunkable, false).unwrap(); dst_state_sync.process_changes( time_source.as_ref(), @@ -2044,6 +2109,227 @@ fn can_state_sync_from_cache() { }) } +#[test] +fn can_state_sync_after_aborting_in_prep_phase() { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { + let (_height, mut state) = src_state_manager.take_tip(); + + // Insert large number of canisters so that the encoded manifest is larger than 1 MiB. + let num_canisters = 2000; + for id in 100..(100 + num_canisters) { + insert_dummy_canister(&mut state, canister_test_id(id)); + } + + let time_source = ic_test_utilities::FastForwardTimeSource::new(); + + src_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); + let hash = wait_for_checkpoint(&*src_state_manager, height(1)); + let id = StateSyncArtifactId { + height: height(1), + hash: hash.clone(), + }; + + let state = src_state_manager.get_latest_state().take(); + + let msg = src_state_sync + .get_validated_by_identifier(&id) + .expect("failed to get state sync messages"); + + let meta_manifest = build_meta_manifest(&msg.manifest); + assert!( + meta_manifest.sub_manifest_hashes.len() >= 2, + "The test should run with the manifest chunked in multiple pieces." + ); + + assert_error_counters(src_metrics); + + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { + // Omit the second piece of the manifest. + let omit: HashSet = + maplit::hashset! {ChunkId::new(MANIFEST_CHUNK_ID_OFFSET + 1)}; + + // First state sync is destroyed when fetching the manifest chunks in the Prep phase + { + let mut chunkable = dst_state_sync.create_chunkable_state(&id); + + // First fetch chunk 0 (the meta-manifest) and manifest chunks but never receive chunk(MANIFEST_CHUNK_ID_OFFSET + 1). + let completion = pipe_partial_state_sync(&msg, &mut *chunkable, &omit, false); + assert!( + matches!(completion, Err(StateSyncErrorCode::ChunksMoreNeeded)), + "Unexpectedly completed state sync" + ); + } + assert_no_remaining_chunks(dst_metrics); + // Second state sync starts from scratch and successfully finishes + { + // Same state just higher height + let id = StateSyncArtifactId { + height: height(2), + hash: hash.clone(), + }; + + let mut chunkable = dst_state_sync.create_chunkable_state(&id); + + let result = pipe_meta_manifest(&msg, &mut *chunkable, false); + assert!(matches!(result, Err(StateSyncErrorCode::ChunksMoreNeeded))); + + // Chunks in the Prep phase are not involved in the cache mechanism. Therefore, all the manifest chunks have to requested again. + let manifest_chunks: HashSet = (MANIFEST_CHUNK_ID_OFFSET + ..(MANIFEST_CHUNK_ID_OFFSET + meta_manifest.sub_manifest_hashes.len() as u32)) + .map(ChunkId::from) + .collect(); + assert_eq!(manifest_chunks, chunkable.chunks_to_download().collect()); + + let result = pipe_manifest(&msg, &mut *chunkable, false); + assert!(matches!(result, Err(StateSyncErrorCode::ChunksMoreNeeded))); + + let dst_msg = pipe_state_sync(msg.clone(), chunkable); + dst_state_sync.process_changes( + time_source.as_ref(), + vec![UnvalidatedArtifact { + message: dst_msg, + peer_id: node_test_id(0), + timestamp: mock_time(), + }], + ); + + let recovered_state = dst_state_manager + .get_state_at(height(2)) + .expect("Destination state manager didn't receive the state") + .take(); + + assert_eq!(height(2), dst_state_manager.latest_state_height()); + assert_eq!(state, recovered_state); + assert_eq!( + *state.as_ref(), + *dst_state_manager.get_latest_state().take() + ); + assert_eq!(vec![height(2)], heights_to_certify(&*dst_state_manager)); + } + assert_no_remaining_chunks(dst_metrics); + assert_error_counters(dst_metrics); + }) + }) +} + +#[test] +fn state_sync_can_reject_invalid_chunks() { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { + let (_height, mut state) = src_state_manager.take_tip(); + + // Insert large number of canisters so that the encoded manifest is larger than 1 MiB. + let num_canisters = 2000; + for id in 100..(100 + num_canisters) { + insert_dummy_canister(&mut state, canister_test_id(id)); + } + + let time_source = ic_test_utilities::FastForwardTimeSource::new(); + + src_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); + let hash = wait_for_checkpoint(&*src_state_manager, height(1)); + let id = StateSyncArtifactId { + height: height(1), + hash, + }; + + let state = src_state_manager.get_latest_state().take(); + + let msg = src_state_sync + .get_validated_by_identifier(&id) + .expect("failed to get state sync messages"); + + assert_error_counters(src_metrics); + + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { + // Provide bad meta-manifest to dst + let mut chunkable = dst_state_sync.create_chunkable_state(&id); + let result = pipe_meta_manifest(&msg, &mut *chunkable, true); + assert!(matches!( + result, + Err(StateSyncErrorCode::MetaManifestVerificationFailed) + )); + + // Provide correct meta-manifest to dst + let result = pipe_meta_manifest(&msg, &mut *chunkable, false); + assert!(matches!(result, Err(StateSyncErrorCode::ChunksMoreNeeded))); + + // Provide bad sub-manifests to dst + // Each time, alter the chunk in the middle of remaining chunks for the current phase. The first half of chunks should be added correctly in this way. + loop { + let remaining_chunks_before = chunkable.chunks_to_download().count() as i32; + let result = pipe_manifest(&msg, &mut *chunkable, true); + assert!(matches!( + result, + Err(StateSyncErrorCode::ManifestVerificationFailed) + )); + let remaining_chunks_after = chunkable.chunks_to_download().count() as i32; + let added_chunks = remaining_chunks_before - remaining_chunks_after; + // Assert that half of the remaining chunks are added correctly each time. + assert_eq!(added_chunks, remaining_chunks_before / 2); + // If no more chunks are added, break out of the loop. + if added_chunks == 0 { + // Assert that there should be only 1 chunk left in this case. + assert_eq!(remaining_chunks_after, 1); + break; + } + } + + // Provide correct sub-manifests to dst + let result = pipe_manifest(&msg, &mut *chunkable, false); + assert!(matches!(result, Err(StateSyncErrorCode::ChunksMoreNeeded))); + + // Provide bad chunks to dst + // Each time, alter the chunk in the middle of remaining chunks for the current phase. The first half of chunks should be added correctly in this way. + loop { + let remaining_chunks_before = chunkable.chunks_to_download().count() as i32; + let result = + pipe_partial_state_sync(&msg, &mut *chunkable, &Default::default(), true); + assert!(matches!( + result, + Err(StateSyncErrorCode::OtherChunkVerificationFailed) + )); + let remaining_chunks_after = chunkable.chunks_to_download().count() as i32; + let added_chunks = remaining_chunks_before - remaining_chunks_after; + // Assert that half of the remaining chunks are added correctly each time. + assert_eq!(added_chunks, remaining_chunks_before / 2); + // If no more chunks are added, break out of the loop. + if added_chunks == 0 { + // Assert that there should be only 1 chunk left in this case. + assert_eq!(remaining_chunks_after, 1); + break; + } + } + + // Provide correct chunks to dst + let dst_msg = pipe_state_sync(msg.clone(), chunkable); + dst_state_sync.process_changes( + time_source.as_ref(), + vec![UnvalidatedArtifact { + message: dst_msg, + peer_id: node_test_id(0), + timestamp: mock_time(), + }], + ); + + let recovered_state = dst_state_manager + .get_state_at(height(1)) + .expect("Destination state manager didn't receive the state") + .take(); + + assert_eq!(height(1), dst_state_manager.latest_state_height()); + assert_eq!(state, recovered_state); + assert_eq!( + *state.as_ref(), + *dst_state_manager.get_latest_state().take() + ); + assert_eq!(vec![height(1)], heights_to_certify(&*dst_state_manager)); + + assert_error_counters(dst_metrics); + assert_no_remaining_chunks(dst_metrics); + }) + }) +} + #[test] fn can_state_sync_into_existing_checkpoint() { state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { @@ -2143,8 +2429,11 @@ fn can_group_small_files_in_state_sync() { state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { let mut chunkable = dst_state_sync.create_chunkable_state(&id); - let result = pipe_manifest(&msg, &mut *chunkable); - assert!(result.is_none()); + let result = pipe_meta_manifest(&msg, &mut *chunkable, false); + assert!(matches!(result, Err(StateSyncErrorCode::ChunksMoreNeeded))); + + let result = pipe_manifest(&msg, &mut *chunkable, false); + assert!(matches!(result, Err(StateSyncErrorCode::ChunksMoreNeeded))); assert!(chunkable .chunks_to_download() diff --git a/rs/types/types/src/artifact.rs b/rs/types/types/src/artifact.rs index a2001dfc6dc..83eeac884e9 100644 --- a/rs/types/types/src/artifact.rs +++ b/rs/types/types/src/artifact.rs @@ -482,6 +482,9 @@ pub struct StateSyncMessage { pub root_hash: CryptoHashOfState, /// Absolute path to the checkpoint root directory. pub checkpoint_root: std::path::PathBuf, + #[serde(serialize_with = "ic_utils::serde_arc::serialize_arc")] + #[serde(deserialize_with = "ic_utils::serde_arc::deserialize_arc")] + pub meta_manifest: Arc, /// The manifest containing the summary of the content. pub manifest: crate::state_sync::Manifest, #[serde(serialize_with = "ic_utils::serde_arc::serialize_arc")] @@ -490,7 +493,7 @@ pub struct StateSyncMessage { } impl ChunkableArtifact for StateSyncMessage { - fn get_chunk(self: Box, _chunk_id: ChunkId) -> Option { + fn get_chunk(self: Box, chunk_id: ChunkId) -> Option { #[cfg(not(target_family = "unix"))] { panic!("This method should only be used when the target OS family is unix."); @@ -499,6 +502,10 @@ impl ChunkableArtifact for StateSyncMessage { #[cfg(target_family = "unix")] { use crate::chunkable::ArtifactChunkData; + use crate::state_sync::{ + encode_manifest, encode_meta_manifest, state_sync_chunk_type, StateSyncChunk, + DEFAULT_CHUNK_SIZE, + }; use std::os::unix::fs::FileExt; let get_single_chunk = |chunk_index: usize| -> Option> { @@ -513,21 +520,45 @@ impl ChunkableArtifact for StateSyncMessage { }; let mut payload: Vec = Vec::new(); - if _chunk_id == crate::state_sync::MANIFEST_CHUNK { - payload = crate::state_sync::encode_manifest(&self.manifest); - } else if _chunk_id.get() < FILE_GROUP_CHUNK_ID_OFFSET - || self.state_sync_file_group.get(&_chunk_id.get()).is_none() - { - payload = get_single_chunk((_chunk_id.get() - 1) as usize)?; - } else { - let chunk_table_indices = self.state_sync_file_group.get(&_chunk_id.get())?; - for chunk_table_index in chunk_table_indices { - payload.extend(get_single_chunk(*chunk_table_index as usize)?); + match state_sync_chunk_type(chunk_id.get()) { + StateSyncChunk::MetaManifestChunk => { + payload = encode_meta_manifest(&self.meta_manifest); + } + StateSyncChunk::ManifestChunk(index) => { + let index = index as usize; + if index < self.meta_manifest.sub_manifest_hashes.len() { + let encoded_manifest = encode_manifest(&self.manifest); + let start = index * DEFAULT_CHUNK_SIZE as usize; + let end = std::cmp::min( + start + DEFAULT_CHUNK_SIZE as usize, + encoded_manifest.len(), + ); + let sub_manifest = encoded_manifest.get(start..end).unwrap_or_else(|| + panic!("We cannot get the {}th piece of the encoded manifest. The manifest and/or meta-manifest must be in abnormal state.", index) + ); + payload = sub_manifest.to_vec(); + } else { + // The chunk request is either malicious or invalid due to the collision between normal file chunks and manifest chunks. + // Neither case could be resolved and a `None` has to be returned in both cases. + return None; + } + } + StateSyncChunk::FileGroupChunk(index) => { + if let Some(chunk_table_indices) = self.state_sync_file_group.get(&index) { + for chunk_table_index in chunk_table_indices { + payload.extend(get_single_chunk(*chunk_table_index as usize)?); + } + } else { + return None; + } + } + StateSyncChunk::FileChunk(index) => { + payload = get_single_chunk(index as usize)?; } } Some(ArtifactChunk { - chunk_id: _chunk_id, + chunk_id, witness: Vec::new(), artifact_chunk_data: ArtifactChunkData::SemiStructuredChunkData(payload), }) diff --git a/rs/types/types/src/state_sync.rs b/rs/types/types/src/state_sync.rs index 3298684dcbb..07e20ad91ed 100644 --- a/rs/types/types/src/state_sync.rs +++ b/rs/types/types/src/state_sync.rs @@ -68,7 +68,7 @@ pub mod proto; use crate::chunkable::ChunkId; -use ic_protobuf::state::sync::v1 as pb; +use ic_protobuf::{proxy::ProtoProxy, state::sync::v1 as pb}; use serde::{Deserialize, Serialize}; use std::{ collections::BTreeMap, @@ -77,8 +77,15 @@ use std::{ sync::Arc, }; -/// Id of the manifest chunk in StateSync artifact. -pub const MANIFEST_CHUNK: ChunkId = ChunkId::new(0); +/// The default chunk size used in manifest computation and state sync. +pub const DEFAULT_CHUNK_SIZE: u32 = 1 << 20; // 1 MiB. + +/// ID of the meta-manifest chunk in StateSync artifact. +pub const META_MANIFEST_CHUNK: ChunkId = ChunkId::new(0); + +/// The IDs of file chunks in state sync start from 1 as chunk id 0 is for the meta-manifest. +/// The chunk id of a file chunk is equal to its index in the chunk table plus 1. +pub const FILE_CHUNK_ID_OFFSET: usize = 1; /// Some small files are grouped into chunks during state sync and /// they need to use a separate range of chunk id to avoid conflicts with normal chunks. @@ -91,6 +98,53 @@ pub const MANIFEST_CHUNK: ChunkId = ChunkId::new(0); // The real number of canisters and size of state are not even close to the assumption so the value of `FILE_GROUP_CHUNK_ID_OFFSET` is chosen safely. pub const FILE_GROUP_CHUNK_ID_OFFSET: u32 = 1 << 30; +/// The IDs of chunks for fetching the manifest in state sync start from this offset. +// +// The value of `MANIFEST_CHUNK_ID_OFFSET` is set as 1 << 31 (2_147_483_648). +// It is within the whole chunk id range (1 << 32) and can avoid collision with normal file chunks and file group chunks. +// First, the length of the chunk table is smaller than 1_073_741_824 from the analysis for `FILE_GROUP_CHUNK_ID_OFFSET`. Second, each file group chunk contains multiple files. +// Therefore the number of file groups is smaller than the length of chunk table, and thus much smaller than 1_073_741_824. +// From another perspective, the number of file group chunks is smaller than 1/128 of the number of canisters because currently it only includes `canister.pbuf` files smaller than 8 KiB. +// Therefore, the space between `FILE_GROUP_CHUNK_ID_OFFSET` and `MANIFEST_CHUNK_ID_OFFSET` is adequate for file group chunks. +pub const MANIFEST_CHUNK_ID_OFFSET: u32 = 1 << 31; + +/// `MANIFEST_CHUNK_ID_OFFSET` should be greater than `FILE_GROUP_CHUNK_ID_OFFSET` to have valid ID range assignment. +#[allow(clippy::assertions_on_constants)] +const _: () = assert!(MANIFEST_CHUNK_ID_OFFSET > FILE_GROUP_CHUNK_ID_OFFSET); + +/// The type and associated index (if applicable) of a chunk in state sync. +#[derive(Debug, PartialEq, Eq)] +pub enum StateSyncChunk { + /// The chunk representing the meta-manifest. + MetaManifestChunk, + /// Nth file chunk (0-based). + FileChunk(u32), + /// Chunk grouping multiple small files (index starting from `FILE_GROUP_CHUNK_ID_OFFSET`). + FileGroupChunk(u32), + /// Nth encoded manifest chunk (0-based). + ManifestChunk(u32), +} + +/// Convert a chunk ID to its chunk type and associated index based on chunk ID range assignment. +/// Note that the conversion only works when `MANIFEST_CHUNK_ID_OFFSET` is greater than `FILE_GROUP_CHUNK_ID_OFFSET`. +pub fn state_sync_chunk_type(chunk_id: u32) -> StateSyncChunk { + const FILE_CHUNK_END_INCLUSIVE: u32 = FILE_GROUP_CHUNK_ID_OFFSET - 1; + const FILE_GROUP_CHUNK_END_INCLUSIVE: u32 = MANIFEST_CHUNK_ID_OFFSET - 1; + match chunk_id { + 0 => StateSyncChunk::MetaManifestChunk, + 1..=FILE_CHUNK_END_INCLUSIVE => { + StateSyncChunk::FileChunk(chunk_id - FILE_CHUNK_ID_OFFSET as u32) + } + FILE_GROUP_CHUNK_ID_OFFSET..=FILE_GROUP_CHUNK_END_INCLUSIVE => { + // Note that key of file group chunks mapping is the exact chunk id so it does not need to be offset. + StateSyncChunk::FileGroupChunk(chunk_id) + } + MANIFEST_CHUNK_ID_OFFSET.. => { + StateSyncChunk::ManifestChunk(chunk_id - MANIFEST_CHUNK_ID_OFFSET) + } + } +} + /// An entry of the file table. #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub struct FileInfo { @@ -164,7 +218,7 @@ impl Deref for Manifest { /// ... /// [8]: ("canister_states/00..11/software.wasm", 93000, ) /// -- CHUNKS -/// -- chunk indices start from 1 because 0 is the ID of the manifest chunk. +/// -- chunk indices start from 1 because 0 is the ID of the ,meta-manifest chunk. /// [ 1]: (0, 1500, 0, ) /// ... /// [45]: (8, 93000, 0, ) @@ -284,27 +338,29 @@ impl fmt::Display for Manifest { /// Serializes the manifest into a byte array. pub fn encode_manifest(manifest: &Manifest) -> Vec { - use prost::Message; - - let pb_manifest = pb::Manifest::from(manifest.clone()); - let mut buf = vec![]; - pb_manifest - .encode(&mut buf) - .expect("failed to encode manifest to protobuf"); - buf + pb::Manifest::proxy_encode(manifest.clone()).expect("Failed to serialize manifest.") } /// Deserializes the manifest from a byte array. pub fn decode_manifest(bytes: &[u8]) -> Result { - use prost::Message; - - let pb_manifest = pb::Manifest::decode(bytes) - .map_err(|err| format!("failed to decode Manifest proto {}", err))?; - pb_manifest - .try_into() + pb::Manifest::proxy_decode(bytes) .map_err(|err| format!("failed to convert Manifest proto into an object: {}", err)) } +pub fn encode_meta_manifest(meta_manifest: &MetaManifest) -> Vec { + pb::MetaManifest::proxy_encode(meta_manifest.clone()) + .expect("Failed to serialize meta-manifest.") +} + +pub fn decode_meta_manifest(bytes: &[u8]) -> Result { + pb::MetaManifest::proxy_decode(bytes).map_err(|err| { + format!( + "failed to convert MetaManifest proto into an object: {}", + err + ) + }) +} + type P2PChunkId = u32; type ManifestChunkTableIndex = u32; @@ -332,7 +388,41 @@ impl FileGroupChunks { self.0.get(chunk_id) } + pub fn last_chunk_id(&self) -> Option { + self.0.last_key_value().map(|(chunk_id, _)| *chunk_id) + } + pub fn is_empty(&self) -> bool { self.0.is_empty() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_state_sync_chunk_type() { + assert_eq!(state_sync_chunk_type(0), StateSyncChunk::MetaManifestChunk); + + (1..FILE_GROUP_CHUNK_ID_OFFSET) + .step_by(100) + .chain(std::iter::once(FILE_GROUP_CHUNK_ID_OFFSET - 1)) + .for_each(|i| assert_eq!(state_sync_chunk_type(i), StateSyncChunk::FileChunk(i - 1))); + + (FILE_GROUP_CHUNK_ID_OFFSET..MANIFEST_CHUNK_ID_OFFSET) + .step_by(100) + .chain(std::iter::once(MANIFEST_CHUNK_ID_OFFSET - 1)) + .for_each(|i| assert_eq!(state_sync_chunk_type(i), StateSyncChunk::FileGroupChunk(i))); + + (MANIFEST_CHUNK_ID_OFFSET..=u32::MAX) + .step_by(100) + .chain(std::iter::once(u32::MAX)) + .for_each(|i| { + assert_eq!( + state_sync_chunk_type(i), + StateSyncChunk::ManifestChunk(i - MANIFEST_CHUNK_ID_OFFSET) + ) + }); + } +} diff --git a/rs/types/types/src/state_sync/proto.rs b/rs/types/types/src/state_sync/proto.rs index 1eac7a316b5..0d0c3c2c819 100644 --- a/rs/types/types/src/state_sync/proto.rs +++ b/rs/types/types/src/state_sync/proto.rs @@ -1,5 +1,5 @@ //! Conversions from Rust to proto structs and back for `StateSync`. -use crate::state_sync::{ChunkInfo, FileInfo, Manifest}; +use crate::state_sync::{ChunkInfo, FileInfo, Manifest, MetaManifest}; use ic_protobuf::proxy::try_decode_hash; use ic_protobuf::proxy::ProxyDecodeError; use ic_protobuf::state::sync::v1 as pb; @@ -46,6 +46,19 @@ impl From for pb::Manifest { } } +impl From for pb::MetaManifest { + fn from(meta_manifest: MetaManifest) -> Self { + Self { + version: meta_manifest.version, + sub_manifest_hashes: meta_manifest + .sub_manifest_hashes + .iter() + .map(|hash| hash.to_vec()) + .collect(), + } + } +} + impl TryFrom for FileInfo { type Error = ProxyDecodeError; @@ -90,3 +103,18 @@ impl TryFrom for Manifest { )) } } + +impl TryFrom for MetaManifest { + type Error = ProxyDecodeError; + + fn try_from(meta_manifest: pb::MetaManifest) -> Result { + Ok(Self { + version: meta_manifest.version, + sub_manifest_hashes: meta_manifest + .sub_manifest_hashes + .into_iter() + .map(try_decode_hash) + .collect::>()?, + }) + } +}