Skip to content

Commit

Permalink
Merge branch 'tim/verify-id' into 'master'
Browse files Browse the repository at this point in the history
fix(consensus_manager): [NET-1614] verify that downloaded artifact matches advertised Id

After downloading an artifact with an RPC it should be checked that the ID of the downloaded artifact matches the advertised ID. 

See merge request dfinity-lab/public/ic!16595
  • Loading branch information
tthebst committed Dec 12, 2023
2 parents a37b22a + 8ef4154 commit 9c3c37f
Showing 1 changed file with 86 additions and 4 deletions.
90 changes: 86 additions & 4 deletions rs/p2p/consensus_manager/src/receiver.rs
Expand Up @@ -14,7 +14,7 @@ use axum::{
use bytes::Bytes;
use crossbeam_channel::Sender as CrossbeamSender;
use ic_interfaces::p2p::consensus::{PriorityFnAndFilterProducer, ValidatedPoolReader};
use ic_logger::{error, ReplicaLogger};
use ic_logger::{error, warn, ReplicaLogger};
use ic_peer_manager::SubnetTopology;
use ic_protobuf::{p2p::v1 as pb, proxy::ProtoProxy};
use ic_quic_transport::{ConnId, Transport};
Expand Down Expand Up @@ -303,6 +303,7 @@ where
self.metrics.download_task_started_total.inc();
self.artifact_processor_tasks.spawn_on(
Self::process_advert(
self.log.clone(),
id,
attr,
None,
Expand Down Expand Up @@ -399,6 +400,7 @@ where

self.artifact_processor_tasks.spawn_on(
Self::process_advert(
self.log.clone(),
id.clone(),
attribute,
artifact.map(|a| (a, peer_id)),
Expand Down Expand Up @@ -485,6 +487,7 @@ where
/// - The priority function evaluates the advert to [`Priority::Drop`] -> [`DownloadResult::PriorityIsDrop`]
/// - The set of peers advertising the artifact, `peer_rx`, becomes empty -> [`DownloadResult::AllPeersDeletedTheArtifact`]
async fn download_artifact(
log: ReplicaLogger,
id: &Artifact::Id,
attr: &Artifact::Attribute,
// Only first peer for specific artifact ID is considered for push
Expand All @@ -506,7 +509,8 @@ where
.await?;

match artifact {
// Artifact was pushed by peer.
// Artifact was pushed by peer. In this case we don't need check that the artifact ID corresponds
// to the artifact because we earlier derived the ID from the artifact.
Some((artifact, peer_id)) => Ok((artifact, peer_id)),

// Fetch artifact
Expand All @@ -532,8 +536,15 @@ where
Ok(Ok(response)) if response.status() == StatusCode::OK => {
let body = response.into_body();
if let Ok(message) = Artifact::PbMessage::proxy_decode(&body) {
result = Ok((message, peer));
break;
if &Artifact::message_to_advert(&message).id == id {
result = Ok((message, peer));
break;
} else {
warn!(
log,
"Peer {} responded with wrong artifact for advert", peer
);
}
}
}
_ => {
Expand Down Expand Up @@ -566,6 +577,7 @@ where
/// This future completes waits for all peers that advertise the artifact to delete it.
/// The artifact is deleted from the unvalidated pool upon completion.
async fn process_advert(
log: ReplicaLogger,
id: Artifact::Id,
attr: Artifact::Attribute,
// Only first peer for specific artifact ID is considered for push
Expand All @@ -582,6 +594,7 @@ where
) {
let _timer = metrics.download_task_duration.start_timer();
let download_result = Self::download_artifact(
log,
&id,
&attr,
artifact,
Expand Down Expand Up @@ -659,6 +672,7 @@ where
}
}

#[derive(Debug, PartialEq, Eq)]
enum DownloadStopped {
AllPeersDeletedTheArtifact,
PriorityIsDrop,
Expand Down Expand Up @@ -686,6 +700,7 @@ mod tests {
use std::{backtrace::Backtrace, sync::Mutex};

use axum::http::Response;
use ic_logger::replica_logger::no_op_logger;
use ic_metrics::MetricsRegistry;
use ic_p2p_test_utils::{
consensus::U64Artifact,
Expand Down Expand Up @@ -1808,4 +1823,71 @@ mod tests {
.expect_err("Task should not close because it is stash.");
});
}

/// Verify that downloads with AdvertId != ArtifactId are not added to the pool.
#[test]
fn invalid_artifact_not_accepted() {
// Abort process if a thread panics. This catches detached tokio tasks that panic.
// https://github.com/tokio-rs/tokio/issues/4516
std::panic::set_hook(Box::new(|info| {
let stacktrace = Backtrace::force_capture();
println!("Got panic. @info:{}\n@stackTrace:{}", info, stacktrace);
std::process::abort();
}));
let rt = tokio::runtime::Runtime::new().unwrap();
let mut mock_transport = MockTransport::new();
let mut seq = Sequence::new();
// Respond with artifact that does not correspond to the advertised ID
mock_transport
.expect_rpc()
.once()
.returning(|_, _| {
Ok(Response::builder()
.body(Bytes::from(
<<U64Artifact as ArtifactKind>::PbMessage>::proxy_encode(1_u64),
))
.unwrap())
})
.in_sequence(&mut seq);
// Respond with artifact that does correspond to the advertised ID
mock_transport
.expect_rpc()
.once()
.returning(|_, _| {
// Respond with artifact that does correspond to the advertised ID
Ok(Response::builder()
.body(Bytes::from(
<<U64Artifact as ArtifactKind>::PbMessage>::proxy_encode(0_u64),
))
.unwrap())
})
.in_sequence(&mut seq);

let mut pc = PeerCounter::new();
pc.insert(NODE_1);
let (_peer_tx, mut peer_rx) = watch::channel(pc);
let pfn = |_: &_, _: &_| Priority::Fetch;
let (_pfn_tx, pfn_rx) = watch::channel(Box::new(pfn) as Box<_>);

rt.block_on(async {
assert_eq!(
ConsensusManagerReceiver::<
U64Artifact,
MockValidatedPoolReader,
(AdvertUpdate<U64Artifact>, NodeId, ConnId),
>::download_artifact(
no_op_logger(),
&0,
&(),
None,
&mut peer_rx,
pfn_rx,
Arc::new(mock_transport),
ConsensusManagerMetrics::new::<U64Artifact>(&MetricsRegistry::default()),
)
.await,
Ok((0, NODE_1))
)
});
}
}

0 comments on commit 9c3c37f

Please sign in to comment.