Skip to content

Commit

Permalink
Merge branch 'tim/return-all-states' into 'master'
Browse files Browse the repository at this point in the history
fix: Advertise all available states

Problem:
There exist edge cases where advertising the latest state is not enough. In some cases it is necessary to sync the second newest state to produce a CUP for the next state.

Solution:
Send an advert for all states available to a peer. This makes sure that other peers will at some point know about all states a peer has. Since peers only fetch the state of the lastest CUP ordering does not matter.


Follow-up:
Add test that checks the ordering. 

See merge request dfinity-lab/public/ic!13895
  • Loading branch information
tthebst committed Aug 3, 2023
2 parents 643a54a + b281d01 commit 205fc8b
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 18 deletions.
4 changes: 2 additions & 2 deletions rs/interfaces/src/state_sync_client.rs
Expand Up @@ -5,8 +5,8 @@ use ic_types::{
};

pub trait StateSyncClient: Send + Sync {
/// Returns the Id of the latest available state or None if no state is available.
fn latest_state(&self) -> Option<StateSyncArtifactId>;
/// Returns a list of all states available.
fn available_states(&self) -> Vec<StateSyncArtifactId>;
/// Initiates new state sync for the specified Id. Returns None if the state should not be synced.
/// If `Some(..)` is returned a new state sync is initiated.
/// Callers of this interface need to uphold the following: `start_state_sync` is not called again
Expand Down
21 changes: 17 additions & 4 deletions rs/p2p/state_sync_manager/src/lib.rs
Expand Up @@ -170,10 +170,23 @@ impl StateSyncManager {
}

fn handle_advert_tick(&mut self) {
if let Some(state_id) = self.state_sync.latest_state() {
self.metrics
.latest_state_height_broadcasted
.set(state_id.height.get() as i64);
let available_states = self.state_sync.available_states();
self.metrics.lowest_state_broadcasted.set(
available_states
.iter()
.map(|h| h.height.get())
.min()
.unwrap_or_default() as i64,
);
self.metrics.highest_state_broadcasted.set(
available_states
.iter()
.map(|h| h.height.get())
.max()
.unwrap_or_default() as i64,
);

for state_id in available_states {
// Unreliable broadcast of adverts to all current peers.
for peer_id in self.transport.peers() {
let request = build_advert_handler_request(state_id.clone());
Expand Down
13 changes: 9 additions & 4 deletions rs/p2p/state_sync_manager/src/metrics.rs
Expand Up @@ -20,7 +20,8 @@ const CHUNK_DOWNLOAD_STATUS_SUCCESS: &str = "success";
pub(crate) struct StateSyncManagerMetrics {
pub state_syncs_total: IntCounter,
pub adverts_received_total: IntCounter,
pub latest_state_height_broadcasted: IntGauge,
pub highest_state_broadcasted: IntGauge,
pub lowest_state_broadcasted: IntGauge,
pub ongoing_state_sync_metrics: OngoingStateSyncMetrics,
}

Expand All @@ -35,9 +36,13 @@ impl StateSyncManagerMetrics {
"state_sync_manager_adverts_received_total",
"Total number of adverts received.",
),
latest_state_height_broadcasted: metrics_registry.int_gauge(
"state_sync_manager_latest_state_height_broadcasted",
"State height that was last broadcasted.",
highest_state_broadcasted: metrics_registry.int_gauge(
"state_sync_manager_highest_state_broadcasted",
"Highest state height broadcasted.",
),
lowest_state_broadcasted: metrics_registry.int_gauge(
"state_sync_manager_lowest_state_broadcasted",
"Lowest state height broadcasted.",
),
ongoing_state_sync_metrics: OngoingStateSyncMetrics::new(metrics_registry),
}
Expand Down
2 changes: 1 addition & 1 deletion rs/p2p/state_sync_manager/src/ongoing.rs
Expand Up @@ -368,7 +368,7 @@ mod tests {
pub StateSync {}

impl StateSyncClient for StateSync {
fn latest_state(&self) -> Option<StateSyncArtifactId>;
fn available_states(&self) -> Vec<StateSyncArtifactId>;

fn start_state_sync(
&self,
Expand Down
8 changes: 4 additions & 4 deletions rs/p2p/state_sync_manager/tests/common.rs
Expand Up @@ -182,14 +182,14 @@ impl FakeStateSync {
}

impl StateSyncClient for FakeStateSync {
fn latest_state(&self) -> Option<StateSyncArtifactId> {
fn available_states(&self) -> Vec<StateSyncArtifactId> {
if self.disconnected.load(Ordering::SeqCst) {
return None;
return vec![];
}
if self.uses_global() {
Some(self.global_state.artifact_id())
vec![self.global_state.artifact_id()]
} else {
Some(self.local_state.artifact_id())
vec![self.local_state.artifact_id()]
}
}

Expand Down
6 changes: 3 additions & 3 deletions rs/state_manager/src/state_sync.rs
Expand Up @@ -357,16 +357,16 @@ impl ArtifactProcessor<StateSyncArtifact> for StateSync {

impl StateSyncClient for StateSync {
/// Non-blocking.
fn latest_state(&self) -> Option<StateSyncArtifactId> {
fn available_states(&self) -> Vec<StateSyncArtifactId> {
// Using height 0 here is sane because for state sync `get_all_validated_by_filter`
// return at most the number of states present on the node. Currently this is usually 1-2.
let filter = StateSyncFilter {
height: Height::from(0),
};
self.get_all_validated_by_filter(&filter)
.last()
.cloned()
.into_iter()
.map(|a| a.id)
.collect()
}

/// Non-blocking.
Expand Down

0 comments on commit 205fc8b

Please sign in to comment.