Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize] Add sync leniency for skipping REST solutions and transctions #3234

Merged
merged 6 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,11 @@ impl<N: Network> Sync<N> {
self.block_sync.is_block_synced()
}

/// Returns the number of blocks the node is behind the greatest peer height.
pub fn num_blocks_behind(&self) -> u32 {
self.block_sync.num_blocks_behind()
}

/// Returns `true` if the node is in gateway mode.
pub const fn is_gateway_mode(&self) -> bool {
self.block_sync.mode().is_gateway()
Expand Down
10 changes: 5 additions & 5 deletions node/rest/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use super::*;
use snarkos_node_router::messages::UnconfirmedSolution;
use snarkos_node_router::{messages::UnconfirmedSolution, SYNC_LENIENCY};
use snarkvm::{
ledger::puzzle::Solution,
prelude::{block::Transaction, Identifier, Plaintext},
Expand Down Expand Up @@ -329,8 +329,8 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
State(rest): State<Self>,
Json(tx): Json<Transaction<N>>,
) -> Result<ErasedJson, RestError> {
// Do not process the transaction if the node is syncing.
if !rest.routing.is_block_synced() {
// Do not process the transaction if the node is too far behind.
if rest.routing.num_blocks_behind() > SYNC_LENIENCY {
return Err(RestError(format!("Unable to broadcast transaction '{}' (node is syncing)", fmt_id(tx.id()))));
}

Expand Down Expand Up @@ -358,8 +358,8 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
State(rest): State<Self>,
Json(solution): Json<Solution<N>>,
) -> Result<ErasedJson, RestError> {
// Do not process the solution if the node is syncing.
if !rest.routing.is_block_synced() {
// Do not process the solution if the node is too far behind.
if rest.routing.num_blocks_behind() > SYNC_LENIENCY {
return Err(RestError(format!(
"Unable to broadcast solution '{}' (node is syncing)",
fmt_id(solution.id())
Expand Down
12 changes: 8 additions & 4 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ use tokio::task::spawn_blocking;
/// The max number of peers to send in a `PeerResponse` message.
const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;

/// The maximum number of blocks the client can be behind it's latest peer before it skips
/// processing incoming transactions and solutions.
pub const SYNC_LENIENCY: u32 = 10;

#[async_trait]
pub trait Inbound<N: Network>: Reading + Outbound<N> {
/// The maximum number of puzzle requests per interval.
Expand Down Expand Up @@ -214,8 +218,8 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
}
Message::UnconfirmedSolution(message) => {
// Do not process unconfirmed solutions if the node is syncing.
if !self.is_block_synced() {
// Do not process unconfirmed solutions if the node is too far behind.
if self.num_blocks_behind() > SYNC_LENIENCY {
trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
return Ok(());
}
Expand Down Expand Up @@ -244,8 +248,8 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
}
Message::UnconfirmedTransaction(message) => {
// Do not process unconfirmed transactions if the node is syncing.
if !self.is_block_synced() {
// Do not process unconfirmed transactions if the node is too far behind.
if self.num_blocks_behind() > SYNC_LENIENCY {
trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
return Ok(());
}
Expand Down
3 changes: 3 additions & 0 deletions node/router/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
/// Returns `true` if the node is synced up to the latest block (within the given tolerance).
fn is_block_synced(&self) -> bool;

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32;

/// Sends a "Ping" message to the given peer.
fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) {
self.send(peer_ip, Message::Ping(Ping::new(self.router().node_type(), block_locators)));
Expand Down
5 changes: 5 additions & 0 deletions node/router/tests/common/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ impl<N: Network> Outbound<N> for TestRouter<N> {
fn is_block_synced(&self) -> bool {
true
}

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32 {
0
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions node/src/client/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> {
fn is_block_synced(&self) -> bool {
self.sync.is_block_synced()
}

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32 {
self.sync.num_blocks_behind()
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions node/src/prover/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Prover<N, C> {
fn is_block_synced(&self) -> bool {
true
}

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32 {
0
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions node/src/validator/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Validator<N, C> {
fn is_block_synced(&self) -> bool {
self.sync.is_block_synced()
}

/// Returns the number of blocks this node is behind the greatest peer height.
fn num_blocks_behind(&self) -> u32 {
self.sync.num_blocks_behind()
}
}

#[async_trait]
Expand Down
13 changes: 12 additions & 1 deletion node/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::{
collections::BTreeMap,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
time::Instant,
Expand Down Expand Up @@ -106,6 +106,8 @@ pub struct BlockSync<N: Network> {
request_timestamps: Arc<RwLock<BTreeMap<u32, Instant>>>,
/// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
is_block_synced: Arc<AtomicBool>,
/// The number of blocks the peer is behind the greatest peer height.
num_blocks_behind: Arc<AtomicU32>,
/// The lock to guarantee advance_with_sync_blocks() is called only once at a time.
advance_with_sync_blocks_lock: Arc<Mutex<()>>,
}
Expand All @@ -122,6 +124,7 @@ impl<N: Network> BlockSync<N> {
responses: Default::default(),
request_timestamps: Default::default(),
is_block_synced: Default::default(),
num_blocks_behind: Default::default(),
advance_with_sync_blocks_lock: Default::default(),
}
}
Expand All @@ -137,6 +140,12 @@ impl<N: Network> BlockSync<N> {
pub fn is_block_synced(&self) -> bool {
self.is_block_synced.load(Ordering::SeqCst)
}

/// Returns the number of blocks the node is behind the greatest peer height.
#[inline]
pub fn num_blocks_behind(&self) -> u32 {
self.num_blocks_behind.load(Ordering::SeqCst)
}
}

#[allow(dead_code)]
Expand Down Expand Up @@ -459,6 +468,8 @@ impl<N: Network> BlockSync<N> {
let num_blocks_behind = greatest_peer_height.saturating_sub(canon_height);
// Determine if the primary is synced.
let is_synced = num_blocks_behind <= max_blocks_behind;
// Update the num blocks behind.
self.num_blocks_behind.store(num_blocks_behind, Ordering::SeqCst);
// Update the sync status.
self.is_block_synced.store(is_synced, Ordering::SeqCst);
// Update the `IS_SYNCED` metric.
Expand Down