Skip to content

Commit

Permalink
Merge #4311
Browse files Browse the repository at this point in the history
4311: Short-circuit initialization of block and deploy metadata DB r=rafal-ch a=rafal-ch

Make both `initialize_block_metadata_db()` and `initialize_deploy_metadata_db()` a no-op if there is no data to be purged on storage initialization.

Additionally, in case purging happens, hash of the purged blocks will be logged, like so:
```
2023-10-04T10:12:48.855232Z DEBUG init{; node_id=tls:7ebd..32a6}: [casper_node::components::storage storage.rs:3005] purged metadata for block 032b..0011
2023-10-04T10:12:48.855243Z DEBUG init{; node_id=tls:7ebd..32a6}: [casper_node::components::storage storage.rs:3005] purged metadata for block 2d5c..5602
```

Co-authored-by: Rafał Chabowski <rafal@casperlabs.io>
  • Loading branch information
casperlabs-bors-ng[bot] and rafal-ch committed Oct 10, 2023
2 parents 478d7f1 + b7d6ae7 commit d9b978b
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 38 deletions.
89 changes: 55 additions & 34 deletions node/src/components/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,23 +1118,9 @@ impl Storage {
StorageRequest::PutFinalitySignature {
signature,
responder,
} => {
let mut txn = self.env.begin_rw_txn()?;
let mut block_signatures = txn
.get_value(self.block_metadata_db, &signature.block_hash)?
.unwrap_or_else(|| {
BlockSignatures::new(signature.block_hash, signature.era_id)
});
block_signatures.insert_proof(signature.public_key, signature.signature);
let outcome = txn.put_value(
self.block_metadata_db,
&block_signatures.block_hash,
&block_signatures,
true,
)?;
txn.commit()?;
responder.respond(outcome).ignore()
}
} => responder
.respond(self.put_finality_signature(signature)?)
.ignore(),
StorageRequest::GetBlockSignature {
block_hash,
public_key,
Expand Down Expand Up @@ -1204,6 +1190,25 @@ impl Storage {
})
}

fn put_finality_signature(
&mut self,
signature: Box<FinalitySignature>,
) -> Result<bool, FatalStorageError> {
let mut txn = self.env.begin_rw_txn()?;
let mut block_signatures = txn
.get_value(self.block_metadata_db, &signature.block_hash)?
.unwrap_or_else(|| BlockSignatures::new(signature.block_hash, signature.era_id));
block_signatures.insert_proof(signature.public_key, signature.signature);
let outcome = txn.put_value(
self.block_metadata_db,
&block_signatures.block_hash,
&block_signatures,
true,
)?;
txn.commit()?;
Ok(outcome)
}

