Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Update fetch redundancy as a factor of committee size #3140

Merged
merged 14 commits into from
Mar 3, 2024
96 changes: 82 additions & 14 deletions node/bft/src/helpers/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,43 @@
// limitations under the License.

use crate::MAX_FETCH_TIMEOUT_IN_MS;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkvm::{console::network::Network, ledger::committee::Committee};

use parking_lot::{Mutex, RwLock};
use std::{
collections::{HashMap, HashSet},
hash::Hash,
net::SocketAddr,
sync::Arc,
};
use time::OffsetDateTime;
use tokio::sync::oneshot;

#[cfg(not(test))]
pub const NUM_REDUNDANT_REQUESTS: usize = 2;
#[cfg(test)]
pub const NUM_REDUNDANT_REQUESTS: usize = 10;

const CALLBACK_TIMEOUT_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS as i64 / 1000;

/// Returns the maximum number of redundant requests for the number of validators in the specified round.
pub fn max_redundant_requests<N: Network>(ledger: Arc<dyn LedgerService<N>>, round: u64) -> usize {
// Determine the number of validators in the committee lookback for the given round.
let num_validators = ledger
.get_committee_lookback_for_round(round)
.map(|committee| committee.num_members())
.ok()
.unwrap_or(Committee::<N>::MAX_COMMITTEE_SIZE as usize);

// Note: It is adequate to set this value to the availability threshold,
// as with high probability one will respond honestly (in the best and worst case
// with stake spread across the validators evenly and unevenly, respectively).
1 + num_validators.saturating_div(3)
}

#[derive(Debug)]
pub struct Pending<T: PartialEq + Eq + Hash, V: Clone> {
/// The map of pending `items` to `peer IPs` that have the item.
pending: RwLock<HashMap<T, HashSet<SocketAddr>>>,
/// The optional callback queue.
callbacks: Mutex<HashMap<T, Vec<(oneshot::Sender<V>, i64)>>>,
/// Each callback has a timeout and a flag indicating if it is associated with a sent request.
callbacks: Mutex<HashMap<T, Vec<(oneshot::Sender<V>, i64, bool)>>>,
}

impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Default for Pending<T, V> {
Expand Down Expand Up @@ -85,12 +99,29 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
self.callbacks.lock().get(&item).map_or(0, |callbacks| callbacks.len())
}

/// Returns the number of pending sent requests for the specified `item`.
pub fn num_sent_requests(&self, item: impl Into<T>) -> usize {
let item = item.into();
// Clear the callbacks that have expired.
self.clear_expired_callbacks_for_item(item);
// Return the number of live callbacks.
self.callbacks
.lock()
.get(&item)
.map_or(0, |callbacks| callbacks.iter().filter(|(_, _, request_sent)| *request_sent).count())
}

