Skip to content

Commit

Permalink
Merge pull request #40 from EspressoSystems/rm/push2
Browse files Browse the repository at this point in the history
Fix state post-refactor
  • Loading branch information
rob-maron committed May 3, 2024
2 parents a681906 + 6eae102 commit bde96db
Show file tree
Hide file tree
Showing 41 changed files with 1,362 additions and 591 deletions.
296 changes: 292 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tokio = { version = "1", default-features = false, features = [
"rt",
"rt-multi-thread",
"parking_lot",
"tracing"
] }
jf-primitives = { git = "https://github.com/EspressoSystems/jellyfish.git", tag = "0.4.2", default-features = false, features = [
"std",
Expand All @@ -31,7 +32,6 @@ async-std = { version = "1", default-features = false, features = [
] }
rkyv = { version = "0.7", features = ["validation"] }
derivative = "2"
parking_lot = "0.12"
rcgen = { version = "0.12", features = ["x509-parser", "pem"] }

# Dev dependencies (can't be defined explicitly in the workspace)
Expand Down
18 changes: 17 additions & 1 deletion cdn-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ harness = false
name = "broadcast"
harness = false

# The main broker binary
[[bin]]
name = "broker"
path = "src/binaries/broker.rs"

# The "bad" broker is a binary that tries to spam an actual broker with connections
[[bin]]
name = "bad-broker"
path = "src/binaries/bad-broker.rs"

# This dependency is used for the Tokio console
[target.'cfg(tokio_unstable)'.dependencies]
console-subscriber = "0.2"


[dependencies]
jf-primitives.workspace = true
cdn-proto = { path = "../cdn-proto", default-features = false, features = [
Expand All @@ -42,6 +57,7 @@ lazy_static = { workspace = true }
rkyv.workspace = true
derivative.workspace = true
dashmap = { version = "5", default-features = false }
parking_lot.workspace = true
rand.workspace = true
local-ip-address = "0.6"
parking_lot = "0.12"
portpicker = "0.1"
6 changes: 3 additions & 3 deletions cdn-broker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ WORKDIR /build
COPY . .

# Build our example (all of them for caching)
RUN cargo build --profile release -p cdn-broker -p cdn-client -p cdn-marshal
RUN cargo build --profile release --bin broker --bin client --bin marshal

# Use a minimal image for the final build
FROM debian:bookworm as RUNNER
Expand All @@ -18,7 +18,7 @@ RUN apt-get update && apt-get install libcurl4 -y
ENV RUST_LOG=info

# Copy the built binary from the builder image
COPY --from=BUILDER ./build/target/release/cdn-broker /bin/cdn-broker
COPY --from=BUILDER ./build/target/release/broker /bin/broker

# Set the entrypoint
ENTRYPOINT ["cdn-broker"]
ENTRYPOINT ["broker"]
85 changes: 85 additions & 0 deletions cdn-broker/src/binaries/bad-broker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//! `bad-broker` is a simple binary that starts a broker with a random key and
//! attempts to start a broker every 100ms. This is useful for testing the
//! broker's ability to handle multiple brokers connecting to it.
use std::time::Duration;

use cdn_broker::{Broker, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result};
use clap::Parser;
use jf_primitives::signatures::{
bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme,
};
use rand::{rngs::StdRng, SeedableRng};
use tokio::{spawn, time::sleep};
use tracing_subscriber::EnvFilter;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
/// The main component of the push CDN.
struct Args {
/// The discovery client endpoint (including scheme) to connect to.
/// With the local discovery feature, this is a file path.
/// With the remote (redis) discovery feature, this is a redis URL (e.g. `redis://127.0.0.1:6789`).
#[arg(short, long)]
discovery_endpoint: String,
}

#[tokio::main]
async fn main() -> Result<()> {
// Parse command line arguments
let args = Args::parse();

// Initialize tracing
#[cfg(not(tokio_unstable))]
if std::env::var("RUST_LOG_FORMAT") == Ok("json".to_string()) {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.json()
.init();
} else {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
}

#[cfg(tokio_unstable)]
console_subscriber::init();

// Forever, generate a new broker key and start a broker
loop {
// Generate the broker key from the supplied seed
let (private_key, public_key) = BLS::key_gen(&(), &mut StdRng::from_entropy()).unwrap();

// Two random ports
let public_port = portpicker::pick_unused_port().unwrap();
let private_port = portpicker::pick_unused_port().unwrap();

// Create config
let broker_config: Config<ProductionRunDef> = Config {
ca_cert_path: None,
ca_key_path: None,

discovery_endpoint: args.discovery_endpoint.clone(),
metrics_bind_endpoint: None,
keypair: KeyPair {
public_key,
private_key,
},

public_bind_endpoint: format!("0.0.0.0:{public_port}"),
public_advertise_endpoint: format!("local_ip:{public_port}"),
private_bind_endpoint: format!("0.0.0.0:{private_port}"),
private_advertise_endpoint: format!("local_ip:{private_port}"),
};

// Create new `Broker`
// Uses TCP from broker connections and Quic for user connections.
let broker = Broker::new(broker_config).await?;

// Start the main loop, consuming it
let jh = spawn(broker.start());

sleep(Duration::from_millis(300)).await;
jh.abort();
}
}
7 changes: 6 additions & 1 deletion cdn-broker/src/main.rs → cdn-broker/src/binaries/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ async fn main() -> Result<()> {
// Parse command line arguments
let args = Args::parse();

// Initialize tracing
// If we aren't on `tokio_unstable`, use the normal logger
#[cfg(not(tokio_unstable))]
if std::env::var("RUST_LOG_FORMAT") == Ok("json".to_string()) {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
Expand All @@ -73,6 +74,10 @@ async fn main() -> Result<()> {
.init();
}

// If we are using the `tokio_unstable` feature, use the console logger
#[cfg(tokio_unstable)]
console_subscriber::init();

// Generate the broker key from the supplied seed
let (private_key, public_key) =
BLS::key_gen(&(), &mut StdRng::seed_from_u64(args.key_seed)).unwrap();
Expand Down
85 changes: 2 additions & 83 deletions cdn-broker/src/connections/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,9 @@

mod relational_map;

use std::{collections::HashSet, sync::Arc};
use std::collections::HashSet;

use cdn_proto::{
connection::{Bytes, UserPublicKey},
def::RunDef,
discovery::BrokerIdentifier,
message::Topic,
mnemonic,
};
use tokio::spawn;
use tracing::debug;

use crate::Inner;
use cdn_proto::{connection::UserPublicKey, discovery::BrokerIdentifier, message::Topic};

use self::relational_map::RelationalMap;

Expand Down Expand Up @@ -44,74 +34,3 @@ impl BroadcastMap {
Self::default()
}
}

impl<Def: RunDef> Inner<Def> {
/// Send a broadcast message to both users and brokers. First figures out where the message
/// is supposed to go, and then sends it. We have `to_user_only` bounds so we can stop thrashing;
/// if we receive a message from a broker we should only be forwarding it to applicable users.
pub async fn handle_broadcast_message(
self: &Arc<Self>,
mut topics: Vec<Topic>,
message: &Bytes,
to_users_only: bool,
) {
// Deduplicate topics
topics.dedup();

// Aggregate recipients
let mut broker_recipients = HashSet::new();
let mut user_recipients = HashSet::new();

for topic in topics {
// If we can send to brokers, we should do it
if !to_users_only {
broker_recipients.extend(
self.connections
.read()
.await
.broadcast_map
.brokers
.get_keys_by_value(&topic),
);
}
user_recipients.extend(
self.connections
.read()
.await
.broadcast_map
.users
.get_keys_by_value(&topic),
);
}

debug!(
num_brokers = broker_recipients.len(),
num_users = user_recipients.len(),
msg = mnemonic(&**message),
"broadcast",
);

// If we can send to brokers, do so
if !to_users_only {
// Send to all brokers
for broker in broker_recipients {
let self_ = self.clone();
let broker_ = broker.clone();
let message_ = message.clone();
spawn(async move {
let _ = self_.send_to_broker(&broker_, message_).await;
});
}
}

// Send to all aggregated users
for user in user_recipients {
// Send to the corresponding user
let self_ = self.clone();
let message_ = message.clone();
spawn(async move {
let _ = self_.send_to_user(user, message_).await;
});
}
}
}
86 changes: 0 additions & 86 deletions cdn-broker/src/connections/broker.rs

This file was deleted.

Loading

0 comments on commit bde96db

Please sign in to comment.