Skip to content

Commit

Permalink
Merge pull request #3088 from AleoHQ/feat/limit-requests
Browse files Browse the repository at this point in the history
Limit the number of duplicate certificate and transmission requests
  • Loading branch information
howardwu committed Feb 12, 2024
2 parents 66f4cc6 + 735ad69 commit 894a75d
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 14 deletions.
93 changes: 88 additions & 5 deletions node/bft/src/helpers/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::MAX_FETCH_TIMEOUT_IN_MS;

use parking_lot::{Mutex, RwLock};
use std::{
collections::{HashMap, HashSet},
hash::Hash,
net::SocketAddr,
};
use time::OffsetDateTime;
use tokio::sync::oneshot;

#[cfg(not(test))]
pub const NUM_REDUNDANT_REQUESTS: usize = 2;
#[cfg(test)]
pub const NUM_REDUNDANT_REQUESTS: usize = 10;

const CALLBACK_TIMEOUT_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS as i64 / 1000;

#[derive(Debug)]
pub struct Pending<T: PartialEq + Eq + Hash, V: Clone> {
/// The map of pending `items` to `peer IPs` that have the item.
pending: RwLock<HashMap<T, HashSet<SocketAddr>>>,
/// TODO (howardwu): Expire callbacks that have not been called after a certain amount of time,
/// or clear the callbacks that are older than a certain round.
/// The optional callback queue.
callbacks: Mutex<HashMap<T, Vec<oneshot::Sender<V>>>>,
callbacks: Mutex<HashMap<T, Vec<(oneshot::Sender<V>, i64)>>>,
}

impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Default for Pending<T, V> {
Expand Down Expand Up @@ -68,6 +76,15 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
self.pending.read().get(&item.into()).cloned()
}

/// Returns the number of pending callbacks for the specified `item`.
pub fn num_callbacks(&self, item: impl Into<T>) -> usize {
let item = item.into();
// Clear the callbacks that have expired.
self.clear_expired_callbacks_for_item(item);
// Return the number of live callbacks.
self.callbacks.lock().get(&item).map_or(0, |callbacks| callbacks.len())
}

/// Inserts the specified `item` and `peer IP` to the pending queue,
/// returning `true` if the `peer IP` was newly-inserted into the entry for the `item`.
///
Expand All @@ -77,9 +94,13 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
let item = item.into();
// Insert the peer IP into the pending queue.
let result = self.pending.write().entry(item).or_default().insert(peer_ip);

// Clear the callbacks that have expired.
self.clear_expired_callbacks_for_item(item);

// If a callback is provided, insert it into the callback queue.
if let Some(callback) = callback {
self.callbacks.lock().entry(item).or_default().push(callback);
self.callbacks.lock().entry(item).or_default().push((callback, OffsetDateTime::now_utc().unix_timestamp()));
}
// Return the result.
result
Expand All @@ -96,14 +117,25 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
if let Some(callbacks) = self.callbacks.lock().remove(&item) {
if let Some(callback_value) = callback_value {
// Send a notification to the callback.
for callback in callbacks {
for (callback, _) in callbacks {
callback.send(callback_value.clone()).ok();
}
}
}
// Return the result.
result
}

/// Removes the callbacks for the specified `item` that have expired.
pub fn clear_expired_callbacks_for_item(&self, item: impl Into<T>) {
// Clear the callbacks that have expired.
if let Some(callbacks) = self.callbacks.lock().get_mut(&item.into()) {
// Fetch the current timestamp.
let now = OffsetDateTime::now_utc().unix_timestamp();
// Remove the callbacks that have expired.
callbacks.retain(|(_, timestamp)| now - *timestamp <= CALLBACK_TIMEOUT_IN_SECS);
}
}
}

#[cfg(test)]
Expand All @@ -114,6 +146,8 @@ mod tests {
prelude::{Rng, TestRng},
};

use std::{thread, time::Duration};

type CurrentNetwork = snarkvm::prelude::MainnetV0;

#[test]
Expand Down Expand Up @@ -173,6 +207,55 @@ mod tests {
// Check empty again.
assert!(pending.is_empty());
}