/// Inserts the specified `item` and `peer IP` to the pending queue,
/// returning `true` if the `peer IP` was newly-inserted into the entry for the `item`.
///
/// In addition, an optional `callback` may be provided, that is triggered upon removal.
/// Note: The callback, if provided, is **always** inserted into the callback queue.
pub fn insert(&self, item: impl Into<T>, peer_ip: SocketAddr, callback: Option<oneshot::Sender<V>>) -> bool {
pub fn insert(
&self,
item: impl Into<T>,
peer_ip: SocketAddr,
callback: Option<(oneshot::Sender<V>, bool)>,
) -> bool {
let item = item.into();
// Insert the peer IP into the pending queue.
let result = self.pending.write().entry(item).or_default().insert(peer_ip);
Expand All @@ -99,8 +130,12 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
self.clear_expired_callbacks_for_item(item);

// If a callback is provided, insert it into the callback queue.
if let Some(callback) = callback {
self.callbacks.lock().entry(item).or_default().push((callback, OffsetDateTime::now_utc().unix_timestamp()));
if let Some((callback, request_sent)) = callback {
self.callbacks.lock().entry(item).or_default().push((
callback,
OffsetDateTime::now_utc().unix_timestamp(),
request_sent,
));
}
// Return the result.
result
Expand All @@ -117,7 +152,7 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
if let Some(callbacks) = self.callbacks.lock().remove(&item) {
if let Some(callback_value) = callback_value {
// Send a notification to the callback.
for (callback, _) in callbacks {
for (callback, _, _) in callbacks {
callback.send(callback_value.clone()).ok();
}
}
Expand All @@ -133,7 +168,7 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
// Fetch the current timestamp.
let now = OffsetDateTime::now_utc().unix_timestamp();
// Remove the callbacks that have expired.
callbacks.retain(|(_, timestamp)| now - *timestamp <= CALLBACK_TIMEOUT_IN_SECS);
callbacks.retain(|(_, timestamp, _)| now - *timestamp <= CALLBACK_TIMEOUT_IN_SECS);
}
}
}
Expand All @@ -150,6 +185,8 @@ mod tests {

type CurrentNetwork = snarkvm::prelude::MainnetV0;

const ITERATIONS: usize = 100;

#[test]
fn test_pending() {
let rng = &mut TestRng::default();
Expand Down Expand Up @@ -233,13 +270,13 @@ mod tests {
let (callback_sender_3, _) = oneshot::channel();

// Insert the commitments.
assert!(pending.insert(commitment_1, addr_1, Some(callback_sender_1)));
assert!(pending.insert(commitment_1, addr_2, Some(callback_sender_2)));
assert!(pending.insert(commitment_1, addr_1, Some((callback_sender_1, true))));
assert!(pending.insert(commitment_1, addr_2, Some((callback_sender_2, true))));

// Sleep for a few seconds.
thread::sleep(Duration::from_secs(CALLBACK_TIMEOUT_IN_SECS as u64 - 1));

assert!(pending.insert(commitment_1, addr_3, Some(callback_sender_3)));
assert!(pending.insert(commitment_1, addr_3, Some((callback_sender_3, true))));

// Check that the number of callbacks has not changed.
assert_eq!(pending.num_callbacks(commitment_1), 3);
Expand All @@ -256,6 +293,37 @@ mod tests {
// Ensure that the expired callbacks have been removed.
assert_eq!(pending.num_callbacks(commitment_1), 0);
}

#[test]
fn test_num_sent_requests() {
let rng = &mut TestRng::default();

// Initialize the ready queue.
let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();

for _ in 0..ITERATIONS {
// Generate a commitment.
let commitment = TransmissionID::Solution(PuzzleCommitment::from_g1_affine(rng.gen()));
// Check if the number of sent requests is correct.
let mut expected_num_sent_requests = 0;
for i in 0..ITERATIONS {
// Generate a peer address.
let addr = SocketAddr::from(([127, 0, 0, 1], i as u16));
// Initialize a callback.
let (callback_sender, _) = oneshot::channel();
// Randomly determine if the callback is associated with a sent request.
let is_sent_request = rng.gen();
// Increment the expected number of sent requests.
if is_sent_request {
expected_num_sent_requests += 1;
}
// Insert the commitment.
assert!(pending.insert(commitment, addr, Some((callback_sender, is_sent_request))));
}
// Ensure that the number of sent requests is correct.
assert_eq!(pending.num_sent_requests(commitment), expected_num_sent_requests);
}
}
}

#[cfg(test)]
Expand Down
32 changes: 20 additions & 12 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use crate::{
helpers::{fmt_id, BFTSender, Pending, Storage, SyncReceiver, NUM_REDUNDANT_REQUESTS},
helpers::{fmt_id, max_redundant_requests, BFTSender, Pending, Storage, SyncReceiver},
Gateway,
Transport,
MAX_FETCH_TIMEOUT_IN_MS,
Expand Down Expand Up @@ -410,19 +410,27 @@ impl<N: Network> Sync<N> {
) -> Result<BatchCertificate<N>> {
// Initialize a oneshot channel.
let (callback_sender, callback_receiver) = oneshot::channel();
// Determine how many sent requests are pending.
let num_sent_requests = self.pending.num_sent_requests(certificate_id);
// Determine the maximum number of redundant requests.
let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round());
// Determine if we should send a certificate request to the peer.
let should_send_request = num_sent_requests < num_redundant_requests;

// Insert the certificate ID into the pending queue.
if self.pending.insert(certificate_id, peer_ip, Some(callback_sender)) {
// Determine how many requests are pending for the certificate.
let num_pending_requests = self.pending.num_callbacks(certificate_id);
// If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
if num_pending_requests <= NUM_REDUNDANT_REQUESTS {
// Send the certificate request to the peer.
if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
}
} else {
trace!("Skipped sending redundant request for certificate {} to '{peer_ip}'", fmt_id(certificate_id));
self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));

// If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
if should_send_request {
// Send the certificate request to the peer.
if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
}
} else {
debug!(
"Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
fmt_id(certificate_id)
);
}
// Wait for the certificate to be fetched.
match tokio::time::timeout(core::time::Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await
Expand Down