Skip to content

Commit

Permalink
Merge pull request #2869 from AleoHQ/feat/max-transmissions
Browse files Browse the repository at this point in the history
Switches transmissions configs to snarkVM
  • Loading branch information
howardwu committed Jan 30, 2024
2 parents 3e0c683 + 6fe8adf commit 6e3e829
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 194 deletions.
172 changes: 58 additions & 114 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ members = [
]

[workspace.dependencies.snarkvm]
version = "=0.16.11"
git = "https://github.com/AleoHQ/snarkVM.git"
rev = "3300460"
#version = "=0.16.11"
features = [ "circuit", "console", "rocks" ]

[[bin]]
Expand Down
17 changes: 7 additions & 10 deletions node/bft/examples/simple_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,18 @@ use snarkos_node_bft::{
helpers::{init_consensus_channels, init_primary_channels, ConsensusReceiver, PrimarySender, Storage},
Primary,
BFT,
MAX_GC_ROUNDS,
MEMORY_POOL_PORT,
};
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkvm::{
ledger::{
committee::{Committee, MIN_VALIDATOR_STAKE},
narwhal::Data,
},
prelude::{
block::Transaction,
coinbase::{ProverSolution, PuzzleCommitment},
Field,
Network,
Uniform,
committee::{Committee, MIN_VALIDATOR_STAKE},
narwhal::{BatchHeader, Data},
},
prelude::{Field, Network, Uniform},
};

