Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[checkpoint] local_fragments should be indexed on sequence number #4502

Merged
merged 1 commit into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,20 +777,23 @@ pub async fn create_fragments<A>(
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let next_cp_seq = checkpoint_db.lock().next_checkpoint();

let mut available_authorities = committee.shuffle_by_stake(None, None);
// Remove ourselves and all validators that we have already diffed with.
let already_fragmented = checkpoint_db.lock().validators_already_fragmented_with();
let already_fragmented = checkpoint_db
.lock()
.validators_already_fragmented_with(next_cp_seq);
// TODO: We can also use AuthorityHealth to pick healthy authorities first.
available_authorities
.retain(|name| name != &active_authority.state.name && !already_fragmented.contains(name));
debug!(
?next_cp_seq,
fragmented_count=?already_fragmented.len(),
to_be_fragmented_count=?available_authorities.len(),
"Going through remaining validators to generate fragments",
);

let next_cp_seq = checkpoint_db.lock().next_checkpoint();

let result = checkpoint_db
.lock()
.attempt_to_construct_checkpoint(committee);
Expand Down
71 changes: 45 additions & 26 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use narwhal_executor::ExecutionIndices;
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::{collections::HashSet, path::Path, sync::Arc};
use std::{path::Path, sync::Arc};
use sui_storage::default_db_options;
use sui_types::messages_checkpoint::{CheckpointProposal, CheckpointProposalContents};
use sui_types::{
Expand Down Expand Up @@ -110,7 +110,7 @@ pub struct CheckpointStoreTables {
// to. These are used for the local node to potentially reconstruct the full
// transaction set.
#[default_options_override_fn = "local_fragments_table_default_config"]
pub local_fragments: DBMap<AuthorityName, CheckpointFragment>,
pub local_fragments: DBMap<(CheckpointSequenceNumber, AuthorityName), CheckpointFragment>,

/// Store the fragments received in order, the counter is purely internal,
/// to allow us to provide a list in order they were received. We only store
Expand Down Expand Up @@ -419,7 +419,17 @@ impl CheckpointStore {
)?
.delete_batch(
&self.tables.local_fragments,
self.tables.local_fragments.keys(),
self.tables
.local_fragments
.iter()
.filter_map(|((seq, name), _)| {
// Delete all keys for checkpoints smaller than what we are committing now.
if seq <= checkpoint_sequence_number {
Some((seq, name))
} else {
None
}
}),
)?;

// Update the transactions databases.
Expand Down Expand Up @@ -480,14 +490,19 @@ impl CheckpointStore {
} else {
fragment.proposer.authority()
};
if !self.tables.local_fragments.contains_key(other_name)? {
self.tables.local_fragments.insert(other_name, fragment)?;
} else {
// We already have this fragment, so we can ignore it.
if self
.tables
.local_fragments
.contains_key(&(next_checkpoint_seq, *other_name))?
{
// If we already have this fragment, we can ignore it.
return Err(SuiError::GenericAuthorityError {
error: format!("Already processed fragment with {:?}", other_name),
});
}
self.tables
.local_fragments
.insert(&(next_checkpoint_seq, *other_name), fragment)?;

// Send to consensus for sequencing.
if let Some(sender) = &self.sender {
Expand Down Expand Up @@ -535,18 +550,16 @@ impl CheckpointStore {
self.tables.fragments.insert(&seq, &fragment)?;

// If the fragment contains us also save it in the list of local fragments
let next_sequence_number = self.next_checkpoint();
if fragment.proposer.summary.sequence_number == next_sequence_number {
if fragment.proposer.authority() == &self.name {
self.tables
.local_fragments
.insert(fragment.other.authority(), &fragment)?;
}
if fragment.other.authority() == &self.name {
self.tables
.local_fragments
.insert(fragment.proposer.authority(), &fragment)?;
}
let fragment_seq = fragment.proposer.summary.sequence_number;
if fragment.proposer.authority() == &self.name {
self.tables
.local_fragments
.insert(&(fragment_seq, *fragment.other.authority()), &fragment)?;
}
if fragment.other.authority() == &self.name {
self.tables
.local_fragments
.insert(&(fragment_seq, *fragment.proposer.authority()), &fragment)?;
}

Ok(())
Expand Down Expand Up @@ -644,17 +657,20 @@ impl CheckpointStore {
}

// Strategy 2 to reconstruct checkpoint -- There is a link between us and the checkpoint set

let local_links: HashSet<_> = self.tables.local_fragments.keys().collect();
let checkpoint_keys: HashSet<_> = reconstructed
let local_links = self.validators_already_fragmented_with(next_sequence_number);
let checkpoint_keys: BTreeSet<_> = reconstructed
.global
.authority_waypoints
.keys()
.cloned()
.collect();

if let Some(auth) = local_links.intersection(&checkpoint_keys).next() {
let fragment = self.tables.local_fragments.get(auth)?.unwrap();
let fragment = self
.tables
.local_fragments
.get(&(next_sequence_number, *auth))?
.unwrap();

// Extract the diff
let diff = if fragment.proposer.authority() == &self.name {
Expand Down Expand Up @@ -788,11 +804,14 @@ impl CheckpointStore {
next_seq % CHECKPOINT_COUNT_PER_EPOCH == 1 && next_seq != 1
}

pub fn validators_already_fragmented_with(&mut self) -> BTreeSet<AuthorityName> {
pub fn validators_already_fragmented_with(
&mut self,
next_seq: CheckpointSequenceNumber,
) -> BTreeSet<AuthorityName> {
self.tables
.local_fragments
.iter()
.map(|(name, _)| name)
.keys()
.filter_map(|(seq, name)| if seq == next_seq { Some(name) } else { None })
.collect()
}

Expand Down