Skip to content

Commit

Permalink
Merge pull request #44 from EspressoSystems/rm/delegated-sending
Browse files Browse the repository at this point in the history
Delegate sending, improve testing infrastructure
  • Loading branch information
rob-maron committed Jun 10, 2024
2 parents c701473 + 07b67b6 commit 737889a
Show file tree
Hide file tree
Showing 48 changed files with 1,781 additions and 1,131 deletions.
389 changes: 304 additions & 85 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cdn-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ path = "src/binaries/bad-broker.rs"

# This dependency is used for the Tokio console
[target.'cfg(tokio_unstable)'.dependencies]
console-subscriber = "0.2"
console-subscriber = "0.3"


[dependencies]
Expand Down
24 changes: 16 additions & 8 deletions cdn-broker/benches/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

use std::time::Duration;

use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::reexports::tests::{TestBroker, TestDefinition, TestRun, TestUser};
use cdn_broker::{assert_received, send_message_as};
use cdn_proto::connection::{protocols::Connection as _, Bytes};
use cdn_proto::connection::protocols::memory::Memory;
use cdn_proto::connection::Bytes;
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Broadcast, Message};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -49,11 +50,14 @@ fn bench_broadcast_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global.into()]),
TestUser::with_index(1, vec![TestTopic::Global.into()]),
],
connected_brokers: vec![],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Benchmark
Expand All @@ -71,14 +75,18 @@ fn bench_broadcast_broker(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![]],
connected_users: vec![TestUser::with_index(0, vec![])],
connected_brokers: vec![
(vec![], vec![TestTopic::Global as u8]),
(vec![], vec![TestTopic::Global as u8]),
TestBroker {
connected_users: vec![TestUser::with_index(1, vec![TestTopic::Global.into()])],
},
TestBroker {
connected_users: vec![TestUser::with_index(2, vec![TestTopic::Global.into()])],
},
],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Benchmark
Expand Down
48 changes: 31 additions & 17 deletions cdn-broker/benches/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

use std::time::Duration;

use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::{assert_received, send_message_as};
use cdn_proto::connection::{protocols::Connection as _, Bytes};
use cdn_broker::reexports::tests::{TestBroker, TestDefinition, TestRun, TestUser};
use cdn_broker::{assert_received, at_index, send_message_as};
use cdn_proto::connection::protocols::memory::Memory;
use cdn_proto::connection::Bytes;
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Direct, Message};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
Expand All @@ -15,7 +16,7 @@ use pprof::criterion::{Output, PProfProfiler};
async fn direct_user_to_self(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![0],
recipient: at_index![0],
message: vec![0; 10000],
});

Expand All @@ -29,7 +30,7 @@ async fn direct_user_to_self(run: &TestRun) {
async fn direct_user_to_user(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![1],
recipient: at_index![1],
message: vec![0; 10000],
});

Expand All @@ -43,7 +44,7 @@ async fn direct_user_to_user(run: &TestRun) {
async fn direct_user_to_broker(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![2],
recipient: at_index![2],
message: vec![0; 10000],
});

Expand All @@ -57,7 +58,7 @@ async fn direct_user_to_broker(run: &TestRun) {
async fn direct_broker_to_user(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![0],
recipient: at_index![0],
message: vec![0; 10000],
});

Expand All @@ -76,11 +77,11 @@ fn bench_direct_user_to_self(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8]],
connected_users: vec![TestUser::with_index(0, vec![TestTopic::Global as u8])],
connected_brokers: vec![],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand All @@ -99,11 +100,14 @@ fn bench_direct_user_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global as u8]),
TestUser::with_index(1, vec![TestTopic::Global as u8]),
],
connected_brokers: vec![],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand All @@ -122,11 +126,16 @@ fn bench_direct_user_to_broker(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global as u8]),
TestUser::with_index(1, vec![TestTopic::Global as u8]),
],
connected_brokers: vec![TestBroker {
connected_users: vec![TestUser::with_index(2, vec![TestTopic::Global as u8])],
}],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand All @@ -145,11 +154,16 @@ fn bench_direct_broker_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global as u8]),
TestUser::with_index(1, vec![TestTopic::Global as u8]),
],
connected_brokers: vec![TestBroker {
connected_users: vec![TestUser::with_index(0, vec![TestTopic::Global as u8])],
}],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand Down
2 changes: 2 additions & 0 deletions cdn-broker/src/binaries/bad-broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
use tokio::{spawn, time::sleep};
#[cfg(not(tokio_unstable))]
use tracing_subscriber::EnvFilter;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -68,6 +69,7 @@ async fn main() -> Result<()> {
public_advertise_endpoint: format!("local_ip:{public_port}"),
private_bind_endpoint: format!("0.0.0.0:{private_port}"),
private_advertise_endpoint: format!("local_ip:{private_port}"),
global_memory_pool_size: None,
};

// Create new `Broker`
Expand Down
9 changes: 9 additions & 0 deletions cdn-broker/src/binaries/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result
use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
#[cfg(not(tokio_unstable))]
use tracing_subscriber::EnvFilter;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -52,6 +53,13 @@ struct Args {
/// The seed for broker key generation
#[arg(short, long, default_value_t = 0)]
key_seed: u64,

/// The size of the global memory pool (in bytes). This is the maximum number of bytes that
/// can be allocated at once for all connections. A connection will block if it
/// tries to allocate more than this amount until some memory is freed.
/// Default is 1GB.
#[arg(long, default_value_t = 1_073_741_824)]
global_memory_pool_size: usize,
}

#[tokio::main]
Expand Down Expand Up @@ -96,6 +104,7 @@ async fn main() -> Result<()> {
public_advertise_endpoint: args.public_advertise_endpoint,
private_bind_endpoint: args.private_bind_endpoint,
private_advertise_endpoint: args.private_advertise_endpoint,
global_memory_pool_size: Some(args.global_memory_pool_size),
};

// Create new `Broker`
Expand Down
2 changes: 2 additions & 0 deletions cdn-broker/src/connections/broadcast/relational_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ impl<K: Hash + PartialEq + Eq + Clone, V: Hash + PartialEq + Eq + Clone> Relatio
}

#[cfg(test)]
// Makes tests more readable
#[allow(clippy::unnecessary_get_then_check)]
pub mod tests {
use super::RelationalMap;

Expand Down
Loading

0 comments on commit 737889a

Please sign in to comment.