Skip to content

Commit

Permalink
Merge branch 'mainnet' of https://github.com/AleoHQ/snarkOS into feat…
Browse files Browse the repository at this point in the history
…/simple-proposal-transmission-check
  • Loading branch information
raychu86 committed Mar 3, 2024
2 parents e59535e + c6ab661 commit c414dbf
Show file tree
Hide file tree
Showing 24 changed files with 739 additions and 153 deletions.
117 changes: 59 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ default-features = false

[workspace.dependencies.snarkvm]
git = "https://github.com/AleoHQ/snarkVM.git"
rev = "1cebaaf"
rev = "46f2625"
#version = "=0.16.18"
features = [ "circuit", "console", "rocks" ]

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* [2.1 Requirements](#21-requirements)
* [2.2 Installation](#22-installation)
* [3. Run an Aleo Node](#3-run-an-aleo-node)
* [3a. Run an Aleo Client](#3a-run-an-aleo-client)
* [3b. Run an Aleo Prover](#3b-run-an-aleo-prover)
* [3.1 Run an Aleo Client](#31-run-an-aleo-client)
* [3.2 Run an Aleo Prover](#32-run-an-aleo-prover)
* [4. FAQs](#4-faqs)
* [5. Command Line Interface](#5-command-line-interface)
* [6. Development Guide](#6-development-guide)
Expand Down Expand Up @@ -96,7 +96,7 @@ Please ensure ports `4130/tcp` and `3030/tcp` are open on your router and OS fir

## 3. Run an Aleo Node

## 3a. Run an Aleo Client
## 3.1 Run an Aleo Client

Start by following the instructions in the [Build Guide](#2-build-guide).

Expand All @@ -105,7 +105,7 @@ Next, to start a client node, from the `snarkOS` directory, run:
./run-client.sh
```

## 3b. Run an Aleo Prover
## 3.2 Run an Aleo Prover

Start by following the instructions in the [Build Guide](#2-build-guide).

Expand Down
13 changes: 11 additions & 2 deletions node/bft/events/src/certificate_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,20 @@ pub mod prop_tests {
(Just(committee.clone()), any::<Selector>(), vec(any_transmission(), 0..16))
.prop_map(|(committee, selector, transmissions)| {
let mut rng = TestRng::default();
let CommitteeContext(_, ValidatorSet(validators)) = committee;
let CommitteeContext(committee, ValidatorSet(validators)) = committee;
let signer = selector.select(validators);
let transmission_ids = transmissions.into_iter().map(|(id, _)| id).collect();

BatchHeader::new(&signer.private_key, 0, now(), transmission_ids, Default::default(), &mut rng).unwrap()
BatchHeader::new(
&signer.private_key,
0,
now(),
committee.id(),
transmission_ids,
Default::default(),
&mut rng,
)
.unwrap()
})
.boxed()
}
Expand Down
14 changes: 2 additions & 12 deletions node/bft/events/src/helpers/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,11 @@ impl<N: Network> Decoder for EventCodec<N> {
Some(bytes) => bytes,
None => return Ok(None),
};
#[cfg(feature = "metrics")]
let num_bytes = bytes.len() as f64;

// Convert the bytes to an event, or fail if it is not valid.
let reader = bytes.reader();
match Event::read_le(reader) {
Ok(event) => {
#[cfg(feature = "metrics")]
metrics::histogram_label(
metrics::tcp::TCP_GATEWAY,
"event",
String::from(event.name().clone()),
num_bytes,
);
Ok(Some(event))
}
Ok(event) => Ok(Some(event)),
Err(error) => {
error!("Failed to deserialize an event: {}", error);
Err(std::io::ErrorKind::InvalidData.into())
Expand Down
5 changes: 4 additions & 1 deletion node/bft/ledger-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ edition = "2021"

[features]
default = [ ]
ledger = [ "rand", "tokio", "tracing" ]
ledger = [ "parking_lot", "rand", "tokio", "tracing" ]
ledger-write = [ ]
mock = [ "parking_lot", "tracing" ]
prover = [ ]
Expand All @@ -32,6 +32,9 @@ version = "0.1"
version = "2.1"
features = [ "serde", "rayon" ]

[dependencies.lru]
version = "0.12"

[dependencies.parking_lot]
version = "0.12"
optional = true
Expand Down
21 changes: 19 additions & 2 deletions node/bft/ledger-service/src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use snarkvm::{
};

use indexmap::IndexMap;
use lru::LruCache;
use parking_lot::Mutex;
use std::{
fmt,
ops::Range,
Expand All @@ -35,18 +37,23 @@ use std::{
},
};

/// The capacity of the LRU holiding the recently queried committees.
const COMMITTEE_CACHE_SIZE: usize = 16;

/// A core ledger service.
pub struct CoreLedgerService<N: Network, C: ConsensusStorage<N>> {
ledger: Ledger<N, C>,
coinbase_verifying_key: Arc<CoinbaseVerifyingKey<N>>,
committee_cache: Arc<Mutex<LruCache<u64, Committee<N>>>>,
shutdown: Arc<AtomicBool>,
}

impl<N: Network, C: ConsensusStorage<N>> CoreLedgerService<N, C> {
/// Initializes a new core ledger service.
pub fn new(ledger: Ledger<N, C>, shutdown: Arc<AtomicBool>) -> Self {
let coinbase_verifying_key = Arc::new(ledger.coinbase_puzzle().coinbase_verifying_key().clone());
Self { ledger, coinbase_verifying_key, shutdown }
let committee_cache = Arc::new(Mutex::new(LruCache::new(COMMITTEE_CACHE_SIZE.try_into().unwrap())));
Self { ledger, coinbase_verifying_key, committee_cache, shutdown }
}
}

Expand Down Expand Up @@ -127,9 +134,19 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
/// Returns the committee for the given round.
/// If the given round is in the future, then the current committee is returned.
fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>> {
// Check if the committee is already in the cache.
if let Some(committee) = self.committee_cache.lock().get(&round) {
return Ok(committee.clone());
}

match self.ledger.get_committee_for_round(round)? {
// Return the committee if it exists.
Some(committee) => Ok(committee),
Some(committee) => {
// Insert the committee into the cache.
self.committee_cache.lock().push(round, committee.clone());
// Return the committee.
Ok(committee)
}
// Return the current committee if the round is in the future.
None => {
// Retrieve the current committee.
Expand Down
6 changes: 6 additions & 0 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ impl<N: Network> BFT<N> {
&self,
leader_certificate: BatchCertificate<N>,
) -> Result<()> {
// Fetch the leader round.
let latest_leader_round = leader_certificate.round();
// Determine the list of all previous leader certificates since the last committed round.
// The order of the leader certificates is from **newest** to **oldest**.
let mut leader_certificates = vec![leader_certificate.clone()];
Expand Down Expand Up @@ -621,6 +623,10 @@ impl<N: Network> BFT<N> {
}
}
}

// Perform garbage collection based on the latest committed leader round.
self.storage().garbage_collect_certificates(latest_leader_round);

Ok(())
}

Expand Down
96 changes: 82 additions & 14 deletions node/bft/src/helpers/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,43 @@
// limitations under the License.

use crate::MAX_FETCH_TIMEOUT_IN_MS;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkvm::{console::network::Network, ledger::committee::Committee};

use parking_lot::{Mutex, RwLock};
use std::{
collections::{HashMap, HashSet},
hash::Hash,
net::SocketAddr,
sync::Arc,
};
use time::OffsetDateTime;
use tokio::sync::oneshot;

#[cfg(not(test))]
pub const NUM_REDUNDANT_REQUESTS: usize = 2;
#[cfg(test)]
pub const NUM_REDUNDANT_REQUESTS: usize = 10;

const CALLBACK_TIMEOUT_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS as i64 / 1000;

/// Returns the maximum number of redundant requests for the number of validators in the specified round.
pub fn max_redundant_requests<N: Network>(ledger: Arc<dyn LedgerService<N>>, round: u64) -> usize {
// Determine the number of validators in the committee lookback for the given round.
let num_validators = ledger
.get_committee_lookback_for_round(round)
.map(|committee| committee.num_members())
.ok()
.unwrap_or(Committee::<N>::MAX_COMMITTEE_SIZE as usize);

// Note: It is adequate to set this value to the availability threshold,
// as with high probability one will respond honestly (in the best and worst case
// with stake spread across the validators evenly and unevenly, respectively).
1 + num_validators.saturating_div(3)
}

#[derive(Debug)]
pub struct Pending<T: PartialEq + Eq + Hash, V: Clone> {
/// The map of pending `items` to `peer IPs` that have the item.
pending: RwLock<HashMap<T, HashSet<SocketAddr>>>,
/// The optional callback queue.
callbacks: Mutex<HashMap<T, Vec<(oneshot::Sender<V>, i64)>>>,
/// Each callback has a timeout and a flag indicating if it is associated with a sent request.
callbacks: Mutex<HashMap<T, Vec<(oneshot::Sender<V>, i64, bool)>>>,
}

impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Default for Pending<T, V> {
Expand Down Expand Up @@ -85,12 +99,29 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
self.callbacks.lock().get(&item).map_or(0, |callbacks| callbacks.len())
}

/// Returns the number of pending sent requests for the specified `item`.
pub fn num_sent_requests(&self, item: impl Into<T>) -> usize {
let item = item.into();
// Clear the callbacks that have expired.
self.clear_expired_callbacks_for_item(item);
// Return the number of live callbacks.
self.callbacks
.lock()
.get(&item)
.map_or(0, |callbacks| callbacks.iter().filter(|(_, _, request_sent)| *request_sent).count())
}

/// Inserts the specified `item` and `peer IP` to the pending queue,
/// returning `true` if the `peer IP` was newly-inserted into the entry for the `item`.
///
/// In addition, an optional `callback` may be provided, that is triggered upon removal.
/// Note: The callback, if provided, is **always** inserted into the callback queue.
pub fn insert(&self, item: impl Into<T>, peer_ip: SocketAddr, callback: Option<oneshot::Sender<V>>) -> bool {
pub fn insert(
&self,
item: impl Into<T>,
peer_ip: SocketAddr,
callback: Option<(oneshot::Sender<V>, bool)>,
) -> bool {
let item = item.into();
// Insert the peer IP into the pending queue.
let result = self.pending.write().entry(item).or_default().insert(peer_ip);
Expand All @@ -99,8 +130,12 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
self.clear_expired_callbacks_for_item(item);

// If a callback is provided, insert it into the callback queue.
if let Some(callback) = callback {
self.callbacks.lock().entry(item).or_default().push((callback, OffsetDateTime::now_utc().unix_timestamp()));
if let Some((callback, request_sent)) = callback {
self.callbacks.lock().entry(item).or_default().push((
callback,
OffsetDateTime::now_utc().unix_timestamp(),
request_sent,
));
}
// Return the result.
result
Expand All @@ -117,7 +152,7 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
if let Some(callbacks) = self.callbacks.lock().remove(&item) {
if let Some(callback_value) = callback_value {
// Send a notification to the callback.
for (callback, _) in callbacks {
for (callback, _, _) in callbacks {
callback.send(callback_value.clone()).ok();
}
}
Expand All @@ -133,7 +168,7 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
// Fetch the current timestamp.
let now = OffsetDateTime::now_utc().unix_timestamp();
// Remove the callbacks that have expired.
callbacks.retain(|(_, timestamp)| now - *timestamp <= CALLBACK_TIMEOUT_IN_SECS);
callbacks.retain(|(_, timestamp, _)| now - *timestamp <= CALLBACK_TIMEOUT_IN_SECS);
}
}
}
Expand All @@ -150,6 +185,8 @@ mod tests {

type CurrentNetwork = snarkvm::prelude::MainnetV0;

const ITERATIONS: usize = 100;

#[test]
fn test_pending() {
let rng = &mut TestRng::default();
Expand Down Expand Up @@ -233,13 +270,13 @@ mod tests {
let (callback_sender_3, _) = oneshot::channel();

// Insert the commitments.
assert!(pending.insert(commitment_1, addr_1, Some(callback_sender_1)));
assert!(pending.insert(commitment_1, addr_2, Some(callback_sender_2)));
assert!(pending.insert(commitment_1, addr_1, Some((callback_sender_1, true))));
assert!(pending.insert(commitment_1, addr_2, Some((callback_sender_2, true))));

// Sleep for a few seconds.
thread::sleep(Duration::from_secs(CALLBACK_TIMEOUT_IN_SECS as u64 - 1));

assert!(pending.insert(commitment_1, addr_3, Some(callback_sender_3)));
assert!(pending.insert(commitment_1, addr_3, Some((callback_sender_3, true))));

// Check that the number of callbacks has not changed.
assert_eq!(pending.num_callbacks(commitment_1), 3);
Expand All @@ -256,6 +293,37 @@ mod tests {
// Ensure that the expired callbacks have been removed.
assert_eq!(pending.num_callbacks(commitment_1), 0);
}

#[test]
fn test_num_sent_requests() {
let rng = &mut TestRng::default();

// Initialize the ready queue.
let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();

for _ in 0..ITERATIONS {
// Generate a commitment.
let commitment = TransmissionID::Solution(PuzzleCommitment::from_g1_affine(rng.gen()));
// Check if the number of sent requests is correct.
let mut expected_num_sent_requests = 0;
for i in 0..ITERATIONS {
// Generate a peer address.
let addr = SocketAddr::from(([127, 0, 0, 1], i as u16));
// Initialize a callback.
let (callback_sender, _) = oneshot::channel();
// Randomly determine if the callback is associated with a sent request.
let is_sent_request = rng.gen();
// Increment the expected number of sent requests.
if is_sent_request {
expected_num_sent_requests += 1;
}
// Insert the commitment.
assert!(pending.insert(commitment, addr, Some((callback_sender, is_sent_request))));
}
// Ensure that the number of sent requests is correct.
assert_eq!(pending.num_sent_requests(commitment), expected_num_sent_requests);
}
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions node/bft/src/helpers/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ mod prop_tests {
&signer.private_key,
committee.starting_round(),
now(),
committee.id(),
transmission_map.keys().cloned().collect(),
Default::default(),
&mut rng,
Expand Down
5 changes: 5 additions & 0 deletions node/bft/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ impl<N: Network> Storage<N> {
fn update_current_round(&self, next_round: u64) {
// Update the current round.
self.current_round.store(next_round, Ordering::SeqCst);
}

/// Update the storage by performing garbage collection based on the next round.
pub(crate) fn garbage_collect_certificates(&self, next_round: u64) {
// Fetch the current GC round.
let current_gc_round = self.gc_round();
// Compute the next GC round.
Expand Down Expand Up @@ -1107,6 +1110,7 @@ pub mod prop_tests {
selector: Selector,
) {
let CommitteeContext(committee, ValidatorSet(validators)) = context;
let committee_id = committee.id();

// Initialize the storage.
let ledger = Arc::new(MockLedgerService::new(committee));
Expand All @@ -1128,6 +1132,7 @@ pub mod prop_tests {
&signer.private_key,
0,
now(),
committee_id,
transmission_map.keys().cloned().collect(),
Default::default(),
&mut rng,
Expand Down

0 comments on commit c414dbf

Please sign in to comment.