Skip to content

Commit

Permalink
Merge pull request #3087 from AleoHQ/feat/sync-without-bft
Browse files Browse the repository at this point in the history
Optimize validator block sync while outside GC range
  • Loading branch information
howardwu committed Feb 12, 2024
2 parents 0780771 + c904669 commit 66f4cc6
Showing 1 changed file with 64 additions and 6 deletions.
70 changes: 64 additions & 6 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ pub struct Sync<N: Network> {
bft_sender: Arc<OnceCell<BFTSender<N>>>,
/// The spawned handles.
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
/// The response lock.
response_lock: Arc<TMutex<()>>,
/// The sync lock.
lock: Arc<TMutex<()>>,
sync_lock: Arc<TMutex<()>>,
}

impl<N: Network> Sync<N> {
Expand All @@ -71,7 +73,8 @@ impl<N: Network> Sync<N> {
pending: Default::default(),
bft_sender: Default::default(),
handles: Default::default(),
lock: Default::default(),
response_lock: Default::default(),
sync_lock: Default::default(),
}
}

Expand Down Expand Up @@ -99,6 +102,11 @@ impl<N: Network> Sync<N> {
let communication = &self_.gateway;
// let communication = &node.router;
self_.block_sync.try_block_sync(communication).await;

// Sync the storage with the blocks.
if let Err(e) = self_.sync_storage_with_blocks().await {
error!("Unable to sync storage with blocks - {e}");
}
}
}));

Expand Down Expand Up @@ -185,14 +193,16 @@ impl<N: Network> Sync<N> {

// Retrieve the block height.
let block_height = latest_block.height();
// Determine the number of maximum number of blocks that would have been garbage collected.
let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
// Determine the earliest height, conservatively set to the block height minus the max GC rounds.
// By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded.
let gc_height = block_height.saturating_sub(u32::try_from(self.storage.max_gc_rounds())?);
let gc_height = block_height.saturating_sub(max_gc_blocks);
// Retrieve the blocks.
let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;

// Acquire the sync lock.
let _lock = self.lock.lock().await;
let _lock = self.sync_lock.lock().await;

debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));

Expand Down Expand Up @@ -267,8 +277,36 @@ impl<N: Network> Sync<N> {

/// Syncs the storage with the given blocks.
pub async fn sync_storage_with_blocks(&self) -> Result<()> {
// Acquire the response lock.
let _lock = self.response_lock.lock().await;

// Retrieve the latest block height.
let mut current_height = self.ledger.latest_block_height() + 1;

// Retrieve the maximum block height of the peers.
let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
// Determine the number of maximum number of blocks that would have been garbage collected.
let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
// Determine the maximum height that the peer would have garbage collected.
let max_gc_height = tip.saturating_sub(max_gc_blocks);

// Determine if we can sync the ledger without updating the BFT first.
if current_height <= max_gc_height {
// Try to advance the ledger *to tip* without updating the BFT.
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the ledger to block {}...", block.height());
self.sync_ledger_with_block_without_bft(block).await?;
// Update the current height.
current_height += 1;
}
// Sync the storage with the ledger if we should transition to the BFT sync.
if current_height > max_gc_height {
if let Err(e) = self.sync_storage_with_ledger_at_bootup().await {
error!("BFT sync (with bootup routine) failed - {e}");
}
}
}

// Try to advance the ledger with sync blocks.
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the BFT to block {}...", block.height());
Expand All @@ -280,10 +318,28 @@ impl<N: Network> Sync<N> {
Ok(())
}

/// Syncs the ledger with the given block without updating the BFT.
async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
// Acquire the sync lock.
let _lock = self.sync_lock.lock().await;

// Check the next block.
self.ledger.check_next_block(&block)?;
// Attempt to advance to the next block.
self.ledger.advance_to_next_block(&block)?;

// Sync the height with the block.
self.storage.sync_height_with_block(block.height());
// Sync the round with the block.
self.storage.sync_round_with_block(block.round());

Ok(())
}

/// Syncs the storage with the given blocks.
pub async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
// Acquire the sync lock.
let _lock = self.lock.lock().await;
let _lock = self.sync_lock.lock().await;

// If the block authority is a subdag, then sync the batch certificates with the block.
if let Authority::Quorum(subdag) = block.authority() {
Expand Down Expand Up @@ -407,8 +463,10 @@ impl<N: Network> Sync<N> {
/// Shuts down the primary.
pub async fn shut_down(&self) {
info!("Shutting down the sync module...");
// Acquire the response lock.
let _lock = self.response_lock.lock().await;
// Acquire the sync lock.
let _lock = self.lock.lock().await;
let _lock = self.sync_lock.lock().await;
// Abort the tasks.
self.handles.lock().iter().for_each(|handle| handle.abort());
}
Expand Down

0 comments on commit 66f4cc6

Please sign in to comment.