use ::bytes::Bytes;
Expand Down Expand Up @@ -116,7 +111,8 @@ pub async fn start_bft(
// Initialize the mock ledger service.
let ledger = Arc::new(MockLedgerService::new(committee));
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), MAX_GC_ROUNDS);
let storage =
Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS);
// Initialize the gateway IP and dev mode.
let (ip, dev) = match peers.get(&node_id) {
Some(ip) => (Some(*ip), None),
Expand Down Expand Up @@ -153,7 +149,8 @@ pub async fn start_primary(
// Initialize the mock ledger service.
let ledger = Arc::new(MockLedgerService::new(committee));
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), MAX_GC_ROUNDS);
let storage =
Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS);
// Initialize the gateway IP and dev mode.
let (ip, dev) = match peers.get(&node_id) {
Some(ip) => (Some(*ip), None),
Expand Down
27 changes: 16 additions & 11 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ use crate::{
events::{EventCodec, PrimaryPing},
helpers::{assign_to_worker, Cache, PrimarySender, Resolver, SyncSender, WorkerSender},
spawn_blocking,
Worker,
CONTEXT,
MAX_BATCH_DELAY_IN_MS,
MAX_GC_ROUNDS,
MAX_TRANSMISSIONS_PER_BATCH,
MAX_TRANSMISSIONS_PER_WORKER_PING,
MEMORY_POOL_PORT,
};
use snarkos_account::Account;
Expand Down Expand Up @@ -54,7 +52,10 @@ use snarkos_node_tcp::{
};
use snarkvm::{
console::prelude::*,
ledger::{committee::Committee, narwhal::Data},
ledger::{
committee::Committee,
narwhal::{BatchHeader, Data},
},
prelude::Address,
};

Expand Down Expand Up @@ -212,12 +213,12 @@ impl<N: Network> Gateway<N> {

/// The maximum number of certificate requests to cache.
fn max_cache_certificates(&self) -> usize {
2 * MAX_GC_ROUNDS as usize * self.max_committee_size()
2 * BatchHeader::<N>::MAX_GC_ROUNDS as usize * self.max_committee_size()
}

/// Thne maximum number of transmission requests to cache.
fn max_cache_transmissions(&self) -> usize {
self.max_cache_certificates() * MAX_TRANSMISSIONS_PER_BATCH
self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
}

/// The maximum number of duplicates for any particular request.
Expand Down Expand Up @@ -743,7 +744,7 @@ impl<N: Network> Gateway<N> {
Event::WorkerPing(ping) => {
// Ensure the number of transmissions is not too large.
ensure!(
ping.transmission_ids.len() <= MAX_TRANSMISSIONS_PER_WORKER_PING,
ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
"{CONTEXT} Received too many transmissions"
);
// Retrieve the number of workers.
Expand Down Expand Up @@ -977,8 +978,10 @@ impl<N: Network> Reading for Gateway<N> {
type Message = Event<N>;

/// The maximum queue depth of incoming messages for a single peer.
const MESSAGE_QUEUE_DEPTH: usize =
2 * MAX_GC_ROUNDS as usize * Committee::<N>::MAX_COMMITTEE_SIZE as usize * MAX_TRANSMISSIONS_PER_BATCH;
const MESSAGE_QUEUE_DEPTH: usize = 2
* BatchHeader::<N>::MAX_GC_ROUNDS as usize
* Committee::<N>::MAX_COMMITTEE_SIZE as usize
* BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH;

/// Creates a [`Decoder`] used to interpret messages from the network.
/// The `side` param indicates the connection side **from the node's perspective**.
Expand Down Expand Up @@ -1010,8 +1013,10 @@ impl<N: Network> Writing for Gateway<N> {
type Message = Event<N>;

/// The maximum queue depth of outgoing messages for a single peer.
const MESSAGE_QUEUE_DEPTH: usize =
2 * MAX_GC_ROUNDS as usize * Committee::<N>::MAX_COMMITTEE_SIZE as usize * MAX_TRANSMISSIONS_PER_BATCH;
const MESSAGE_QUEUE_DEPTH: usize = 2
* BatchHeader::<N>::MAX_GC_ROUNDS as usize
* Committee::<N>::MAX_COMMITTEE_SIZE as usize
* BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH;

/// Creates an [`Encoder`] used to write the outbound messages to the target stream.
/// The `side` parameter indicates the connection side **from the node's perspective**.
Expand Down
11 changes: 4 additions & 7 deletions node/bft/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,17 +935,14 @@ mod tests {
#[cfg(test)]
pub mod prop_tests {
use super::*;
use crate::{
helpers::{now, storage::tests::assert_storage},
MAX_GC_ROUNDS,
};
use crate::helpers::{now, storage::tests::assert_storage};
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkvm::{
ledger::{
coinbase::PuzzleCommitment,
committee::prop_tests::{CommitteeContext, ValidatorSet},
narwhal::Data,
narwhal::{BatchHeader, Data},
},
prelude::{Signature, Uniform},
};
Expand All @@ -970,7 +967,7 @@ pub mod prop_tests {
type Strategy = BoxedStrategy<Storage<CurrentNetwork>>;

fn arbitrary() -> Self::Strategy {
(any::<CommitteeContext>(), 0..MAX_GC_ROUNDS)
(any::<CommitteeContext>(), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS)
.prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
let ledger = Arc::new(MockLedgerService::new(committee));
Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
Expand All @@ -979,7 +976,7 @@ pub mod prop_tests {
}

fn arbitrary_with(context: Self::Parameters) -> Self::Strategy {
(Just(context), 0..MAX_GC_ROUNDS)
(Just(context), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS)
.prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
let ledger = Arc::new(MockLedgerService::new(committee));
Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
Expand Down
18 changes: 1 addition & 17 deletions node/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,8 @@ pub const MAX_GC_ROUNDS: u64 = 50; // rounds
pub const MAX_LEADER_CERTIFICATE_DELAY_IN_SECS: i64 = 2 * MAX_BATCH_DELAY_IN_MS as i64 / 1000; // seconds
/// The maximum number of seconds before the timestamp is considered expired.
pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds
/// The maximum number of transmissions allowed in a batch.
pub const MAX_TRANSMISSIONS_PER_BATCH: usize = 250; // transmissions
/// The maximum number of transmissions allowed in a worker ping.
pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = MAX_TRANSMISSIONS_PER_BATCH / 10; // transmissions
/// The maximum number of workers that can be spawned.
pub const MAX_WORKERS: u8 = 1; // workers
pub const MAX_WORKERS: u8 = 1; // worker(s)

/// The frequency at which each primary broadcasts a ping to every other node.
pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms
Expand All @@ -76,15 +72,3 @@ macro_rules! spawn_blocking {
}
};
}

#[cfg(test)]
mod tests {
use super::*;

type CurrentNetwork = snarkvm::console::network::Testnet3;

#[test]
fn test_max_gc_rounds() {
assert_eq!(MAX_GC_ROUNDS as usize, snarkvm::ledger::narwhal::Subdag::<CurrentNetwork>::MAX_ROUNDS);
}
}
3 changes: 1 addition & 2 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::{
Transport,
Worker,
MAX_BATCH_DELAY_IN_MS,
MAX_TRANSMISSIONS_PER_BATCH,
MAX_WORKERS,
PRIMARY_PING_IN_MS,
WORKER_PING_IN_MS,
Expand Down Expand Up @@ -379,7 +378,7 @@ impl<N: Network> Primary<N> {
}

// Determined the required number of transmissions per worker.
let num_transmissions_per_worker = MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
// Initialize the map of transmissions.
let mut transmissions: IndexMap<_, _> = Default::default();
// Initialize a tracker for the number of transactions.
Expand Down
26 changes: 17 additions & 9 deletions node/bft/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@ use crate::{
ProposedBatch,
Transport,
MAX_BATCH_DELAY_IN_MS,
MAX_TRANSMISSIONS_PER_BATCH,
MAX_TRANSMISSIONS_PER_WORKER_PING,
MAX_WORKERS,
};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkvm::{
console::prelude::*,
ledger::narwhal::{Data, Transmission, TransmissionID},
prelude::{
ledger::{
block::Transaction,
coinbase::{ProverSolution, PuzzleCommitment},
narwhal::{BatchHeader, Data, Transmission, TransmissionID},
},
};

Expand All @@ -37,8 +35,6 @@ use parking_lot::Mutex;
use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{sync::oneshot, task::JoinHandle, time::timeout};

const MAX_TRANSMISSIONS_PER_WORKER: usize = MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;

#[derive(Clone)]
pub struct Worker<N: Network> {
/// The worker ID.
Expand Down Expand Up @@ -97,6 +93,14 @@ impl<N: Network> Worker<N> {
}

impl<N: Network> Worker<N> {
/// The maximum number of transmissions allowed in a worker.
pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
/// The maximum number of transmissions allowed in a worker ping.
pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;

// transmissions

/// Returns the number of transmissions in the ready queue.
pub fn num_transmissions(&self) -> usize {
self.ready.num_transmissions()
Expand Down Expand Up @@ -209,8 +213,12 @@ impl<N: Network> Worker<N> {
/// Broadcasts a worker ping event.
pub(crate) fn broadcast_ping(&self) {
// Retrieve the transmission IDs.
let transmission_ids =
self.ready.transmission_ids().into_iter().take(MAX_TRANSMISSIONS_PER_WORKER_PING).collect::<IndexSet<_>>();
let transmission_ids = self
.ready
.transmission_ids()
.into_iter()
.take(Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
.collect::<IndexSet<_>>();

// Broadcast the ping event.
if !transmission_ids.is_empty() {
Expand All @@ -228,7 +236,7 @@ impl<N: Network> Worker<N> {
}
// If the ready queue is full, then skip this transmission.
// Note: We must prioritize the unconfirmed solutions and unconfirmed transactions, not transmissions.
if self.ready.num_transmissions() > MAX_TRANSMISSIONS_PER_WORKER {
if self.ready.num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
return;
}
// Attempt to fetch the transmission from the peer.
Expand Down
28 changes: 12 additions & 16 deletions node/bft/tests/common/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,21 @@ use snarkos_node_bft::{
Primary,
BFT,
MAX_BATCH_DELAY_IN_MS,
MAX_GC_ROUNDS,
};
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkvm::{
console::algorithms::BHP256,
console::{
account::{Address, PrivateKey},
algorithms::{Hash, BHP256},
},
ledger::{
block::Block,
committee::{Committee, MIN_VALIDATOR_STAKE},
Ledger,
},
prelude::{
narwhal::BatchHeader,
store::{helpers::memory::ConsensusMemory, ConsensusStore},
Address,
CryptoRng,
FromBytes,
Hash,
PrivateKey,
Rng,
TestRng,
ToBits,
ToBytes,
VM,
Ledger,
},
prelude::{CryptoRng, FromBytes, Rng, TestRng, ToBits, ToBytes, VM},
utilities::to_bytes_le,
};

Expand Down Expand Up @@ -152,7 +144,11 @@ impl TestNetwork {
let mut rng = TestRng::fixed(id as u64);
let gen_ledger = genesis_ledger(gen_key, committee.clone(), balances.clone(), &mut rng);
let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger));
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), MAX_GC_ROUNDS);
let storage = Storage::new(
ledger.clone(),
Arc::new(BFTMemoryService::new()),
BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS,
);

let (primary, bft) = if config.bft {
let bft = BFT::<CurrentNetwork>::new(account, storage, ledger, None, &[], Some(id as u16)).unwrap();
Expand Down
12 changes: 5 additions & 7 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@ use snarkos_node_bft::{
},
spawn_blocking,
BFT,
MAX_GC_ROUNDS,
MAX_TRANSMISSIONS_PER_BATCH,
};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_bft_storage_service::BFTPersistentStorage;
use snarkvm::{
ledger::{
block::Transaction,
coinbase::{ProverSolution, PuzzleCommitment},
narwhal::{Data, Subdag, Transmission, TransmissionID},
narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
},
prelude::*,
};
Expand Down Expand Up @@ -86,7 +84,7 @@ impl<N: Network> Consensus<N> {
// Initialize the Narwhal transmissions.
let transmissions = Arc::new(BFTPersistentStorage::open(dev)?);
// Initialize the Narwhal storage.
let storage = NarwhalStorage::new(ledger.clone(), transmissions, MAX_GC_ROUNDS);
let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS);
// Initialize the BFT.
let bft = BFT::new(account, storage, ledger.clone(), ip, trusted_validators, dev)?;
// Return the consensus.
Expand Down Expand Up @@ -202,7 +200,7 @@ impl<N: Network> Consensus<N> {

// If the memory pool of this node is full, return early.
let num_unconfirmed = self.num_unconfirmed_transmissions();
if num_unconfirmed > N::MAX_SOLUTIONS || num_unconfirmed > MAX_TRANSMISSIONS_PER_BATCH {
if num_unconfirmed > N::MAX_SOLUTIONS || num_unconfirmed > BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
return Ok(());
}
// Retrieve the solutions.
Expand Down Expand Up @@ -256,13 +254,13 @@ impl<N: Network> Consensus<N> {

// If the memory pool of this node is full, return early.
let num_unconfirmed = self.num_unconfirmed_transmissions();
if num_unconfirmed > MAX_TRANSMISSIONS_PER_BATCH {
if num_unconfirmed > BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
return Ok(());
}
// Retrieve the transactions.
let transactions = {
// Determine the available capacity.
let capacity = MAX_TRANSMISSIONS_PER_BATCH.saturating_sub(num_unconfirmed);
let capacity = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH.saturating_sub(num_unconfirmed);
// Acquire the lock on the queue.
let mut queue = self.transactions_queue.lock();
// Determine the number of transactions to send.
Expand Down

0 comments on commit 6e3e829

Please sign in to comment.