Skip to content

Commit

Permalink
Merge branch 'rumenov/rmarchffgd' into 'master'
Browse files Browse the repository at this point in the history
refactor: don't use the ArtifactChunk type in the old P2P; remove the UnitChunk variant as well.

 

See merge request dfinity-lab/public/ic!16600
  • Loading branch information
rumenov committed Dec 8, 2023
2 parents 600affb + c64fe88 commit b3adfe5
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 233 deletions.
56 changes: 4 additions & 52 deletions rs/p2p/src/download_management.rs
Expand Up @@ -83,7 +83,6 @@ use ic_types::{
CanisterHttpArtifact, CertificationArtifact, ConsensusArtifact, DkgArtifact, EcdsaArtifact,
IngressArtifact,
},
chunkable::ArtifactChunkData,
chunkable::ChunkId,
chunkable::CHUNKID_UNIT_CHUNK,
crypto::CryptoHash,
Expand Down Expand Up @@ -211,7 +210,7 @@ impl GossipImpl {
// Allowing the rest of the artifact to be downloaded and
// skipping only the affected chunk increase overall
// resilience.
if let Err(error) = gossip_chunk.artifact_chunk {
if let Err(error) = gossip_chunk.artifact {
self.metrics.chunks_not_served_from_peer.inc();
trace!(
self.log,
Expand Down Expand Up @@ -262,12 +261,7 @@ impl GossipImpl {
let artifact_tracker = artifact_tracker.unwrap();

// Feed the chunk to the tracker.
let completed_artifact = match gossip_chunk.artifact_chunk.unwrap().artifact_chunk_data {
ArtifactChunkData::UnitChunkData(artifact) => artifact,
_ => {
return;
}
};
let completed_artifact = gossip_chunk.artifact.unwrap();

// Record metrics.
self.metrics.artifacts_received.inc();
Expand Down Expand Up @@ -1007,7 +1001,6 @@ pub mod tests {
types::ids::{node_test_id, subnet_test_id},
};
use ic_test_utilities_registry::{add_subnet_record, SubnetRecordBuilder};
use ic_types::chunkable::ArtifactErrorCode;
use ic_types::consensus::dkg::{DealingContent, DkgMessageId, Message as DkgMessage};
use ic_types::crypto::{
threshold_sig::ni_dkg::{NiDkgDealing, NiDkgId, NiDkgTag, NiDkgTargetSubnet},
Expand All @@ -1018,7 +1011,7 @@ pub mod tests {
use ic_types::{
artifact,
artifact::{Artifact, ArtifactAttribute, ArtifactPriorityFn, Priority},
chunkable::{ArtifactChunk, ArtifactChunkData, Chunkable, ChunkableArtifact},
chunkable::ChunkableArtifact,
Height, NodeId, PrincipalId,
};
use ic_types::{artifact::ArtifactKind, artifact_kind::ConsensusArtifact, consensus::*};
Expand All @@ -1037,43 +1030,6 @@ pub mod tests {
#[derive(Default)]
pub(crate) struct TestArtifactManager {}

/// The test artifact.
struct TestArtifact {
/// The number of chunks.
num_chunks: u32,
/// The list of artifact chunks.
chunks: Vec<ArtifactChunk>,
}

/// `TestArtifact` implements the `Chunkable` trait.
impl Chunkable for TestArtifact {
/// The method returns an Iterator over the chunks to download.
fn chunks_to_download(&self) -> Box<dyn Iterator<Item = ChunkId>> {
Box::new(
(0..self.num_chunks)
.map(ChunkId::from)
.collect::<Vec<_>>()
.into_iter(),
)
}

/// The method adds the given chunk.
fn add_chunk(
&mut self,
artifact_chunk: ArtifactChunk,
) -> Result<Artifact, ArtifactErrorCode> {
self.chunks.push(artifact_chunk.clone());
if self.chunks.len() == self.num_chunks as usize {
match artifact_chunk.artifact_chunk_data {
ArtifactChunkData::UnitChunkData(artifact) => Ok(artifact),
_ => Err(ArtifactErrorCode::ChunkVerificationFailed),
}
} else {
Err(ArtifactErrorCode::ChunksMoreNeeded)
}
}
}

/// The `TestArtifactManager` implements the `TestArtifact` trait.
impl ArtifactManager for TestArtifactManager {
/// The method ignores the artifact and always returns Ok(()).
Expand Down Expand Up @@ -1555,10 +1511,6 @@ pub mod tests {
integrity_hash: CryptoHash,
) -> GossipChunk {
let payload = Artifact::DkgMessage(receive_check_test_create_message(number));
let artifact_chunk = ArtifactChunk {
chunk_id,
artifact_chunk_data: ArtifactChunkData::UnitChunkData(payload),
};

let request = GossipChunkRequest {
artifact_id,
Expand All @@ -1567,7 +1519,7 @@ pub mod tests {
};
GossipChunk {
request,
artifact_chunk: Ok(artifact_chunk),
artifact: Ok(payload),
}
}

Expand Down
22 changes: 4 additions & 18 deletions rs/p2p/src/gossip_protocol.rs
Expand Up @@ -68,10 +68,7 @@ use ic_logger::{info, replica_logger::ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_protobuf::registry::subnet::v1::GossipConfig;
use ic_registry_client_helpers::subnet::SubnetRegistry;
use ic_types::{
artifact::ArtifactFilter, chunkable::ArtifactChunk, chunkable::ArtifactChunkData,
crypto::CryptoHash, p2p::GossipAdvert, NodeId, SubnetId,
};
use ic_types::{artifact::ArtifactFilter, crypto::CryptoHash, p2p::GossipAdvert, NodeId, SubnetId};
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
Expand Down Expand Up @@ -261,22 +258,11 @@ impl Gossip for GossipImpl {
.with_label_values(&["in_chunk_request"])
.start_timer();

let artifact_chunk = match self
let artifact = match self
.artifact_manager
.get_validated_by_identifier(&chunk_request.artifact_id)
{
Some(artifact) => artifact
.get_chunk(chunk_request.chunk_id)
.map(|a| ArtifactChunk {
chunk_id: chunk_request.chunk_id,
artifact_chunk_data: ArtifactChunkData::UnitChunkData(a),
})
.ok_or_else(|| {
self.gossip_metrics.requested_chunks_not_found.inc();
P2PError {
p2p_error_code: P2PErrorCode::NotFound,
}
}),
Some(artifact) => Ok(artifact.get_chunk()),
None => {
self.gossip_metrics.requested_chunks_not_found.inc();
Err(P2PError {
Expand All @@ -287,7 +273,7 @@ impl Gossip for GossipImpl {

let gossip_chunk = GossipChunk {
request: chunk_request,
artifact_chunk,
artifact,
};
let message = GossipMessage::Chunk(gossip_chunk);
self.transport_send(message, node_id);
Expand Down
26 changes: 8 additions & 18 deletions rs/p2p/src/gossip_types.rs
Expand Up @@ -5,8 +5,8 @@ use ic_protobuf::types::v1 as pb;
use ic_protobuf::types::v1::gossip_chunk::Response;
use ic_protobuf::types::v1::gossip_message::Body;
use ic_types::{
artifact::{ArtifactFilter, ArtifactId},
chunkable::{ArtifactChunk, ChunkId},
artifact::{Artifact, ArtifactFilter, ArtifactId},
chunkable::ChunkId,
crypto::CryptoHash,
p2p::GossipAdvert,
};
Expand All @@ -31,7 +31,7 @@ pub(crate) struct GossipChunk {
/// The request which resulted in the 'artifact_chunk'.
pub(crate) request: GossipChunkRequest,
/// The artifact chunk, encapsulated in a `P2PResult`.
pub(crate) artifact_chunk: P2PResult<ArtifactChunk>,
pub(crate) artifact: P2PResult<Artifact>,
}

/// This is the message exchanged on the wire with other peers. This
Expand Down Expand Up @@ -135,12 +135,9 @@ impl TryFrom<pb::GossipChunkRequest> for GossipChunkRequest {
impl From<GossipChunk> for pb::GossipChunk {
/// The function converts the given chunk into the Protobuf equivalent.
fn from(gossip_chunk: GossipChunk) -> Self {
let GossipChunk {
request,
artifact_chunk,
} = gossip_chunk;
let response = match artifact_chunk {
Ok(artifact_chunk) => Some(Response::Chunk(artifact_chunk.into())),
let GossipChunk { request, artifact } = gossip_chunk;
let response = match artifact {
Ok(artifact_chunk) => Some(Response::Artifact(artifact_chunk.into())),
// Add additional cases as required.
Err(_) => Some(Response::Error(pb::P2pError::NotFound as i32)),
};
Expand All @@ -160,18 +157,11 @@ impl TryFrom<pb::GossipChunk> for GossipChunk {
let request = gossip_chunk.request.ok_or(ProxyDecodeError::MissingField(
"The 'request' field is missing",
))?;
let chunk_id = ChunkId::from(request.chunk_id);
let request = GossipChunkRequest::try_from(request)?;
Ok(Self {
request,
artifact_chunk: match response {
Response::Chunk(c) => {
let artifact_chunk: ArtifactChunk = c.try_into()?;
Ok(ArtifactChunk {
chunk_id,
artifact_chunk_data: artifact_chunk.artifact_chunk_data,
})
}
artifact: match response {
Response::Artifact(c) => Ok(c.try_into()?),
Response::Error(_e) => Err(P2PError {
p2p_error_code: P2PErrorCode::NotFound,
}),
Expand Down
12 changes: 4 additions & 8 deletions rs/p2p/state_sync_manager/tests/common.rs
Expand Up @@ -293,14 +293,10 @@ impl Chunkable for FakeChunkable {

// Add chunk to state if not part of manifest
if !is_manifest_chunk(artifact_chunk.chunk_id) {
if let ArtifactChunkData::SemiStructuredChunkData(data) =
artifact_chunk.artifact_chunk_data
{
self.local_state
.add_chunk(artifact_chunk.chunk_id, data.len())
} else {
panic!("Bug: Wrong artifact data type.")
}
let ArtifactChunkData::SemiStructuredChunkData(data) =
artifact_chunk.artifact_chunk_data;
self.local_state
.add_chunk(artifact_chunk.chunk_id, data.len())
}

let elems = self.chunk_sets.iter().map(|set| set.len()).sum::<usize>();
Expand Down
12 changes: 2 additions & 10 deletions rs/protobuf/def/types/v1/p2p.proto
Expand Up @@ -36,10 +36,10 @@ message GossipChunkRequest {
message GossipChunk {
GossipChunkRequest request = 6;
oneof response {
ArtifactChunk chunk = 3;
Artifact artifact = 7;
P2PError error = 4;
}
reserved 1, 2, 5;
reserved 1, 2, 3, 5;
}

enum P2PError {
Expand Down Expand Up @@ -77,11 +77,3 @@ message Artifact {
FileTreeSyncArtifact file_tree_sync = 7;
}
}

message ArtifactChunk {
oneof data {
bytes chunk = 3;
Artifact artifact = 4;
}
reserved 1, 2;
}
27 changes: 3 additions & 24 deletions rs/protobuf/src/gen/types/types.v1.rs
Expand Up @@ -1570,7 +1570,7 @@ pub struct GossipChunkRequest {
pub struct GossipChunk {
#[prost(message, optional, tag = "6")]
pub request: ::core::option::Option<GossipChunkRequest>,
#[prost(oneof = "gossip_chunk::Response", tags = "3, 4")]
#[prost(oneof = "gossip_chunk::Response", tags = "7, 4")]
pub response: ::core::option::Option<gossip_chunk::Response>,
}
/// Nested message and enum types in `GossipChunk`.
Expand All @@ -1580,8 +1580,8 @@ pub mod gossip_chunk {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Response {
#[prost(message, tag = "3")]
Chunk(super::ArtifactChunk),
#[prost(message, tag = "7")]
Artifact(super::Artifact),
#[prost(enumeration = "super::P2pError", tag = "4")]
Error(i32),
}
Expand Down Expand Up @@ -1651,27 +1651,6 @@ pub mod artifact {
FileTreeSync(super::FileTreeSyncArtifact),
}
}
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::large_enum_variant)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ArtifactChunk {
#[prost(oneof = "artifact_chunk::Data", tags = "3, 4")]
pub data: ::core::option::Option<artifact_chunk::Data>,
}
/// Nested message and enum types in `ArtifactChunk`.
pub mod artifact_chunk {
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::large_enum_variant)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Data {
#[prost(bytes, tag = "3")]
Chunk(::prost::alloc::vec::Vec<u8>),
#[prost(message, tag = "4")]
Artifact(super::Artifact),
}
}
#[derive(
serde::Serialize,
serde::Deserialize,
Expand Down
11 changes: 2 additions & 9 deletions rs/state_manager/src/state_sync/chunkable.rs
Expand Up @@ -1159,15 +1159,8 @@ impl Chunkable for IncompleteState {

fn add_chunk(&mut self, artifact_chunk: ArtifactChunk) -> Result<Artifact, ArtifactErrorCode> {
let ix = artifact_chunk.chunk_id.get();

let payload = match artifact_chunk.artifact_chunk_data {
ArtifactChunkData::SemiStructuredChunkData(ref payload) => payload,
other => {
warn!(self.log, "State sync chunk has wrong shape {:?}", other);
return Err(ChunkVerificationFailed);
}
};

let ArtifactChunkData::SemiStructuredChunkData(ref payload) =
artifact_chunk.artifact_chunk_data;
match &mut self.state {
DownloadState::Complete(ref artifact) => {
debug!(
Expand Down
6 changes: 0 additions & 6 deletions rs/state_manager/tests/common/mod.rs
Expand Up @@ -396,12 +396,6 @@ pub fn pipe_state_sync(src: StateSyncMessage, mut dst: Box<dyn Chunkable>) -> St

fn alter_chunk_data(chunk: &mut ArtifactChunk) {
let mut chunk_data = match &chunk.artifact_chunk_data {
ArtifactChunkData::UnitChunkData(_) => {
panic!(
"Unexpected artifact chunk data type: {:?}",
chunk.artifact_chunk_data
);
}
ArtifactChunkData::SemiStructuredChunkData(chunk_data) => chunk_data.clone(),
};
match chunk_data.last_mut() {
Expand Down

0 comments on commit b3adfe5

Please sign in to comment.