Skip to content

Commit

Permalink
Merge branch 'pakhomov/mr-384' into 'master'
Browse files Browse the repository at this point in the history
[MR-384] Make sure we remove checkpoint layout if state sync overruns commit_and_certify

 

See merge request dfinity-lab/public/ic!10266
  • Loading branch information
pakhomov-dfinity committed Jan 24, 2023
2 parents e111b1c + f5375d8 commit ee04eef
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 93 deletions.
196 changes: 108 additions & 88 deletions rs/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,27 @@ fn load_checkpoint(
})
}

#[cfg(debug_assertions)]
fn check_certifications_metadata_snapshots_and_states_metadata_are_consistent(
states: &SharedState,
) {
let certification_heights = states
.certifications_metadata
.keys()
.copied()
.collect::<Vec<_>>();
let snapshot_heights = states
.snapshots
.iter()
.map(|s| s.height)
.filter(|h| h.get() != 0)
.collect::<Vec<_>>();
debug_assert_eq!(certification_heights, snapshot_heights);
for h in states.states_metadata.keys() {
debug_assert!(states.certifications_metadata.contains_key(h));
}
}

fn initialize_tip(
log: &ReplicaLogger,
tip_channel: &Sender<TipRequest>,
Expand Down Expand Up @@ -1965,47 +1986,52 @@ impl StateManagerImpl {
}
}

let hash_tree = hash_lazy_tree(&LazyTree::from(&state));
let certification_metadata = CertificationMetadata {
certified_state_hash: crypto_hash_of_tree(&hash_tree),
hash_tree: Some(Arc::new(hash_tree)),
certification: None,
};

let mut states = self.states.write();
#[cfg(debug_assertions)]
check_certifications_metadata_snapshots_and_states_metadata_are_consistent(&states);
states.disable_state_fetch_below(height);

