Skip to content

Commit

Permalink
Merge branch 'shuo/chunking_the_manifest' into 'master'
Browse files Browse the repository at this point in the history
feat: [MR-363] chunking the manifest in state sync

This MR implements the new state sync mechanism, i.e. requesting the meta-manifest first and then the manifest in chunks. 

See merge request dfinity-lab/public/ic!9976
  • Loading branch information
ShuoWangNSL committed Mar 24, 2023
2 parents 2670aa6 + f33e3cd commit 60ef2e1
Show file tree
Hide file tree
Showing 14 changed files with 916 additions and 132 deletions.
5 changes: 5 additions & 0 deletions rs/protobuf/def/state/sync/v1/manifest.proto
Expand Up @@ -20,3 +20,8 @@ message Manifest {
repeated FileInfo file_table = 2;
repeated ChunkInfo chunk_table = 3;
}

message MetaManifest {
uint32 version = 1;
repeated bytes sub_manifest_hashes = 2;
}
8 changes: 8 additions & 0 deletions rs/protobuf/src/gen/state/state.sync.v1.rs
Expand Up @@ -30,3 +30,11 @@ pub struct Manifest {
#[prost(message, repeated, tag = "3")]
pub chunk_table: ::prost::alloc::vec::Vec<ChunkInfo>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MetaManifest {
#[prost(uint32, tag = "1")]
pub version: u32,
#[prost(bytes = "vec", repeated, tag = "2")]
pub sub_manifest_hashes: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
}
35 changes: 18 additions & 17 deletions rs/state_manager/src/lib.rs
Expand Up @@ -10,7 +10,7 @@ pub mod tree_diff;
pub mod tree_hash;

use crate::{
manifest::{build_meta_manifest, compute_bundled_manifest, MAX_SUPPORTED_STATE_SYNC_VERSION},
manifest::{compute_bundled_manifest, MAX_SUPPORTED_STATE_SYNC_VERSION},
state_sync::chunkable::cache::StateSyncCache,
tip::{spawn_tip_thread, TipRequest},
};
Expand Down Expand Up @@ -1817,26 +1817,31 @@ impl StateManagerImpl {
fn find_checkpoint_by_root_hash(
&self,
root_hash: &CryptoHashOfState,
) -> Option<(Height, Manifest)> {
) -> Option<(Height, Manifest, Arc<MetaManifest>)> {
self.states
.read()
.states_metadata
.iter()
.find_map(
|(h, metadata)| match (metadata.root_hash(), metadata.manifest()) {
(Some(hash), Some(manifest)) if hash == root_hash => {
Some((*h, manifest.clone()))
}
_ => None,
},
)
.find_map(|(h, metadata)| {
let bundled_manifest = metadata.bundled_manifest.clone()?;
if &bundled_manifest.root_hash == root_hash {
Some((
*h,
bundled_manifest.manifest,
bundled_manifest.meta_manifest,
))
} else {
None
}
})
}

fn on_synced_checkpoint(
&self,
state: ReplicatedState,
height: Height,
manifest: Manifest,
meta_manifest: Arc<MetaManifest>,
root_hash: CryptoHashOfState,
) {
if self
Expand Down Expand Up @@ -1902,10 +1907,6 @@ impl StateManagerImpl {
.iter()
.map(|f| f.size_bytes as i64)
.sum();
// The computation of meta_manifest is temporary in this replica version.
// In future versions, meta_manifest will also be part of StateSyncMessage
// and can be directly populated here without extra computation.
let meta_manifest = build_meta_manifest(&manifest);

states.states_metadata.insert(
height,
Expand All @@ -1914,7 +1915,7 @@ impl StateManagerImpl {
bundled_manifest: Some(BundledManifest {
root_hash,
manifest,
meta_manifest: Arc::new(meta_manifest),
meta_manifest,
}),
state_sync_file_group: None,
},
Expand Down Expand Up @@ -2520,7 +2521,7 @@ impl StateManager for StateManagerImpl {
// Let's see if we already have this state locally. This might
// be the case if we are in subnet recovery mode and
// re-introducing some old state with a new height.
if let Some((checkpoint_height, manifest)) = self.find_checkpoint_by_root_hash(&root_hash) {
if let Some((checkpoint_height, manifest, meta_manifest)) = self.find_checkpoint_by_root_hash(&root_hash) {
info!(self.log,
"Copying checkpoint {} with root hash {:?} under new height {}",
checkpoint_height, root_hash, height);
Expand All @@ -2529,7 +2530,7 @@ impl StateManager for StateManagerImpl {
Ok(_) => {
let state = load_checkpoint(&self.state_layout, height, &self.metrics, self.own_subnet_type, Arc::clone(&self.get_fd_factory()))
.expect("failed to load checkpoint");
self.on_synced_checkpoint(state, height, manifest, root_hash);
self.on_synced_checkpoint(state, height, manifest, meta_manifest, root_hash);
return;
}
Err(e) => {
Expand Down
15 changes: 12 additions & 3 deletions rs/state_manager/src/manifest.rs
Expand Up @@ -15,7 +15,7 @@ use crate::{
use bit_vec::BitVec;
use hash::{chunk_hasher, file_hasher, manifest_hasher, ManifestHash};
use ic_crypto_sha::Sha256;
use ic_logger::{error, fatal, ReplicaLogger};
use ic_logger::{error, fatal, warn, ReplicaLogger};
use ic_replicated_state::PageIndex;
use ic_state_layout::{CheckpointLayout, ReadOnly};
use ic_sys::{mmap::ScopedMmap, PAGE_SIZE};
Expand All @@ -35,6 +35,8 @@ use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, Weak};

pub use ic_types::state_sync::DEFAULT_CHUNK_SIZE;

/// Initial version.
pub const STATE_SYNC_V1: u32 = 1;

Expand All @@ -49,8 +51,6 @@ pub const CURRENT_STATE_SYNC_VERSION: u32 = STATE_SYNC_V2;
/// The replica will panic if trying to deal with a manifest with a version higher than this.
pub const MAX_SUPPORTED_STATE_SYNC_VERSION: u32 = STATE_SYNC_V2;

pub const DEFAULT_CHUNK_SIZE: u32 = 1 << 20; // 1 MiB.

/// When computing a manifest, we recompute the hash of every
/// `REHASH_EVERY_NTH_CHUNK` chunk, even if we know it to be unchanged and
/// have a hash computed earlier by this replica process.
Expand Down Expand Up @@ -902,6 +902,15 @@ pub fn compute_manifest(
metrics
.manifest_size
.set(encode_manifest(&manifest).len() as i64);

if manifest.chunk_table.len() > FILE_GROUP_CHUNK_ID_OFFSET as usize / 2 {
warn!(
log,
"The chunk table is longer than half of the available chunk ID space in state sync. chunk table length: {}, state sync max chunk id: {}",
manifest.chunk_table.len(),
FILE_GROUP_CHUNK_ID_OFFSET - 1,
);
}
Ok(manifest)
}

Expand Down
50 changes: 50 additions & 0 deletions rs/state_manager/src/manifest/tests/computation.rs
Expand Up @@ -353,6 +353,56 @@ fn test_validate_sub_manifest() {
}),
validate_sub_manifest(num, &[], &meta_manifest)
);

// Test that an altered chunk gives a different hash.
let sub_manifest_0 = encoded_manifest
[0..std::cmp::min(DEFAULT_CHUNK_SIZE as usize, encoded_manifest.len())]
.to_vec();
let expected_hash = hash_concat!(21u8, b"ic-state-sub-manifest", &sub_manifest_0[..]);
assert_eq!(expected_hash, meta_manifest.sub_manifest_hashes[0]);

let mut bad_chunk = sub_manifest_0;
let alter_position = bad_chunk.len() / 2;
bad_chunk[alter_position] = bad_chunk[alter_position].wrapping_add(1);
let actual_hash = hash_concat!(21u8, b"ic-state-sub-manifest", &bad_chunk[..]);

assert_eq!(
Err(ChunkValidationError::InvalidChunkHash {
chunk_ix: 0,
expected_hash: expected_hash.to_vec(),
actual_hash: actual_hash.to_vec()
}),
validate_sub_manifest(0, &bad_chunk, &meta_manifest)
);
}

#[test]
fn test_get_sub_manifest_based_on_index() {
let (file_table, chunk_table) = dummy_file_table_and_chunk_table();
let manifest = Manifest::new(STATE_SYNC_V2, file_table, chunk_table);
let meta_manifest = build_meta_manifest(&manifest);
let encoded_manifest = encode_manifest(&manifest);

let mut assembled_manifest = Vec::new();

// For each element in the meta-manifest, we can get a corresponding sub-manifest as part of the manifest.
let len = meta_manifest.sub_manifest_hashes.len();
for index in 0..len {
// The same way that `StateSyncMessage` uses to serve a sub-manifest in state sync.
let start = index * DEFAULT_CHUNK_SIZE as usize;
let end = std::cmp::min(start + DEFAULT_CHUNK_SIZE as usize, encoded_manifest.len());
let sub_manifest = encoded_manifest
.get(start..end)
.expect("failed to get the sub-manifest")
.to_vec();
assembled_manifest.extend(sub_manifest);
}
assert_eq!(assembled_manifest, encoded_manifest);

// Test that we get nothing given an out-of-bound index.
let start = len * DEFAULT_CHUNK_SIZE as usize;
let end = std::cmp::min(start + DEFAULT_CHUNK_SIZE as usize, encoded_manifest.len());
assert!(encoded_manifest.get(start..end).is_none());
}

#[test]
Expand Down
7 changes: 7 additions & 0 deletions rs/state_manager/src/state_sync.rs
Expand Up @@ -99,6 +99,7 @@ impl ArtifactClient<StateSyncArtifact> for StateSync {
msg_id: &StateSyncArtifactId,
) -> Option<StateSyncMessage> {
let mut file_group_to_populate: Option<Arc<FileGroupChunks>> = None;

let state_sync_message = self
.state_manager
.states
Expand All @@ -108,6 +109,7 @@ impl ArtifactClient<StateSyncArtifact> for StateSync {
.find_map(|(height, metadata)| {
if metadata.root_hash() == Some(&msg_id.hash) {
let manifest = metadata.manifest()?;
let meta_manifest = metadata.meta_manifest()?;
let checkpoint_root =
self.state_manager.state_layout.checkpoint(*height).ok()?;
let state_sync_file_group = match &metadata.state_sync_file_group {
Expand All @@ -125,6 +127,7 @@ impl ArtifactClient<StateSyncArtifact> for StateSync {
height: *height,
root_hash: msg_id.hash.clone(),
checkpoint_root: checkpoint_root.raw_path().to_path_buf(),
meta_manifest,
manifest: manifest.clone(),
state_sync_file_group,
})
Expand Down Expand Up @@ -182,12 +185,14 @@ impl ArtifactClient<StateSyncArtifact> for StateSync {
if h > filter.height {
let metadata = states.states_metadata.get(&h)?;
let manifest = metadata.manifest()?;
let meta_manifest = metadata.meta_manifest()?;
let checkpoint_root = self.state_manager.state_layout.checkpoint(h).ok()?;
let msg = StateSyncMessage {
height: h,
root_hash: metadata.root_hash()?.clone(),
checkpoint_root: checkpoint_root.raw_path().to_path_buf(),
manifest: manifest.clone(),
meta_manifest,
state_sync_file_group: Default::default(),
};
Some(StateSyncArtifact::message_to_advert(&msg))
Expand Down Expand Up @@ -325,10 +330,12 @@ impl ArtifactProcessor<StateSyncArtifact> for StateSync {
self.state_manager.get_fd_factory(),
)
.expect("failed to recover checkpoint");

self.state_manager.on_synced_checkpoint(
state,
height,
message.manifest,
message.meta_manifest,
message.root_hash,
);
}
Expand Down

0 comments on commit 60ef2e1

Please sign in to comment.