Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Optimize validator block sync while outside GC range #3087

Merged
merged 8 commits into from
Feb 12, 2024
70 changes: 64 additions & 6 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ pub struct Sync<N: Network> {
bft_sender: Arc<OnceCell<BFTSender<N>>>,
/// The spawned handles.
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
/// The response lock.
response_lock: Arc<TMutex<()>>,
/// The sync lock.
lock: Arc<TMutex<()>>,
sync_lock: Arc<TMutex<()>>,
}

impl<N: Network> Sync<N> {
Expand All @@ -71,7 +73,8 @@ impl<N: Network> Sync<N> {
pending: Default::default(),
bft_sender: Default::default(),
handles: Default::default(),
lock: Default::default(),
response_lock: Default::default(),
sync_lock: Default::default(),
}
}

Expand Down Expand Up @@ -99,6 +102,11 @@ impl<N: Network> Sync<N> {
let communication = &self_.gateway;
// let communication = &node.router;
self_.block_sync.try_block_sync(communication).await;

// Sync the storage with the blocks.
if let Err(e) = self_.sync_storage_with_blocks().await {
error!("Unable to sync storage with blocks - {e}");
}
}
}));

Expand Down Expand Up @@ -185,14 +193,16 @@ impl<N: Network> Sync<N> {

// Retrieve the block height.
let block_height = latest_block.height();
// Determine the number of maximum number of blocks that would have been garbage collected.
let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
// Determine the earliest height, conservatively set to the block height minus the max GC rounds.
// By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
let gc_height = block_height.saturating_sub(u32::try_from(self.storage.max_gc_rounds())?);
let gc_height = block_height.saturating_sub(max_gc_blocks);
// Retrieve the blocks.
let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;

// Acquire the sync lock.
let _lock = self.lock.lock().await;
let _lock = self.sync_lock.lock().await;

debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));

Expand Down Expand Up @@ -267,8 +277,36 @@ impl<N: Network> Sync<N> {

/// Syncs the storage with the given blocks.
pub async fn sync_storage_with_blocks(&self) -> Result<()> {
// Acquire the response lock.
let _lock = self.response_lock.lock().await;

// Retrieve the latest block height.
let mut current_height = self.ledger.latest_block_height() + 1;

// Retrieve the maximum block height of the peers.
let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
// Determine the number of maximum number of blocks that would have been garbage collected.
let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
// Determine the maximum height that the peer would have garbage collected.
let max_gc_height = tip.saturating_sub(max_gc_blocks);

// Determine if we can sync the ledger without updating the BFT first.
if current_height <= max_gc_height {
// Try to advance the ledger *to tip* without updating the BFT.
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the ledger to block {}...", block.height());
self.sync_ledger_with_block_without_bft(block).await?;
// Update the current height.
current_height += 1;
}
// Sync the storage with the ledger if we should transition to the BFT sync.
if current_height > max_gc_height {
if let Err(e) = self.sync_storage_with_ledger_at_bootup().await {
error!("BFT sync (with bootup routine) failed - {e}");
}
}
}

// Try to advance the ledger with sync blocks.
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the BFT to block {}...", block.height());
Expand All @@ -280,10 +318,28 @@ impl<N: Network> Sync<N> {
Ok(())
}

/// Syncs the ledger with the given block without updating the BFT.
async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
// Acquire the sync lock.
let _lock = self.sync_lock.lock().await;

// Check the next block.
self.ledger.check_next_block(&block)?;
// Attempt to advance to the next block.
self.ledger.advance_to_next_block(&block)?;

// Sync the height with the block.
self.storage.sync_height_with_block(block.height());
// Sync the round with the block.
self.storage.sync_round_with_block(block.round());

Ok(())
}

/// Syncs the storage with the given blocks.
pub async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
// Acquire the sync lock.
let _lock = self.lock.lock().await;
let _lock = self.sync_lock.lock().await;

// If the block authority is a subdag, then sync the batch certificates with the block.
if let Authority::Quorum(subdag) = block.authority() {
Expand Down Expand Up @@ -407,8 +463,10 @@ impl<N: Network> Sync<N> {
/// Shuts down the primary.
pub async fn shut_down(&self) {
info!("Shutting down the sync module...");
// Acquire the response lock.
let _lock = self.response_lock.lock().await;
// Acquire the sync lock.
let _lock = self.lock.lock().await;
let _lock = self.sync_lock.lock().await;
// Abort the tasks.
self.handles.lock().iter().for_each(|handle| handle.abort());
}
Expand Down