if let Some(snapshot) = states.snapshots.back() {
if snapshot.height >= height {
info!(
self.log,
"Completed StateSync for state {} that we already have locally", height
);
return;
}
if states
.snapshots
.iter()
.any(|snapshot| snapshot.height == height)
{
info!(
self.log,
"Completed StateSync for state {} that we already have locally", height
);
return;
}

let hash_tree = hash_lazy_tree(&LazyTree::from(&state));

states.snapshots.push_back(Snapshot {
height,
state: Arc::new(state),
});
states
.snapshots
.make_contiguous()
.sort_by_key(|snapshot| snapshot.height);

self.metrics
.resident_state_count
.set(states.snapshots.len() as i64);

states.certifications_metadata.insert(
height,
CertificationMetadata {
certified_state_hash: crypto_hash_of_tree(&hash_tree),
hash_tree: Some(Arc::new(hash_tree)),
certification: None,
},
);
states
.certifications_metadata
.insert(height, certification_metadata);

let state_size_bytes: i64 = manifest
.file_table
.iter()
.map(|f| f.size_bytes as i64)
.sum();

self.metrics.state_size.set(state_size_bytes);

// The computation of meta_manifest is temporary in this replica version.
// In future versions, meta_manifest will also be part of StateSyncMessage
// and can be directly populated here without extra computation.
Expand All @@ -2025,7 +2051,10 @@ impl StateManagerImpl {
);

let latest_height = update_latest_height(&self.latest_state_height, height);
self.metrics.max_resident_height.set(latest_height as i64);
if latest_height == height.get() {
self.metrics.max_resident_height.set(latest_height as i64);
self.metrics.state_size.set(state_size_bytes);
}

self.release_lock_and_persist_metadata(states);

Expand Down Expand Up @@ -2882,6 +2911,8 @@ impl StateManager for StateManagerImpl {
Self::compute_certification_metadata(&self.metrics, &self.log, &checkpointed_state);

let mut states = self.states.write();
#[cfg(debug_assertions)]
check_certifications_metadata_snapshots_and_states_metadata_are_consistent(&states);

// The following assert validates that we don't have two clients
// modifying TIP at the same time and that each commit_and_certify()
Expand All @@ -2908,82 +2939,71 @@ impl StateManager for StateManagerImpl {
);
}

let (tip_height, tip) = match states.snapshots.back() {
Some(latest_snapshot) if height <= latest_snapshot.height => {
// This state is older than the one we already have. This can
// happen if we state-sync and apply blocks in parallel.
info!(
self.log,
"Skip committing an outdated state {}, latest state is {}",
height,
latest_snapshot.height
);
// The next call to take_tip() will take care of updating the
// tip if needed.
(height, state)
}
_ => {
states.snapshots.push_back(Snapshot {
height,
state: Arc::new(checkpointed_state),
});

if scope == CertificationScope::Full {
let manifest_delta = previous_checkpoint_info.map(
|PreviousCheckpointInfo {
dirty_pages,
base_manifest,
base_height,
}| {
manifest::ManifestDelta {
base_manifest,
base_height,
target_height: height,
dirty_memory_pages: dirty_pages,
}
},
);

states.states_metadata.insert(
height,
StateMetadata {
checkpoint_layout: Some(self.state_layout.checkpoint(height).unwrap()),
bundled_manifest: None,
state_sync_file_group: None,
},
);

// On the NNS subnet we never allow incremental manifest computation
let is_nns =
self.own_subnet_id == state.metadata.network_topology.nns_subnet_id;
if !states
.snapshots
.iter()
.any(|snapshot| snapshot.height == height)
{
states.snapshots.push_back(Snapshot {
height,
state: Arc::new(checkpointed_state),
});
states
.snapshots
.make_contiguous()
.sort_by_key(|snapshot| snapshot.height);

self.compute_manifest_request_sender
.send(ComputeManifestRequest::Compute {
checkpoint_layout: cp_layout.unwrap(),
manifest_delta: if is_nns { None } else { manifest_delta },
})
.expect("failed to send ComputeManifestRequest message");
}
states
.certifications_metadata
.insert(height, certification_metadata);

if scope == CertificationScope::Full {
let manifest_delta = previous_checkpoint_info.map(
|PreviousCheckpointInfo {
dirty_pages,
base_manifest,
base_height,
}| {
manifest::ManifestDelta {
base_manifest,
base_height,
target_height: height,
dirty_memory_pages: dirty_pages,
}
},
);

states
.certifications_metadata
.insert(height, certification_metadata);
states.states_metadata.insert(
height,
StateMetadata {
checkpoint_layout: Some(self.state_layout.checkpoint(height).unwrap()),
bundled_manifest: None,
state_sync_file_group: None,
},
);

update_latest_height(&self.latest_state_height, height);
// On the NNS subnet we never allow incremental manifest computation
let is_nns = self.own_subnet_id == state.metadata.network_topology.nns_subnet_id;

(height, state)
self.compute_manifest_request_sender
.send(ComputeManifestRequest::Compute {
checkpoint_layout: cp_layout.unwrap(),
manifest_delta: if is_nns { None } else { manifest_delta },
})
.expect("failed to send ComputeManifestRequest message");
}
};

let latest_height = update_latest_height(&self.latest_state_height, height);
self.metrics.max_resident_height.set(latest_height as i64);
}

self.metrics
.resident_state_count
.set(states.snapshots.len() as i64);

self.metrics
.max_resident_height
.set(tip_height.get() as i64);

states.tip = Some((tip_height, tip));
// The next call to take_tip() will take care of updating the
// tip if needed.
states.tip = Some((height, state));

if scope == CertificationScope::Full {
self.release_lock_and_persist_metadata(states);
Expand Down
81 changes: 76 additions & 5 deletions rs/state_manager/tests/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2370,12 +2370,65 @@ fn can_recover_from_corruption_on_state_sync() {

#[test]
fn can_commit_below_state_sync() {
state_manager_test(|src_metrics, src_state_manager| {
let (_height, mut state) = src_state_manager.take_tip();
insert_dummy_canister(&mut state, canister_test_id(100));
let time_source = ic_test_utilities::FastForwardTimeSource::new();

src_state_manager.commit_and_certify(state, height(1), CertificationScope::Full);
let (_height, state) = src_state_manager.take_tip();
src_state_manager.commit_and_certify(state, height(2), CertificationScope::Full);

let hash = wait_for_checkpoint(&src_state_manager, height(2));
let id = StateSyncArtifactId {
height: height(2),
hash,
};

let msg = src_state_manager
.get_validated_by_identifier(&id)
.expect("failed to get state sync messages");

assert_error_counters(src_metrics);

state_manager_test(|dst_metrics, dst_state_manager| {
let (tip_height, state) = dst_state_manager.take_tip();
assert_eq!(tip_height, height(0));
let chunkable = dst_state_manager.create_chunkable_state(&id);
let dst_msg = pipe_state_sync(msg, chunkable);
dst_state_manager.process_changes(
time_source.as_ref(),
vec![UnvalidatedArtifact {
message: dst_msg,
peer_id: node_test_id(0),
timestamp: mock_time(),
}],
);
// Check committing an old state doesn't panic
dst_state_manager.commit_and_certify(state, height(1), CertificationScope::Full);
wait_for_checkpoint(&dst_state_manager, height(1));

// take_tip should update the tip to the synced checkpoint
let (tip_height, _state) = dst_state_manager.take_tip();
assert_eq!(tip_height, height(2));
assert_eq!(dst_state_manager.latest_state_height(), height(2));
// state 1 should be removeable
dst_state_manager.remove_states_below(height(2));
assert_eq!(dst_state_manager.checkpoint_heights(), vec![height(2)]);
assert_error_counters(dst_metrics);
})
})
}

#[test]
fn can_state_sync_below_commit() {
state_manager_test(|src_metrics, src_state_manager| {
let (_height, mut state) = src_state_manager.take_tip();
insert_dummy_canister(&mut state, canister_test_id(100));
let time_source = ic_test_utilities::FastForwardTimeSource::new();

src_state_manager.commit_and_certify(state.clone(), height(1), CertificationScope::Full);

let hash = wait_for_checkpoint(&src_state_manager, height(1));
let id = StateSyncArtifactId {
height: height(1),
Expand All @@ -2389,8 +2442,18 @@ fn can_commit_below_state_sync() {
assert_error_counters(src_metrics);

state_manager_test(|dst_metrics, dst_state_manager| {
let chunkable = dst_state_manager.create_chunkable_state(&id);
let (tip_height, state) = dst_state_manager.take_tip();
assert_eq!(tip_height, height(0));
dst_state_manager.commit_and_certify(state, height(1), CertificationScope::Full);

let (_height, state) = dst_state_manager.take_tip();
dst_state_manager.commit_and_certify(state, height(2), CertificationScope::Full);
wait_for_checkpoint(&dst_state_manager, height(2));

let (_height, state) = dst_state_manager.take_tip();
dst_state_manager.remove_states_below(height(2));
assert_eq!(dst_state_manager.checkpoint_heights(), vec![height(2)]);
let chunkable = dst_state_manager.create_chunkable_state(&id);
let dst_msg = pipe_state_sync(msg, chunkable);
dst_state_manager.process_changes(
time_source.as_ref(),
Expand All @@ -2400,11 +2463,19 @@ fn can_commit_below_state_sync() {
timestamp: mock_time(),
}],
);
assert_eq!(
dst_state_manager.checkpoint_heights(),
vec![height(1), height(2)]
);
dst_state_manager.commit_and_certify(state, height(3), CertificationScope::Full);
wait_for_checkpoint(&dst_state_manager, height(3));

dst_state_manager.take_tip();
// Check committing an old state doesn't panic
dst_state_manager.commit_and_certify(state, height(1), CertificationScope::Full);

let (tip_height, _state) = dst_state_manager.take_tip();
assert_eq!(tip_height, height(3));
assert_eq!(dst_state_manager.latest_state_height(), height(3));
// state 1 should be removeable
dst_state_manager.remove_states_below(height(3));
assert_eq!(dst_state_manager.checkpoint_heights(), vec![height(3)]);
assert_error_counters(dst_metrics);
})
})
Expand Down

0 comments on commit ee04eef

Please sign in to comment.