#[test]
fn test_expired_callbacks() {
let rng = &mut TestRng::default();

// Initialize the ready queue.
let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();

// Check initially empty.
assert!(pending.is_empty());
assert_eq!(pending.len(), 0);

// Initialize the commitments.
let commitment_1 = TransmissionID::Solution(PuzzleCommitment::from_g1_affine(rng.gen()));

// Initialize the SocketAddrs.
let addr_1 = SocketAddr::from(([127, 0, 0, 1], 1234));
let addr_2 = SocketAddr::from(([127, 0, 0, 1], 2345));
let addr_3 = SocketAddr::from(([127, 0, 0, 1], 3456));

// Initialize the callbacks.
let (callback_sender_1, _) = oneshot::channel();
let (callback_sender_2, _) = oneshot::channel();
let (callback_sender_3, _) = oneshot::channel();

// Insert the commitments.
assert!(pending.insert(commitment_1, addr_1, Some(callback_sender_1)));
assert!(pending.insert(commitment_1, addr_2, Some(callback_sender_2)));

// Sleep for a few seconds.
thread::sleep(Duration::from_secs(CALLBACK_TIMEOUT_IN_SECS as u64 - 1));

assert!(pending.insert(commitment_1, addr_3, Some(callback_sender_3)));

// Check that the number of callbacks has not changed.
assert_eq!(pending.num_callbacks(commitment_1), 3);

// Wait for 2 seconds.
thread::sleep(Duration::from_secs(2));

// Ensure that the expired callbacks have been removed.
assert_eq!(pending.num_callbacks(commitment_1), 1);

// Wait for `CALLBACK_TIMEOUT_IN_SECS` seconds.
thread::sleep(Duration::from_secs(CALLBACK_TIMEOUT_IN_SECS as u64));

// Ensure that the expired callbacks have been removed.
assert_eq!(pending.num_callbacks(commitment_1), 0);
}
}

#[cfg(test)]
Expand Down
15 changes: 11 additions & 4 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use crate::{
helpers::{fmt_id, BFTSender, Pending, Storage, SyncReceiver},
helpers::{fmt_id, BFTSender, Pending, Storage, SyncReceiver, NUM_REDUNDANT_REQUESTS},
Gateway,
Transport,
MAX_FETCH_TIMEOUT_IN_MS,
Expand Down Expand Up @@ -412,9 +412,16 @@ impl<N: Network> Sync<N> {
let (callback_sender, callback_receiver) = oneshot::channel();
// Insert the certificate ID into the pending queue.
if self.pending.insert(certificate_id, peer_ip, Some(callback_sender)) {
// Send the certificate request to the peer.
if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
bail!("Unable to fetch certificate {} - failed to send request", fmt_id(certificate_id))
// Determine how many requests are pending for the certificate.
let num_pending_requests = self.pending.num_callbacks(certificate_id);
// If the number of requests is less than or equal to the redundancy factor, send the certificate request to the peer.
if num_pending_requests <= NUM_REDUNDANT_REQUESTS {
// Send the certificate request to the peer.
if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
}
} else {
trace!("Skipped sending redundant request for certificate {} to '{peer_ip}'", fmt_id(certificate_id));
}
}
// Wait for the certificate to be fetched.
Expand Down
15 changes: 11 additions & 4 deletions node/bft/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use crate::{
events::{Event, TransmissionRequest, TransmissionResponse},
helpers::{fmt_id, Pending, Ready, Storage, WorkerReceiver},
helpers::{fmt_id, Pending, Ready, Storage, WorkerReceiver, NUM_REDUNDANT_REQUESTS},
ProposedBatch,
Transport,
MAX_FETCH_TIMEOUT_IN_MS,
Expand Down Expand Up @@ -386,9 +386,16 @@ impl<N: Network> Worker<N> {
let (callback_sender, callback_receiver) = oneshot::channel();
// Insert the transmission ID into the pending queue.
self.pending.insert(transmission_id, peer_ip, Some(callback_sender));
// Send the transmission request to the peer.
if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
bail!("Unable to fetch transmission - failed to send request")
// Determine how many requests are pending for the transmission.
let num_pending_requests = self.pending.num_callbacks(transmission_id);
// If the number of requests is less than or equal to the the redundancy factor, send the transmission request to the peer.
if num_pending_requests <= NUM_REDUNDANT_REQUESTS {
// Send the transmission request to the peer.
if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
bail!("Unable to fetch transmission - failed to send request")
}
} else {
trace!("Skipped sending redundant request for transmission {} to '{peer_ip}'", fmt_id(transmission_id));
}
// Wait for the transmission to be fetched.
match timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
Expand Down
2 changes: 1 addition & 1 deletion node/router/messages/src/helpers/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<N: Network> Decoder for MessageCodec<N> {
match Message::read_le(reader) {
Ok(message) => Ok(Some(message)),
Err(error) => {
error!("Failed to deserialize a message: {}", error);
warn!("Failed to deserialize a message - {}", error);
Err(std::io::ErrorKind::InvalidData.into())
}
}
Expand Down

0 comments on commit 894a75d

Please sign in to comment.