Skip to content

Commit

Permalink
Merge branch 'eichhorl/share-retransmission' into 'master'
Browse files Browse the repository at this point in the history
feat(consensus): CON-1087 Don't relay shares during retransmissions

For the redesigned P2P-consensus interface, it is important that consensus respects a certain bound on the number of artifacts it advertizes, otherwise we could get stuck. Relaying shares could make consensus violate the bound, as it was calculated assuming no relaying.

While proactive relaying of shares was disabled in https://gitlab.com/dfinity-lab/public/ic/-/merge_requests/12942, they may still be relayed by nodes answering "retransmission requests" of other nodes catching up, or coming back online.

With this MR, shares not signed by the own node are no longer returned from `get_all_validated_by_filter` which excludes them when answering retransmission requests. This requires us to add the own node ID to consensus and certification pools.

**Additionally:**
- Ensure that overflows when incrementing the filter are handled properly.
- For certifications, make sure to always take the maximum between the filter and the lowest artifact available. Otherwise we might try to lookup all artifacts between height 0 and the current height, even if those artifacts don't exist.
- Extend unit test to exercise these edge cases. 

See merge request dfinity-lab/public/ic!15569
  • Loading branch information
eichhorl committed Oct 30, 2023
2 parents 4f22c3d + 111538c commit 2e14994
Show file tree
Hide file tree
Showing 18 changed files with 401 additions and 50 deletions.
1 change: 1 addition & 0 deletions rs/artifact_pool/benches/load_blocks.rs
Expand Up @@ -27,6 +27,7 @@ where
{
ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
let mut consensus_pool = ConsensusPoolImpl::new_from_cup_without_bytes(
node_test_id(0),
subnet_test_id(0),
make_genesis(ic_types::consensus::dkg::Summary::fake()),
pool_config,
Expand Down
4 changes: 3 additions & 1 deletion rs/artifact_pool/src/bin/consensus_pool_util.rs
Expand Up @@ -10,6 +10,7 @@ use ic_metrics::MetricsRegistry;
use ic_types::{
consensus::{certification::CertificationMessage, CatchUpPackage, ConsensusMessageHashable},
time::current_time,
NodeId, PrincipalId,
};
use prost::Message;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -121,7 +122,8 @@ fn open_certification_pool(path: &str, read_only: bool) -> CertificationPoolImpl
let path = PathBuf::from(path);
let mut config = ArtifactPoolConfig::new(path);
config.persistent_pool_read_only = read_only;
CertificationPoolImpl::new(config, log, MetricsRegistry::new())
let node_id = NodeId::from(PrincipalId::new_node_test_id(0));
CertificationPoolImpl::new(node_id, config, log, MetricsRegistry::new())
}

fn from_str<'a, T: Deserialize<'a>>(json: &'a str) -> Result<T, serde_json::Error> {
Expand Down
154 changes: 133 additions & 21 deletions rs/artifact_pool/src/certification_pool.rs
Expand Up @@ -12,6 +12,7 @@ use ic_metrics::MetricsRegistry;
use ic_types::artifact::ArtifactKind;
use ic_types::consensus::IsShare;
use ic_types::crypto::crypto_hash;
use ic_types::NodeId;
use ic_types::{
artifact::CertificationMessageFilter,
artifact::CertificationMessageId,
Expand All @@ -29,6 +30,7 @@ use std::collections::HashSet;
/// multi-signatures of (height, hash) pairs, where hash corresponds to an
/// execution state.
pub struct CertificationPoolImpl {
node_id: NodeId,
// Unvalidated shares and certifications are stored separately to improve the validation
// performance by checking for full certifications first.
unvalidated_shares: HeightIndex<CertificationShare>,
Expand All @@ -47,6 +49,7 @@ const POOL_CERTIFICATION: &str = "certification";

impl CertificationPoolImpl {
pub fn new(
node_id: NodeId,
config: ArtifactPoolConfig,
log: ReplicaLogger,
metrics_registry: MetricsRegistry,
Expand All @@ -73,6 +76,7 @@ impl CertificationPoolImpl {
};

CertificationPoolImpl {
node_id,
unvalidated_shares: HeightIndex::default(),
unvalidated_certifications: HeightIndex::default(),
persistent_pool,
Expand Down Expand Up @@ -362,17 +366,40 @@ impl ValidatedPoolReader<CertificationArtifact> for CertificationPoolImpl {
&self,
filter: &CertificationMessageFilter,
) -> Box<dyn Iterator<Item = CertificationMessage> + '_> {
// Return all validated certifications and all shares above the filter
let min_height = filter.height.get();
let all_certs = self
.validated_certifications()
.filter(move |cert| cert.height > Height::from(min_height))
.map(CertificationMessage::Certification);
let all_shares = self
.validated_shares()
.filter(move |share| share.height > Height::from(min_height))
.map(CertificationMessage::CertificationShare);
Box::new(all_certs.chain(all_shares))
// In case we received a filter of u64::MAX, don't overflow.
let Some(filter) = filter.height.get().checked_add(1).map(Height::from) else {
return Box::new(std::iter::empty());
};

let certification_range = self.persistent_pool.certifications().height_range();
let share_range = self.persistent_pool.certification_shares().height_range();

let ranges = [certification_range.as_ref(), share_range.as_ref()]
.into_iter()
.flatten();
let Some(min_height) = ranges.clone().map(|range| range.min).min() else {
return Box::new(std::iter::empty());
};
let min = min_height.max(filter);
let max = ranges.map(|range| range.max).max().unwrap_or(min);

// For all heights above the minimum, return the validated certification of the subnet,
// or the share signed by this node if we don't have the aggregate.
let iterator = (min.get()..=max.get()).map(Height::from).flat_map(|h| {
let mut certifications = self.persistent_pool.certifications().get_by_height(h);
if let Some(certification) = certifications.next() {
vec![CertificationMessage::Certification(certification)]
} else {
self.persistent_pool
.certification_shares()
.get_by_height(h)
.filter(|share| share.signed.signature.signer == self.node_id)
.map(CertificationMessage::CertificationShare)
.collect()
}
});

Box::new(iterator)
}
}

Expand Down Expand Up @@ -459,8 +486,12 @@ mod tests {
#[test]
fn test_certification_pool_insert_and_remove() {
ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
let mut pool =
CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new());
let mut pool = CertificationPoolImpl::new(
node_test_id(0),
pool_config,
no_op_logger(),
MetricsRegistry::new(),
);
let share1 = fake_share(1, 0);
let id1 = msg_to_id(&share1);
let share2 = fake_share(2, 1);
Expand Down Expand Up @@ -512,8 +543,12 @@ mod tests {
#[test]
fn test_certification_pool_add_to_validated() {
ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
let mut pool =
CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new());
let mut pool = CertificationPoolImpl::new(
node_test_id(0),
pool_config,
no_op_logger(),
MetricsRegistry::new(),
);
let share_msg = fake_share(7, 0);
let cert_msg = fake_cert(8);
let result = pool.apply_changes(
Expand All @@ -540,8 +575,12 @@ mod tests {
#[test]
fn test_certification_pool_move_to_validated() {
ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
let mut pool =
CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new());
let mut pool = CertificationPoolImpl::new(
node_test_id(0),
pool_config,
no_op_logger(),
MetricsRegistry::new(),
);
let share_msg = fake_share(10, 10);
let cert_msg = fake_cert(20);
pool.insert(to_unvalidated(share_msg.clone()));
Expand Down Expand Up @@ -582,8 +621,12 @@ mod tests {
#[test]
fn test_certification_pool_remove_all() {
ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
let mut pool =
CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new());
let mut pool = CertificationPoolImpl::new(
node_test_id(0),
pool_config,
no_op_logger(),
MetricsRegistry::new(),
);
let share_msg = fake_share(10, 10);
let cert_msg = fake_cert(10);
pool.insert(to_unvalidated(share_msg.clone()));
Expand Down Expand Up @@ -651,8 +694,12 @@ mod tests {
#[test]
fn test_certification_pool_handle_invalid() {
ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
let mut pool =
CertificationPoolImpl::new(pool_config, no_op_logger(), MetricsRegistry::new());
let mut pool = CertificationPoolImpl::new(
node_test_id(0),
pool_config,
no_op_logger(),
MetricsRegistry::new(),
);
let share_msg = fake_share(10, 10);
pool.insert(to_unvalidated(share_msg.clone()));

Expand All @@ -679,4 +726,69 @@ mod tests {
assert!(!result.poll_immediately);
});
}

#[test]
fn test_get_all_validated_by_filter() {
ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
let node = node_test_id(3);
let mut pool = CertificationPoolImpl::new(
node,
pool_config,
no_op_logger(),
MetricsRegistry::new(),
);

let height_offset = 5_000_000_000;
let filter = CertificationMessageFilter {
height: Height::from(height_offset + 10),
};

// Create shares from 5 nodes for 20 heights, only add an aggregate on even heights.
let mut messages = Vec::new();
for h in 1..=20 {
for i in 1..=5 {
messages.push(ChangeAction::AddToValidated(fake_share(
height_offset + h,
i,
)));
}
if h % 2 == 0 {
messages.push(ChangeAction::AddToValidated(fake_cert(height_offset + h)));
}
}

pool.apply_changes(&SysTimeSource::new(), messages);

let get_signer = |m: &CertificationMessage| match m {
CertificationMessage::CertificationShare(x) => x.signed.signature.signer,
_ => panic!("No signer for aggregate artifacts"),
};

let mut heights = HashSet::new();
pool.get_all_validated_by_filter(&filter).for_each(|m| {
assert!(m.height() >= filter.height);
if m.height().get() % 2 == 0 {
assert!(!m.is_share());
}
if m.height().get() % 2 != 0 {
assert!(m.is_share());
}
if m.is_share() {
assert_eq!(get_signer(&m), node);
}
assert!(heights.insert(m.height()));
});
assert_eq!(heights.len(), 10);

let min_filter = CertificationMessageFilter {
height: Height::from(u64::MIN),
};
assert_eq!(pool.get_all_validated_by_filter(&min_filter).count(), 20);

let max_filter = CertificationMessageFilter {
height: Height::from(u64::MAX),
};
assert_eq!(pool.get_all_validated_by_filter(&max_filter).count(), 0);
});
}
}

0 comments on commit 2e14994

Please sign in to comment.