Skip to content

Commit

Permalink
fix(consensus): [CON-1128] Make certification lookups more efficient …
Browse files Browse the repository at this point in the history
…(Part 2)
  • Loading branch information
dist1ll committed Jan 20, 2024
1 parent 29314b0 commit 1dce834
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 117 deletions.
233 changes: 130 additions & 103 deletions rs/artifact_pool/src/certification_pool.rs
Expand Up @@ -24,17 +24,17 @@ use ic_types::{
Height,
};
use prometheus::IntCounter;
use std::collections::HashSet;
use std::collections::{BTreeMap, HashSet};

/// Certification pool contains 2 types of artifacts: partial and
/// multi-signatures of (height, hash) pairs, where hash corresponds to an
/// execution state.
pub struct CertificationPoolImpl {
node_id: NodeId,
// Unvalidated shares and certifications are stored separately to improve the validation
// performance by checking for full certifications first.
unvalidated_shares: HeightIndex<CertificationShare>,
unvalidated_certifications: HeightIndex<Certification>,

unvalidated_share_index: HeightIndex<CertificationMessageHash>,
unvalidated_cert_index: HeightIndex<CertificationMessageHash>,
unvalidated: BTreeMap<CertificationMessageHash, CertificationMessage>,

pub persistent_pool: Box<dyn MutablePoolSection + Send + Sync>,

Expand Down Expand Up @@ -79,8 +79,9 @@ impl CertificationPoolImpl {

CertificationPoolImpl {
node_id,
unvalidated_shares: HeightIndex::default(),
unvalidated_certifications: HeightIndex::default(),
unvalidated_share_index: HeightIndex::default(),
unvalidated_cert_index: HeightIndex::default(),
unvalidated: BTreeMap::default(),
persistent_pool,
invalidated_artifacts: metrics_registry.int_counter(
"certification_invalidated_artifacts",
Expand Down Expand Up @@ -120,6 +121,27 @@ impl CertificationPoolImpl {
}
}

/// Removes all unvalidated artifacts below the given height
fn remove_all_unvalidated_below(&mut self, height: Height) {
// remove from unvalidated pool
let range = (
std::ops::Bound::Included(Height::from(0)),
std::ops::Bound::Excluded(height),
);
self.unvalidated_share_index
.range(range)
.chain(self.unvalidated_cert_index.range(range))
.for_each(|(_, ids)| {
for id in ids {
self.unvalidated.remove(id);
}
});

// purge indices
self.unvalidated_share_index.remove_all_below(height);
self.unvalidated_cert_index.remove_all_below(height);
}

fn update_metrics(&self) {
// Validated artifacts metrics
self.validated_pool_metrics
Expand All @@ -135,52 +157,48 @@ impl CertificationPoolImpl {
self.unvalidated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_ARTIFACT_TYPE])
.set(self.unvalidated_certifications.size() as i64);
.set(self.unvalidated_cert_index.size() as i64);
self.unvalidated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE])
.set(self.unvalidated_shares.size() as i64);
.set(self.unvalidated_share_index.size() as i64);
}
}

impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
type ChangeSet = ChangeSet;

