Skip to content

Commit

Permalink
refactor: move the StateSyncArtifactId into the P2P interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
rumenov committed Jan 17, 2024
1 parent 5c80fa2 commit 24abd67
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 99 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion rs/interfaces/src/p2p/state_sync.rs
@@ -1,8 +1,34 @@
//! The file contains the synchronous interface used from P2P, to drive the StateSync protocol.
use ic_types::artifact::StateSyncArtifactId;
use ic_protobuf::p2p::v1 as p2p_pb;
use ic_types::{crypto::CryptoHash, Height};
use phantom_newtype::Id;
use thiserror::Error;

/// Identifier of a state sync artifact.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct StateSyncArtifactId {
pub height: Height,
pub hash: CryptoHash,
}

impl From<StateSyncArtifactId> for p2p_pb::StateSyncId {
fn from(id: StateSyncArtifactId) -> Self {
Self {
height: id.height.get(),
hash: id.hash.0,
}
}
}

impl From<p2p_pb::StateSyncId> for StateSyncArtifactId {
fn from(id: p2p_pb::StateSyncId) -> Self {
Self {
height: Height::from(id.height),
hash: CryptoHash(id.hash),
}
}
}

pub type Chunk = Vec<u8>;

/// Error codes returned by the `Chunkable` interface.
Expand Down
3 changes: 2 additions & 1 deletion rs/p2p/state_sync_manager/BUILD.bazel
Expand Up @@ -13,7 +13,7 @@ DEPENDENCIES = [
"//rs/monitoring/metrics",
"//rs/protobuf",
"//rs/p2p/quic_transport",
"//rs/types/types",
"//rs/types/base_types",
"@crate_index//:axum_0_7_0",
"@crate_index//:base64",
"@crate_index//:bytes",
Expand All @@ -34,6 +34,7 @@ DEV_DEPENDENCIES = [
"//rs/state_manager",
"//rs/p2p/test_utils",
"//rs/test_utilities/logger",
"//rs/types/types",
"//rs/types/types_test_utils",
"@crate_index//:mockall",
"@crate_index//:turmoil",
Expand Down
3 changes: 2 additions & 1 deletion rs/p2p/state_sync_manager/Cargo.toml
Expand Up @@ -16,7 +16,7 @@ ic-logger = { path = "../../monitoring/logger" }
ic-metrics = { path = "../../monitoring/metrics" }
ic-protobuf = { path = "../../protobuf" }
ic-quic-transport = { path = "../quic_transport" }
ic-types = { path = "../../types/types" }
ic-base-types = { path = "../../types/base_types" }
prometheus = { workspace = true }
prost = { workspace = true }
rand = "0.8.5"
Expand All @@ -33,6 +33,7 @@ ic-memory-transport = { path = "../memory_transport" }
ic-p2p-test-utils = { path = "../test_utils" }
ic-state-manager = { path = "../../state_manager" }
ic-test-utilities-logger = { path = "../../test_utilities/logger" }
ic-types = { path = "../../types/types" }
ic-types-test-utils = { path = "../../types/types_test_utils" }
mockall = { workspace = true }
turmoil = { workspace = true }
10 changes: 5 additions & 5 deletions rs/p2p/state_sync_manager/src/lib.rs
Expand Up @@ -21,11 +21,11 @@ use std::{
};

use axum::{routing::any, Router};
use ic_interfaces::p2p::state_sync::StateSyncClient;
use ic_base_types::NodeId;
use ic_interfaces::p2p::state_sync::{StateSyncArtifactId, StateSyncClient};
use ic_logger::{info, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_quic_transport::Transport;
use ic_types::{artifact::StateSyncArtifactId, NodeId};
use metrics::{StateSyncManagerHandlerMetrics, StateSyncManagerMetrics};
use ongoing::OngoingStateSyncHandle;
use routes::{
Expand Down Expand Up @@ -249,7 +249,7 @@ mod tests {
use ic_metrics::MetricsRegistry;
use ic_p2p_test_utils::mocks::{MockChunkable, MockStateSync, MockTransport};
use ic_test_utilities_logger::with_test_replica_logger;
use ic_types::{crypto::CryptoHash, CryptoHashOfState, Height};
use ic_types::{crypto::CryptoHash, Height};
use ic_types_test_utils::ids::{NODE_1, NODE_2};
use mockall::Sequence;
use prost::Message;
Expand Down Expand Up @@ -330,11 +330,11 @@ mod tests {
let rt = Runtime::new().unwrap();
let old_id = StateSyncArtifactId {
height: Height::from(0),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
};
let id = StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
};

let (handler_tx, handler_rx) = tokio::sync::mpsc::channel(100);
Expand Down
12 changes: 6 additions & 6 deletions rs/p2p/state_sync_manager/src/ongoing.rs
Expand Up @@ -20,10 +20,10 @@ use crate::metrics::OngoingStateSyncMetrics;
use crate::routes::{build_chunk_handler_request, parse_chunk_handler_response};

use ic_async_utils::JoinMap;
use ic_interfaces::p2p::state_sync::{ChunkId, Chunkable, StateSyncClient};
use ic_base_types::NodeId;
use ic_interfaces::p2p::state_sync::{ChunkId, Chunkable, StateSyncArtifactId, StateSyncClient};
use ic_logger::{error, info, ReplicaLogger};
use ic_quic_transport::Transport;
use ic_types::{artifact::StateSyncArtifactId, NodeId};
use rand::{
distributions::{Distribution, WeightedIndex},
rngs::SmallRng,
Expand Down Expand Up @@ -378,7 +378,7 @@ mod tests {
use ic_metrics::MetricsRegistry;
use ic_p2p_test_utils::mocks::{MockChunkable, MockStateSync, MockTransport};
use ic_test_utilities_logger::with_test_replica_logger;
use ic_types::{crypto::CryptoHash, CryptoHashOfState, Height};
use ic_types::{crypto::CryptoHash, Height};
use ic_types_test_utils::ids::{NODE_1, NODE_2};
use prost::Message;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -422,7 +422,7 @@ mod tests {
Arc::new(Mutex::new(Box::new(c))),
StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
},
Arc::new(s),
Arc::new(t),
Expand Down Expand Up @@ -464,7 +464,7 @@ mod tests {
Arc::new(Mutex::new(Box::new(c))),
StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
},
Arc::new(s),
Arc::new(t),
Expand Down Expand Up @@ -511,7 +511,7 @@ mod tests {
Arc::new(Mutex::new(Box::new(c))),
StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
},
Arc::new(s),
Arc::new(t),
Expand Down
3 changes: 2 additions & 1 deletion rs/p2p/state_sync_manager/src/routes/advert.rs
Expand Up @@ -7,9 +7,10 @@ use axum::{
Extension,
};
use bytes::BytesMut;
use ic_base_types::NodeId;
use ic_interfaces::p2p::state_sync::StateSyncArtifactId;
use ic_logger::ReplicaLogger;
use ic_protobuf::p2p::v1 as pb;
use ic_types::{artifact::StateSyncArtifactId, NodeId};
use prost::Message;

pub const STATE_SYNC_ADVERT_PATH: &str = "/state-sync/advert";
Expand Down
3 changes: 1 addition & 2 deletions rs/p2p/state_sync_manager/src/routes/chunk.rs
Expand Up @@ -8,10 +8,9 @@ use axum::{
http::{Request, Response, StatusCode},
};
use bytes::BytesMut;
use ic_interfaces::p2p::state_sync::{Chunk, ChunkId, StateSyncClient};
use ic_interfaces::p2p::state_sync::{Chunk, ChunkId, StateSyncArtifactId, StateSyncClient};
use ic_logger::ReplicaLogger;
use ic_protobuf::p2p::v1 as pb;
use ic_types::artifact::StateSyncArtifactId;
use prost::Message;

pub const STATE_SYNC_CHUNK_PATH: &str = "/state-sync/chunk";
Expand Down
12 changes: 7 additions & 5 deletions rs/p2p/state_sync_manager/tests/common.rs
Expand Up @@ -9,15 +9,17 @@ use std::{
time::Duration,
};

use ic_interfaces::p2p::state_sync::{AddChunkError, Chunk, ChunkId, Chunkable, StateSyncClient};
use ic_interfaces::p2p::state_sync::{
AddChunkError, Chunk, ChunkId, Chunkable, StateSyncArtifactId, StateSyncClient,
};
use ic_logger::ReplicaLogger;
use ic_memory_transport::TransportRouter;
use ic_metrics::MetricsRegistry;
use ic_p2p_test_utils::mocks::{MockChunkable, MockStateSync};
use ic_state_manager::state_sync::types::{Manifest, MetaManifest, StateSyncMessage};
use ic_types::{
artifact::StateSyncArtifactId, crypto::CryptoHash, state_sync::StateSyncVersion,
CryptoHashOfState, Height, NodeId, PrincipalId,
crypto::CryptoHash, state_sync::StateSyncVersion, CryptoHashOfState, Height, NodeId,
PrincipalId,
};
use tokio::{runtime::Handle, task::JoinHandle};

Expand Down Expand Up @@ -108,7 +110,7 @@ impl State {
state.chunks.hash(&mut hasher);
StateSyncArtifactId {
height: state.height,
hash: CryptoHashOfState::from(CryptoHash(hasher.finish().to_be_bytes().to_vec())),
hash: CryptoHash(hasher.finish().to_be_bytes().to_vec()),
}
}

Expand Down Expand Up @@ -429,7 +431,7 @@ fn state_sync_artifact(id: StateSyncArtifactId) -> StateSyncMessage {

StateSyncMessage {
height: id.height,
root_hash: id.hash,
root_hash: CryptoHashOfState::from(id.hash),
checkpoint_root: PathBuf::new(),
manifest,
meta_manifest: Arc::new(meta_manifest),
Expand Down
20 changes: 9 additions & 11 deletions rs/p2p/state_sync_manager/tests/test.rs
Expand Up @@ -11,7 +11,7 @@ use crate::common::{
SharableMockChunkable, State,
};
use common::SharableMockStateSync;
use ic_interfaces::p2p::state_sync::{AddChunkError, ChunkId};
use ic_interfaces::p2p::state_sync::{AddChunkError, ChunkId, StateSyncArtifactId};
use ic_logger::info;
use ic_memory_transport::TransportRouter;
use ic_p2p_test_utils::{
Expand All @@ -23,9 +23,7 @@ use ic_p2p_test_utils::{
ConnectivityChecker,
};
use ic_test_utilities_logger::with_test_replica_logger;
use ic_types::{
artifact::StateSyncArtifactId, crypto::CryptoHash, CryptoHashOfState, Height, RegistryVersion,
};
use ic_types::{crypto::CryptoHash, Height, RegistryVersion};
use ic_types_test_utils::ids::{NODE_1, NODE_2, NODE_3};
use tokio::sync::Notify;
use turmoil::Builder;
Expand Down Expand Up @@ -327,11 +325,11 @@ fn test_single_advert_between_two_nodes() {
let received_advert_n2_a1 = Arc::new(AtomicBool::new(false));
let state_sync_id_1 = StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
};
let state_sync_id_2 = StateSyncArtifactId {
height: Height::from(2),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
};
let state_sync_id_2_clone = state_sync_id_2.clone();
let state_sync_id_1_clone = state_sync_id_1.clone();
Expand Down Expand Up @@ -442,11 +440,11 @@ fn test_multiple_advert_between_two_nodes() {
let received_advert_n2_a2 = Arc::new(AtomicBool::new(false));
let state_sync_id_1 = StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
};
let state_sync_id_2 = StateSyncArtifactId {
height: Height::from(2),
hash: CryptoHashOfState::new(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
};
let state_sync_id_2_clone = state_sync_id_2.clone();
let state_sync_id_1_clone = state_sync_id_1.clone();
Expand Down Expand Up @@ -646,7 +644,7 @@ fn test_state_sync_abortion() {
vec![
StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::from(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
};
10
]
Expand Down Expand Up @@ -701,7 +699,7 @@ fn test_state_sync_abortion() {
.return_once(|| {
vec![StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::from(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
}]
});
s1.get_mut()
Expand Down Expand Up @@ -749,7 +747,7 @@ fn test_state_sync_abortion() {
.return_once(|| {
vec![StateSyncArtifactId {
height: Height::from(1),
hash: CryptoHashOfState::from(CryptoHash(vec![])),
hash: CryptoHash(vec![]),
}]
});
s2.get_mut().expect_available_states().return_const(vec![]);
Expand Down
4 changes: 2 additions & 2 deletions rs/p2p/test_utils/src/mocks.rs
Expand Up @@ -4,11 +4,11 @@ use axum::http::{Request, Response};
use bytes::Bytes;
use ic_interfaces::p2p::{
consensus::{PriorityFnAndFilterProducer, ValidatedPoolReader},
state_sync::{AddChunkError, Chunk, ChunkId, Chunkable, StateSyncClient},
state_sync::{AddChunkError, Chunk, ChunkId, Chunkable, StateSyncArtifactId, StateSyncClient},
};
use ic_quic_transport::{ConnId, SendError, Transport};
use ic_types::artifact::PriorityFn;
use ic_types::{artifact::StateSyncArtifactId, NodeId};
use ic_types::NodeId;
use mockall::mock;

mock! {
Expand Down
18 changes: 10 additions & 8 deletions rs/state_manager/src/state_sync.rs
Expand Up @@ -7,9 +7,11 @@ use crate::{
state_sync::types::{FileGroupChunks, StateSyncMessage},
StateSyncRefs, EXTRA_CHECKPOINTS_TO_KEEP, NUMBER_OF_CHECKPOINT_THREADS,
};
use ic_interfaces::p2p::state_sync::{Chunk, ChunkId, Chunkable, StateSyncClient};
use ic_interfaces::p2p::state_sync::{
Chunk, ChunkId, Chunkable, StateSyncArtifactId, StateSyncClient,
};
use ic_logger::{info, warn, ReplicaLogger};
use ic_types::{artifact::StateSyncArtifactId, Height};
use ic_types::{CryptoHashOfState, Height};
use std::sync::{Arc, Mutex};

#[derive(Clone)]
Expand Down Expand Up @@ -37,7 +39,7 @@ impl StateSync {
Box::new(crate::state_sync::chunkable::IncompleteState::new(
self.log.clone(),
id.height,
id.hash.clone(),
CryptoHashOfState::from(id.hash.clone()),
self.state_manager.state_layout.clone(),
self.state_manager.latest_manifest(),
self.state_manager.metrics.clone(),
Expand All @@ -64,7 +66,7 @@ impl StateSync {
.states_metadata
.iter()
.find_map(|(height, metadata)| {
if metadata.root_hash() == Some(&msg_id.hash) {
if metadata.root_hash().map(|v| v.get_ref()) == Some(&msg_id.hash) {
let manifest = metadata.manifest()?;
let meta_manifest = metadata.meta_manifest()?;
let checkpoint_root =
Expand All @@ -82,7 +84,7 @@ impl StateSync {

Some(StateSyncMessage {
height: *height,
root_hash: msg_id.hash.clone(),
root_hash: CryptoHashOfState::from(msg_id.hash.clone()),
checkpoint_root: checkpoint_root.raw_path().to_path_buf(),
meta_manifest,
manifest: manifest.clone(),
Expand Down Expand Up @@ -142,7 +144,7 @@ impl StateSync {
};
Some(StateSyncArtifactId {
height: msg.height,
hash: msg.root_hash.clone(),
hash: msg.root_hash.clone().get(),
})
} else {
None
Expand All @@ -165,7 +167,7 @@ impl StateSync {
if let Some(recorded_root_hash) = self.state_sync_refs.get(&artifact_id.height) {
// If this advert@h is for an ongoing state sync, we check if the hash is the
// same as the hash that consensus gave us.
if recorded_root_hash != artifact_id.hash {
if recorded_root_hash.get_ref() != &artifact_id.hash {
warn!(
self.log,
"Received an advert for state @{} with a hash that does not match the hash of the state we are fetching: expected {:?}, got {:?}",
Expand All @@ -190,7 +192,7 @@ impl StateSync {
return match artifact_id.height.cmp(max_sync_height) {
Ordering::Less => false,
// Drop the advert if the hashes do not match.
Ordering::Equal if *hash != artifact_id.hash => {
Ordering::Equal if hash.get_ref() != &artifact_id.hash => {
warn!(
self.log,
"Received an advert for state {} with a hash that does not match the hash passed to fetch_state: expected {:?}, got {:?}",
Expand Down

0 comments on commit 24abd67

Please sign in to comment.