From 7512e739e53794125ef21aecfb58827719a20ac1 Mon Sep 17 00:00:00 2001 From: Kamil Popielarz Date: Wed, 21 Feb 2024 11:08:31 +0000 Subject: [PATCH] chore(consensus): when deleting an artifact from an LMDB pool, use `cursor::get(last)` instead of `cursor::iter::last` to get the last element in the database. --- rs/artifact_pool/src/lmdb_pool.rs | 173 +++++++++++++++++++++--------- 1 file changed, 124 insertions(+), 49 deletions(-) diff --git a/rs/artifact_pool/src/lmdb_pool.rs b/rs/artifact_pool/src/lmdb_pool.rs index 1eea096ed4f..ad15db26805 100644 --- a/rs/artifact_pool/src/lmdb_pool.rs +++ b/rs/artifact_pool/src/lmdb_pool.rs @@ -597,20 +597,9 @@ impl PersistentHeightIndexedPool { let index_db = self.get_index_db(&key.type_key); tx.del(index_db, &key.height_key, Some(&key.id_key.0))?; - let min_height; - let max_height; - { - let mut cursor = tx.open_ro_cursor(index_db)?; - let mut iter = cursor.iter_start(); - min_height = iter - .next() - .transpose()? - .map(|(key, _)| HeightKey::from(key)); - max_height = iter - .last() - .transpose()? - .map(|(key, _)| HeightKey::from(key)); - } + let min_height = tx_get_key(tx, index_db, GetOp::First)?; + let max_height = tx_get_key(tx, index_db, GetOp::Last)?; + match (min_height, max_height) { (Some(min), Some(max)) => self.update_meta(tx, &key.type_key, &Meta { min, max }), (Some(min), None) => self.update_meta(tx, &key.type_key, &Meta { min, max: min }), @@ -634,45 +623,40 @@ impl PersistentHeightIndexedPool { if meta.min >= height_key { return Ok(artifact_ids); } + let index_db = self.get_index_db(&type_key); { let mut cursor = tx.open_rw_cursor(index_db)?; - loop { - match cursor.iter().next().transpose()? { - None => break, - Some((key, id)) => { - if HeightKey::from(key) >= height_key { - break; - } - artifact_ids.push(ArtifactKey { - type_key, - height_key: HeightKey::from(key), - id_key: IdKey::from(id), - }); - cursor.del(WriteFlags::empty())?; - } + + while let Some((key, id)) = cursor.iter().next().transpose()? { + if HeightKey::from(key) >= height_key { + break; } + artifact_ids.push(ArtifactKey { + type_key, + height_key: HeightKey::from(key), + id_key: IdKey::from(id), + }); + cursor.del(WriteFlags::empty())?; } } + // update meta let meta = if meta.max <= height_key { None } else { - let mut cursor = tx.open_rw_cursor(index_db)?; - cursor - .iter_start() - .next() - .transpose()? - .map(|(key, _)| Meta { - min: HeightKey::from(key), - max: meta.max, - }) + tx_get_key(tx, index_db, GetOp::First)?.map(|key| Meta { + min: key, + max: meta.max, + }) }; + match meta { None => tx.del(self.meta, &type_key, None)?, Some(meta) => self.update_meta(tx, &type_key, &meta)?, } } + Ok(artifact_ids) } @@ -688,21 +672,18 @@ impl PersistentHeightIndexedPool { for &type_key in Artifact::TYPE_KEYS { purged.append(&mut self.tx_purge_index_below(tx, type_key, height_key)?); } + // delete from artifacts table let mut cursor = tx.open_rw_cursor(self.artifacts)?; let height = Height::from(height_key); - loop { - match cursor.iter().next().transpose()? { - None => break, - Some((key, _)) => { - let id_key = IdKey::from(key); - if id_key.height() >= height { - break; - } - cursor.del(WriteFlags::empty())?; - } + + while let Some((key, _)) = cursor.iter().next().transpose()? { + if IdKey::from(key).height() >= height { + break; } + cursor.del(WriteFlags::empty())?; } + Ok(purged) } @@ -728,6 +709,35 @@ impl PersistentHeightIndexedPool { } } +#[derive(Copy, Clone)] +enum GetOp { + First, + Last, +} + +impl From for c_uint { + fn from(op: GetOp) -> Self { + match op { + GetOp::First => lmdb_sys::MDB_FIRST, + GetOp::Last => lmdb_sys::MDB_LAST, + } + } +} + +/// Retrieves the first or the last key from the database. +fn tx_get_key( + tx: &impl Transaction, + database: Database, + op: GetOp, +) -> lmdb::Result> { + let cursor = tx.open_ro_cursor(database)?; + match cursor.get(/*key=*/ None, /*data=*/ None, op.into()) { + Ok((key, _value)) => Ok(key.map(HeightKey::from)), + Err(lmdb::Error::NotFound) => Ok(None), + Err(err) => Err(err), + } +} + impl InitializablePoolSection for PersistentHeightIndexedPool { /// Insert a cup with the original bytes from which that cup was received. fn insert_cup_with_proto(&self, cup_proto: pb::CatchUpPackage) { @@ -2138,8 +2148,8 @@ mod tests { } #[test] - fn remove_test() { - run_persistent_pool_test("remove_test", |config, log| { + fn remove_block_proposals_consistency_test() { + run_persistent_pool_test("remove_block_proposals_consistency_test", |config, log| { let mut pool = PersistentHeightIndexedPool::new_consensus_pool( config, /*read_only=*/ false, log, ); @@ -2179,6 +2189,71 @@ mod tests { }); } + #[test] + fn remove_block_proposals_bounds_test() { + run_persistent_pool_test("remove_block_proposals_bounds_test", |config, log| { + let mut pool = PersistentHeightIndexedPool::new_consensus_pool( + config, /*read_only=*/ false, log, + ); + let block_proposal_1_0 = validated_block_proposal(Height::new(1), Rank(0)); + let block_proposal_1_1 = validated_block_proposal(Height::new(1), Rank(1)); + let block_proposal_2_1 = validated_block_proposal(Height::new(2), Rank(1)); + let block_proposal_3_1 = validated_block_proposal(Height::new(3), Rank(1)); + + let mut ops = PoolSectionOps::new(); + ops.insert(block_proposal_1_0.clone()); + ops.insert(block_proposal_1_1.clone()); + ops.insert(block_proposal_2_1.clone()); + ops.insert(block_proposal_3_1.clone()); + pool.mutate(ops); + + // Remove a block at Height 1 - the height bounds shouldn't change + let removal_ops = PoolSectionOps { + ops: vec![PoolSectionOp::Remove(block_proposal_1_0.msg.get_id())], + }; + + pool.mutate(removal_ops); + + assert_eq!( + pool.block_proposal().height_range(), + Some(HeightRange::new(Height::new(1), Height::new(3))), + ); + + // Remove a block at Height 3 - the upper bound should change + let removal_ops = PoolSectionOps { + ops: vec![PoolSectionOp::Remove(block_proposal_3_1.msg.get_id())], + }; + + pool.mutate(removal_ops); + + assert_eq!( + pool.block_proposal().height_range(), + Some(HeightRange::new(Height::new(1), Height::new(2))), + ); + + // Remove a block at Height 1 - the lower bound should change + let removal_ops = PoolSectionOps { + ops: vec![PoolSectionOp::Remove(block_proposal_1_1.msg.get_id())], + }; + + pool.mutate(removal_ops); + + assert_eq!( + pool.block_proposal().height_range(), + Some(HeightRange::new(Height::new(2), Height::new(2))), + ); + + // Remove the remaining block, at Height 2 - `height_range` should be `None` + let removal_ops = PoolSectionOps { + ops: vec![PoolSectionOp::Remove(block_proposal_2_1.msg.get_id())], + }; + + pool.mutate(removal_ops); + + assert!(pool.block_proposal().height_range().is_none(),); + }); + } + #[test] fn test_purge_survives_reboot() { run_persistent_pool_test("test_purge_survives_reboot", |config, log| {