From 111538c251558ddda56f3997b4b37d353a6c8e5c Mon Sep 17 00:00:00 2001 From: Leo Eichhorn Date: Mon, 30 Oct 2023 17:08:03 +0000 Subject: [PATCH] feat(consensus): CON-1087 Don't relay shares during retransmissions --- rs/artifact_pool/benches/load_blocks.rs | 1 + .../src/bin/consensus_pool_util.rs | 4 +- rs/artifact_pool/src/certification_pool.rs | 154 +++++++++-- rs/artifact_pool/src/consensus_pool.rs | 252 ++++++++++++++++-- rs/artifact_pool/src/consensus_pool_cache.rs | 2 + rs/consensus/benches/validate_payload.rs | 1 + rs/consensus/mocks/src/lib.rs | 1 + rs/consensus/src/certification/certifier.rs | 9 +- rs/consensus/tests/framework/driver.rs | 1 + rs/consensus/tests/framework/types.rs | 1 + rs/consensus/tests/payload.rs | 7 +- rs/p2p/tests/framework/p2p_runner.rs | 1 + rs/recovery/src/steps.rs | 8 +- rs/replay/src/player.rs | 4 + rs/replay/src/validator.rs | 1 + rs/replica/setup_ic_network/src/lib.rs | 1 + rs/replica/src/setup_ic_stack.rs | 1 + .../artifact_pool/src/consensus_pool.rs | 2 + 18 files changed, 401 insertions(+), 50 deletions(-) diff --git a/rs/artifact_pool/benches/load_blocks.rs b/rs/artifact_pool/benches/load_blocks.rs index c2db1f3441f..5e9fc25d739 100644 --- a/rs/artifact_pool/benches/load_blocks.rs +++ b/rs/artifact_pool/benches/load_blocks.rs @@ -27,6 +27,7 @@ where { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { let mut consensus_pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_test_id(0), subnet_test_id(0), make_genesis(ic_types::consensus::dkg::Summary::fake()), pool_config, diff --git a/rs/artifact_pool/src/bin/consensus_pool_util.rs b/rs/artifact_pool/src/bin/consensus_pool_util.rs index 50b3b9b42e0..f6b4516bb47 100644 --- a/rs/artifact_pool/src/bin/consensus_pool_util.rs +++ b/rs/artifact_pool/src/bin/consensus_pool_util.rs @@ -10,6 +10,7 @@ use ic_metrics::MetricsRegistry; use ic_types::{ consensus::{certification::CertificationMessage, CatchUpPackage, ConsensusMessageHashable}, time::current_time, + NodeId, PrincipalId, }; use prost::Message; use serde::{Deserialize, Serialize}; @@ -121,7 +122,8 @@ fn open_certification_pool(path: &str, read_only: bool) -> CertificationPoolImpl let path = PathBuf::from(path); let mut config = ArtifactPoolConfig::new(path); config.persistent_pool_read_only = read_only; - CertificationPoolImpl::new(config, log, MetricsRegistry::new()) + let node_id = NodeId::from(PrincipalId::new_node_test_id(0)); + CertificationPoolImpl::new(node_id, config, log, MetricsRegistry::new()) } fn from_str<'a, T: Deserialize<'a>>(json: &'a str) -> Result { diff --git a/rs/artifact_pool/src/certification_pool.rs b/rs/artifact_pool/src/certification_pool.rs index 6212c7aa600..310ff39d9c7 100644 --- a/rs/artifact_pool/src/certification_pool.rs +++ b/rs/artifact_pool/src/certification_pool.rs @@ -12,6 +12,7 @@ use ic_metrics::MetricsRegistry; use ic_types::artifact::ArtifactKind; use ic_types::consensus::IsShare; use ic_types::crypto::crypto_hash; +use ic_types::NodeId; use ic_types::{ artifact::CertificationMessageFilter, artifact::CertificationMessageId, @@ -29,6 +30,7 @@ use std::collections::HashSet; /// multi-signatures of (height, hash) pairs, where hash corresponds to an /// execution state. pub struct CertificationPoolImpl { + node_id: NodeId, // Unvalidated shares and certifications are stored separately to improve the validation // performance by checking for full certifications first. unvalidated_shares: HeightIndex, @@ -47,6 +49,7 @@ const POOL_CERTIFICATION: &str = "certification"; impl CertificationPoolImpl { pub fn new( + node_id: NodeId, config: ArtifactPoolConfig, log: ReplicaLogger, metrics_registry: MetricsRegistry, @@ -73,6 +76,7 @@ impl CertificationPoolImpl { }; CertificationPoolImpl { + node_id, unvalidated_shares: HeightIndex::default(), unvalidated_certifications: HeightIndex::default(), persistent_pool, @@ -362,17 +366,40 @@ impl ValidatedPoolReader for CertificationPoolImpl { &self, filter: &CertificationMessageFilter, ) -> Box + '_> { - // Return all validated certifications and all shares above the filter - let min_height = filter.height.get(); - let all_certs = self - .validated_certifications() - .filter(move |cert| cert.height > Height::from(min_height)) - .map(CertificationMessage::Certification); - let all_shares = self - .validated_shares() - .filter(move |share| share.height > Height::from(min_height)) - .map(CertificationMessage::CertificationShare); - Box::new(all_certs.chain(all_shares)) + // In case we received a filter of u64::MAX, don't overflow. + let Some(filter) = filter.height.get().checked_add(1).map(Height::from) else { + return Box::new(std::iter::empty()); + }; + + let certification_range = self.persistent_pool.certifications().height_range(); + let share_range = self.persistent_pool.certification_shares().height_range(); + + let ranges = [certification_range.as_ref(), share_range.as_ref()] + .into_iter() + .flatten(); + let Some(min_height) = ranges.clone().map(|range| range.min).min() else { + return Box::new(std::iter::empty()); + }; + let min = min_height.max(filter); + let max = ranges.map(|range| range.max).max().unwrap_or(min); + + // For all heights above the minimum, return the validated certification of the subnet, + // or the share signed by this node if we don't have the aggregate. + let iterator = (min.get()..=max.get()).map(Height::from).flat_map(|h| { + let mut certifications = self.persistent_pool.certifications().get_by_height(h); + if let Some(certification) = certifications.next() { + vec![CertificationMessage::Certification(certification)] + } else { + self.persistent_pool + .certification_shares() + .get_by_height(h) + .filter(|share| share.signed.signature.signer == self.node_id) + .map(CertificationMessage::CertificationShare) + .collect() + } + }); + + Box::new(iterator) } } @@ -459,8 +486,12 @@ mod tests { #[test] fn test_certification_pool_insert_and_remove() { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { - let mut pool = - CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new()); + let mut pool = CertificationPoolImpl::new( + node_test_id(0), + pool_config, + no_op_logger(), + MetricsRegistry::new(), + ); let share1 = fake_share(1, 0); let id1 = msg_to_id(&share1); let share2 = fake_share(2, 1); @@ -512,8 +543,12 @@ mod tests { #[test] fn test_certification_pool_add_to_validated() { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { - let mut pool = - CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new()); + let mut pool = CertificationPoolImpl::new( + node_test_id(0), + pool_config, + no_op_logger(), + MetricsRegistry::new(), + ); let share_msg = fake_share(7, 0); let cert_msg = fake_cert(8); let result = pool.apply_changes( @@ -540,8 +575,12 @@ mod tests { #[test] fn test_certification_pool_move_to_validated() { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { - let mut pool = - CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new()); + let mut pool = CertificationPoolImpl::new( + node_test_id(0), + pool_config, + no_op_logger(), + MetricsRegistry::new(), + ); let share_msg = fake_share(10, 10); let cert_msg = fake_cert(20); pool.insert(to_unvalidated(share_msg.clone())); @@ -582,8 +621,12 @@ mod tests { #[test] fn test_certification_pool_remove_all() { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { - let mut pool = - CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new()); + let mut pool = CertificationPoolImpl::new( + node_test_id(0), + pool_config, + no_op_logger(), + MetricsRegistry::new(), + ); let share_msg = fake_share(10, 10); let cert_msg = fake_cert(10); pool.insert(to_unvalidated(share_msg.clone())); @@ -651,8 +694,12 @@ mod tests { #[test] fn test_certification_pool_handle_invalid() { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { - let mut pool = - CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new()); + let mut pool = CertificationPoolImpl::new( + node_test_id(0), + pool_config, + no_op_logger(), + MetricsRegistry::new(), + ); let share_msg = fake_share(10, 10); pool.insert(to_unvalidated(share_msg.clone())); @@ -679,4 +726,69 @@ mod tests { assert!(!result.poll_immediately); }); } + + #[test] + fn test_get_all_validated_by_filter() { + ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { + let node = node_test_id(3); + let mut pool = CertificationPoolImpl::new( + node, + pool_config, + no_op_logger(), + MetricsRegistry::new(), + ); + + let height_offset = 5_000_000_000; + let filter = CertificationMessageFilter { + height: Height::from(height_offset + 10), + }; + + // Create shares from 5 nodes for 20 heights, only add an aggregate on even heights. + let mut messages = Vec::new(); + for h in 1..=20 { + for i in 1..=5 { + messages.push(ChangeAction::AddToValidated(fake_share( + height_offset + h, + i, + ))); + } + if h % 2 == 0 { + messages.push(ChangeAction::AddToValidated(fake_cert(height_offset + h))); + } + } + + pool.apply_changes(&SysTimeSource::new(), messages); + + let get_signer = |m: &CertificationMessage| match m { + CertificationMessage::CertificationShare(x) => x.signed.signature.signer, + _ => panic!("No signer for aggregate artifacts"), + }; + + let mut heights = HashSet::new(); + pool.get_all_validated_by_filter(&filter).for_each(|m| { + assert!(m.height() >= filter.height); + if m.height().get() % 2 == 0 { + assert!(!m.is_share()); + } + if m.height().get() % 2 != 0 { + assert!(m.is_share()); + } + if m.is_share() { + assert_eq!(get_signer(&m), node); + } + assert!(heights.insert(m.height())); + }); + assert_eq!(heights.len(), 10); + + let min_filter = CertificationMessageFilter { + height: Height::from(u64::MIN), + }; + assert_eq!(pool.get_all_validated_by_filter(&min_filter).count(), 20); + + let max_filter = CertificationMessageFilter { + height: Height::from(u64::MAX), + }; + assert_eq!(pool.get_all_validated_by_filter(&max_filter).count(), 0); + }); + } } diff --git a/rs/artifact_pool/src/consensus_pool.rs b/rs/artifact_pool/src/consensus_pool.rs index 20b83c92d1b..5aa74510bc3 100644 --- a/rs/artifact_pool/src/consensus_pool.rs +++ b/rs/artifact_pool/src/consensus_pool.rs @@ -20,6 +20,7 @@ use ic_interfaces::{ use ic_logger::{warn, ReplicaLogger}; use ic_metrics::buckets::linear_buckets; use ic_protobuf::types::v1 as pb; +use ic_types::NodeId; use ic_types::{ artifact::ArtifactKind, artifact::ConsensusMessageFilter, artifact::ConsensusMessageId, artifact_kind::ConsensusArtifact, consensus::*, Height, SubnetId, Time, @@ -248,6 +249,7 @@ impl PoolMetrics { } pub struct ConsensusPoolImpl { + node_id: NodeId, validated: Box, unvalidated: Box + Send + Sync>, validated_metrics: PoolMetrics, @@ -362,6 +364,7 @@ impl ConsensusPoolImpl { /// the validated pool, the one that is greater (with respect to /// height and registry version) will be used. pub fn new( + node_id: NodeId, subnet_id: SubnetId, cup_proto: pb::CatchUpPackage, config: ArtifactPoolConfig, @@ -370,7 +373,7 @@ impl ConsensusPoolImpl { ) -> ConsensusPoolImpl { let mut pool = UncachedConsensusPoolImpl::new(config.clone(), log.clone()); Self::init_genesis(cup_proto, pool.validated.as_mut()); - let mut pool = Self::from_uncached(pool, registry.clone(), log.clone()); + let mut pool = Self::from_uncached(node_id, pool, registry.clone(), log.clone()); // If the back up directory is set, instantiate the backup component // and create a subdirectory with the subnet id as directory name. pool.backup = config.backup_config.map(|config| { @@ -419,12 +422,14 @@ impl ConsensusPoolImpl { /// Can be used to instantiate an empty pool without a CUP. pub fn from_uncached( + node_id: NodeId, uncached: UncachedConsensusPoolImpl, registry: ic_metrics::MetricsRegistry, log: ReplicaLogger, ) -> ConsensusPoolImpl { let cache = Arc::new(ConsensusCacheImpl::new(&uncached)); ConsensusPoolImpl { + node_id, validated: uncached.validated, unvalidated: uncached.unvalidated, invalidated_artifacts: registry.int_counter( @@ -440,13 +445,21 @@ impl ConsensusPoolImpl { } pub fn new_from_cup_without_bytes( + node_id: NodeId, subnet_id: SubnetId, catch_up_package: CatchUpPackage, config: ArtifactPoolConfig, registry: ic_metrics::MetricsRegistry, log: ReplicaLogger, ) -> ConsensusPoolImpl { - Self::new(subnet_id, (&catch_up_package).into(), config, registry, log) + Self::new( + node_id, + subnet_id, + (&catch_up_package).into(), + config, + registry, + log, + ) } /// Get a copy of ConsensusPoolCache. @@ -715,12 +728,13 @@ impl ValidatedPoolReader for ConsensusPoolImpl { self.validated.get(id) } - // Return an iterator of all artifacts that is required to make progress + // Return an iterator of all artifacts that are required to make progress // above the given height filter. fn get_all_validated_by_filter( &self, filter: &ConsensusMessageFilter, ) -> Box + '_> { + let node_id = self.node_id; let max_catch_up_height = self .validated .catch_up_package() @@ -730,7 +744,14 @@ impl ValidatedPoolReader for ConsensusPoolImpl { // Since random beacon of previous height is required, min_random_beacon_height // should be one less than the normal min height. let min_random_beacon_height = max_catch_up_height.max(filter.height); - let min = min_random_beacon_height.increment(); + // In case we received a filter of u64::MAX, don't overflow. + let Some(min) = min_random_beacon_height + .get() + .checked_add(1) + .map(Height::from) + else { + return Box::new(std::iter::empty()); + }; let max_finalized_height = self .validated .finalization() @@ -802,23 +823,19 @@ impl ValidatedPoolReader for ConsensusPoolImpl { ..=max_random_tape_height .max(max_random_tape_share_height) .get(); - let random_tapes = tape_range - .clone() - .map(move |h| self.validated.random_tape().get_by_height(Height::from(h))); - let random_tape_shares = tape_range.map(move |h| { - self.validated - .random_tape_share() - .get_by_height(Height::from(h)) + let random_tape_iterator = tape_range.map(Height::from).flat_map(move |h| { + let mut tapes = self.validated.random_tape().get_by_height(h); + if let Some(tape) = tapes.next() { + vec![tape.into_message()] + } else { + self.validated + .random_tape_share() + .get_by_height(h) + .filter(|x| x.signature.signer == node_id) + .map(|x| x.into_message()) + .collect() + } }); - let random_tape_iterator = - random_tapes - .zip(random_tape_shares) - .flat_map(|(mut tape, shares)| { - tape.next().map_or_else( - || shares.map(|x| x.into_message()).collect::>(), - |x| vec![x.into_message()], - ) - }); Box::new( self.validated @@ -844,6 +861,7 @@ impl ValidatedPoolReader for ConsensusPoolImpl { min: min_finalized_share_height, max: max_finalized_share_height, }) + .filter(move |x| x.signature.signer == node_id) .map(|x| x.into_message()), ) .chain( @@ -862,6 +880,7 @@ impl ValidatedPoolReader for ConsensusPoolImpl { min: min_notarization_share_height, max: max_notarization_share_height, }) + .filter(move |x| x.signature.signer == node_id) .map(|x| x.into_message()), ) .chain( @@ -880,6 +899,7 @@ impl ValidatedPoolReader for ConsensusPoolImpl { min: min_random_beacon_share_height, max: max_random_beacon_share_height, }) + .filter(move |x| x.signature.signer == node_id) .map(|x| x.into_message()), ) .chain( @@ -928,8 +948,8 @@ mod tests { use ic_types::{ batch::ValidationContext, consensus::{BlockProposal, RandomBeacon}, - crypto::{CryptoHash, CryptoHashOf}, - RegistryVersion, + crypto::{crypto_hash, CryptoHash, CryptoHashOf}, + RegistryVersion, ReplicaVersion, }; use prost::Message; use std::{collections::HashMap, convert::TryFrom, fs, io::Read, path::Path, sync::RwLock}; @@ -940,6 +960,7 @@ mod tests { let time_source = FastForwardTimeSource::new(); let time_0 = time_source.get_relative_time(); let mut pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_test_id(0), subnet_test_id(0), make_genesis(ic_types::consensus::dkg::Summary::fake()), pool_config, @@ -997,6 +1018,7 @@ mod tests { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { let time_source = FastForwardTimeSource::new(); let mut pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_test_id(0), subnet_test_id(0), make_genesis(ic_types::consensus::dkg::Summary::fake()), pool_config, @@ -1073,6 +1095,7 @@ mod tests { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { let time_source = FastForwardTimeSource::new(); let mut pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_test_id(0), subnet_test_id(0), make_genesis(ic_types::consensus::dkg::Summary::fake()), pool_config, @@ -1137,6 +1160,7 @@ mod tests { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { let time_source = FastForwardTimeSource::new(); let mut pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_test_id(0), subnet_test_id(0), make_genesis(ic_types::consensus::dkg::Summary::fake()), pool_config, @@ -1162,11 +1186,192 @@ mod tests { }); } + #[test] + fn test_get_all_validated_by_filter() { + ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { + let time_source = FastForwardTimeSource::new(); + let node = node_test_id(3); + let mut pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node, + subnet_test_id(0), + make_genesis(ic_types::consensus::dkg::Summary::fake()), + pool_config, + ic_metrics::MetricsRegistry::new(), + no_op_logger(), + ); + + let height_offset = 5_000_000_000; + let filter = ConsensusMessageFilter { + height: Height::from(height_offset + 10), + }; + + let fake_block = |height: Height| { + Block::new( + CryptoHashOf::from(CryptoHash(vec![])), + Payload::new( + ic_types::crypto::crypto_hash, + (ic_types::consensus::dkg::Summary::fake(), None).into(), + ), + Height::from(height), + Rank(0), + ValidationContext { + registry_version: RegistryVersion::from(99), + certified_height: Height::from(42), + time: mock_time(), + }, + ) + }; + + let fake_proposal = |height: Height, node_id: NodeId| { + BlockProposal::fake(fake_block(height), node_id) + .into_message() + .into_message() + }; + + let fake_finalization = |height: Height| { + Finalization::fake(FinalizationContent { + version: ReplicaVersion::default(), + height, + block: crypto_hash(&fake_block(height)), + }) + .into_message() + }; + + let fake_finalization_share = |height: Height, node_id: NodeId| { + FinalizationShare::fake(&fake_block(height), node_id).into_message() + }; + + let fake_notarization = |height: Height| { + Notarization::fake(NotarizationContent { + version: ReplicaVersion::default(), + height, + block: crypto_hash(&fake_block(height)), + }) + .into_message() + }; + + let fake_notarization_share = |height: Height, node_id: NodeId| { + NotarizationShare::fake(&fake_block(height), node_id).into_message() + }; + + let fake_beacon = |height: Height| { + RandomBeacon::fake(RandomBeaconContent { + version: ReplicaVersion::default(), + height, + parent: CryptoHashOf::from(CryptoHash(vec![])), + }) + .into_message() + }; + + let fake_beacon_share = |height: Height, node_id: NodeId| { + RandomBeaconShare::fake( + &fake_beacon(height.decrement()).try_into().unwrap(), + node_id, + ) + .into_message() + }; + + let fake_tape = |height: Height| { + RandomTape::fake(RandomTapeContent { + version: ReplicaVersion::default(), + height, + }) + .into_message() + }; + + let fake_tape_share = |height: Height, node_id: NodeId| { + RandomTapeShare::fake(height, node_id).into_message() + }; + + // Create shares from 5 nodes for 20 heights, only add aggregates below height 15. + let mut messages = Vec::new(); + for h in 1..=20 { + let height = Height::from(height_offset + h); + for i in 1..=5 { + let node_id = node_test_id(i); + messages.extend([ + fake_proposal(height, node_id), + fake_finalization_share(height, node_id), + fake_notarization_share(height, node_id), + fake_beacon_share(height, node_id), + fake_tape_share(height, node_id), + ]); + } + if h <= 15 { + messages.extend([ + fake_finalization(height), + fake_notarization(height), + fake_beacon(height), + fake_tape(height), + ]); + } + } + messages.push( + CatchUpPackage::fake(CatchUpContent::new( + HashedBlock::new(crypto_hash, fake_block(Height::from(height_offset))), + HashedRandomBeacon::new( + crypto_hash, + RandomBeacon::fake(RandomBeaconContent { + version: ReplicaVersion::default(), + height: Height::from(height_offset), + parent: CryptoHashOf::from(CryptoHash(vec![])), + }), + ), + CryptoHashOf::from(CryptoHash(vec![])), + )) + .into_message(), + ); + + pool.apply_changes( + time_source.as_ref(), + messages + .into_iter() + .map(|m| ChangeAction::AddToValidated(m)) + .collect(), + ); + + let get_signer = |m: &ConsensusMessage| match m { + ConsensusMessage::RandomBeaconShare(x) => x.signature.signer, + ConsensusMessage::RandomTapeShare(x) => x.signature.signer, + ConsensusMessage::NotarizationShare(x) => x.signature.signer, + ConsensusMessage::FinalizationShare(x) => x.signature.signer, + ConsensusMessage::CatchUpPackageShare(x) => x.signature.signer, + _ => panic!("No signer for aggregate artifacts"), + }; + + pool.get_all_validated_by_filter(&filter).for_each(|m| { + assert!(m.height() >= filter.height); + if m.height().get() <= height_offset + 15 { + assert!(!m.is_share()); + } + if m.is_share() { + assert_eq!(get_signer(&m), node); + } + }); + + let min_filter = ConsensusMessageFilter { + height: Height::from(u64::MIN), + }; + + assert_eq!( + pool.get_all_validated_by_filter(&min_filter).count(), + // 1 CUP, 15 heights of aggregates, 5 heights of shares, 20 heights of proposals + 1 + 15 * 4 + 5 * 4 + 20 * 5 + ); + + let max_filter = ConsensusMessageFilter { + height: Height::from(u64::MAX), + }; + assert_eq!(pool.get_all_validated_by_filter(&max_filter).count(), 0); + }); + } + #[test] fn test_metrics() { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { let time_source = FastForwardTimeSource::new(); let mut pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_test_id(0), subnet_test_id(0), make_genesis(ic_types::consensus::dkg::Summary::fake()), pool_config, @@ -1316,6 +1521,7 @@ mod tests { .join(subnet_id.to_string()) .join(ic_types::ReplicaVersion::default().to_string()); let mut pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_test_id(0), subnet_id, make_genesis(ic_types::consensus::dkg::Summary::fake()), pool_config, @@ -1664,6 +1870,7 @@ mod tests { let subnet_id = subnet_test_id(0); let path = backup_dir.path().join(format!("{:?}", subnet_id)); let mut pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_test_id(0), subnet_id, make_genesis(ic_types::consensus::dkg::Summary::fake()), pool_config, @@ -1859,6 +2066,7 @@ mod tests { let state_manager = FakeStateManager::new(); let state_manager = Arc::new(state_manager); let mut pool = TestConsensusPool::new( + node_test_id(0), subnet_id, pool_config, time_source, diff --git a/rs/artifact_pool/src/consensus_pool_cache.rs b/rs/artifact_pool/src/consensus_pool_cache.rs index 59f40d1c2a2..2a0608310b7 100644 --- a/rs/artifact_pool/src/consensus_pool_cache.rs +++ b/rs/artifact_pool/src/consensus_pool_cache.rs @@ -512,6 +512,7 @@ mod test { let state_manager = FakeStateManager::new(); let state_manager = Arc::new(state_manager); let mut pool = TestConsensusPool::new( + node_test_id(0), subnet_id, pool_config, time_source, @@ -592,6 +593,7 @@ mod test { )]; let mut pool = TestConsensusPool::new( + node_test_id(0), subnet_test_id(1), pool_config, FastForwardTimeSource::new(), diff --git a/rs/consensus/benches/validate_payload.rs b/rs/consensus/benches/validate_payload.rs index 71d66d35377..11c9b8f8b0d 100644 --- a/rs/consensus/benches/validate_payload.rs +++ b/rs/consensus/benches/validate_payload.rs @@ -105,6 +105,7 @@ where let committee = vec![node_test_id(0)]; let summary = dkg::Summary::fake(); let mut consensus_pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_test_id(0), subnet_test_id(0), make_genesis(summary), pool_config.clone(), diff --git a/rs/consensus/mocks/src/lib.rs b/rs/consensus/mocks/src/lib.rs index 001116cd077..13af128dd22 100644 --- a/rs/consensus/mocks/src/lib.rs +++ b/rs/consensus/mocks/src/lib.rs @@ -107,6 +107,7 @@ pub fn dependencies_with_subnet_records_with_raw_state_manager( log, ))); let pool = TestConsensusPool::new( + replica_config.node_id, subnet_id, pool_config, time_source.clone(), diff --git a/rs/consensus/src/certification/certifier.rs b/rs/consensus/src/certification/certifier.rs index 38e8f596d69..f66f04928c5 100644 --- a/rs/consensus/src/certification/certifier.rs +++ b/rs/consensus/src/certification/certifier.rs @@ -727,6 +727,7 @@ mod tests { add_expectations(state_manager.clone(), 1, 4); let metrics_registry = MetricsRegistry::new(); let mut cert_pool = CertificationPoolImpl::new( + replica_config.node_id, pool_config, ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), @@ -790,6 +791,7 @@ mod tests { add_expectations(state_manager.clone(), 1, 4); let metrics_registry = MetricsRegistry::new(); let mut cert_pool = CertificationPoolImpl::new( + replica_config.node_id, pool_config, ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), @@ -923,6 +925,7 @@ mod tests { add_expectations(state_manager.clone(), 3, 5); let metrics_registry = MetricsRegistry::new(); let mut cert_pool = CertificationPoolImpl::new( + replica_config.node_id, pool_config, ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), @@ -999,6 +1002,7 @@ mod tests { add_expectations(state_manager.clone(), 3, 5); let metrics_registry = MetricsRegistry::new(); let mut cert_pool = CertificationPoolImpl::new( + replica_config.node_id, pool_config, ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), @@ -1067,6 +1071,7 @@ mod tests { add_expectations(state_manager.clone(), 3, 5); let metrics_registry = MetricsRegistry::new(); let cert_pool = CertificationPoolImpl::new( + replica_config.node_id, pool_config, ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), @@ -1137,7 +1142,7 @@ mod tests { add_expectations(state_manager.clone(), 3, 5); let metrics_registry = MetricsRegistry::new(); let certifier = CertifierImpl::new( - replica_config, + replica_config.clone(), membership, crypto, state_manager.clone(), @@ -1146,6 +1151,7 @@ mod tests { log, ); let mut cert_pool = CertificationPoolImpl::new( + replica_config.node_id, pool_config, ic_logger::replica_logger::no_op_logger(), metrics_registry, @@ -1250,6 +1256,7 @@ mod tests { add_expectations(state_manager.clone(), 4, 5); let metrics_registry = MetricsRegistry::new(); let mut cert_pool = CertificationPoolImpl::new( + replica_config.node_id, pool_config, ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), diff --git a/rs/consensus/tests/framework/driver.rs b/rs/consensus/tests/framework/driver.rs index 15d605df20d..2cb5fbc4488 100644 --- a/rs/consensus/tests/framework/driver.rs +++ b/rs/consensus/tests/framework/driver.rs @@ -39,6 +39,7 @@ impl<'a> ConsensusDriver<'a> { ) -> ConsensusDriver<'a> { let ingress_pool = RefCell::new(TestIngressPool::new(node_id, pool_config.clone())); let certification_pool = Arc::new(RwLock::new(CertificationPoolImpl::new( + node_id, pool_config, logger.clone(), metrics_registry, diff --git a/rs/consensus/tests/framework/types.rs b/rs/consensus/tests/framework/types.rs index e75adb4fed2..a0a61c0c020 100644 --- a/rs/consensus/tests/framework/types.rs +++ b/rs/consensus/tests/framework/types.rs @@ -177,6 +177,7 @@ impl ConsensusDependencies { let metrics_registry = MetricsRegistry::new(); let consensus_pool = Arc::new(RwLock::new(ConsensusPoolImpl::new_from_cup_without_bytes( + replica_config.node_id, replica_config.subnet_id, cup, pool_config.clone(), diff --git a/rs/consensus/tests/payload.rs b/rs/consensus/tests/payload.rs index e2c76d99151..037338b080c 100644 --- a/rs/consensus/tests/payload.rs +++ b/rs/consensus/tests/payload.rs @@ -83,11 +83,9 @@ fn consensus_produces_expected_batches() { *router.next_batch_height.write().unwrap() = Height::from(1); // skip genesis block let router = Arc::new(router); + let node_id = node_test_id(0); let subnet_id = subnet_test_id(0); - let replica_config = ReplicaConfig { - node_id: node_test_id(0), - subnet_id, - }; + let replica_config = ReplicaConfig { node_id, subnet_id }; let fake_crypto = CryptoReturningOk::default(); let fake_crypto = Arc::new(fake_crypto); let metrics_registry = MetricsRegistry::new(); @@ -109,6 +107,7 @@ fn consensus_produces_expected_batches() { let summary = dkg::make_genesis_summary(&*registry_client, replica_config.subnet_id, None); let consensus_pool = Arc::new(RwLock::new( consensus_pool::ConsensusPoolImpl::new_from_cup_without_bytes( + node_id, subnet_id, make_genesis(summary), pool_config.clone(), diff --git a/rs/p2p/tests/framework/p2p_runner.rs b/rs/p2p/tests/framework/p2p_runner.rs index a1288e30939..cddc94526e3 100755 --- a/rs/p2p/tests/framework/p2p_runner.rs +++ b/rs/p2p/tests/framework/p2p_runner.rs @@ -107,6 +107,7 @@ fn execute_test( let cup = make_catch_up_package_with_empty_transcript(registry.clone(), subnet_id); let consensus_pool = Arc::new(RwLock::new(ConsensusPoolImpl::new( + node_id, subnet_id, pb::CatchUpPackage::from(&cup), artifact_pool_config.clone(), diff --git a/rs/recovery/src/steps.rs b/rs/recovery/src/steps.rs index 3f339cf0b8b..3d557fdea89 100644 --- a/rs/recovery/src/steps.rs +++ b/rs/recovery/src/steps.rs @@ -12,7 +12,7 @@ use crate::{ IC_JSON5_PATH, IC_REGISTRY_LOCAL_STORE, IC_STATE, IC_STATE_EXCLUDES, NEW_IC_STATE, READONLY, }; use ic_artifact_pool::certification_pool::CertificationPoolImpl; -use ic_base_types::CanisterId; +use ic_base_types::{CanisterId, NodeId, PrincipalId}; use ic_config::artifact_pool::ArtifactPoolConfig; use ic_interfaces::certification::CertificationPool; use ic_metrics::MetricsRegistry; @@ -136,6 +136,7 @@ impl Step for MergeCertificationPoolsStep { .flat_map(|r| r.map_err(|e| warn!(self.logger, "Failed to read dir: {:?}", e))) .map(|dir| { let pool = CertificationPoolImpl::new( + NodeId::from(PrincipalId::new_anonymous()), ArtifactPoolConfig::new(dir.path()), self.logger.clone().into(), MetricsRegistry::new(), @@ -148,6 +149,7 @@ impl Step for MergeCertificationPoolsStep { // Analyze and move full certifications let new_pool = CertificationPoolImpl::new( + NodeId::from(PrincipalId::new_anonymous()), ArtifactPoolConfig::new(self.work_dir.join("data/ic_consensus_pool")), self.logger.clone().into(), MetricsRegistry::new(), @@ -1068,11 +1070,13 @@ mod tests { let tmp = tempfile::tempdir().expect("Could not create a temp dir"); let work_dir = tmp.path().to_path_buf(); let pool1 = CertificationPoolImpl::new( + node_test_id(0), ArtifactPoolConfig::new(work_dir.join("certifications/ip1")), logger.clone().into(), MetricsRegistry::new(), ); let pool2 = CertificationPoolImpl::new( + node_test_id(0), ArtifactPoolConfig::new(work_dir.join("certifications/ip2")), logger.clone().into(), MetricsRegistry::new(), @@ -1132,6 +1136,7 @@ mod tests { step.exec().expect("Failed to execute step."); let new_pool = CertificationPoolImpl::new( + node_test_id(0), ArtifactPoolConfig::new(work_dir.join("data/ic_consensus_pool")), logger.clone().into(), MetricsRegistry::new(), @@ -1192,6 +1197,7 @@ mod tests { step.exec().expect("Failed to execute step."); let new_pool = CertificationPoolImpl::new( + node_test_id(0), ArtifactPoolConfig::new(work_dir.join("data/ic_consensus_pool")), logger.clone().into(), MetricsRegistry::new(), diff --git a/rs/replay/src/player.rs b/rs/replay/src/player.rs index f9f36d537b3..5cc09977323 100644 --- a/rs/replay/src/player.rs +++ b/rs/replay/src/player.rs @@ -172,6 +172,7 @@ impl Player { backup::read_cup_file(&cup_file).expect("CUP of the starting block should be valid"); // This would create a new pool with just the genesis CUP. let pool = ConsensusPoolImpl::new_from_cup_without_bytes( + NodeId::from(PrincipalId::new_anonymous()), subnet_id, initial_cup, artifact_pool_config, @@ -206,6 +207,7 @@ impl Player { // recovery. artifact_pool_config.persistent_pool_read_only = true; let consensus_pool = ConsensusPoolImpl::from_uncached( + NodeId::from(PrincipalId::new_anonymous()), UncachedConsensusPoolImpl::new(artifact_pool_config, log.clone()), MetricsRegistry::new(), log.clone(), @@ -310,6 +312,7 @@ impl Player { )); let certification_pool = consensus_pool.as_ref().map(|_| { CertificationPoolImpl::new( + NodeId::from(PrincipalId::new_anonymous()), ArtifactPoolConfig::from(cfg.artifact_pool.clone()), log.clone(), metrics_registry.clone(), @@ -1344,6 +1347,7 @@ mod tests { fn test_get_share_certified_hashes() { let tmp = tempfile::tempdir().expect("Could not create a temp dir"); let pool = CertificationPoolImpl::new( + node_test_id(0), ArtifactPoolConfig::new(tmp.path().to_path_buf()), no_op_logger(), MetricsRegistry::new(), diff --git a/rs/replay/src/validator.rs b/rs/replay/src/validator.rs index d5a48a1a6b1..2cd0a4d3c30 100644 --- a/rs/replay/src/validator.rs +++ b/rs/replay/src/validator.rs @@ -209,6 +209,7 @@ impl ReplayValidator { // This creates a new pool with just the genesis CUP. let mut pool = ConsensusPoolImpl::new_from_cup_without_bytes( + self.replica_cfg.node_id, self.replica_cfg.subnet_id, cup, artifact_pool_config, diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index 8d79201ca40..3e585f03c5d 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -636,6 +636,7 @@ fn init_artifact_pools( let ecdsa_pool = Arc::new(RwLock::new(ecdsa_pool)); let certification_pool = Arc::new(RwLock::new(CertificationPoolImpl::new( + node_id, config, log.clone(), registry.clone(), diff --git a/rs/replica/src/setup_ic_stack.rs b/rs/replica/src/setup_ic_stack.rs index 7109ea2de28..89aae1892b1 100755 --- a/rs/replica/src/setup_ic_stack.rs +++ b/rs/replica/src/setup_ic_stack.rs @@ -124,6 +124,7 @@ pub fn construct_ic_stack( ); let consensus_pool = Arc::new(RwLock::new(ConsensusPoolImpl::new( + node_id, subnet_id, pb::CatchUpPackage::from(&catch_up_package), artifact_pool_config.clone(), diff --git a/rs/test_utilities/artifact_pool/src/consensus_pool.rs b/rs/test_utilities/artifact_pool/src/consensus_pool.rs index d2684d65ccf..fdae33f2511 100644 --- a/rs/test_utilities/artifact_pool/src/consensus_pool.rs +++ b/rs/test_utilities/artifact_pool/src/consensus_pool.rs @@ -159,6 +159,7 @@ impl TestConsensusPool { /// Creates a new test pool. `registry_version_for_genesis` is used to /// create the genesis block with data from the provided registry. pub fn new( + node_id: NodeId, subnet_id: SubnetId, pool_config: ArtifactPoolConfig, time_source: Arc, @@ -183,6 +184,7 @@ impl TestConsensusPool { )); let summary = ic_consensus::dkg::make_genesis_summary(&*registry_client, subnet_id, None); let pool = ConsensusPoolImpl::new_from_cup_without_bytes( + node_id, subnet_id, ic_test_utilities::consensus::make_genesis(summary), pool_config,