Skip to content

Commit

Permalink
fix(consensus): [CON-1173] fix assert_consistency test utility func…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
kpop-dfinity committed Feb 1, 2024
1 parent dcfc4c2 commit 0794a08
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 46 deletions.
157 changes: 125 additions & 32 deletions rs/artifact_pool/src/lmdb_pool.rs
Expand Up @@ -971,19 +971,7 @@ impl PoolArtifact for ConsensusMessage {
where
<T as TryFrom<Self>>::Error: Debug,
{
let bytes = tx.get(artifacts, &key)?;
let protobuf = log_err!(
pb::ValidatedConsensusArtifact::decode(bytes),
log,
"ConsensusArtifact::load_as protobuf decoding"
)
.ok_or(lmdb::Error::Panic)?;
let artifact: ValidatedConsensusArtifact = log_err!(
protobuf.try_into(),
log,
"ConsensusArtifact::load_as protobuf conversion"
)
.ok_or(lmdb::Error::Panic)?;
let artifact = tx_get_validated_consensus_artifact(key, artifacts, tx, log)?;

let msg = match artifact.msg {
ConsensusMessage::BlockProposal(mut proposal) => {
Expand Down Expand Up @@ -1018,6 +1006,27 @@ impl PoolArtifact for ConsensusMessage {
}
}

fn tx_get_validated_consensus_artifact(
key: &IdKey,
artifacts: Database,
tx: &impl Transaction,
log: &ReplicaLogger,
) -> lmdb::Result<ValidatedConsensusArtifact> {
let bytes = tx.get(artifacts, &key)?;
let protobuf = log_err!(
pb::ValidatedConsensusArtifact::decode(bytes),
log,
"tx_get_validated_consensus_artifact: protobuf decoding"
)
.ok_or(lmdb::Error::Panic)?;
log_err!(
ValidatedConsensusArtifact::try_from(protobuf),
log,
"tx_get_validated_consensus_artifact: protobuf conversion"
)
.ok_or(lmdb::Error::Panic)
}

/// Block payloads are loaded separately on demand.
fn load_block_payload(
db_env: Arc<Environment>,
Expand Down Expand Up @@ -1918,8 +1927,8 @@ mod tests {
use crate::{
consensus_pool::MutablePoolSection,
test_utils::{
fake_random_beacon, finalization_share_ops, notarization_share_ops, random_beacon_ops,
PoolTestHelper,
block_proposal_ops, fake_random_beacon, finalization_share_ops, notarization_share_ops,
random_beacon_ops, PoolTestHelper,
},
};
use ic_test_utilities_logger::with_test_replica_logger;
Expand Down Expand Up @@ -2017,35 +2026,73 @@ mod tests {
fn test_purge_survives_reboot() {
run_persistent_pool_test("test_purge_survives_reboot", |config, log| {
// create a pool and purge at height 10
let height10 = Height::from(10);
const PURGE_HEIGHT: Height = Height::new(10);
const RANDOM_BEACONS_INITIAL_MIN_HEIGHT: u64 = 6;
const RANDOM_BEACONS_INITIAL_MAX_HEIGHT: u64 = 18;
const BLOCK_PROPOSALS_INITIAL_MIN_HEIGHT: u64 = 8;
const BLOCK_PROPOSALS_INITIAL_MAX_HEIGHT: u64 = 15;
{
let mut pool = PersistentHeightIndexedPool::new_consensus_pool(
config.clone(),
false,
/*read_only=*/ false,
log.clone(),
);
// insert a few things
let rb_ops = random_beacon_ops();
// random beacons
let rb_ops = random_beacon_ops(
RANDOM_BEACONS_INITIAL_MIN_HEIGHT..=RANDOM_BEACONS_INITIAL_MAX_HEIGHT,
);
pool.mutate(rb_ops.clone());
let iter = pool.random_beacon().get_all();
let msgs_from_pool = iter;
assert_eq!(msgs_from_pool.count(), rb_ops.ops.len());
assert_eq!(pool.random_beacon().size(), rb_ops.ops.len());
// block proposals
let block_proposal_ops = block_proposal_ops(
BLOCK_PROPOSALS_INITIAL_MIN_HEIGHT..=BLOCK_PROPOSALS_INITIAL_MAX_HEIGHT,
);
pool.mutate(block_proposal_ops.clone());
assert_eq!(pool.block_proposal().size(), block_proposal_ops.ops.len());

// purge at height 10
let mut purge_ops = PoolSectionOps::new();
purge_ops.purge_below(height10);
purge_ops.purge_below(PURGE_HEIGHT);
pool.mutate(purge_ops);

// verify that the artifacts have been purged
assert_eq!(
pool.random_beacon().height_range().map(|r| r.min),
Some(height10)
pool.random_beacon().height_range(),
Some(HeightRange {
min: PURGE_HEIGHT,
max: Height::new(RANDOM_BEACONS_INITIAL_MAX_HEIGHT)
})
);
assert_eq!(
pool.block_proposal().height_range(),
Some(HeightRange {
min: PURGE_HEIGHT,
max: Height::new(BLOCK_PROPOSALS_INITIAL_MAX_HEIGHT)
})
);
assert_consistency(&pool);
}
// create the same pool again, check if purge was persisted
{
let pool = PersistentHeightIndexedPool::new_consensus_pool(config, false, log);
let pool = PersistentHeightIndexedPool::new_consensus_pool(
config, /*read_only=*/ false, log,
);
assert_eq!(
pool.random_beacon().height_range().map(|r| r.min),
Some(height10)
pool.random_beacon().height_range(),
Some(HeightRange {
min: PURGE_HEIGHT,
max: Height::from(RANDOM_BEACONS_INITIAL_MAX_HEIGHT)
})
);
assert_eq!(
pool.block_proposal().height_range(),
Some(HeightRange {
min: PURGE_HEIGHT,
max: Height::from(BLOCK_PROPOSALS_INITIAL_MAX_HEIGHT)
})
);
assert_consistency(&pool);
}
});
}
Expand All @@ -2068,7 +2115,7 @@ mod tests {
pool.mutate(fs_ops.clone());
let ns_ops = notarization_share_ops();
pool.mutate(ns_ops.clone());
pool.mutate(random_beacon_ops());
pool.mutate(random_beacon_ops(/*heights=*/ 3..19));
// min height of finalization shares should be less than 10
assert!(pool.finalization_share().height_range().map(|r| r.min) < Some(height10));
// min height of notarization shares should be less than 13
Expand Down Expand Up @@ -2145,7 +2192,7 @@ mod tests {

let tx = pool.db_env.begin_ro_txn().unwrap();
// get all ids from all indices
let mut ids_index = pool
let ids_index = pool
.indices
.iter()
.flat_map(|(_, db)| {
Expand All @@ -2159,10 +2206,43 @@ mod tests {
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
ids_index.sort();

let block_proposal_ids_index = {
let block_proposal_index_db = pool.get_index_db(&BLOCK_PROPOSAL_KEY);
let mut cursor = tx.open_ro_cursor(block_proposal_index_db).unwrap();
cursor
.iter()
.map(|res| {
let (_, id_key) = res.unwrap();
IdKey::from(id_key)
})
.collect::<Vec<_>>()
};

let block_payload_ids_from_block_proposals = block_proposal_ids_index
.iter()
.map(|block_proposal_id| {
let artifact = tx_get_validated_consensus_artifact(
block_proposal_id,
pool.artifacts,
&tx,
&pool.log,
)
.expect("The payload should exist in the artifacts db");

let ConsensusMessage::BlockProposal(proposal) = artifact.msg else {
panic!("Referenced artifact is not a block proposal");
};

let block = proposal.content.as_ref();
let payload_hash = block.payload.get_hash().clone();

IdKey::from((block.height(), payload_hash.get_ref()))
})
.collect::<Vec<_>>();

// get all ids from artifacts db
let ids_artifacts = {
let mut ids_artifacts = {
let mut cursor = tx.open_ro_cursor(pool.artifacts).unwrap();
cursor
.iter()
Expand All @@ -2173,9 +2253,22 @@ mod tests {
.collect::<Vec<_>>()
};
tx.commit().unwrap();
ids_artifacts.sort();

// they should be equal
assert_eq!(ids_index, ids_artifacts);
assert_eq!(
block_proposal_ids_index.len(),
block_payload_ids_from_block_proposals.len()
);
assert_eq!(
ids_index.len() + block_proposal_ids_index.len(),
ids_artifacts.len()
);

let mut all_id_references = [ids_index, block_payload_ids_from_block_proposals].concat();
all_id_references.sort();

assert_eq!(all_id_references, ids_artifacts);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion rs/artifact_pool/src/rocksdb_pool.rs
Expand Up @@ -1312,7 +1312,7 @@ mod tests {
let mut pool =
PersistentHeightIndexedPool::new_consensus_pool(config.clone(), log.clone());
// insert a few things
let rb_ops = random_beacon_ops();
let rb_ops = random_beacon_ops(/*heights=*/ 3..19);
pool.mutate(rb_ops.clone());
let iter = pool.random_beacon().get_all();
let msgs_from_pool = iter;
Expand Down
28 changes: 16 additions & 12 deletions rs/artifact_pool/src/test_utils.rs
Expand Up @@ -129,10 +129,10 @@ where
T: PoolTestHelper,
{
T::run_persistent_pool_test("test_as_height_indexed_pool", |config, log| {
let rb_ops = random_beacon_ops();
let rb_ops = random_beacon_ops(/*heights=*/ 3..19);
let fz_ops = finalization_ops();
let nz_ops = notarization_ops();
let bp_ops = block_proposal_ops();
let bp_ops = block_proposal_ops(/*heights=*/ 1..18);
let rbs_ops = random_beacon_share_ops();
let nzs_ops = notarization_share_ops();
let fzs_ops = finalization_share_ops();
Expand Down Expand Up @@ -266,7 +266,7 @@ where
T::run_persistent_pool_test(
"test_block_proposal_and_payload_correspondence",
|config, log| {
let insert_ops = block_proposal_ops();
let insert_ops = block_proposal_ops(/*heights=*/ 1..18);
let msgs = insert_ops
.ops
.iter()
Expand Down Expand Up @@ -340,7 +340,7 @@ where
T::run_persistent_pool_test(
"test_iterating_while_inserting_doesnt_see_new_updates",
|config, log| {
let rb_ops = random_beacon_ops();
let rb_ops = random_beacon_ops(/*heights=*/ 3..19);
let mut pool = T::new_consensus_pool(config, log);
pool.mutate(rb_ops);
let iter = pool.random_beacon().get_all();
Expand Down Expand Up @@ -377,7 +377,7 @@ where
T: PoolTestHelper,
{
T::run_persistent_pool_test("test_iterator_can_outlive_the_pool", |config, log| {
let rb_ops = random_beacon_ops();
let rb_ops = random_beacon_ops(/*heights=*/ 3..19);
let iter;

// Create a pool in this inner scope, which will be destroyed
Expand Down Expand Up @@ -414,7 +414,7 @@ where
let path = config
.persistent_pool_validated_persistent_db_path()
.clone();
let rb_ops = random_beacon_ops();
let rb_ops = random_beacon_ops(/*heights=*/ 3..19);
{
let mut pool = T::new_consensus_pool(config, log);
pool.mutate(rb_ops);
Expand Down Expand Up @@ -464,10 +464,12 @@ where
}

// Support functions for the tests
pub(crate) fn random_beacon_ops() -> PoolSectionOps<ValidatedConsensusArtifact> {
pub(crate) fn random_beacon_ops(
heights: impl IntoIterator<Item = u64>,
) -> PoolSectionOps<ValidatedConsensusArtifact> {
let mut ops = PoolSectionOps::new();
for i in 3..19 {
let random_beacon = fake_random_beacon(Height::from(i));
for height in heights {
let random_beacon = fake_random_beacon(Height::from(height));
let msg = ConsensusMessage::RandomBeacon(random_beacon);
ops.insert(ValidatedConsensusArtifact {
msg,
Expand All @@ -477,10 +479,12 @@ pub(crate) fn random_beacon_ops() -> PoolSectionOps<ValidatedConsensusArtifact>
ops
}

fn block_proposal_ops() -> PoolSectionOps<ValidatedConsensusArtifact> {
pub(crate) fn block_proposal_ops(
heights: impl IntoIterator<Item = u64>,
) -> PoolSectionOps<ValidatedConsensusArtifact> {
let mut ops = PoolSectionOps::new();
for i in 1..18 {
let block_proposal = fake_block_proposal(Height::from(i));
for height in heights {
let block_proposal = fake_block_proposal(Height::from(height));
let msg = ConsensusMessage::BlockProposal(block_proposal);
ops.insert(ValidatedConsensusArtifact {
msg,
Expand Down
2 changes: 1 addition & 1 deletion rs/interfaces/src/consensus_pool.rs
Expand Up @@ -148,7 +148,7 @@ impl TryFrom<pb::ValidatedConsensusArtifact> for ValidatedConsensusArtifact {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct HeightRange {
pub min: Height,
pub max: Height,
Expand Down

0 comments on commit 0794a08

Please sign in to comment.