fn insert(&mut self, msg: UnvalidatedArtifact<CertificationMessage>) {
let height = msg.message.height();
let label = msg.message.label();
match &msg.message {
CertificationMessage::CertificationShare(share) => {
if self.unvalidated_shares.insert(height, share) {
self.unvalidated_pool_metrics
.received_artifact_bytes
.with_label_values(&[label])
.observe(std::mem::size_of_val(share) as f64);
}
}
CertificationMessage::Certification(cert) => {
if self.unvalidated_certifications.insert(height, cert) {
self.unvalidated_pool_metrics
.received_artifact_bytes
.with_label_values(&[label])
.observe(std::mem::size_of_val(cert) as f64);
}
}
let label = msg.message.label().to_owned();
let hash = CertificationMessageHash::from(&msg.message);
let size = std::mem::size_of_val(&msg.message) as f64;

if match hash {
CertificationMessageHash::Certification(_) => self
.unvalidated_cert_index
.insert(msg.message.height(), &hash),
CertificationMessageHash::CertificationShare(_) => self
.unvalidated_share_index
.insert(msg.message.height(), &hash),
} {
self.unvalidated.insert(hash, msg.message);
self.unvalidated_pool_metrics
.received_artifact_bytes
.with_label_values(&[&label])
.observe(size);
}
}

fn remove(&mut self, id: &CertificationMessageId) {
// TODO(CON-1128): this implementation is inefficient as we compute all hashes
// every time.
match &id.hash {
CertificationMessageHash::Certification(hash) => {
self.unvalidated_certifications
.retain(id.height, |c| *hash != crypto_hash(c));
}
CertificationMessageHash::CertificationShare(hash) => {
self.unvalidated_shares
.retain(id.height, |s| *hash != crypto_hash(s));
}
if match id.hash {
CertificationMessageHash::Certification(_) => self
.unvalidated_cert_index
.retain(id.height, |c| c != &id.hash),
CertificationMessageHash::CertificationShare(_) => self
.unvalidated_share_index
.retain(id.height, |c| c != &id.hash),
} {
self.unvalidated.remove(&id.hash);
}
}

Expand All @@ -203,44 +221,31 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
if !msg.is_share() {
adverts.push(CertificationArtifact::message_to_advert(&msg));
}
let height = msg.height();
let label = msg.label().to_owned();

self.remove(&CertificationMessageId::from(&msg));
self.validated_pool_metrics
.received_artifact_bytes
.with_label_values(&[&label])
.observe(std::mem::size_of_val(&msg) as f64);

match msg {
CertificationMessage::CertificationShare(share) => {
self.unvalidated_shares.remove(height, &share);
self.validated_pool_metrics
.received_artifact_bytes
.with_label_values(&[&label])
.observe(std::mem::size_of_val(&share) as f64);
self.persistent_pool
.insert(CertificationMessage::CertificationShare(share));
}
CertificationMessage::Certification(cert) => {
self.unvalidated_certifications.remove(height, &cert);
self.validated_pool_metrics
.received_artifact_bytes
.with_label_values(&[&label])
.observe(std::mem::size_of_val(&cert) as f64);
self.insert_validated_certification(cert);
}
};
}

ChangeAction::RemoveFromUnvalidated(msg) => {
let height = msg.height();
match msg {
CertificationMessage::CertificationShare(share) => {
self.unvalidated_shares.remove(height, &share)
}
CertificationMessage::Certification(cert) => {
self.unvalidated_certifications.remove(height, &cert)
}
};
self.remove(&CertificationMessageId::from(&msg));
}

ChangeAction::RemoveAllBelow(height) => {
self.unvalidated_shares.remove_all_below(height);
self.unvalidated_certifications.remove_all_below(height);
self.remove_all_unvalidated_below(height);
purged.append(&mut self.persistent_pool.purge_below(height));
}

Expand All @@ -250,15 +255,7 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
self.log,
"Invalid certification message ({:?}): {:?}", reason, msg
);
let height = msg.height();
match msg {
CertificationMessage::CertificationShare(share) => {
self.unvalidated_shares.remove(height, &share);
}
CertificationMessage::Certification(cert) => {
self.unvalidated_certifications.remove(height, &cert);
}
};
self.remove(&CertificationMessageId::from(&msg));
}
});

Expand Down Expand Up @@ -315,22 +312,40 @@ impl CertificationPool for CertificationPoolImpl {
&self,
height: Height,
) -> Box<dyn Iterator<Item = &CertificationShare> + '_> {
self.unvalidated_shares.lookup(height)
Box::new(self.unvalidated_share_index.lookup(height).map(|id| {
let CertificationMessage::CertificationShare(share) = self
.unvalidated
.get(id)
.expect("value must exist if hash exists")
else {
panic!("hash is share, but value is not");
};
share
}))
}

fn unvalidated_certifications_at_height(
&self,
height: Height,
) -> Box<dyn Iterator<Item = &Certification> + '_> {
self.unvalidated_certifications.lookup(height)
Box::new(self.unvalidated_cert_index.lookup(height).map(|id| {
let CertificationMessage::Certification(cert) = self
.unvalidated
.get(id)
.expect("value must exist if hash exists")
else {
panic!("hash is certification, but value is not");
};
cert
}))
}