/// Handles a [`BlockCompletedAnnouncement`].
fn handle_mark_block_completed_request(
&mut self,
Expand Down Expand Up @@ -2981,21 +2986,32 @@ fn initialize_block_metadata_db(
block_metadata_db: &Database,
deleted_block_hashes: &HashSet<&[u8]>,
) -> Result<(), FatalStorageError> {
info!("initializing block metadata database");
let mut txn = env.begin_rw_txn()?;
let mut cursor = txn.open_rw_cursor(*block_metadata_db)?;
let block_count_to_be_deleted = deleted_block_hashes.len();
info!(
block_count_to_be_deleted,
"initializing block metadata database"
);

for row in cursor.iter() {
let (raw_key, _) = row?;
if deleted_block_hashes.contains(raw_key) {
cursor.del(WriteFlags::empty())?;
continue;
if !deleted_block_hashes.is_empty() {
let mut txn = env.begin_rw_txn()?;
let mut cursor = txn.open_rw_cursor(*block_metadata_db)?;

for row in cursor.iter() {
let (raw_key, _) = row?;
if deleted_block_hashes.contains(raw_key) {
cursor.del(WriteFlags::empty())?;
let digest = Digest::try_from(raw_key);
debug!(
"purged metadata for block {}",
digest.map_or("<unknown>".to_string(), |digest| digest.to_string())
);
continue;
}
}
drop(cursor);
txn.commit()?;
}

drop(cursor);
txn.commit()?;

info!("block metadata database initialized");
Ok(())
}
Expand All @@ -3006,15 +3022,20 @@ fn initialize_deploy_metadata_db(
deploy_metadata_db: &Database,
deleted_deploy_hashes: &HashSet<DeployHash>,
) -> Result<(), LmdbExtError> {
info!("initializing deploy metadata database");
let deploy_count_to_be_deleted = deleted_deploy_hashes.len();
info!(
deploy_count_to_be_deleted,
"initializing deploy metadata database"
);

let mut txn = env.begin_rw_txn()?;
deleted_deploy_hashes.iter().for_each(|deleted_deploy_hash| {
if !deleted_deploy_hashes.is_empty() {
let mut txn = env.begin_rw_txn()?;
deleted_deploy_hashes.iter().for_each(|deleted_deploy_hash| {
if txn.del(*deploy_metadata_db, deleted_deploy_hash, None).is_err() {
debug!(%deleted_deploy_hash, "not purging from 'deploy_metadata_db' because not existing");
}
});
txn.commit()?;
}});
txn.commit()?;
}

info!("deploy metadata database initialized");
Ok(())
Expand Down
127 changes: 123 additions & 4 deletions node/src/components/storage/tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Unit tests for the storage component.

use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
fs::{self, File},
iter,
iter::{self, FromIterator},
rc::Rc,
sync::Arc,
};
Expand All @@ -18,8 +18,8 @@ use casper_types::{
};

use super::{
move_storage_files_to_network_subdir, should_move_storage_files_to_network_subdir, Config,
Storage,
initialize_block_metadata_db, move_storage_files_to_network_subdir,
should_move_storage_files_to_network_subdir, Config, Storage,
};
use crate::{
components::fetcher::{FetchItem, FetchResponse},
Expand Down Expand Up @@ -1849,3 +1849,122 @@ fn unbonding_purse_serialization_roundtrip() {
// Explicitly assert that the `new_validator` is not `None`
assert!(deserialized.new_validator().is_some())
}

// Clippy complains because there's a `OnceCell` in `FinalitySignature`, hence it should not be used
// as a key in `BTreeSet`. However, we don't change the content of the cell during the course of the
// test so there's no risk the hash or order of keys will change.
#[allow(clippy::mutable_key_type)]
fn assert_signatures(storage: &Storage, block_hash: BlockHash, expected: Vec<FinalitySignature>) {
let mut txn = storage.env.begin_ro_txn().unwrap();
let actual = storage
.get_block_signatures(&mut txn, &block_hash)
.expect("should be able to read signatures");
let actual = actual.map_or(BTreeSet::new(), |signatures| {
signatures.finality_signatures().collect()
});
let expected: BTreeSet<_> = expected.into_iter().collect();
assert_eq!(actual, expected);
}

#[test]
fn should_initialize_block_metadata_db() {
let mut harness = ComponentHarness::default();
let mut storage = storage_fixture(&harness);

let block_1 = Block::random(&mut harness.rng);
let fs_1_1 =
FinalitySignature::random_for_block(*block_1.hash(), block_1.header().era_id().into());
let fs_1_2 =
FinalitySignature::random_for_block(*block_1.hash(), block_1.header().era_id().into());

let block_2 = Block::random(&mut harness.rng);
let fs_2_1 =
FinalitySignature::random_for_block(*block_2.hash(), block_2.header().era_id().into());
let fs_2_2 =
FinalitySignature::random_for_block(*block_2.hash(), block_2.header().era_id().into());

let block_3 = Block::random(&mut harness.rng);
let fs_3_1 =
FinalitySignature::random_for_block(*block_3.hash(), block_3.header().era_id().into());
let fs_3_2 =
FinalitySignature::random_for_block(*block_3.hash(), block_3.header().era_id().into());

let block_4 = Block::random(&mut harness.rng);

let _ = storage.put_finality_signature(Box::new(fs_1_1.clone()));
let _ = storage.put_finality_signature(Box::new(fs_1_2.clone()));
let _ = storage.put_finality_signature(Box::new(fs_2_1.clone()));
let _ = storage.put_finality_signature(Box::new(fs_2_2.clone()));
let _ = storage.put_finality_signature(Box::new(fs_3_1.clone()));
let _ = storage.put_finality_signature(Box::new(fs_3_2.clone()));

assert_signatures(
&storage,
*block_1.hash(),
vec![fs_1_1.clone(), fs_1_2.clone()],
);
assert_signatures(
&storage,
*block_2.hash(),
vec![fs_2_1.clone(), fs_2_2.clone()],
);
assert_signatures(
&storage,
*block_3.hash(),
vec![fs_3_1.clone(), fs_3_2.clone()],
);
assert_signatures(&storage, *block_4.hash(), vec![]);

// Purging empty set of blocks should not change state.
let to_be_purged = HashSet::new();
let _ = initialize_block_metadata_db(&storage.env, &storage.block_metadata_db, &to_be_purged);
assert_signatures(&storage, *block_1.hash(), vec![fs_1_1, fs_1_2]);
assert_signatures(
&storage,
*block_2.hash(),
vec![fs_2_1.clone(), fs_2_2.clone()],
);
assert_signatures(
&storage,
*block_3.hash(),
vec![fs_3_1.clone(), fs_3_2.clone()],
);

// Purging for block_1 should leave sigs for block_2 and block_3 intact.
let to_be_purged = HashSet::from_iter([block_1.hash().as_ref()]);
let _ = initialize_block_metadata_db(&storage.env, &storage.block_metadata_db, &to_be_purged);
assert_signatures(&storage, *block_1.hash(), vec![]);
assert_signatures(
&storage,
*block_2.hash(),
vec![fs_2_1.clone(), fs_2_2.clone()],
);
assert_signatures(
&storage,
*block_3.hash(),
vec![fs_3_1.clone(), fs_3_2.clone()],
);
assert_signatures(&storage, *block_4.hash(), vec![]);

// Purging for block_4 (which has no signatures) should not modify state.
let to_be_purged = HashSet::from_iter([block_4.hash().as_ref()]);
let _ = initialize_block_metadata_db(&storage.env, &storage.block_metadata_db, &to_be_purged);
assert_signatures(&storage, *block_1.hash(), vec![]);
assert_signatures(&storage, *block_2.hash(), vec![fs_2_1, fs_2_2]);
assert_signatures(&storage, *block_3.hash(), vec![fs_3_1, fs_3_2]);
assert_signatures(&storage, *block_4.hash(), vec![]);

// Purging for all blocks should leave no signatures.
let to_be_purged = HashSet::from_iter([
block_1.hash().as_ref(),
block_2.hash().as_ref(),
block_3.hash().as_ref(),
block_4.hash().as_ref(),
]);

let _ = initialize_block_metadata_db(&storage.env, &storage.block_metadata_db, &to_be_purged);
assert_signatures(&storage, *block_1.hash(), vec![]);
assert_signatures(&storage, *block_2.hash(), vec![]);
assert_signatures(&storage, *block_3.hash(), vec![]);
assert_signatures(&storage, *block_4.hash(), vec![]);
}

0 comments on commit d9b978b

Please sign in to comment.