Skip to content

Commit

Permalink
chore(consensus): when deleting an artifact from an LMDB pool, use `c…
Browse files Browse the repository at this point in the history
…ursor::get(last)` instead of `cursor::iter::last` to get the last element in the database.
  • Loading branch information
kpop-dfinity committed Feb 21, 2024
1 parent 8e3b66b commit 7512e73
Showing 1 changed file with 124 additions and 49 deletions.
173 changes: 124 additions & 49 deletions rs/artifact_pool/src/lmdb_pool.rs
Expand Up @@ -597,20 +597,9 @@ impl<Artifact: PoolArtifact> PersistentHeightIndexedPool<Artifact> {
let index_db = self.get_index_db(&key.type_key);
tx.del(index_db, &key.height_key, Some(&key.id_key.0))?;

let min_height;
let max_height;
{
let mut cursor = tx.open_ro_cursor(index_db)?;
let mut iter = cursor.iter_start();
min_height = iter
.next()
.transpose()?
.map(|(key, _)| HeightKey::from(key));
max_height = iter
.last()
.transpose()?
.map(|(key, _)| HeightKey::from(key));
}
let min_height = tx_get_key(tx, index_db, GetOp::First)?;
let max_height = tx_get_key(tx, index_db, GetOp::Last)?;

match (min_height, max_height) {
(Some(min), Some(max)) => self.update_meta(tx, &key.type_key, &Meta { min, max }),
(Some(min), None) => self.update_meta(tx, &key.type_key, &Meta { min, max: min }),
Expand All @@ -634,45 +623,40 @@ impl<Artifact: PoolArtifact> PersistentHeightIndexedPool<Artifact> {
if meta.min >= height_key {
return Ok(artifact_ids);
}

let index_db = self.get_index_db(&type_key);
{
let mut cursor = tx.open_rw_cursor(index_db)?;
loop {
match cursor.iter().next().transpose()? {
None => break,
Some((key, id)) => {
if HeightKey::from(key) >= height_key {
break;
}
artifact_ids.push(ArtifactKey {
type_key,
height_key: HeightKey::from(key),
id_key: IdKey::from(id),
});
cursor.del(WriteFlags::empty())?;
}

while let Some((key, id)) = cursor.iter().next().transpose()? {
if HeightKey::from(key) >= height_key {
break;
}
artifact_ids.push(ArtifactKey {
type_key,
height_key: HeightKey::from(key),
id_key: IdKey::from(id),
});
cursor.del(WriteFlags::empty())?;
}
}

// update meta
let meta = if meta.max <= height_key {
None
} else {
let mut cursor = tx.open_rw_cursor(index_db)?;
cursor
.iter_start()
.next()
.transpose()?
.map(|(key, _)| Meta {
min: HeightKey::from(key),
max: meta.max,
})
tx_get_key(tx, index_db, GetOp::First)?.map(|key| Meta {
min: key,
max: meta.max,
})
};

match meta {
None => tx.del(self.meta, &type_key, None)?,
Some(meta) => self.update_meta(tx, &type_key, &meta)?,
}
}

Ok(artifact_ids)
}

Expand All @@ -688,21 +672,18 @@ impl<Artifact: PoolArtifact> PersistentHeightIndexedPool<Artifact> {
for &type_key in Artifact::TYPE_KEYS {
purged.append(&mut self.tx_purge_index_below(tx, type_key, height_key)?);
}

// delete from artifacts table
let mut cursor = tx.open_rw_cursor(self.artifacts)?;
let height = Height::from(height_key);
loop {
match cursor.iter().next().transpose()? {
None => break,
Some((key, _)) => {
let id_key = IdKey::from(key);
if id_key.height() >= height {
break;
}
cursor.del(WriteFlags::empty())?;
}

while let Some((key, _)) = cursor.iter().next().transpose()? {
if IdKey::from(key).height() >= height {
break;
}
cursor.del(WriteFlags::empty())?;
}

Ok(purged)
}

Expand All @@ -728,6 +709,35 @@ impl<Artifact: PoolArtifact> PersistentHeightIndexedPool<Artifact> {
}
}

#[derive(Copy, Clone)]
enum GetOp {
First,
Last,
}

impl From<GetOp> for c_uint {
fn from(op: GetOp) -> Self {
match op {
GetOp::First => lmdb_sys::MDB_FIRST,
GetOp::Last => lmdb_sys::MDB_LAST,
}
}
}

/// Retrieves the first or the last key from the database.
fn tx_get_key(
tx: &impl Transaction,
database: Database,
op: GetOp,
) -> lmdb::Result<Option<HeightKey>> {
let cursor = tx.open_ro_cursor(database)?;
match cursor.get(/*key=*/ None, /*data=*/ None, op.into()) {
Ok((key, _value)) => Ok(key.map(HeightKey::from)),
Err(lmdb::Error::NotFound) => Ok(None),
Err(err) => Err(err),
}
}

impl InitializablePoolSection for PersistentHeightIndexedPool<ConsensusMessage> {
/// Insert a cup with the original bytes from which that cup was received.
fn insert_cup_with_proto(&self, cup_proto: pb::CatchUpPackage) {
Expand Down Expand Up @@ -2138,8 +2148,8 @@ mod tests {
}

#[test]
fn remove_test() {
run_persistent_pool_test("remove_test", |config, log| {
fn remove_block_proposals_consistency_test() {
run_persistent_pool_test("remove_block_proposals_consistency_test", |config, log| {
let mut pool = PersistentHeightIndexedPool::new_consensus_pool(
config, /*read_only=*/ false, log,
);
Expand Down Expand Up @@ -2179,6 +2189,71 @@ mod tests {
});
}

#[test]
fn remove_block_proposals_bounds_test() {
run_persistent_pool_test("remove_block_proposals_bounds_test", |config, log| {
let mut pool = PersistentHeightIndexedPool::new_consensus_pool(
config, /*read_only=*/ false, log,
);
let block_proposal_1_0 = validated_block_proposal(Height::new(1), Rank(0));
let block_proposal_1_1 = validated_block_proposal(Height::new(1), Rank(1));
let block_proposal_2_1 = validated_block_proposal(Height::new(2), Rank(1));
let block_proposal_3_1 = validated_block_proposal(Height::new(3), Rank(1));

let mut ops = PoolSectionOps::new();
ops.insert(block_proposal_1_0.clone());
ops.insert(block_proposal_1_1.clone());
ops.insert(block_proposal_2_1.clone());
ops.insert(block_proposal_3_1.clone());
pool.mutate(ops);

// Remove a block at Height 1 - the height bounds shouldn't change
let removal_ops = PoolSectionOps {
ops: vec![PoolSectionOp::Remove(block_proposal_1_0.msg.get_id())],
};

pool.mutate(removal_ops);

assert_eq!(
pool.block_proposal().height_range(),
Some(HeightRange::new(Height::new(1), Height::new(3))),
);

// Remove a block at Height 3 - the upper bound should change
let removal_ops = PoolSectionOps {
ops: vec![PoolSectionOp::Remove(block_proposal_3_1.msg.get_id())],
};

pool.mutate(removal_ops);

assert_eq!(
pool.block_proposal().height_range(),
Some(HeightRange::new(Height::new(1), Height::new(2))),
);

// Remove a block at Height 1 - the lower bound should change
let removal_ops = PoolSectionOps {
ops: vec![PoolSectionOp::Remove(block_proposal_1_1.msg.get_id())],
};

pool.mutate(removal_ops);

assert_eq!(
pool.block_proposal().height_range(),
Some(HeightRange::new(Height::new(2), Height::new(2))),
);

// Remove the remaining block, at Height 2 - `height_range` should be `None`
let removal_ops = PoolSectionOps {
ops: vec![PoolSectionOp::Remove(block_proposal_2_1.msg.get_id())],
};

pool.mutate(removal_ops);

assert!(pool.block_proposal().height_range().is_none(),);
});
}

#[test]
fn test_purge_survives_reboot() {
run_persistent_pool_test("test_purge_survives_reboot", |config, log| {
Expand Down

0 comments on commit 7512e73

Please sign in to comment.