Skip to content

Commit

Permalink
Merge pull request #3234 from AleoHQ/rest/sync-leniency
Browse files Browse the repository at this point in the history
[Optimize] Add sync leniency for skipping REST solutions and transctions
  • Loading branch information
howardwu committed Apr 26, 2024
2 parents 190e125 + 08a40be commit 9be1039
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 10 deletions.
5 changes: 5 additions & 0 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,11 @@ impl<N: Network> Sync<N> {
self.block_sync.is_block_synced()
}

/// Returns the number of blocks the node is behind the greatest peer height.
pub fn num_blocks_behind(&self) -> u32 {
self.block_sync.num_blocks_behind()
}

/// Returns `true` if the node is in gateway mode.
pub const fn is_gateway_mode(&self) -> bool {
self.block_sync.mode().is_gateway()
Expand Down
10 changes: 5 additions & 5 deletions node/rest/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use super::*;
use snarkos_node_router::messages::UnconfirmedSolution;
use snarkos_node_router::{messages::UnconfirmedSolution, SYNC_LENIENCY};
use snarkvm::{
ledger::puzzle::Solution,
prelude::{block::Transaction, Identifier, Plaintext},
Expand Down Expand Up @@ -329,8 +329,8 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
State(rest): State<Self>,
Json(tx): Json<Transaction<N>>,
) -> Result<ErasedJson, RestError> {
// Do not process the transaction if the node is syncing.
if !rest.routing.is_block_synced() {
// Do not process the transaction if the node is too far behind.
if rest.routing.num_blocks_behind() > SYNC_LENIENCY {
return Err(RestError(format!("Unable to broadcast transaction '{}' (node is syncing)", fmt_id(tx.id()))));
}

Expand Down Expand Up @@ -358,8 +358,8 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
State(rest): State<Self>,
Json(solution): Json<Solution<N>>,
) -> Result<ErasedJson, RestError> {
// Do not process the solution if the node is syncing.
if !rest.routing.is_block_synced() {
// Do not process the solution if the node is too far behind.
if rest.routing.num_blocks_behind() > SYNC_LENIENCY {
return Err(RestError(format!(
"Unable to broadcast solution '{}' (node is syncing)",
fmt_id(solution.id())
Expand Down
12 changes: 8 additions & 4 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ use tokio::task::spawn_blocking;
/// The max number of peers to send in a `PeerResponse` message.
const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;

/// The maximum number of blocks the client can be behind it's latest peer before it skips
/// processing incoming transactions and solutions.
pub const SYNC_LENIENCY: u32 = 10;

#[async_trait]
pub trait Inbound<N: Network>: Reading + Outbound<N> {
/// The maximum number of puzzle requests per interval.
Expand Down Expand Up @@ -214,8 +218,8 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
}
Message::UnconfirmedSolution(message) => {
// Do not process unconfirmed solutions if the node is syncing.
if !self.is_block_synced() {
// Do not process unconfirmed solutions if the node is too far behind.
if self.num_blocks_behind() > SYNC_LENIENCY {
trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
return Ok(());
}
Expand Down Expand Up @@ -244,8 +248,8 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
}
Message::UnconfirmedTransaction(message) => {
// Do not process unconfirmed transactions if the node is syncing.
if !self.is_block_synced() {
// Do not process unconfirmed transactions if the node is too far behind.
if self.num_blocks_behind() > SYNC_LENIENCY {
trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
return Ok(());
}
Expand Down
3 changes: 3 additions & 0 deletions node/router/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
/// Returns `true` if the node is synced up to the latest block (within the given tolerance).
fn is_block_synced(&self) -> bool;

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32;

/// Sends a "Ping" message to the given peer.
fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) {
self.send(peer_ip, Message::Ping(Ping::new(self.router().node_type(), block_locators)));
Expand Down
5 changes: 5 additions & 0 deletions node/router/tests/common/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ impl<N: Network> Outbound<N> for TestRouter<N> {
fn is_block_synced(&self) -> bool {
true
}

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32 {
0
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions node/src/client/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> {
fn is_block_synced(&self) -> bool {
self.sync.is_block_synced()
}

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32 {
self.sync.num_blocks_behind()
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions node/src/prover/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Prover<N, C> {
fn is_block_synced(&self) -> bool {
true
}

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32 {
0
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions node/src/validator/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Validator<N, C> {
fn is_block_synced(&self) -> bool {
self.sync.is_block_synced()
}

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32 {
self.sync.num_blocks_behind()
}
}

#[async_trait]
Expand Down
13 changes: 12 additions & 1 deletion node/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::{
collections::BTreeMap,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
time::Instant,
Expand Down Expand Up @@ -106,6 +106,8 @@ pub struct BlockSync<N: Network> {
request_timestamps: Arc<RwLock<BTreeMap<u32, Instant>>>,
/// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
is_block_synced: Arc<AtomicBool>,
/// The number of blocks the peer is behind the greatest peer height.
num_blocks_behind: Arc<AtomicU32>,
/// The lock to guarantee advance_with_sync_blocks() is called only once at a time.
advance_with_sync_blocks_lock: Arc<Mutex<()>>,
}
Expand All @@ -122,6 +124,7 @@ impl<N: Network> BlockSync<N> {
responses: Default::default(),
request_timestamps: Default::default(),
is_block_synced: Default::default(),
num_blocks_behind: Default::default(),
advance_with_sync_blocks_lock: Default::default(),
}
}
Expand All @@ -137,6 +140,12 @@ impl<N: Network> BlockSync<N> {
pub fn is_block_synced(&self) -> bool {
self.is_block_synced.load(Ordering::SeqCst)
}

/// Returns the number of blocks the node is behind the greatest peer height.
#[inline]
pub fn num_blocks_behind(&self) -> u32 {
self.num_blocks_behind.load(Ordering::SeqCst)
}
}

#[allow(dead_code)]
Expand Down Expand Up @@ -459,6 +468,8 @@ impl<N: Network> BlockSync<N> {
let num_blocks_behind = greatest_peer_height.saturating_sub(canon_height);
// Determine if the primary is synced.
let is_synced = num_blocks_behind <= max_blocks_behind;
// Update the num blocks behind.
self.num_blocks_behind.store(num_blocks_behind, Ordering::SeqCst);
// Update the sync status.
self.is_block_synced.store(is_synced, Ordering::SeqCst);
// Update the `IS_SYNCED` metric.
Expand Down

0 comments on commit 9be1039

Please sign in to comment.