fn all_heights_with_artifacts(&self) -> Vec<Height> {
let mut heights: Vec<Height> = self
.unvalidated_shares
.unvalidated_share_index
.heights()
.cloned()
.chain(self.unvalidated_certifications.heights().cloned())
.chain(self.unvalidated_cert_index.heights().cloned())
.chain(self.validated_shares().map(|share| share.height))
.chain(
self.validated_certifications()
Expand All @@ -351,22 +366,7 @@ impl CertificationPool for CertificationPoolImpl {

impl ValidatedPoolReader<CertificationArtifact> for CertificationPoolImpl {
fn contains(&self, id: &CertificationMessageId) -> bool {
// TODO(CON-1128): this is a very inefficient implementation as we compute all hashes
// every time.
match &id.hash {
CertificationMessageHash::CertificationShare(hash) => {
self.unvalidated_shares
.lookup(id.height)
.any(|share| &crypto_hash(share) == hash)
|| self.persistent_pool.get(id).is_some()
}
CertificationMessageHash::Certification(hash) => {
self.unvalidated_certifications
.lookup(id.height)
.any(|cert| &crypto_hash(cert) == hash)
|| self.persistent_pool.get(id).is_some()
}
}
self.unvalidated.contains_key(&id.hash) || self.persistent_pool.get(id).is_some()
}

fn get_validated_by_identifier(
Expand Down Expand Up @@ -499,17 +499,7 @@ mod tests {
}

fn msg_to_id(msg: &CertificationMessage) -> CertificationMessageId {
CertificationMessageId {
hash: match msg {
CertificationMessage::Certification(c) => {
CertificationMessageHash::Certification(crypto_hash(c))
}
CertificationMessage::CertificationShare(s) => {
CertificationMessageHash::CertificationShare(crypto_hash(s))
}
},
height: msg.height(),
}
CertificationMessageId::from(msg)
}

fn to_unvalidated(message: CertificationMessage) -> UnvalidatedArtifact<CertificationMessage> {
Expand Down Expand Up @@ -646,6 +636,11 @@ mod tests {
.count(),
0
);
// INVARIANT: The sizes the unvalidated pool and the height index must be equal
assert_eq!(
pool.unvalidated_share_index.size() + pool.unvalidated_cert_index.size(),
pool.unvalidated.values().len()
)
});
}

Expand All @@ -668,8 +663,8 @@ mod tests {
]);
let share_msg = fake_share(10, 30);
let cert_msg = fake_cert(10);
pool.insert(to_unvalidated(share_msg));
pool.insert(to_unvalidated(cert_msg));
pool.insert(to_unvalidated(share_msg.clone()));
pool.insert(to_unvalidated(cert_msg.clone()));

assert_eq!(pool.all_heights_with_artifacts().len(), 1);
assert_eq!(pool.shares_at_height(Height::from(10)).count(), 1);
Expand Down Expand Up @@ -713,6 +708,11 @@ mod tests {
.count(),
0
);
// INVARIANT: The sizes the unvalidated pool and the height index must be equal
assert_eq!(
pool.unvalidated_share_index.size() + pool.unvalidated_cert_index.size(),
pool.unvalidated.values().len()
)
});
}

Expand Down Expand Up @@ -746,6 +746,33 @@ mod tests {

let result = pool.apply_changes(vec![]);
assert!(!result.poll_immediately);
// INVARIANT: The sizes the unvalidated pool and the height index must be equal
assert_eq!(
pool.unvalidated_share_index.size() + pool.unvalidated_cert_index.size(),
pool.unvalidated.values().len()
)
});
}

#[test]
fn test_certification_pool_contains_unvalidated() {
ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
let mut pool = CertificationPoolImpl::new(
node_test_id(0),
pool_config,
no_op_logger(),
MetricsRegistry::new(),
);
let share_msg = fake_share(7, 0);
let cert_msg = fake_cert(8);

assert!(!pool.contains(&CertificationMessageId::from(&share_msg)));
assert!(!pool.contains(&CertificationMessageId::from(&cert_msg)));

pool.insert(to_unvalidated(share_msg.clone()));

assert!(pool.contains(&CertificationMessageId::from(&share_msg)));
assert!(!pool.contains(&CertificationMessageId::from(&cert_msg)));
});
}

Expand Down

2 comments on commit 1dce834

@massimoalbarello
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @dist1ll,

these changes should have already been included in a previous proposal. I didn't notice an in-between proposal that reverted those changes but maybe I just missed it.

What happened there? Why was it added, then removed (I guess) and now added again?

@dist1ll
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@massimoalbarello

There was some mixup in the aggregation of the release notes, so this got accidentally included. I missed removing that commit.

Please sign in to comment.