Skip to content

Commit

Permalink
blockstore: send duplicate proofs for chained merkle root conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Apr 16, 2024
1 parent 2e91155 commit dff346d
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 87 deletions.
217 changes: 139 additions & 78 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ impl Blockstore {
fn previous_erasure_set(
&self,
erasure_set: ErasureSetId,
erasure_metas: &mut BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
erasure_metas: &BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
) -> Result<Option<ErasureSetId>> {
let (slot, fec_set_index) = erasure_set.store_key();

Expand Down Expand Up @@ -542,12 +542,6 @@ impl Blockstore {
let candidate_erasure_set = ErasureSetId::new(slot, candidate_fec_set_index);
let candidate_erasure_meta: ErasureMeta = deserialize(candidate_erasure_meta.as_ref())?;

// Add this candidate to erasure metas to avoid blockstore lookup in future
erasure_metas.insert(
candidate_erasure_set,
WorkingEntry::Clean(candidate_erasure_meta),
);

// Check if this is actually the consecutive erasure set
let Some(next_fec_set_index) = candidate_erasure_meta.next_fec_set_index() else {
return Err(BlockstoreError::InvalidErasureConfig);
Expand Down Expand Up @@ -1117,6 +1111,70 @@ impl Blockstore {
&mut write_batch,
)?;

for (erasure_set, working_erasure_meta) in erasure_metas.iter() {
if !working_erasure_meta.should_write() {
// Not a new erasure meta
continue;
}
let (slot, _) = erasure_set.store_key();
// First coding shred from this erasure batch, check the forward merkle root chaining
if !self.has_duplicate_shreds_in_slot(slot) {
let erasure_meta = working_erasure_meta.as_ref();
let shred_id = ShredId::new(
slot,
erasure_meta
.first_received_coding_shred_index()
.expect("First received coding index must exist for all erasure metas"),
ShredType::Code,
);
let shred = Shred::new_from_serialized_shred(
self.get_shred_from_just_inserted_or_db(&just_inserted_shreds, shred_id)
.expect("Shred indicated by erasure meta must exist")
.into_owned(),
)
.expect("Shred indicated by erasure meta must be deserializable");

self.check_forward_chained_merkle_root_consistency(
&shred,
erasure_meta,
&just_inserted_shreds,
&mut merkle_root_metas,
&mut duplicate_shreds,
);
}
}

for (erasure_set, working_merkle_root_meta) in merkle_root_metas.iter() {
if !working_merkle_root_meta.should_write() {
// Not a new merkle root meta
continue;
}
let (slot, _) = erasure_set.store_key();
// First shred from this erasure batch, check the backwards merkle root chaining
if !self.has_duplicate_shreds_in_slot(slot) {
let merkle_root_meta = working_merkle_root_meta.as_ref();
let shred_id = ShredId::new(
slot,
merkle_root_meta.first_received_shred_index(),
merkle_root_meta.first_received_shred_type(),
);
let shred = Shred::new_from_serialized_shred(
self.get_shred_from_just_inserted_or_db(&just_inserted_shreds, shred_id)
.expect("Shred indicated by merkle root meta must exist")
.into_owned(),
)
.expect("Shred indicated by merkle root meta must be deserializable");

self.check_backwards_chained_merkle_root_consistency(
&shred,
&just_inserted_shreds,
&erasure_metas,
&merkle_root_metas,
&mut duplicate_shreds,
);
}
}

for (erasure_set, working_erasure_meta) in erasure_metas {
if !working_erasure_meta.should_write() {
// No need to rewrite the column
Expand Down Expand Up @@ -1701,69 +1759,66 @@ impl Blockstore {
/// `duplicate_shreds`.
///
/// This is intended to be used right after `shred`'s `erasure_meta`
/// has been created for the first time and loaded into `erasure_metas`.
#[allow(dead_code)]
/// has been created for the first time.
fn check_forward_chained_merkle_root_consistency(
&self,
shred: &Shred,
erasure_meta: &ErasureMeta,
just_inserted_shreds: &HashMap<ShredId, Shred>,
erasure_metas: &BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
merkle_root_metas: &mut HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
) -> bool {
debug_assert!(erasure_meta.check_coding_shred(shred));
let slot = shred.slot();
let erasure_set = shred.erasure_set();
let erasure_meta_entry = erasure_metas.get(&erasure_set).expect(
"Checking chained merkle root consistency on an erasure set {erasure_set:?}
that is not loaded in memory, programmer error",
);
let erasure_meta = erasure_meta_entry.as_ref();
debug_assert!(erasure_meta.check_coding_shred(shred));

// If a shred from the next fec set has already been inserted, check the chaining
let Some(next_fec_set_index) = erasure_meta.next_fec_set_index() else {
error!("Invalid erasure meta, unable to compute next fec set index {erasure_meta:?}");
return false;
};
let next_erasure_set = ErasureSetId::new(slot, next_fec_set_index);
let next_merkle_root_meta = match merkle_root_metas.entry(next_erasure_set) {
HashMapEntry::Vacant(entry) => self
.merkle_root_meta(next_erasure_set)
.unwrap()
.map(|meta| entry.insert(WorkingEntry::Clean(meta))),
HashMapEntry::Occupied(entry) => Some(entry.into_mut()),
let (next_shred_index, next_shred_type) = if let Some(merkle_root_meta_entry) =
merkle_root_metas.get(&next_erasure_set)
{
(
merkle_root_meta_entry.as_ref().first_received_shred_index(),
merkle_root_meta_entry.as_ref().first_received_shred_type(),
)
} else if let Some(merkle_root_meta) = self.merkle_root_meta(next_erasure_set).unwrap() {
(
merkle_root_meta.first_received_shred_index(),
merkle_root_meta.first_received_shred_type(),
)
} else {
// No shred from the next fec set has been received
return true;
};
if let Some(next_merkle_root_meta) = next_merkle_root_meta.as_deref().map(AsRef::as_ref) {
let next_shred_id = ShredId::new(
slot,
next_merkle_root_meta.first_received_shred_index(),
next_merkle_root_meta.first_received_shred_type(),
);
let next_shred =
Self::get_shred_from_just_inserted_or_db(self, just_inserted_shreds, next_shred_id)
.expect("Shred indicated by merkle root meta must exist")
.into_owned();
let merkle_root = shred.merkle_root().ok();
let chained_merkle_root = shred::layout::get_chained_merkle_root(&next_shred);
let next_shred_id = ShredId::new(slot, next_shred_index, next_shred_type);
let next_shred =
Self::get_shred_from_just_inserted_or_db(self, just_inserted_shreds, next_shred_id)
.expect("Shred indicated by merkle root meta must exist")
.into_owned();
let merkle_root = shred.merkle_root().ok();
let chained_merkle_root = shred::layout::get_chained_merkle_root(&next_shred);

if !self.check_chaining(merkle_root, chained_merkle_root) {
warn!(
"Received conflicting chained merkle roots for slot: {slot},
shred {erasure_set:?} type {:?} has merkle root {merkle_root:?}, however
next fec set shred {next_erasure_set:?} type {:?} chains to merkle root {chained_merkle_root:?}.
Reporting as duplicate",
shred.shred_type(),
next_merkle_root_meta.first_received_shred_type(),
);
if !self.check_chaining(merkle_root, chained_merkle_root) {
warn!(
"Received conflicting chained merkle roots for slot: {slot},
shred {erasure_set:?} type {:?} has merkle root {merkle_root:?}, however
next fec set shred {next_erasure_set:?} type {:?} chains to merkle root {chained_merkle_root:?}.
Reporting as duplicate",
shred.shred_type(),
next_shred_type,
);

if !self.has_duplicate_shreds_in_slot(shred.slot()) {
duplicate_shreds.push(PossibleDuplicateShred::ChainedMerkleRootConflict(
shred.clone(),
next_shred,
));
}
return false;
if !self.has_duplicate_shreds_in_slot(shred.slot()) {
duplicate_shreds.push(PossibleDuplicateShred::ChainedMerkleRootConflict(
shred.clone(),
next_shred,
));
}
return false;
}

true
Expand All @@ -1778,13 +1833,12 @@ impl Blockstore {
///
/// This is intended to be used right after `shred`'s `merkle_root_meta`
/// has been created for the first time.
#[allow(dead_code)]
fn check_backwards_chained_merkle_root_consistency(
&self,
shred: &Shred,
just_inserted_shreds: &HashMap<ShredId, Shred>,
erasure_metas: &mut BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
merkle_root_metas: &mut HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
erasure_metas: &BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
merkle_root_metas: &HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
) -> bool {
let slot = shred.slot();
Expand All @@ -1811,21 +1865,28 @@ impl Blockstore {
// we will verify this chain through the forward check above.
return true;
};
let prev_merkle_root_meta_entry = match merkle_root_metas.entry(prev_erasure_set) {
HashMapEntry::Vacant(entry) => entry.insert(WorkingEntry::Clean(
self.merkle_root_meta(prev_erasure_set)
.unwrap()
.expect("merkle root meta must exist for erasure meta"),
)),
HashMapEntry::Occupied(entry) => entry.into_mut(),
};

let prev_merkle_root_meta = prev_merkle_root_meta_entry.as_ref();
let prev_shred_id = ShredId::new(
slot,
prev_merkle_root_meta.first_received_shred_index(),
prev_merkle_root_meta.first_received_shred_type(),
);
let (prev_shred_index, prev_shred_type) =
if let Some(prev_merkle_root_meta_entry) = merkle_root_metas.get(&prev_erasure_set) {
(
prev_merkle_root_meta_entry
.as_ref()
.first_received_shred_index(),
prev_merkle_root_meta_entry
.as_ref()
.first_received_shred_type(),
)
} else {
let merkle_root_meta = self
.merkle_root_meta(prev_erasure_set)
.unwrap()
.expect("merkle root meta must exist for erasure meta");
(
merkle_root_meta.first_received_shred_index(),
merkle_root_meta.first_received_shred_type(),
)
};
let prev_shred_id = ShredId::new(slot, prev_shred_index, prev_shred_type);
let prev_shred =
Self::get_shred_from_just_inserted_or_db(self, just_inserted_shreds, prev_shred_id)
.expect("Shred indicated by merkle root meta must exist")
Expand All @@ -1841,7 +1902,7 @@ impl Blockstore {
Reporting as duplicate",
shred.erasure_set(),
shred.shred_type(),
prev_merkle_root_meta.first_received_shred_type(),
prev_shred_type,
);

if !self.has_duplicate_shreds_in_slot(shred.slot()) {
Expand Down Expand Up @@ -11044,7 +11105,7 @@ pub mod tests {
// Blockstore is empty
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &mut erasure_metas)
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
None
);
Expand All @@ -11053,7 +11114,7 @@ pub mod tests {
erasure_metas.insert(erasure_set_0, WorkingEntry::Dirty(erasure_meta_0));
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &mut erasure_metas)
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
None
);
Expand All @@ -11065,7 +11126,7 @@ pub mod tests {
.unwrap();
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &mut erasure_metas)
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
None
);
Expand All @@ -11074,7 +11135,7 @@ pub mod tests {
erasure_metas.insert(erasure_set_prev, WorkingEntry::Dirty(erasure_meta_prev));
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &mut erasure_metas)
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
Some(erasure_set_prev)
);
Expand All @@ -11086,7 +11147,7 @@ pub mod tests {
.unwrap();
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &mut erasure_metas)
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
Some(erasure_set_prev)
);
Expand All @@ -11095,22 +11156,22 @@ pub mod tests {
erasure_metas.insert(erasure_set_prev, WorkingEntry::Clean(erasure_meta_prev));
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &mut erasure_metas)
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
Some(erasure_set_prev)
);

// Works even if the previous fec set has index 0
assert_eq!(
blockstore
.previous_erasure_set(erasure_set_prev, &mut erasure_metas)
.previous_erasure_set(erasure_set_prev, &erasure_metas)
.unwrap(),
Some(erasure_set_0)
);
erasure_metas.remove(&erasure_set_0);
assert_eq!(
blockstore
.previous_erasure_set(erasure_set_prev, &mut erasure_metas)
.previous_erasure_set(erasure_set_prev, &erasure_metas)
.unwrap(),
Some(erasure_set_0)
);
Expand All @@ -11129,7 +11190,7 @@ pub mod tests {
);
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &mut erasure_metas)
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
None,
);
Expand All @@ -11142,7 +11203,7 @@ pub mod tests {
.unwrap();
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &mut erasure_metas)
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
None,
);
Expand Down
Loading

0 comments on commit dff346d

Please sign in to comment.