From 5110e0e6821d2f18afba7bc22aaa5afbd0aad805 Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 2 May 2024 16:04:36 -0400 Subject: [PATCH 01/13] fix everything --- Cargo.lock | 296 +++++++++++++++++- Cargo.toml | 1 - cdn-broker/Cargo.toml | 13 +- cdn-broker/src/connections/broadcast/mod.rs | 85 +---- cdn-broker/src/connections/broker.rs | 86 ----- cdn-broker/src/connections/direct/mod.rs | 70 +---- cdn-broker/src/connections/mod.rs | 110 ++++++- cdn-broker/src/connections/user.rs | 64 ---- cdn-broker/src/handlers/mod.rs | 5 - cdn-broker/src/lib.rs | 24 +- cdn-broker/src/main.rs | 7 +- .../broker.rs => tasks/broker/handler.rs} | 178 ++++++++--- cdn-broker/src/tasks/broker/heartbeat.rs | 104 ++++++ .../listener.rs} | 1 - cdn-broker/src/tasks/broker/mod.rs | 5 + cdn-broker/src/tasks/broker/sender.rs | 61 ++++ cdn-broker/src/tasks/{ => broker}/sync.rs | 47 ++- cdn-broker/src/tasks/heartbeat.rs | 90 ------ cdn-broker/src/tasks/mod.rs | 6 +- .../user.rs => tasks/user/handler.rs} | 38 +-- .../{user_listener.rs => user/listener.rs} | 0 cdn-broker/src/tasks/user/mod.rs | 3 + cdn-broker/src/tasks/user/sender.rs | 40 +++ cdn-broker/src/test-binaries/bad-broker.rs | 121 +++++++ cdn-broker/src/tests/mod.rs | 13 +- cdn-client/Cargo.toml | 10 + cdn-client/src/test-binaries/bad-connector.rs | 66 ++++ cdn-client/src/test-binaries/bad-sender.rs | 94 ++++++ cdn-proto/Cargo.toml | 1 - cdn-proto/src/connection/auth/broker.rs | 10 +- cdn-proto/src/connection/protocols/quic.rs | 20 +- cdn-proto/src/connection/protocols/tcp.rs | 10 +- process-compose.yaml | 35 ++- 33 files changed, 1145 insertions(+), 569 deletions(-) delete mode 100644 cdn-broker/src/connections/broker.rs delete mode 100644 cdn-broker/src/connections/user.rs delete mode 100644 cdn-broker/src/handlers/mod.rs rename cdn-broker/src/{handlers/broker.rs => tasks/broker/handler.rs} (51%) create mode 100644 cdn-broker/src/tasks/broker/heartbeat.rs rename cdn-broker/src/tasks/{broker_listener.rs => broker/listener.rs} (93%) create mode 100644 cdn-broker/src/tasks/broker/mod.rs create mode 100644 cdn-broker/src/tasks/broker/sender.rs rename cdn-broker/src/tasks/{ => broker}/sync.rs (73%) delete mode 100644 cdn-broker/src/tasks/heartbeat.rs rename cdn-broker/src/{handlers/user.rs => tasks/user/handler.rs} (76%) rename cdn-broker/src/tasks/{user_listener.rs => user/listener.rs} (100%) create mode 100644 cdn-broker/src/tasks/user/mod.rs create mode 100644 cdn-broker/src/tasks/user/sender.rs create mode 100644 cdn-broker/src/test-binaries/bad-broker.rs create mode 100644 cdn-client/src/test-binaries/bad-connector.rs create mode 100644 cdn-client/src/test-binaries/bad-sender.rs diff --git a/Cargo.lock b/Cargo.lock index 9db4347..5301238 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,6 +510,28 @@ dependencies = [ "async-global-executor", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "async-task" version = "4.7.0" @@ -548,6 +570,51 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -734,6 +801,7 @@ dependencies = [ "async-std", "cdn-proto", "clap", + "console-subscriber", "criterion", "dashmap", "derivative", @@ -741,6 +809,7 @@ dependencies = [ "lazy_static", "local-ip-address", "parking_lot", + "portpicker", "pprof", "prometheus", "rand", @@ -792,7 +861,6 @@ dependencies = [ "lazy_static", "mnemonic", "mockall", - "parking_lot", "pem", "portpicker", "pprof", @@ -948,6 +1016,43 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console-api" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber 0.3.18", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -993,6 +1098,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.5.1" @@ -1031,6 +1145,15 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -1372,6 +1495,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "flate2" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.11.0" @@ -1569,7 +1702,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 2.2.6", "slab", "tokio", "tokio-util", @@ -1623,6 +1756,19 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.3.9" @@ -1735,6 +1881,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.28" @@ -1759,6 +1911,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "idna" version = "0.5.0" @@ -1769,6 +1933,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.2.6" @@ -1786,7 +1960,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "321f0f839cd44a4686e9504b0a62b4d69a50b62072144c71c68f5873c167b8d9" dependencies = [ "ahash 0.8.11", - "indexmap", + "indexmap 2.2.6", "is-terminal", "itoa", "log", @@ -2028,6 +2202,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -2604,6 +2784,38 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.60", +] + +[[package]] +name = "prost-types" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -2994,6 +3206,12 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustversion" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47" + [[package]] name = "ryu" version = "1.0.17" @@ -3271,7 +3489,7 @@ dependencies = [ "futures-util", "hashlink", "hex", - "indexmap", + "indexmap 2.2.6", "log", "memchr", "once_cell", @@ -3515,6 +3733,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "synstructure" version = "0.12.6" @@ -3698,9 +3922,20 @@ dependencies = [ "pin-project-lite", "socket2", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -3748,6 +3983,59 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" diff --git a/Cargo.toml b/Cargo.toml index ccb0696..9b5f556 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,6 @@ async-std = { version = "1", default-features = false, features = [ ] } rkyv = { version = "0.7", features = ["validation"] } derivative = "2" -parking_lot = "0.12" rcgen = { version = "0.12", features = ["x509-parser", "pem"] } # Dev dependencies (can't be defined explicitly in the workspace) diff --git a/cdn-broker/Cargo.toml b/cdn-broker/Cargo.toml index 2dfed19..e75c9d9 100644 --- a/cdn-broker/Cargo.toml +++ b/cdn-broker/Cargo.toml @@ -27,6 +27,16 @@ harness = false name = "broadcast" harness = false +# The "bad" broker is a binary that tries to spam an actual broker with connections +[[bin]] +name = "bad-broker" +path = "src/test-binaries/bad-broker.rs" + +# This dependency is used for the Tokio console +[target.'cfg(tokio_unstable)'.dependencies] +console-subscriber = "0.2" + + [dependencies] jf-primitives.workspace = true cdn-proto = { path = "../cdn-proto", default-features = false, features = [ @@ -42,6 +52,7 @@ lazy_static = { workspace = true } rkyv.workspace = true derivative.workspace = true dashmap = { version = "5", default-features = false } -parking_lot.workspace = true rand.workspace = true local-ip-address = "0.6" +parking_lot = "0.12" +portpicker = "0.1" diff --git a/cdn-broker/src/connections/broadcast/mod.rs b/cdn-broker/src/connections/broadcast/mod.rs index 01721ab..32ba9bb 100644 --- a/cdn-broker/src/connections/broadcast/mod.rs +++ b/cdn-broker/src/connections/broadcast/mod.rs @@ -2,19 +2,9 @@ mod relational_map; -use std::{collections::HashSet, sync::Arc}; +use std::collections::HashSet; -use cdn_proto::{ - connection::{Bytes, UserPublicKey}, - def::RunDef, - discovery::BrokerIdentifier, - message::Topic, - mnemonic, -}; -use tokio::spawn; -use tracing::debug; - -use crate::Inner; +use cdn_proto::{connection::UserPublicKey, discovery::BrokerIdentifier, message::Topic}; use self::relational_map::RelationalMap; @@ -44,74 +34,3 @@ impl BroadcastMap { Self::default() } } - -impl Inner { - /// Send a broadcast message to both users and brokers. First figures out where the message - /// is supposed to go, and then sends it. We have `to_user_only` bounds so we can stop thrashing; - /// if we receive a message from a broker we should only be forwarding it to applicable users. - pub async fn handle_broadcast_message( - self: &Arc, - mut topics: Vec, - message: &Bytes, - to_users_only: bool, - ) { - // Deduplicate topics - topics.dedup(); - - // Aggregate recipients - let mut broker_recipients = HashSet::new(); - let mut user_recipients = HashSet::new(); - - for topic in topics { - // If we can send to brokers, we should do it - if !to_users_only { - broker_recipients.extend( - self.connections - .read() - .await - .broadcast_map - .brokers - .get_keys_by_value(&topic), - ); - } - user_recipients.extend( - self.connections - .read() - .await - .broadcast_map - .users - .get_keys_by_value(&topic), - ); - } - - debug!( - num_brokers = broker_recipients.len(), - num_users = user_recipients.len(), - msg = mnemonic(&**message), - "broadcast", - ); - - // If we can send to brokers, do so - if !to_users_only { - // Send to all brokers - for broker in broker_recipients { - let self_ = self.clone(); - let broker_ = broker.clone(); - let message_ = message.clone(); - spawn(async move { - let _ = self_.send_to_broker(&broker_, message_).await; - }); - } - } - - // Send to all aggregated users - for user in user_recipients { - // Send to the corresponding user - let self_ = self.clone(); - let message_ = message.clone(); - spawn(async move { - let _ = self_.send_to_user(user, message_).await; - }); - } - } -} diff --git a/cdn-broker/src/connections/broker.rs b/cdn-broker/src/connections/broker.rs deleted file mode 100644 index 9f64ccb..0000000 --- a/cdn-broker/src/connections/broker.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::sync::Arc; - -use cdn_proto::{ - connection::{protocols::Connection, Bytes}, - def::RunDef, - discovery::BrokerIdentifier, - error::{Error, Result}, -}; -use tokio::spawn; -use tracing::error; - -use crate::Inner; - -impl Inner { - /// Asynchronously send a message to all currently connected brokers. On failure, - /// the broker will be removed. - pub async fn spawn_send_to_brokers(self: &Arc, message: &Bytes) { - // For each broker, - for connection in &self.connections.read().await.brokers { - // Clone things we will need downstream - let message = message.clone(); - let broker_identifier = connection.0.clone(); - let connection = connection.1 .0.clone(); - let connections = self.connections.clone(); - - // Spawn a task to send the message - spawn(async move { - if let Err(err) = connection.send_message_raw(message).await { - // If we fail, remove the broker from our map. - error!("failed to send message to broker: {err}"); - connections - .write() - .await - .remove_broker(&broker_identifier, "failed to send message"); - }; - }); - } - } - - /// Send a message to a particular broker. If it fails, it removes the broker from all maps. - /// Awaits on message acknowledgement and returns the error if it does - pub async fn send_to_broker( - self: &Arc, - broker_identifier: &BrokerIdentifier, - message: Bytes, - ) -> Result<()> { - let connections_read_guard = self.connections.read().await; - // If we are connected to them, - if let Some((connection, _)) = connections_read_guard.brokers.get(broker_identifier) { - // Send the message - if let Err(err) = connection.send_message_raw(message).await { - // Remove them if we failed to send it - error!("failed to send message to broker: {err}"); - - // Drop the read guard before acquiring the write lock - drop(connections_read_guard); - - self.connections - .write() - .await - .remove_broker(broker_identifier, "failed to send message"); - - // Return an error - return Err(Error::Connection( - "failed to send message to broker".to_string(), - )); - }; - } else { - // Drop the read guard before acquiring the write lock - drop(connections_read_guard); - - // Remove the broker if they are not connected - self.connections - .write() - .await - .remove_broker(broker_identifier, "not connected"); - - // Return an error - return Err(Error::Connection( - "failed to send message to broker".to_string(), - )); - } - - Ok(()) - } -} diff --git a/cdn-broker/src/connections/direct/mod.rs b/cdn-broker/src/connections/direct/mod.rs index 2703348..ec9c273 100644 --- a/cdn-broker/src/connections/direct/mod.rs +++ b/cdn-broker/src/connections/direct/mod.rs @@ -2,78 +2,10 @@ mod versioned_map; -use std::sync::Arc; - -use cdn_proto::{ - connection::{Bytes, UserPublicKey}, - def::RunDef, - discovery::BrokerIdentifier, - mnemonic, -}; -use tokio::spawn; -use tracing::{debug, warn}; - -use crate::Inner; +use cdn_proto::{connection::UserPublicKey, discovery::BrokerIdentifier}; use self::versioned_map::VersionedMap; /// We define the direct map as just a type alias of a `VersionedMap`, which // deals with version vectors. pub type DirectMap = VersionedMap; - -impl Inner { - /// Send a direct message to either a user or a broker. First figures out where the message - /// is supposed to go, and then sends it. We have `to_user_only` bounds so we can stop thrashing; - /// if we receive a message from a broker we should only be forwarding it to applicable users. - pub async fn handle_direct_message( - self: &Arc, - user_public_key: UserPublicKey, - message: Bytes, - to_user_only: bool, - ) { - // Look up from our map - if let Some(broker_identifier) = self - .connections - .read() - .await - .direct_map - .get(&user_public_key) - { - if *broker_identifier == self.connections.read().await.identity { - // We own the user, send it this way - debug!( - user = mnemonic(&user_public_key), - msg = mnemonic(&*message), - "direct", - ); - - // Send to the corresponding user - let self_ = self.clone(); - spawn(async move { - let _ = self_.send_to_user(user_public_key, message).await; - }); - } else { - // If we don't have the stipulation to send it to ourselves only - // This is so we don't thrash between brokers - if !to_user_only { - debug!( - broker = %broker_identifier, - msg = mnemonic(&*message), - "direct", - ); - - // Asynchronously send to the broker responsible - let self_ = self.clone(); - let broker_identifier_ = broker_identifier.clone(); - spawn(async move { - let _ = self_.send_to_broker(&broker_identifier_, message).await; - }); - } - } - } else { - // Warning if the recipient user did not exist. - // TODO: Add sync in here to prevent forking. This is likely a problem. - warn!(id = mnemonic(&user_public_key), "user did not exist in map"); - } - } -} diff --git a/cdn-broker/src/connections/mod.rs b/cdn-broker/src/connections/mod.rs index 81117bd..bd213e6 100644 --- a/cdn-broker/src/connections/mod.rs +++ b/cdn-broker/src/connections/mod.rs @@ -21,17 +21,14 @@ use self::broadcast::BroadcastMap; mod broadcast; mod direct; -mod broker; -mod user; - pub struct Connections { // Our identity. Used for versioned vector conflict resolution. identity: BrokerIdentifier, // The current users connected to us - users: HashMap, AbortHandle)>, + users: HashMap, Vec)>, // The current brokers connected to us - brokers: HashMap, AbortHandle)>, + brokers: HashMap, Vec)>, // The versioned vector for looking up where direct messages should go direct_map: DirectMap, @@ -52,6 +49,88 @@ impl Connections { } } + /// Get the broker identifier for a given user (if it exists) + pub fn get_broker_identifier_of_user(&self, user: &UserPublicKey) -> Option { + self.direct_map.get(user).cloned() + } + + /// Get the broker connection for a given broker identifier (cloned) + pub fn get_broker_connection( + &self, + broker_identifier: &BrokerIdentifier, + ) -> Option> { + self.brokers.get(broker_identifier).map(|(c, _)| c.clone()) + } + + /// Get the connection for a given user public key (cloned) + pub fn get_user_connection(&self, user: &UserPublicKey) -> Option> { + self.users.get(user).map(|(c, _)| c.clone()) + } + + /// Get all broker identifiers that we are connected to + pub fn get_broker_identifiers(&self) -> Vec { + self.brokers.keys().cloned().collect() + } + + /// Add a task to the list of tasks for a broker. This is used to + /// cancel the task if the broker disconnects. + /// TODO: macro this? + pub fn add_broker_task(&mut self, broker_identifier: &BrokerIdentifier, handle: AbortHandle) { + if let Some((_, handles)) = self.brokers.get_mut(broker_identifier) { + // If the broker exists, add the handle to the list of tasks + handles.push(handle); + } else { + // Otherwise, cancel the task + handle.abort(); + } + } + + /// Add a task to the list of tasks for a user. This is used to + /// cancel the task if the user disconnects. + /// TODO: macro this? + pub fn add_user_task(&mut self, user: &UserPublicKey, handle: AbortHandle) { + if let Some((_, handles)) = self.users.get_mut(user) { + // If the user exists, add the handle to the list of tasks + handles.push(handle); + } else { + // Otherwise, cancel the task + handle.abort(); + } + } + + /// Get all users and brokers interested in a list of topics. + pub fn get_interested_by_topic( + &self, + topics: &Vec, + to_users_only: bool, + ) -> (Vec, Vec) { + // Aggregate recipients + let mut broker_recipients = HashSet::new(); + let mut user_recipients = HashSet::new(); + + // For each topic + for topic in topics { + // Get all users interested in the topic + for user in self.broadcast_map.users.get_keys_by_value(topic) { + user_recipients.insert(user); + } + + // If we want to send to brokers as well, + if !to_users_only { + // Get all brokers interested in the topic + for broker in self.broadcast_map.brokers.get_keys_by_value(topic) { + broker_recipients.insert(broker); + } + } + } + + // Return the recipients + ( + broker_recipients.into_iter().collect(), + user_recipients.into_iter().collect(), + ) + } + /// Get the number of users connected to us at any given time. pub fn num_users(&self) -> usize { self.users.len() @@ -129,7 +208,8 @@ impl Connections { // Remove the old broker if it exists self.remove_broker(&broker_identifier, "already existed"); - self.brokers.insert(broker_identifier, (connection, handle)); + self.brokers + .insert(broker_identifier, (connection, vec![handle])); } /// Insert a user into our map. Updates the versioned vector that @@ -150,7 +230,7 @@ impl Connections { // Add to our map. Remove the old one if it exists self.users - .insert(user_public_key.clone(), (connection, handle)); + .insert(user_public_key.clone(), (connection, vec![handle])); // Insert into our direct map self.direct_map @@ -166,13 +246,15 @@ impl Connections { /// from our broadcast map, in case they were subscribed to any topics. pub fn remove_broker(&mut self, broker_identifier: &BrokerIdentifier, reason: &str) { // Remove from broker list, cancelling the previous task if it exists - if let Some(previous_handle) = self.brokers.remove(broker_identifier).map(|(_, h)| h) { + if let Some(task_handles) = self.brokers.remove(broker_identifier).map(|(_, h)| h) { // Decrement the metric for the number of brokers connected metrics::NUM_BROKERS_CONNECTED.dec(); error!(id = %broker_identifier, reason = reason, "broker disconnected"); - // Cancel the broker's task - previous_handle.abort(); + // Cancel all tasks + for handle in task_handles { + handle.abort(); + } }; // Remove from all topics @@ -188,7 +270,7 @@ impl Connections { /// to send us messages for a disconnected user. pub fn remove_user(&mut self, user_public_key: UserPublicKey, reason: &str) { // Remove from user list, returning the previous handle if it exists - if let Some(previous_handle) = self.users.remove(&user_public_key).map(|(_, h)| h) { + if let Some(task_handles) = self.users.remove(&user_public_key).map(|(_, h)| h) { // Decrement the metric for the number of users connected metrics::NUM_USERS_CONNECTED.dec(); warn!( @@ -197,8 +279,10 @@ impl Connections { "user disconnected" ); - // Cancel the user's task - previous_handle.abort(); + // Cancel all tasks + for handle in task_handles { + handle.abort(); + } }; // Remove from user topics diff --git a/cdn-broker/src/connections/user.rs b/cdn-broker/src/connections/user.rs deleted file mode 100644 index ed2f0a7..0000000 --- a/cdn-broker/src/connections/user.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::sync::Arc; - -use cdn_proto::{ - connection::{protocols::Connection, Bytes, UserPublicKey}, - def::RunDef, - error::{Error, Result}, -}; -use tracing::warn; - -use crate::Inner; - -impl Inner { - /// Send a message to a user connected to us. - /// If it fails, the user is removed from our map and an error is returned - /// TODO: function this? - pub async fn send_to_user( - self: &Arc, - user_public_key: UserPublicKey, - message: Bytes, - ) -> Result<()> { - // Acquire the read guard for connections - let connections_read_guard = self.connections.read().await; - // See if the user is connected - if let Some((connection, _)) = connections_read_guard.users.get(&user_public_key) { - // If they are, clone things we will need - let connection = connection.clone(); - let connections = self.connections.clone(); - - // Send the message - if let Err(err) = connection.send_message_raw(message).await { - // Drop the read guard - drop(connections_read_guard); - - // If we fail to send the message, remove the user. - warn!("failed to send message to user: {err}"); - connections - .write() - .await - .remove_user(user_public_key, "failed to send message"); - - // Return an error - return Err(Error::Connection( - "failed to send message to broker".to_string(), - )); - }; - } else { - // Drop the read guard - drop(connections_read_guard); - - // Remove the user if they are not connected - self.connections - .write() - .await - .remove_user(user_public_key, "not connected"); - - // Return an error - return Err(Error::Connection( - "failed to send message to user".to_string(), - )); - } - - Ok(()) - } -} diff --git a/cdn-broker/src/handlers/mod.rs b/cdn-broker/src/handlers/mod.rs deleted file mode 100644 index 8f1a216..0000000 --- a/cdn-broker/src/handlers/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! This file defines the handler module, wherein we define connection handlers for -//! `Arc`. - -pub mod broker; -pub mod user; diff --git a/cdn-broker/src/lib.rs b/cdn-broker/src/lib.rs index c652131..4f7edcc 100644 --- a/cdn-broker/src/lib.rs +++ b/cdn-broker/src/lib.rs @@ -4,7 +4,6 @@ #![forbid(unsafe_code)] mod connections; -mod handlers; pub mod reexports; mod tasks; @@ -29,7 +28,8 @@ use cdn_proto::{ use cdn_proto::{crypto::signature::KeyPair, metrics as proto_metrics}; use connections::Connections; use local_ip_address::local_ip; -use tokio::{select, spawn, sync::RwLock}; +use parking_lot::RwLock; +use tokio::{select, spawn}; use tracing::info; /// The broker's configuration. We need this when we create a new one. @@ -229,25 +229,21 @@ impl Broker { pub async fn start(self) -> Result<()> { // Spawn the heartbeat task, which we use to register with `Discovery` every so often. // We also use it to check for new brokers who may have joined. - let heartbeat_task = spawn(self.inner.clone().run_heartbeat_task()); + let inner_ = self.inner.clone(); + let heartbeat_task = spawn(inner_.run_heartbeat_task()); // Spawn the sync task, which updates other brokers with our keys periodically. - let sync_task = spawn(self.inner.clone().run_sync_task()); + let inner_ = self.inner.clone(); + let sync_task = spawn(inner_.run_sync_task()); // Spawn the public (user) listener task // TODO: maybe macro this, since it's repeat code with the private listener task - let user_listener_task = spawn( - self.inner - .clone() - .run_user_listener_task(self.user_listener), - ); + let inner_ = self.inner.clone(); + let user_listener_task = spawn(inner_.clone().run_user_listener_task(self.user_listener)); // Spawn the private (broker) listener task - let broker_listener_task = spawn( - self.inner - .clone() - .run_broker_listener_task(self.broker_listener), - ); + let inner_ = self.inner.clone(); + let broker_listener_task = spawn(inner_.run_broker_listener_task(self.broker_listener)); // Serve the (possible) metrics task if let Some(metrics_bind_endpoint) = self.metrics_bind_endpoint { diff --git a/cdn-broker/src/main.rs b/cdn-broker/src/main.rs index c93446d..8477ad1 100644 --- a/cdn-broker/src/main.rs +++ b/cdn-broker/src/main.rs @@ -61,7 +61,8 @@ async fn main() -> Result<()> { // Parse command line arguments let args = Args::parse(); - // Initialize tracing + // If we aren't on `tokio_unstable`, use the normal logger + #[cfg(not(tokio_unstable))] if std::env::var("RUST_LOG_FORMAT") == Ok("json".to_string()) { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) @@ -73,6 +74,10 @@ async fn main() -> Result<()> { .init(); } + // If we are using the `tokio_unstable` feature, use the console logger + #[cfg(tokio_unstable)] + console_subscriber::init(); + // Generate the broker key from the supplied seed let (private_key, public_key) = BLS::key_gen(&(), &mut StdRng::seed_from_u64(args.key_seed)).unwrap(); diff --git a/cdn-broker/src/handlers/broker.rs b/cdn-broker/src/tasks/broker/handler.rs similarity index 51% rename from cdn-broker/src/handlers/broker.rs rename to cdn-broker/src/tasks/broker/handler.rs index dc7e060..c2c9edb 100644 --- a/cdn-broker/src/handlers/broker.rs +++ b/cdn-broker/src/tasks/broker/handler.rs @@ -1,18 +1,18 @@ //! This file defines the broker connection handler. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use cdn_proto::{ authenticate_with_broker, bail, - connection::{auth::broker::BrokerAuth, protocols::Connection as _, UserPublicKey}, + connection::{auth::broker::BrokerAuth, protocols::Connection as _, Bytes, UserPublicKey}, def::{Connection, RunDef}, discovery::BrokerIdentifier, error::{Error, Result}, - message::Message, - verify_broker, + message::{Message, Topic}, + mnemonic, verify_broker, }; -use tokio::{spawn, sync::Notify}; -use tracing::error; +use tokio::{spawn, time::timeout}; +use tracing::{debug, error}; use crate::{connections::DirectMap, Inner}; @@ -25,31 +25,40 @@ impl Inner { ) { // Depending on which way the direction came in, we will want to authenticate with a different // flow. - let broker_identifier = if is_outbound { - // If we reached out to the other broker first, authenticate first. - let broker_endpoint = authenticate_with_broker!(connection, self); - verify_broker!(connection, self); - broker_endpoint - } else { - // If the other broker reached out to us first, authenticate second. - verify_broker!(connection, self); - authenticate_with_broker!(connection, self) - }; - // Create a notifier for the broker receive loop to wait for - let notify_initialized = Arc::new(Notify::new()); - let wait_initialized = notify_initialized.clone(); + // Give us 5 seconds to authenticate with the broker + let broker_identifier = match timeout(Duration::from_secs(5), async { + if is_outbound { + // If we reached out to the other broker first, authenticate first. + let broker_endpoint = authenticate_with_broker!(connection, self); + verify_broker!(connection, self); + broker_endpoint + } else { + // If the other broker reached out to us first, authenticate second. + verify_broker!(connection, self); + authenticate_with_broker!(connection, self) + } + }) + .await + { + Ok(Ok(broker_identifier)) => broker_identifier, + Ok(Err(e)) => { + error!("failed to authenticate with broker: {:?}", e); + return; + } + _ => { + error!("timed out while authenticating with broker"); + return; + } + }; - // Clone the necessary data for the broker receive loop + // Clone things we will need let self_ = self.clone(); - let broker_identifier_ = broker_identifier.clone(); let connection_ = connection.clone(); + let broker_identifier_ = broker_identifier.clone(); - // Spawn the broker receive loop + // Start the receiving end of the broker let receive_handle = spawn(async move { - // Wait for the handler to have finished initialization - wait_initialized.notified().await; - // If we error, come back to the callback so we can remove the connection from the list. if let Err(err) = self_ .broker_receive_loop(&broker_identifier_, connection_) @@ -65,46 +74,41 @@ impl Inner { self_ .connections .write() - .await .remove_broker(&broker_identifier_, "failed to receive message"); }; }) .abort_handle(); - // Add to our broker and remove the old one if it exists - self.connections.write().await.add_broker( - broker_identifier.clone(), - connection, - receive_handle, - ); - - // Notify the broker receive loop that we are initialized - notify_initialized.notify_one(); - - // Send a full user sync - if let Err(err) = self.full_user_sync(&broker_identifier).await { - error!("failed to perform full user sync: {err}"); + // Send a full topic sync + if let Err(err) = self.full_topic_sync(&broker_identifier) { + error!("failed to perform full topic sync: {err}"); return; }; - // Send a full topic sync - if let Err(err) = self.full_topic_sync(&broker_identifier).await { - error!("failed to perform full topic sync: {err}"); + // Send a full user sync + if let Err(err) = self.full_user_sync(&broker_identifier) { + error!("failed to perform full user sync: {err}"); return; }; // If we have `strong-consistency` enabled, send partials #[cfg(feature = "strong-consistency")] { - if let Err(err) = self.partial_topic_sync().await { + if let Err(err) = self.partial_topic_sync() { error!("failed to perform partial topic sync: {err}"); } - if let Err(err) = self.partial_user_sync().await { + if let Err(err) = self.partial_user_sync() { error!("failed to perform partial user sync: {err}"); } } + + // Add to our broker and remove the old one if it exists + self.connections + .write() + .add_broker(broker_identifier, connection, receive_handle); } + /// This is the default loop for handling broker connections pub async fn broker_receive_loop( self: &Arc, broker_identifier: &BrokerIdentifier, @@ -122,23 +126,20 @@ impl Inner { Message::Direct(ref direct) => { let user_public_key = UserPublicKey::from(direct.recipient.clone()); - self.handle_direct_message(user_public_key, raw_message, true) - .await; + self.handle_direct_message(&user_public_key, raw_message, true); } // If we receive a broadcast message from a broker, we want to send it to all interested users Message::Broadcast(ref broadcast) => { let topics = broadcast.topics.clone(); - self.handle_broadcast_message(topics, &raw_message, true) - .await; + self.handle_broadcast_message(topics, &raw_message, true); } // If we receive a subscribe message from a broker, we add them as "interested" locally. Message::Subscribe(subscribe) => { self.connections .write() - .await .subscribe_broker_to(broker_identifier, subscribe); } @@ -146,7 +147,6 @@ impl Inner { Message::Unsubscribe(unsubscribe) => { self.connections .write() - .await .unsubscribe_broker_from(broker_identifier, &unsubscribe); } @@ -159,7 +159,7 @@ impl Inner { "failed to deserialize user sync message" ); - self.connections.write().await.apply_user_sync(user_sync); + self.connections.write().apply_user_sync(user_sync); } // Do nothing if we receive an unexpected message @@ -167,4 +167,80 @@ impl Inner { } } } + + /// This function handles direct messages from users and brokers. + pub fn handle_direct_message( + self: &Arc, + user_public_key: &UserPublicKey, + message: Bytes, + to_user_only: bool, + ) { + // Get the corresponding broker for the user + let broker_identifier = self + .connections + .read() + .get_broker_identifier_of_user(user_public_key); + + // If the broker exists, + if let Some(broker_identifier) = broker_identifier { + // If the broker is us, send the message to the user + if broker_identifier == self.identity { + debug!( + user = mnemonic(user_public_key), + msg = mnemonic(&*message), + "direct", + ); + + // Send the message to the user + self.try_send_to_user(user_public_key, message); + } else { + // Otherwise, send the message to the broker (but only if we are not told to send to the user only) + if !to_user_only { + debug!( + broker = %broker_identifier, + msg = mnemonic(&*message), + "direct", + ); + + // Send the message to the broker + self.try_send_to_broker(&broker_identifier, message); + } + } + } + } + + /// This function handles broadcast messages from users and brokers. + pub fn handle_broadcast_message( + self: &Arc, + mut topics: Vec, + message: &Bytes, + to_users_only: bool, + ) { + // Deduplicate topics + topics.dedup(); + + // Get the list of actors interested in the topics + let (interested_brokers, interested_users) = self + .connections + .read() + .get_interested_by_topic(&topics, to_users_only); + + // Debug log the broadcast + debug!( + num_brokers = interested_brokers.len(), + num_users = interested_users.len(), + msg = mnemonic(&**message), + "broadcast", + ); + + // Send the message to all interested brokers + for broker_identifier in interested_brokers { + self.try_send_to_broker(&broker_identifier, message.clone()); + } + + // Send the message to all interested users + for user_public_key in interested_users { + self.try_send_to_user(&user_public_key, message.clone()); + } + } } diff --git a/cdn-broker/src/tasks/broker/heartbeat.rs b/cdn-broker/src/tasks/broker/heartbeat.rs new file mode 100644 index 0000000..6e87202 --- /dev/null +++ b/cdn-broker/src/tasks/broker/heartbeat.rs @@ -0,0 +1,104 @@ +//! The heartbeat task periodically posts our state to either Redis or an embeddable file DB. + +use std::{collections::HashSet, sync::Arc, time::Duration}; + +use cdn_proto::{ + connection::protocols::Protocol as _, + def::{Protocol, RunDef}, + discovery::{BrokerIdentifier, DiscoveryClient}, +}; +use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; +use tokio::{ + spawn, + time::{sleep, timeout}, +}; +use tracing::error; + +use crate::Inner; + +impl Inner { + /// This task deals with setting the number of our connected users in Redis or the embedded db. It allows + /// the marshal to correctly choose the broker with the least amount of connections. + pub async fn run_heartbeat_task(self: Arc) { + // Clone the `discovery` client, which needs to be mutable + let mut discovery_client = self.discovery_client.clone(); + + // Run this forever, unless we run into a panic (e.g. the "as" conversion.) + loop { + let num_connections = self.connections.read().num_users() as u64; + + // Register with the discovery service every n seconds, updating our number of connected users + match timeout( + Duration::from_secs(5), + discovery_client.perform_heartbeat(num_connections, Duration::from_secs(60)), + ) + .await + { + Ok(Ok(())) => {} + Ok(Err(err)) => { + error!("failed to perform heartbeat: {}", err); + } + Err(_) => { + error!("timed out trying to perform heartbeat"); + } + } + + // Attempt to get all other brokers + let other_brokers = + match timeout(Duration::from_secs(5), discovery_client.get_other_brokers()).await { + Ok(Ok(brokers)) => brokers, + Ok(Err(err)) => { + error!("failed to get other brokers: {}", err); + continue; + } + Err(_) => { + // This is an important error + error!("timed out trying to get other brokers"); + continue; + } + }; + + // Calculate which brokers to connect to by taking the difference + // Only connect to brokers with a larger identifier + let mut brokers_to_connect_to: Vec = other_brokers + .difference(&HashSet::from_iter(self.connections.read().all_brokers())) + .filter(|broker| broker >= &&self.identity) + .cloned() + .collect(); + + // Shuffle the list (so we don't get stuck in an authentication lock + // on a broker that is down) + brokers_to_connect_to.shuffle(&mut StdRng::from_entropy()); + + // Calculate the difference, spawn tasks to connect to them + for broker in brokers_to_connect_to { + // Extrapolate the endpoint to connect to + let to_connect_endpoint = broker.private_advertise_endpoint.clone(); + + // Clone the inner because we need it for the possible new broker task + let inner = self.clone(); + + // Spawn task to connect to a broker we haven't seen + spawn(async move { + // Connect to the broker + let connection = + // Our TCP protocol is unsecured, so the cert we use does not matter. + // Time out is at protocol level + match Protocol::::connect(&to_connect_endpoint, true).await + { + Ok(connection) => connection, + Err(err) => { + error!("failed to connect to broker: {err}"); + return; + } + }; + + inner.handle_broker_connection(connection, true).await; + }); + + // Sleep for 10 seconds + sleep(Duration::from_secs(10)).await; + } + } + } +} diff --git a/cdn-broker/src/tasks/broker_listener.rs b/cdn-broker/src/tasks/broker/listener.rs similarity index 93% rename from cdn-broker/src/tasks/broker_listener.rs rename to cdn-broker/src/tasks/broker/listener.rs index cf9d1d4..b023269 100644 --- a/cdn-broker/src/tasks/broker_listener.rs +++ b/cdn-broker/src/tasks/broker/listener.rs @@ -9,7 +9,6 @@ use cdn_proto::{ use tokio::spawn; use tracing::error; -// TODO: change connection to be named struct instead of tuple for readability purposes use crate::Inner; impl Inner { diff --git a/cdn-broker/src/tasks/broker/mod.rs b/cdn-broker/src/tasks/broker/mod.rs new file mode 100644 index 0000000..cb0e0fa --- /dev/null +++ b/cdn-broker/src/tasks/broker/mod.rs @@ -0,0 +1,5 @@ +pub mod handler; +pub mod heartbeat; +pub mod listener; +pub mod sender; +pub mod sync; diff --git a/cdn-broker/src/tasks/broker/sender.rs b/cdn-broker/src/tasks/broker/sender.rs new file mode 100644 index 0000000..e3aa157 --- /dev/null +++ b/cdn-broker/src/tasks/broker/sender.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; + +use cdn_proto::connection::protocols::Connection; +use cdn_proto::{connection::Bytes, def::RunDef, discovery::BrokerIdentifier}; +use tokio::spawn; +use tracing::error; + +use crate::Inner; + +impl Inner { + /// Attempts to asynchronously send a message to a broker. + /// If it fails, the broker is removed from the list of connections. + pub fn try_send_to_broker( + self: &Arc, + broker_identifier: &BrokerIdentifier, + message: Bytes, + ) { + // Get the optional connection + let connection = self + .connections + .read() + .get_broker_connection(broker_identifier); + + // If the connection exists, + if let Some(connection) = connection { + // Clone what we need + let self_ = self.clone(); + let broker_identifier_ = broker_identifier.clone(); + + // Send the message + let send_handle = spawn(async move { + if let Err(e) = connection.send_message_raw(message).await { + error!("failed to send message to broker: {:?}", e); + + // Remove the broker if we failed to send the message + self_ + .connections + .write() + .remove_broker(&broker_identifier_, "failed to send message"); + }; + }) + .abort_handle(); + + // Add the send handle to the list of tasks for the broker + self.connections + .write() + .add_broker_task(broker_identifier, send_handle); + } + } + + /// Attempts to asynchronously send a message to all brokers. + /// If it fails, the failing broker is removed from the list of connections. + pub fn try_send_to_brokers(self: &Arc, message: &Bytes) { + // Get the optional connection + let brokers = self.connections.read().get_broker_identifiers(); + + for broker in brokers { + self.clone().try_send_to_broker(&broker, message.clone()); + } + } +} diff --git a/cdn-broker/src/tasks/sync.rs b/cdn-broker/src/tasks/broker/sync.rs similarity index 73% rename from cdn-broker/src/tasks/sync.rs rename to cdn-broker/src/tasks/broker/sync.rs index 2b1ab08..3764592 100644 --- a/cdn-broker/src/tasks/sync.rs +++ b/cdn-broker/src/tasks/broker/sync.rs @@ -40,13 +40,12 @@ impl Inner { /// # Errors /// - If we fail to serialize the message /// - If we fail to send the message - pub async fn full_user_sync(self: &Arc, broker: &BrokerIdentifier) -> Result<()> { + pub fn full_user_sync(self: &Arc, broker: &BrokerIdentifier) -> Result<()> { // Get full user sync map - let full_sync_map = self.connections.read().await.get_full_user_sync(); + let full_sync_map = self.connections.read().get_full_user_sync(); // Serialize and send the message to the broker - self.send_to_broker(broker, prepare_sync_message!(full_sync_map)) - .await?; + self.try_send_to_broker(broker, prepare_sync_message!(full_sync_map)); Ok(()) } @@ -56,9 +55,9 @@ impl Inner { /// /// # Errors /// - If we fail to serialize the message - pub async fn partial_user_sync(self: &Arc) -> Result<()> { + pub fn partial_user_sync(self: &Arc) -> Result<()> { // Get partial user sync map - let partial_sync_map = self.connections.write().await.get_partial_user_sync(); + let partial_sync_map = self.connections.write().get_partial_user_sync(); // Return if we haven't had any changes if partial_sync_map.underlying_map.is_empty() { @@ -68,8 +67,8 @@ impl Inner { // Serialize the message let raw_message = prepare_sync_message!(partial_sync_map); - // Send it to all brokers - self.spawn_send_to_brokers(&raw_message).await; + // Send to all brokers + self.try_send_to_brokers(&raw_message); Ok(()) } @@ -79,20 +78,19 @@ impl Inner { /// /// # Errors /// - if we fail to serialize the message - pub async fn full_topic_sync(self: &Arc, broker: &BrokerIdentifier) -> Result<()> { + pub fn full_topic_sync(self: &Arc, broker_identifier: &BrokerIdentifier) -> Result<()> { // Get full list of topics - let topics = self.connections.read().await.get_full_topic_sync(); + let topics = self.connections.read().get_full_topic_sync(); - // Serialize and send the message - self.send_to_broker( - broker, + // Send to broker + self.try_send_to_broker( + broker_identifier, Bytes::from_unchecked(bail!( Message::Subscribe(topics).serialize(), Serialize, "failed to serialize topics" )), - ) - .await?; + ); Ok(()) } @@ -102,9 +100,9 @@ impl Inner { /// /// # Errors /// - If we fail to serialize the message - pub async fn partial_topic_sync(self: &Arc) -> Result<()> { + pub fn partial_topic_sync(self: &Arc) -> Result<()> { // Get partial list of topics - let (additions, removals) = self.connections.write().await.get_partial_topic_sync(); + let (additions, removals) = self.connections.write().get_partial_topic_sync(); // If we have some additions, if !additions.is_empty() { @@ -114,7 +112,9 @@ impl Inner { Serialize, "failed to serialize topics" )); - self.spawn_send_to_brokers(&raw_subscribe_message).await; + + // Send to all brokers + self.try_send_to_brokers(&raw_subscribe_message); } // If we have some removals, @@ -127,7 +127,7 @@ impl Inner { )); // Send to all brokers - self.spawn_send_to_brokers(&raw_unsubscribe_message).await; + self.try_send_to_brokers(&raw_unsubscribe_message); } Ok(()) @@ -138,17 +138,16 @@ impl Inner { pub async fn run_sync_task(self: Arc) { loop { // Perform user sync - if let Err(err) = self.partial_user_sync().await { + if let Err(err) = self.partial_user_sync() { error!("failed to perform partial user sync: {err}"); - } + }; // Perform topic sync - if let Err(err) = self.partial_topic_sync().await { - error!("failed to perform partial user sync: {err}"); + if let Err(err) = self.partial_topic_sync() { + error!("failed to perform partial topic sync: {err}"); }; // Sleep - // TODO: parameterize this sleep(Duration::from_secs(10)).await; } } diff --git a/cdn-broker/src/tasks/heartbeat.rs b/cdn-broker/src/tasks/heartbeat.rs deleted file mode 100644 index bc3050c..0000000 --- a/cdn-broker/src/tasks/heartbeat.rs +++ /dev/null @@ -1,90 +0,0 @@ -//! The heartbeat task periodically posts our state to either Redis or an embeddable file DB. - -use std::{collections::HashSet, sync::Arc, time::Duration}; - -use cdn_proto::{ - connection::protocols::Protocol as _, - def::{Protocol, RunDef}, - discovery::{BrokerIdentifier, DiscoveryClient}, -}; -use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; -use tokio::{spawn, time::sleep}; -use tracing::error; - -use crate::Inner; - -impl Inner { - /// This task deals with setting the number of our connected users in Redis or the embedded db. It allows - /// the marshal to correctly choose the broker with the least amount of connections. - pub async fn run_heartbeat_task(self: Arc) { - // Clone the `discovery` client, which needs to be mutable - let mut discovery_client = self.discovery_client.clone(); - - // Run this forever, unless we run into a panic (e.g. the "as" conversion.) - loop { - let num_connections = self.connections.read().await.num_users() as u64; - // Register with the discovery service every n seconds, updating our number of connected users - if let Err(err) = discovery_client - .perform_heartbeat(num_connections, Duration::from_secs(60)) - .await - { - // If we fail, we want to see this - error!("failed to perform heartbeat: {}", err); - } - - // Check for new brokers, spawning tasks to connect to them if necessary - match discovery_client.get_other_brokers().await { - Ok(brokers) => { - // Calculate which brokers to connect to by taking the difference - // Only connect to brokers with a larger identifier - let mut brokers_to_connect_to: Vec = brokers - .difference(&HashSet::from_iter( - self.connections.read().await.all_brokers(), - )) - .filter(|broker| broker >= &&self.identity) - .cloned() - .collect(); - - // Shuffle the list (so we don't get stuck in the authentication lock - // on a broker that is down) - brokers_to_connect_to.shuffle(&mut StdRng::from_entropy()); - - // Calculate the difference, spawn tasks to connect to them - for broker in brokers_to_connect_to { - // TODO: make this into a separate function - // Extrapolate the endpoint to connect to - let to_connect_endpoint = broker.private_advertise_endpoint.clone(); - - // Clone the inner because we need it for the possible new broker task - let inner = self.clone(); - - // Spawn task to connect to a broker we haven't seen - spawn(async move { - // Connect to the broker - let connection = - // Our TCP protocol is unsecured, so the cert we use does not matter. - match Protocol::::connect(&to_connect_endpoint, true).await - { - Ok(connection) => connection, - Err(err) => { - error!("failed to connect to broker: {err}"); - return; - } - }; - - inner.handle_broker_connection(connection, true).await; - }); - } - } - - Err(err) => { - // This is an important error as well - error!("failed to get other brokers: {}", err); - } - } - - // Sleep for 10 seconds - sleep(Duration::from_secs(10)).await; - } - } -} diff --git a/cdn-broker/src/tasks/mod.rs b/cdn-broker/src/tasks/mod.rs index 4826df5..f682bfe 100644 --- a/cdn-broker/src/tasks/mod.rs +++ b/cdn-broker/src/tasks/mod.rs @@ -1,7 +1,5 @@ //! This file defines the different types of tasks we have implemented //! for `Arc>`. -pub mod broker_listener; -pub mod heartbeat; -pub mod sync; -pub mod user_listener; +pub mod broker; +pub mod user; diff --git a/cdn-broker/src/handlers/user.rs b/cdn-broker/src/tasks/user/handler.rs similarity index 76% rename from cdn-broker/src/handlers/user.rs rename to cdn-broker/src/tasks/user/handler.rs index c658efb..844e48a 100644 --- a/cdn-broker/src/handlers/user.rs +++ b/cdn-broker/src/tasks/user/handler.rs @@ -6,12 +6,9 @@ use std::time::Duration; use cdn_proto::connection::{protocols::Connection as _, UserPublicKey}; use cdn_proto::def::{Connection, RunDef}; -#[cfg(feature = "strong-consistency")] -use cdn_proto::discovery::DiscoveryClient; use cdn_proto::error::{Error, Result}; use cdn_proto::{connection::auth::broker::BrokerAuth, message::Message, mnemonic}; use tokio::spawn; -use tokio::sync::Notify; use tokio::time::timeout; use tracing::{error, warn}; @@ -39,20 +36,13 @@ impl Inner { let public_key = UserPublicKey::from(public_key); let user_identifier = mnemonic(&public_key); - // Create a notifier for the user receive loop to wait for - let notify_initialized = Arc::new(Notify::new()); - let wait_initialized = notify_initialized.clone(); - - // Clone the necessary data for the broker receive loop + // Clone the necessary data for the receive loop let self_ = self.clone(); let public_key_ = public_key.clone(); let connection_ = connection.clone(); // Spawn the user receive loop let receive_handle = spawn(async move { - // Wait for the handler to have finished initialization - wait_initialized.notified().await; - // If we error, come back to the callback so we can remove the connection from the list. if let Err(err) = self_.user_receive_loop(&public_key_, connection_).await { warn!(id = user_identifier, error = err.to_string(), "user error"); @@ -61,7 +51,6 @@ impl Inner { self_ .connections .write() - .await .remove_user(public_key_, "failed to receive message"); }; }) @@ -70,33 +59,20 @@ impl Inner { // Add our user and remove the old one if it exists self.connections .write() - .await .add_user(&public_key, connection, &topics, receive_handle); - // Notify the user receive loop that we are initialized - notify_initialized.notify_one(); - // If we have `strong-consistency` enabled, #[cfg(feature = "strong-consistency")] { // Send partial topic data - if let Err(err) = self.partial_topic_sync().await { + if let Err(err) = self.partial_topic_sync() { error!("failed to perform partial topic sync: {err}"); } // Send partial user data - if let Err(err) = self.partial_user_sync().await { + if let Err(err) = self.partial_user_sync() { error!("failed to perform partial user sync: {err}"); } - - // We want to perform a heartbeat for every user connection so that the number - // of users connected to brokers is usually evenly distributed. - let num_users = self.connections.read().await.num_users() as u64; - let _ = self - .discovery_client - .clone() - .perform_heartbeat(num_users, std::time::Duration::from_secs(60)) - .await; } } @@ -119,16 +95,14 @@ impl Inner { Message::Direct(ref direct) => { let user_public_key = UserPublicKey::from(direct.recipient.clone()); - self.handle_direct_message(user_public_key, raw_message, false) - .await; + self.handle_direct_message(&user_public_key, raw_message, false); } // If we get a broadcast message from a user, send it to both brokers and users. Message::Broadcast(ref broadcast) => { let topics = broadcast.topics.clone(); - self.handle_broadcast_message(topics, &raw_message, false) - .await; + self.handle_broadcast_message(topics, &raw_message, false); } // Subscribe messages from users will just update the state locally @@ -136,7 +110,6 @@ impl Inner { // TODO: add handle functions for this to make it easier to read self.connections .write() - .await .subscribe_user_to(public_key, subscribe); } @@ -144,7 +117,6 @@ impl Inner { Message::Unsubscribe(unsubscribe) => { self.connections .write() - .await .unsubscribe_user_from(public_key, &unsubscribe); } diff --git a/cdn-broker/src/tasks/user_listener.rs b/cdn-broker/src/tasks/user/listener.rs similarity index 100% rename from cdn-broker/src/tasks/user_listener.rs rename to cdn-broker/src/tasks/user/listener.rs diff --git a/cdn-broker/src/tasks/user/mod.rs b/cdn-broker/src/tasks/user/mod.rs new file mode 100644 index 0000000..858c8e0 --- /dev/null +++ b/cdn-broker/src/tasks/user/mod.rs @@ -0,0 +1,3 @@ +pub mod handler; +pub mod listener; +pub mod sender; diff --git a/cdn-broker/src/tasks/user/sender.rs b/cdn-broker/src/tasks/user/sender.rs new file mode 100644 index 0000000..61dd3fc --- /dev/null +++ b/cdn-broker/src/tasks/user/sender.rs @@ -0,0 +1,40 @@ +use std::sync::Arc; + +use cdn_proto::connection::protocols::Connection; +use cdn_proto::connection::UserPublicKey; +use cdn_proto::{connection::Bytes, def::RunDef}; +use tokio::spawn; +use tracing::error; + +use crate::Inner; + +impl Inner { + pub fn try_send_to_user(self: &Arc, user: &UserPublicKey, message: Bytes) { + // Get the optional connection + let connection = self.connections.read().get_user_connection(user); + + // If the connection exists, + if let Some(connection) = connection { + // Clone what we need + let self_ = self.clone(); + let user_ = user.clone(); + + // Send the message + let send_handle = spawn(async move { + if let Err(e) = connection.send_message_raw(message).await { + error!("failed to send message to user: {:?}", e); + + // Remove the broker if we failed to send the message + self_ + .connections + .write() + .remove_user(user_, "failed to send message"); + }; + }) + .abort_handle(); + + // Add the send handle to the list of tasks for the broker + self.connections.write().add_user_task(user, send_handle); + } + } +} diff --git a/cdn-broker/src/test-binaries/bad-broker.rs b/cdn-broker/src/test-binaries/bad-broker.rs new file mode 100644 index 0000000..6df18a4 --- /dev/null +++ b/cdn-broker/src/test-binaries/bad-broker.rs @@ -0,0 +1,121 @@ +//! `bad-broker` is a simple binary that starts a broker with a random key and +//! attempts to start a broker every 100ms. This is useful for testing the +//! broker's ability to handle multiple brokers connecting to it. +use std::time::Duration; + +use cdn_broker::{Broker, Config}; +use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result}; +use clap::Parser; +use jf_primitives::signatures::{ + bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme, +}; +use rand::{rngs::StdRng, SeedableRng}; +use tokio::{spawn, time::sleep}; +use tracing_subscriber::EnvFilter; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +/// The main component of the push CDN. +struct Args { + /// The discovery client endpoint (including scheme) to connect to. + /// With the local discovery feature, this is a file path. + /// With the remote (redis) discovery feature, this is a redis URL (e.g. `redis://127.0.0.1:6789`). + #[arg(short, long)] + discovery_endpoint: String, + + /// The endpoint to bind to for externalizing metrics (in `IP:port` form). If not provided, + /// metrics are not exposed. + #[arg(short, long)] + metrics_bind_endpoint: Option, + + /// The user-facing endpoint in `IP:port` form to bind to for connections from users + #[arg(long, default_value = "0.0.0.0:1738")] + public_bind_endpoint: String, + + /// The user-facing endpoint in `IP:port` form to advertise + #[arg(long, default_value = "local_ip:1738")] + public_advertise_endpoint: String, + + /// The broker-facing endpoint in `IP:port` form to bind to for connections from + /// other brokers + #[arg(long, default_value = "0.0.0.0:1739")] + private_bind_endpoint: String, + + /// The broker-facing endpoint in `IP:port` form to advertise + #[arg(long, default_value = "local_ip:1739")] + private_advertise_endpoint: String, + + /// The path to the CA certificate + /// If not provided, a local, pinned CA is used + #[arg(long)] + ca_cert_path: Option, + + /// The path to the CA key + /// If not provided, a local, pinned CA is used + #[arg(long)] + ca_key_path: Option, + + /// The seed for broker key generation + #[arg(short, long, default_value_t = 0)] + key_seed: u64, +} + +#[tokio::main] +async fn main() -> Result<()> { + // Parse command line arguments + let args = Args::parse(); + + // Initialize tracing + #[cfg(not(tokio_unstable))] + if std::env::var("RUST_LOG_FORMAT") == Ok("json".to_string()) { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .json() + .init(); + } else { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + } + + #[cfg(tokio_unstable)] + console_subscriber::init(); + + // Forever, generate a new broker key and start a broker + loop { + // Generate the broker key from the supplied seed + let (private_key, public_key) = BLS::key_gen(&(), &mut StdRng::from_entropy()).unwrap(); + + // Two random ports + let public_port = portpicker::pick_unused_port().unwrap(); + let private_port = portpicker::pick_unused_port().unwrap(); + + // Create config + let broker_config: Config = Config { + ca_cert_path: args.ca_cert_path.clone(), + ca_key_path: args.ca_key_path.clone(), + + discovery_endpoint: args.discovery_endpoint.clone(), + metrics_bind_endpoint: args.metrics_bind_endpoint.clone(), + keypair: KeyPair { + public_key, + private_key, + }, + + public_bind_endpoint: format!("0.0.0.0:{public_port}"), + public_advertise_endpoint: format!("local_ip:{public_port}"), + private_bind_endpoint: format!("0.0.0.0:{private_port}"), + private_advertise_endpoint: format!("local_ip:{private_port}"), + }; + + // Create new `Broker` + // Uses TCP from broker connections and Quic for user connections. + let broker = Broker::new(broker_config).await?; + + // Start the main loop, consuming it + let jh = spawn(broker.start()); + + sleep(Duration::from_millis(100)).await; + jh.abort(); + } +} diff --git a/cdn-broker/src/tests/mod.rs b/cdn-broker/src/tests/mod.rs index 11f9fca..165fe8b 100644 --- a/cdn-broker/src/tests/mod.rs +++ b/cdn-broker/src/tests/mod.rs @@ -180,7 +180,7 @@ impl TestDefinition { .abort_handle(); // Inject our user into the connections - broker_under_test.inner.connections.write().await.add_user( + broker_under_test.inner.connections.write().add_user( &identifier, connection2, topics, @@ -236,12 +236,11 @@ impl TestDefinition { .abort_handle(); // Inject our broker into the connections - broker_under_test - .inner - .connections - .write() - .await - .add_broker(identifier.clone(), connection2, receive_handle); + broker_under_test.inner.connections.write().add_broker( + identifier.clone(), + connection2, + receive_handle, + ); // Send our subscriptions to it let subscribe_message = Message::Subscribe(broker.1.clone()); diff --git a/cdn-client/Cargo.toml b/cdn-client/Cargo.toml index d94dfcb..4d4fb73 100644 --- a/cdn-client/Cargo.toml +++ b/cdn-client/Cargo.toml @@ -8,6 +8,16 @@ description = "Defines client interactions for both marshals and brokers" [features] runtime-async-std = ["dep:async-std"] +# Bad connector attempts to continuously authenticate with a broker +[[bin]] +name = "bad-connector" +path = "src/test-binaries/bad-connector.rs" + +# Bad sender attempts to continuously send messages to a broker +[[bin]] +name = "bad-sender" +path = "src/test-binaries/bad-sender.rs" + [dependencies] tokio = { workspace = true } async-std = { workspace = true, optional = true } diff --git a/cdn-client/src/test-binaries/bad-connector.rs b/cdn-client/src/test-binaries/bad-connector.rs new file mode 100644 index 0000000..64cf64c --- /dev/null +++ b/cdn-client/src/test-binaries/bad-connector.rs @@ -0,0 +1,66 @@ +//! "Bad connector" is a simple example of a client that connects to the broker every +//! 200ms. This is useful for testing the broker's ability to handle many connections. + +use std::time::Duration; + +use cdn_client::{Client, Config}; +use cdn_proto::{crypto::signature::KeyPair, def::ProductionClientConnection, message::Topic}; +use clap::Parser; +use jf_primitives::signatures::{ + bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme, +}; +use rand::{rngs::StdRng, SeedableRng}; +use tokio::time::sleep; +use tracing_subscriber::EnvFilter; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +/// An example user of the Push CDN +struct Args { + /// The remote marshal endpoint to connect to, including the port. + #[arg(short, long)] + marshal_endpoint: String, +} + +#[tokio::main] +async fn main() { + // Parse command line arguments + let args = Args::parse(); + + // Initialize tracing + if std::env::var("RUST_LOG_FORMAT") == Ok("json".to_string()) { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .json() + .init(); + } else { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + } + + // In a loop, + loop { + // Generate a random keypair + let (private_key, public_key) = + BLS::key_gen(&(), &mut StdRng::from_entropy()).expect("failed to generate key"); + + // Build the config, the endpoint being where we expect the marshal to be + let config = Config { + endpoint: args.marshal_endpoint.clone(), + keypair: KeyPair { + public_key, + private_key, + }, + subscribed_topics: vec![Topic::Global], + use_local_authority: true, + }; + + // Create a client, specifying the BLS signature algorithm + // and the `QUIC` protocol. + let client = Client::::new(config); + + client.ensure_initialized().await; + sleep(Duration::from_millis(200)).await; + } +} diff --git a/cdn-client/src/test-binaries/bad-sender.rs b/cdn-client/src/test-binaries/bad-sender.rs new file mode 100644 index 0000000..baac8c3 --- /dev/null +++ b/cdn-client/src/test-binaries/bad-sender.rs @@ -0,0 +1,94 @@ +//! "Bad sender" is a simple example of a client that continuously sends a message to itself. +//! This is useful for testing the broker's ability to handle many messages. + +use cdn_client::{Client, Config}; +use cdn_proto::{crypto::signature::KeyPair, def::ProductionClientConnection, message::Topic}; +use clap::Parser; +use jf_primitives::signatures::{ + bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme, +}; +use rand::{rngs::StdRng, SeedableRng}; +use tracing::info; +use tracing_subscriber::EnvFilter; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +/// An example user of the Push CDN +struct Args { + /// The remote marshal endpoint to connect to, including the port. + #[arg(short, long)] + marshal_endpoint: String, +} + +#[tokio::main] +async fn main() { + // Parse command line arguments + let args = Args::parse(); + + // Initialize tracing + if std::env::var("RUST_LOG_FORMAT") == Ok("json".to_string()) { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .json() + .init(); + } else { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + } + + // Generate a random keypair + let (private_key, public_key) = + BLS::key_gen(&(), &mut StdRng::from_entropy()).expect("failed to generate key"); + + // Build the config, the endpoint being where we expect the marshal to be + let config = Config { + endpoint: args.marshal_endpoint, + keypair: KeyPair { + public_key, + private_key, + }, + subscribed_topics: vec![Topic::Global], + use_local_authority: true, + }; + + // Create a client, specifying the BLS signature algorithm + // and the `QUIC` protocol. + let client = Client::::new(config); + let message = vec![0u8; 10000]; + + // In a loop, + loop { + // Send a direct message to ourselves + if let Err(e) = client + .send_direct_message(&public_key, message.clone()) + .await + { + println!("failed to send direct message: {e:?}"); + continue; + } + info!("successfully sent direct message"); + + if let Err(e) = client.receive_message().await { + println!("err: {e:?}"); + continue; + } + info!("successfully received direct message"); + + // Send a direct message to ourselves + if let Err(e) = client + .send_broadcast_message(vec![Topic::Global], message.clone()) + .await + { + println!("failed to send broadcast message: {e:?}"); + continue; + } + info!("successfully sent broadcast message"); + + if let Err(e) = client.receive_message().await { + println!("failed to send broadcast message: {e:?}"); + continue; + } + info!("successfully received broadcast message"); + } +} diff --git a/cdn-proto/Cargo.toml b/cdn-proto/Cargo.toml index 142b6e3..bffdfd2 100644 --- a/cdn-proto/Cargo.toml +++ b/cdn-proto/Cargo.toml @@ -64,4 +64,3 @@ rkyv.workspace = true mnemonic = "1" rcgen.workspace = true derivative.workspace = true -parking_lot.workspace = true \ No newline at end of file diff --git a/cdn-proto/src/connection/auth/broker.rs b/cdn-proto/src/connection/auth/broker.rs index e55ebe2..4fd1703 100644 --- a/cdn-proto/src/connection/auth/broker.rs +++ b/cdn-proto/src/connection/auth/broker.rs @@ -32,10 +32,11 @@ macro_rules! authenticate_with_broker { ($connection: expr, $inner: expr) => { // Authenticate with the other broker, returning their reconnect endpoint match BrokerAuth::::authenticate_with_broker(&mut $connection, &$inner.keypair).await { - Ok(broker_endpoint) => broker_endpoint, + Ok(broker_endpoint) => Ok(broker_endpoint), Err(err) => { - error!("failed authentication with broker: {err}"); - return; + return Err(Error::Connection( + "failed authentication with broker: {err}".to_string(), + )); } } }; @@ -53,8 +54,7 @@ macro_rules! verify_broker { ) .await { - error!("failed to verify broker: {err}"); - return; + return Err(Error::Connection("failed to verify broker".to_string())); }; }; } diff --git a/cdn-proto/src/connection/protocols/quic.rs b/cdn-proto/src/connection/protocols/quic.rs index cec053f..fa4bf45 100644 --- a/cdn-proto/src/connection/protocols/quic.rs +++ b/cdn-proto/src/connection/protocols/quic.rs @@ -14,6 +14,7 @@ use quinn::{ClientConfig, Connecting, Endpoint, ServerConfig, TransportConfig, V use rustls::{Certificate, PrivateKey, RootCertStore}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::Mutex; +use tokio::time::timeout; use super::{ read_length_delimited, write_length_delimited, Connection, Listener, Protocol, @@ -101,18 +102,29 @@ impl Protocol for Quic { // Connect with QUIC endpoint to remote endpoint let connection = bail!( bail!( - endpoint.connect(remote_endpoint, "espresso"), + timeout( + Duration::from_secs(5), + bail!( + endpoint.connect(remote_endpoint, "espresso"), + Connection, + "timed out connecting to remote endpoint" + ) + ) + .await, Connection, "failed quic connect to remote endpoint" - ) - .await, + ), Connection, "failed quic connect to remote endpoint" ); // Open an outgoing bidirectional stream let (mut sender, receiver) = bail!( - connection.open_bi().await, + bail!( + timeout(Duration::from_secs(5), connection.open_bi()).await, + Connection, + "timed out accepting stream" + ), Connection, "failed to accept bidirectional stream" ); diff --git a/cdn-proto/src/connection/protocols/tcp.rs b/cdn-proto/src/connection/protocols/tcp.rs index 7a49611..03d24b0 100644 --- a/cdn-proto/src/connection/protocols/tcp.rs +++ b/cdn-proto/src/connection/protocols/tcp.rs @@ -4,12 +4,14 @@ use std::marker::PhantomData; use std::net::SocketAddr; +use std::time::Duration; use std::{net::ToSocketAddrs, sync::Arc}; use async_trait::async_trait; use rustls::{Certificate, PrivateKey}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::sync::Mutex; +use tokio::time::timeout; use tokio::{ io::AsyncWriteExt, net::{TcpSocket, TcpStream}, @@ -70,9 +72,13 @@ impl Protocol for Tcp { // Connect the stream to the local socket let stream = bail!( - socket.connect(remote_endpoint).await, + bail!( + timeout(Duration::from_secs(5), socket.connect(remote_endpoint)).await, + Connection, + "timed out connecting to tcp endpoint" + ), Connection, - "failed tcp connect to remote endpoint" + "failed to connect to tcp endpoint" ); // Split the connection and create our wrapper diff --git a/process-compose.yaml b/process-compose.yaml index af2c1da..29fe975 100644 --- a/process-compose.yaml +++ b/process-compose.yaml @@ -8,13 +8,13 @@ processes: command: echo 'requirepass changeme!' | keydb-server - --save "" --appendonly no marshal_0: - command: cargo run --package cdn-marshal --release -- -d "redis://:changeme!@localhost:6379" + command: cargo run --bin cdn-marshal -- -d "redis://:changeme!@localhost:6379" broker_0: - command: cargo run --package cdn-broker --release -- -d "redis://:changeme!@localhost:6379" + command: cargo run --bin cdn-broker -- -d "redis://:changeme!@localhost:6379" broker_1: - command: cargo run --package cdn-broker --release -- + command: cargo run --bin cdn-broker --release -- --public-bind-endpoint 0.0.0.0:1740 --public-advertise-endpoint local_ip:1740 --private-bind-endpoint 0.0.0.0:1741 @@ -23,6 +23,29 @@ processes: client_0: command: cargo run --bin cdn-client --release -- -m "127.0.0.1:1737" - depends_on: - marshal_0: - condition: process_started \ No newline at end of file + + # Uncomment the following lines to run misbehaving processes and the Tokio console + + # broker_tokio_console: + # command: CARGO_TARGET_DIR="target/unstable" RUSTFLAGS="--cfg tokio_unstable" + # cargo run --bin cdn-broker -- + # -d "redis://:changeme!@localhost:6379" + # --public-bind-endpoint 0.0.0.0:1742 + # --public-advertise-endpoint local_ip:1742 + # --private-bind-endpoint 0.0.0.0:1743 + # --private-advertise-endpoint local_ip:1743 + + # bad_broker: + # command: cargo run --bin bad-broker -d "redis://:changeme!@localhost:6379" + + # bad_connector: + # command: cargo run --bin bad-connector -- -m "127.0.0.1:1737" + # depends_on: + # marshal_0: + # condition: process_started + + # bad_sender: + # command: cargo run --bin bad-sender -- -m "127.0.0.1:1737" + # depends_on: + # marshal_0: + # condition: process_started \ No newline at end of file From 8403f79dcd960d40edaab573f2370be26a3615ee Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 2 May 2024 16:35:52 -0400 Subject: [PATCH 02/13] minor improvements --- Cargo.toml | 1 + cdn-client/src/retry.rs | 4 ++-- process-compose.yaml | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9b5f556..5ccdc85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ tokio = { version = "1", default-features = false, features = [ "rt", "rt-multi-thread", "parking_lot", + "tracing" ] } jf-primitives = { git = "https://github.com/EspressoSystems/jellyfish.git", tag = "0.4.2", default-features = false, features = [ "std", diff --git a/cdn-client/src/retry.rs b/cdn-client/src/retry.rs index 2bfa35f..532cbb4 100644 --- a/cdn-client/src/retry.rs +++ b/cdn-client/src/retry.rs @@ -132,10 +132,10 @@ macro_rules! try_with_reconnect { match $out { Ok(res) => Ok(res), Err(err) => { - error!("connection failed: {err}"); - // Acquire our "semaphore". If another task is doing this, just return an error if let Ok(mut connection_guard) = $self.inner.connection.clone().try_write_owned() { + error!("connection failed: {err}, reconnecting"); + // Clone `inner` so we can use it in the task let inner = $self.inner.clone(); // We are the only ones reconnecting. Let's launch the task to reconnect diff --git a/process-compose.yaml b/process-compose.yaml index 29fe975..d46b815 100644 --- a/process-compose.yaml +++ b/process-compose.yaml @@ -36,7 +36,7 @@ processes: # --private-advertise-endpoint local_ip:1743 # bad_broker: - # command: cargo run --bin bad-broker -d "redis://:changeme!@localhost:6379" + # command: cargo run --bin bad-broker -- -d "redis://:changeme!@localhost:6379" # bad_connector: # command: cargo run --bin bad-connector -- -m "127.0.0.1:1737" From 89f4856b69975c3dbced50c6b22492a5d7e93872 Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 2 May 2024 16:38:34 -0400 Subject: [PATCH 03/13] fix an absolute blunder --- cdn-broker/src/tasks/broker/heartbeat.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdn-broker/src/tasks/broker/heartbeat.rs b/cdn-broker/src/tasks/broker/heartbeat.rs index 6e87202..96a517b 100644 --- a/cdn-broker/src/tasks/broker/heartbeat.rs +++ b/cdn-broker/src/tasks/broker/heartbeat.rs @@ -95,10 +95,10 @@ impl Inner { inner.handle_broker_connection(connection, true).await; }); - - // Sleep for 10 seconds - sleep(Duration::from_secs(10)).await; } + + // Sleep for 10 seconds + sleep(Duration::from_secs(10)).await; } } } From e8be39df20931222f39c580e7307800840e43799 Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 2 May 2024 16:49:42 -0400 Subject: [PATCH 04/13] increase to 300ms --- cdn-broker/src/test-binaries/bad-broker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdn-broker/src/test-binaries/bad-broker.rs b/cdn-broker/src/test-binaries/bad-broker.rs index 6df18a4..e657d95 100644 --- a/cdn-broker/src/test-binaries/bad-broker.rs +++ b/cdn-broker/src/test-binaries/bad-broker.rs @@ -115,7 +115,7 @@ async fn main() -> Result<()> { // Start the main loop, consuming it let jh = spawn(broker.start()); - sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(300)).await; jh.abort(); } } From cab76359e12789b262e176ba5fd58003f3b217b8 Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 2 May 2024 19:24:40 -0400 Subject: [PATCH 05/13] fix memory leak blunder --- cdn-broker/src/connections/mod.rs | 68 ++++++++++++++++++--------- cdn-broker/src/tasks/broker/sender.rs | 28 +++++++++-- cdn-broker/src/tasks/user/sender.rs | 24 +++++++++- cdn-broker/src/tests/mod.rs | 4 +- 4 files changed, 97 insertions(+), 27 deletions(-) diff --git a/cdn-broker/src/connections/mod.rs b/cdn-broker/src/connections/mod.rs index bd213e6..a787b71 100644 --- a/cdn-broker/src/connections/mod.rs +++ b/cdn-broker/src/connections/mod.rs @@ -25,10 +25,10 @@ pub struct Connections { // Our identity. Used for versioned vector conflict resolution. identity: BrokerIdentifier, - // The current users connected to us - users: HashMap, Vec)>, - // The current brokers connected to us - brokers: HashMap, Vec)>, + // The current users connected to us, along with their running tasks + users: HashMap, HashMap)>, + // The current brokers connected to us, along with their running tasks + brokers: HashMap, HashMap)>, // The versioned vector for looking up where direct messages should go direct_map: DirectMap, @@ -72,32 +72,54 @@ impl Connections { self.brokers.keys().cloned().collect() } - /// Add a task to the list of tasks for a broker. This is used to - /// cancel the task if the broker disconnects. - /// TODO: macro this? - pub fn add_broker_task(&mut self, broker_identifier: &BrokerIdentifier, handle: AbortHandle) { + /// Add a task to the list of tasks for a broker along with a unique ID + /// This is used to cancel the task if the broker disconnects. + pub fn add_broker_task( + &mut self, + broker_identifier: &BrokerIdentifier, + id: u128, + handle: AbortHandle, + ) { if let Some((_, handles)) = self.brokers.get_mut(broker_identifier) { - // If the broker exists, add the handle to the list of tasks - handles.push(handle); + // If the broker exists, add the handle to the map of tasks + handles.insert(id, handle); } else { // Otherwise, cancel the task handle.abort(); } } - /// Add a task to the list of tasks for a user. This is used to - /// cancel the task if the user disconnects. + /// Add a task to the list of tasks for a user along with a unique ID + /// This is used to cancel the task if the user disconnects. /// TODO: macro this? - pub fn add_user_task(&mut self, user: &UserPublicKey, handle: AbortHandle) { + pub fn add_user_task(&mut self, user: &UserPublicKey, id: u128, handle: AbortHandle) { if let Some((_, handles)) = self.users.get_mut(user) { - // If the user exists, add the handle to the list of tasks - handles.push(handle); + // If the user exists, add the handle to the map of tasks + handles.insert(id, handle); } else { // Otherwise, cancel the task handle.abort(); } } + /// Remove a task from the list of tasks for a broker. + /// Does not abort the task. + pub fn remove_broker_task(&mut self, broker_identifier: &BrokerIdentifier, id: u128) { + if let Some((_, handles)) = self.brokers.get_mut(broker_identifier) { + // If the broker exists, remove the handle from the map of tasks + handles.remove(&id); + } + } + + /// Remove a task from the list of tasks for a user. + /// Does not abort the task. + pub fn remove_user_task(&mut self, user: &UserPublicKey, id: u128) { + if let Some((_, handles)) = self.users.get_mut(user) { + // If the broker exists, remove the handle from the map of tasks + handles.remove(&id); + } + } + /// Get all users and brokers interested in a list of topics. pub fn get_interested_by_topic( &self, @@ -208,8 +230,10 @@ impl Connections { // Remove the old broker if it exists self.remove_broker(&broker_identifier, "already existed"); - self.brokers - .insert(broker_identifier, (connection, vec![handle])); + self.brokers.insert( + broker_identifier, + (connection, HashMap::from([(0, handle)])), + ); } /// Insert a user into our map. Updates the versioned vector that @@ -229,8 +253,10 @@ impl Connections { self.remove_user(user_public_key.clone(), "already existed"); // Add to our map. Remove the old one if it exists - self.users - .insert(user_public_key.clone(), (connection, vec![handle])); + self.users.insert( + user_public_key.clone(), + (connection, HashMap::from([(0, handle)])), + ); // Insert into our direct map self.direct_map @@ -252,7 +278,7 @@ impl Connections { error!(id = %broker_identifier, reason = reason, "broker disconnected"); // Cancel all tasks - for handle in task_handles { + for (_, handle) in task_handles { handle.abort(); } }; @@ -280,7 +306,7 @@ impl Connections { ); // Cancel all tasks - for handle in task_handles { + for (_, handle) in task_handles { handle.abort(); } }; diff --git a/cdn-broker/src/tasks/broker/sender.rs b/cdn-broker/src/tasks/broker/sender.rs index e3aa157..1ddb23a 100644 --- a/cdn-broker/src/tasks/broker/sender.rs +++ b/cdn-broker/src/tasks/broker/sender.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use cdn_proto::connection::protocols::Connection; use cdn_proto::{connection::Bytes, def::RunDef, discovery::BrokerIdentifier}; use tokio::spawn; +use tokio::sync::Notify; use tracing::error; use crate::Inner; @@ -27,6 +28,13 @@ impl Inner { let self_ = self.clone(); let broker_identifier_ = broker_identifier.clone(); + // Create a random handle identifier + let handle_identifier = rand::random::(); + + // To notify the sender when the task has been added + let notify = Arc::new(Notify::const_new()); + let notified = notify.clone(); + // Send the message let send_handle = spawn(async move { if let Err(e) = connection.send_message_raw(message).await { @@ -37,14 +45,28 @@ impl Inner { .connections .write() .remove_broker(&broker_identifier_, "failed to send message"); + } else { + // Wait for the sender to add the task to the list + notified.notified().await; + + // If we successfully sent the message, remove the task from the list + self_ + .connections + .write() + .remove_broker_task(&broker_identifier_, handle_identifier); }; }) .abort_handle(); // Add the send handle to the list of tasks for the broker - self.connections - .write() - .add_broker_task(broker_identifier, send_handle); + self.connections.write().add_broker_task( + broker_identifier, + handle_identifier, + send_handle, + ); + + // Notify the sender that the task has been added + notify.notify_one(); } } diff --git a/cdn-broker/src/tasks/user/sender.rs b/cdn-broker/src/tasks/user/sender.rs index 61dd3fc..0353217 100644 --- a/cdn-broker/src/tasks/user/sender.rs +++ b/cdn-broker/src/tasks/user/sender.rs @@ -4,6 +4,7 @@ use cdn_proto::connection::protocols::Connection; use cdn_proto::connection::UserPublicKey; use cdn_proto::{connection::Bytes, def::RunDef}; use tokio::spawn; +use tokio::sync::Notify; use tracing::error; use crate::Inner; @@ -19,6 +20,13 @@ impl Inner { let self_ = self.clone(); let user_ = user.clone(); + // Create a random handle identifier + let handle_identifier = rand::random::(); + + // To notify the sender when the task has been added + let notify = Arc::new(Notify::const_new()); + let notified = notify.clone(); + // Send the message let send_handle = spawn(async move { if let Err(e) = connection.send_message_raw(message).await { @@ -29,12 +37,26 @@ impl Inner { .connections .write() .remove_user(user_, "failed to send message"); + } else { + // Wait for the sender to add the task to the list + notified.notified().await; + + // If we successfully sent the message, remove the task from the list + self_ + .connections + .write() + .remove_user_task(&user_, handle_identifier); }; }) .abort_handle(); // Add the send handle to the list of tasks for the broker - self.connections.write().add_user_task(user, send_handle); + self.connections + .write() + .add_user_task(user, handle_identifier, send_handle); + + // Notify the sender that the task has been added + notify.notify_one(); } } } diff --git a/cdn-broker/src/tests/mod.rs b/cdn-broker/src/tests/mod.rs index 165fe8b..52f7cd7 100644 --- a/cdn-broker/src/tests/mod.rs +++ b/cdn-broker/src/tests/mod.rs @@ -149,7 +149,7 @@ impl TestDefinition { /// and adds the user to the internal state. /// /// Then, it sends subscription messages to the broker for the topics described in `TestDefinition` - async fn inject_users( + fn inject_users( broker_under_test: &Broker, users: &[Vec], ) -> Vec { @@ -285,7 +285,7 @@ impl TestDefinition { Self::inject_brokers(&broker_under_test, self.connected_brokers).await; // Inject our users - run.connected_users = Self::inject_users(&broker_under_test, &self.connected_users).await; + run.connected_users = Self::inject_users(&broker_under_test, &self.connected_users); // Return our injected brokers and users run From 977ddb1eca6c82bc96f2f161c548f8be9a1798b0 Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 3 May 2024 06:17:10 -0400 Subject: [PATCH 06/13] move binaries around --- cdn-broker/Cargo.toml | 7 ++++++- .../src/{test-binaries => binaries}/bad-broker.rs | 0 cdn-broker/src/{main.rs => binaries/broker.rs} | 0 cdn-broker/src/connections/mod.rs | 6 ++++-- cdn-client/Cargo.toml | 9 +++++++-- .../{test-binaries => binaries}/bad-connector.rs | 0 .../src/{test-binaries => binaries}/bad-sender.rs | 0 cdn-client/src/{main.rs => binaries/client.rs} | 0 cdn-marshal/Cargo.toml | 5 +++++ cdn-marshal/src/{main.rs => binaries/marshal.rs} | 0 process-compose.yaml | 13 +++++++------ 11 files changed, 29 insertions(+), 11 deletions(-) rename cdn-broker/src/{test-binaries => binaries}/bad-broker.rs (100%) rename cdn-broker/src/{main.rs => binaries/broker.rs} (100%) rename cdn-client/src/{test-binaries => binaries}/bad-connector.rs (100%) rename cdn-client/src/{test-binaries => binaries}/bad-sender.rs (100%) rename cdn-client/src/{main.rs => binaries/client.rs} (100%) rename cdn-marshal/src/{main.rs => binaries/marshal.rs} (100%) diff --git a/cdn-broker/Cargo.toml b/cdn-broker/Cargo.toml index e75c9d9..c006b92 100644 --- a/cdn-broker/Cargo.toml +++ b/cdn-broker/Cargo.toml @@ -27,10 +27,15 @@ harness = false name = "broadcast" harness = false +# The main broker binary +[[bin]] +name = "broker" +path = "src/binaries/broker.rs" + # The "bad" broker is a binary that tries to spam an actual broker with connections [[bin]] name = "bad-broker" -path = "src/test-binaries/bad-broker.rs" +path = "src/binaries/bad-broker.rs" # This dependency is used for the Tokio console [target.'cfg(tokio_unstable)'.dependencies] diff --git a/cdn-broker/src/test-binaries/bad-broker.rs b/cdn-broker/src/binaries/bad-broker.rs similarity index 100% rename from cdn-broker/src/test-binaries/bad-broker.rs rename to cdn-broker/src/binaries/bad-broker.rs diff --git a/cdn-broker/src/main.rs b/cdn-broker/src/binaries/broker.rs similarity index 100% rename from cdn-broker/src/main.rs rename to cdn-broker/src/binaries/broker.rs diff --git a/cdn-broker/src/connections/mod.rs b/cdn-broker/src/connections/mod.rs index a787b71..db6b3f1 100644 --- a/cdn-broker/src/connections/mod.rs +++ b/cdn-broker/src/connections/mod.rs @@ -21,14 +21,16 @@ use self::broadcast::BroadcastMap; mod broadcast; mod direct; +type TaskMap = HashMap; + pub struct Connections { // Our identity. Used for versioned vector conflict resolution. identity: BrokerIdentifier, // The current users connected to us, along with their running tasks - users: HashMap, HashMap)>, + users: HashMap, TaskMap)>, // The current brokers connected to us, along with their running tasks - brokers: HashMap, HashMap)>, + brokers: HashMap, TaskMap)>, // The versioned vector for looking up where direct messages should go direct_map: DirectMap, diff --git a/cdn-client/Cargo.toml b/cdn-client/Cargo.toml index 4d4fb73..e6ddf78 100644 --- a/cdn-client/Cargo.toml +++ b/cdn-client/Cargo.toml @@ -8,15 +8,20 @@ description = "Defines client interactions for both marshals and brokers" [features] runtime-async-std = ["dep:async-std"] +# The main "tester" binary +[[bin]] +name = "client" +path = "src/binaries/client.rs" + # Bad connector attempts to continuously authenticate with a broker [[bin]] name = "bad-connector" -path = "src/test-binaries/bad-connector.rs" +path = "src/binaries/bad-connector.rs" # Bad sender attempts to continuously send messages to a broker [[bin]] name = "bad-sender" -path = "src/test-binaries/bad-sender.rs" +path = "src/binaries/bad-sender.rs" [dependencies] tokio = { workspace = true } diff --git a/cdn-client/src/test-binaries/bad-connector.rs b/cdn-client/src/binaries/bad-connector.rs similarity index 100% rename from cdn-client/src/test-binaries/bad-connector.rs rename to cdn-client/src/binaries/bad-connector.rs diff --git a/cdn-client/src/test-binaries/bad-sender.rs b/cdn-client/src/binaries/bad-sender.rs similarity index 100% rename from cdn-client/src/test-binaries/bad-sender.rs rename to cdn-client/src/binaries/bad-sender.rs diff --git a/cdn-client/src/main.rs b/cdn-client/src/binaries/client.rs similarity index 100% rename from cdn-client/src/main.rs rename to cdn-client/src/binaries/client.rs diff --git a/cdn-marshal/Cargo.toml b/cdn-marshal/Cargo.toml index 8b6d017..fb5c52e 100644 --- a/cdn-marshal/Cargo.toml +++ b/cdn-marshal/Cargo.toml @@ -10,6 +10,11 @@ global-permits = ["cdn-proto/global-permits"] runtime-async-std = ["dep:async-std"] +# The main marshal binary +[[bin]] +name = "marshal" +path = "src/binaries/marshal.rs" + [dependencies] jf-primitives.workspace = true cdn-proto = { path = "../cdn-proto", default-features = false, features = [ diff --git a/cdn-marshal/src/main.rs b/cdn-marshal/src/binaries/marshal.rs similarity index 100% rename from cdn-marshal/src/main.rs rename to cdn-marshal/src/binaries/marshal.rs diff --git a/process-compose.yaml b/process-compose.yaml index d46b815..6b49e70 100644 --- a/process-compose.yaml +++ b/process-compose.yaml @@ -8,13 +8,13 @@ processes: command: echo 'requirepass changeme!' | keydb-server - --save "" --appendonly no marshal_0: - command: cargo run --bin cdn-marshal -- -d "redis://:changeme!@localhost:6379" + command: cargo run --bin marshal -- -d "redis://:changeme!@localhost:6379" broker_0: - command: cargo run --bin cdn-broker -- -d "redis://:changeme!@localhost:6379" + command: cargo run --bin broker -- -d "redis://:changeme!@localhost:6379" broker_1: - command: cargo run --bin cdn-broker --release -- + command: cargo run --bin broker --release -- --public-bind-endpoint 0.0.0.0:1740 --public-advertise-endpoint local_ip:1740 --private-bind-endpoint 0.0.0.0:1741 @@ -22,13 +22,13 @@ processes: -d "redis://:changeme!@localhost:6379" client_0: - command: cargo run --bin cdn-client --release -- -m "127.0.0.1:1737" + command: cargo run --bin client --release -- -m "127.0.0.1:1737" # Uncomment the following lines to run misbehaving processes and the Tokio console # broker_tokio_console: # command: CARGO_TARGET_DIR="target/unstable" RUSTFLAGS="--cfg tokio_unstable" - # cargo run --bin cdn-broker -- + # cargo run --bin broker -- # -d "redis://:changeme!@localhost:6379" # --public-bind-endpoint 0.0.0.0:1742 # --public-advertise-endpoint local_ip:1742 @@ -36,7 +36,8 @@ processes: # --private-advertise-endpoint local_ip:1743 # bad_broker: - # command: cargo run --bin bad-broker -- -d "redis://:changeme!@localhost:6379" + # command: cargo run --bin bad-broker -- + # -d "redis://:changeme!@localhost:6379" # bad_connector: # command: cargo run --bin bad-connector -- -m "127.0.0.1:1737" From f25d5383bc7d88e3ed58c1f6407584aa84160ce4 Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 3 May 2024 06:19:16 -0400 Subject: [PATCH 07/13] remove some args from bad broker binary --- cdn-broker/src/binaries/bad-broker.rs | 42 ++------------------------- 1 file changed, 3 insertions(+), 39 deletions(-) diff --git a/cdn-broker/src/binaries/bad-broker.rs b/cdn-broker/src/binaries/bad-broker.rs index e657d95..e0a551c 100644 --- a/cdn-broker/src/binaries/bad-broker.rs +++ b/cdn-broker/src/binaries/bad-broker.rs @@ -22,42 +22,6 @@ struct Args { /// With the remote (redis) discovery feature, this is a redis URL (e.g. `redis://127.0.0.1:6789`). #[arg(short, long)] discovery_endpoint: String, - - /// The endpoint to bind to for externalizing metrics (in `IP:port` form). If not provided, - /// metrics are not exposed. - #[arg(short, long)] - metrics_bind_endpoint: Option, - - /// The user-facing endpoint in `IP:port` form to bind to for connections from users - #[arg(long, default_value = "0.0.0.0:1738")] - public_bind_endpoint: String, - - /// The user-facing endpoint in `IP:port` form to advertise - #[arg(long, default_value = "local_ip:1738")] - public_advertise_endpoint: String, - - /// The broker-facing endpoint in `IP:port` form to bind to for connections from - /// other brokers - #[arg(long, default_value = "0.0.0.0:1739")] - private_bind_endpoint: String, - - /// The broker-facing endpoint in `IP:port` form to advertise - #[arg(long, default_value = "local_ip:1739")] - private_advertise_endpoint: String, - - /// The path to the CA certificate - /// If not provided, a local, pinned CA is used - #[arg(long)] - ca_cert_path: Option, - - /// The path to the CA key - /// If not provided, a local, pinned CA is used - #[arg(long)] - ca_key_path: Option, - - /// The seed for broker key generation - #[arg(short, long, default_value_t = 0)] - key_seed: u64, } #[tokio::main] @@ -92,11 +56,11 @@ async fn main() -> Result<()> { // Create config let broker_config: Config = Config { - ca_cert_path: args.ca_cert_path.clone(), - ca_key_path: args.ca_key_path.clone(), + ca_cert_path: None, + ca_key_path: None, discovery_endpoint: args.discovery_endpoint.clone(), - metrics_bind_endpoint: args.metrics_bind_endpoint.clone(), + metrics_bind_endpoint: None, keypair: KeyPair { public_key, private_key, From 884d033aeac3322bb7bc0e4c74a34811f7db4b6c Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 3 May 2024 06:24:33 -0400 Subject: [PATCH 08/13] fix dockerfiles --- cdn-broker/Dockerfile | 6 +++--- cdn-client/Dockerfile | 6 +++--- cdn-marshal/Dockerfile | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cdn-broker/Dockerfile b/cdn-broker/Dockerfile index b9fd1e1..d14c34c 100644 --- a/cdn-broker/Dockerfile +++ b/cdn-broker/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /build COPY . . # Build our example (all of them for caching) -RUN cargo build --profile release -p cdn-broker -p cdn-client -p cdn-marshal +RUN cargo build --profile release -p broker -p client -p marshal # Use a minimal image for the final build FROM debian:bookworm as RUNNER @@ -18,7 +18,7 @@ RUN apt-get update && apt-get install libcurl4 -y ENV RUST_LOG=info # Copy the built binary from the builder image -COPY --from=BUILDER ./build/target/release/cdn-broker /bin/cdn-broker +COPY --from=BUILDER ./build/target/release/broker /bin/broker # Set the entrypoint -ENTRYPOINT ["cdn-broker"] \ No newline at end of file +ENTRYPOINT ["broker"] \ No newline at end of file diff --git a/cdn-client/Dockerfile b/cdn-client/Dockerfile index db02256..f8431d9 100644 --- a/cdn-client/Dockerfile +++ b/cdn-client/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /build COPY . . # Build our example (all of them for caching) -RUN cargo build --profile release -p cdn-broker -p cdn-client -p cdn-marshal +RUN cargo build --profile release -p broker -p client -p marshal # Use a minimal image for the final build FROM debian:bookworm as RUNNER @@ -18,7 +18,7 @@ RUN apt-get update && apt-get install libcurl4 -y ENV RUST_LOG=info # Copy the built binary from the builder image -COPY --from=BUILDER ./build/target/release/cdn-client /bin/cdn-client +COPY --from=BUILDER ./build/target/release/client /bin/client # Set the entrypoint -ENTRYPOINT ["cdn-client"] \ No newline at end of file +ENTRYPOINT ["client"] \ No newline at end of file diff --git a/cdn-marshal/Dockerfile b/cdn-marshal/Dockerfile index 3d00567..6125f55 100644 --- a/cdn-marshal/Dockerfile +++ b/cdn-marshal/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /build COPY . . # Build our example (all of them for caching) -RUN cargo build --profile release -p cdn-broker -p cdn-client -p cdn-marshal +RUN cargo build --profile release -p broker -p client -p marshal # Use a minimal image for the final build FROM debian:bookworm as RUNNER @@ -18,7 +18,7 @@ RUN apt-get update && apt-get install libcurl4 -y ENV RUST_LOG=info # Copy the built binary from the builder image -COPY --from=BUILDER ./build/target/release/cdn-marshal /bin/cdn-marshal +COPY --from=BUILDER ./build/target/release/marshal /bin/marshal # Set the entrypoint -ENTRYPOINT ["cdn-marshal"] \ No newline at end of file +ENTRYPOINT ["marshal"] \ No newline at end of file From 583616ced0239f5256c7f1fc481451efc460a79c Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 3 May 2024 09:29:49 -0400 Subject: [PATCH 09/13] add tests for double connections --- tests/src/connections.rs | 134 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 129 insertions(+), 5 deletions(-) diff --git a/tests/src/connections.rs b/tests/src/connections.rs index d1d1d16..95b7ec2 100644 --- a/tests/src/connections.rs +++ b/tests/src/connections.rs @@ -10,7 +10,7 @@ use cdn_proto::{ connection::UserPublicKey, crypto::signature::{KeyPair, Serializable}, def::{TestingConnection, TestingRunDef}, - discovery::{embedded::Embedded, DiscoveryClient}, + discovery::{embedded::Embedded, BrokerIdentifier, DiscoveryClient}, message::Topic, }; use jf_primitives::signatures::{ @@ -18,7 +18,10 @@ use jf_primitives::signatures::{ }; use rand::RngCore; use rand::{rngs::StdRng, SeedableRng}; -use tokio::{spawn, time::timeout}; +use tokio::{ + spawn, + time::{sleep, timeout}, +}; /// Generate a deterministic keypair from a seed macro_rules! keypair_from_seed { @@ -131,6 +134,16 @@ macro_rules! new_client { }}; } +/// Create a new database client with the given endpoint and identity. +macro_rules! new_db_client { + ($discovery_ep: expr, $as: expr) => {{ + // Create a new DB client + Embedded::new($discovery_ep.clone(), $as) + .await + .expect("failed to initialize db client") + }}; +} + /// Test that an end-to-end connection succeeds #[tokio::test] async fn test_end_to_end() { @@ -183,9 +196,7 @@ async fn test_whitelist() { }; // Create a new DB client - let mut db = Embedded::new(discovery_endpoint, None) - .await - .expect("failed to initialize db client"); + let mut db = new_db_client!(discovery_endpoint, None); // Set the whitelist to only allow client1 db.set_whitelist(vec![client1_public_key.clone()]) @@ -221,3 +232,116 @@ async fn test_whitelist() { "client2 connected when it shouldn't have" ); } + +/// Test for connecting twice to the same broker. +/// Should kick off the first connection. +#[tokio::test] +async fn test_double_connect_same_broker() { + // Get a temporary path for the discovery endpoint + let discovery_endpoint = get_temp_db_path!(); + + // Create and start a new broker + new_broker!(0, "8083", "8084", discovery_endpoint); + + // Create and start a new marshal + new_marshal!("8085", discovery_endpoint); + + // Create 2 clients with the same keypair + let client1 = new_client!(1, vec![Topic::Global], "8085"); + let client2 = new_client!(1, vec![Topic::Global], "8085"); + + // Assert both clients are connected + let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { + panic!("failed to connect as client1"); + }; + let Ok(()) = timeout(Duration::from_secs(1), client2.ensure_initialized()).await else { + panic!("failed to connect as client2"); + }; + + // Wait for a second + sleep(Duration::from_millis(50)).await; + + // Attempt to send a message, should fail + assert!(client1 + .send_direct_message(&keypair_from_seed!(1).1, b"hello direct".to_vec()) + .await + .is_err()); + + // The second client to connect should have succeeded + client2 + .send_direct_message(&keypair_from_seed!(1).1, b"hello direct".to_vec()) + .await + .expect("failed to send message from second client"); +} + +/// Test for connecting twice to different brokers +/// Should kick off the first connection. +#[tokio::test] +async fn test_double_connect_different_broker() { + // Get a temporary path for the discovery endpoint + let discovery_endpoint = get_temp_db_path!(); + + // Create and start two brokers + new_broker!(0, "8088", "8089", discovery_endpoint); + new_broker!(0, "8086", "8087", discovery_endpoint); + + // Create and start a new marshal + new_marshal!("8090", discovery_endpoint); + + // Create 2 clients with the same keypair + let client1 = new_client!(1, vec![Topic::Global], "8090"); + let client2 = new_client!(1, vec![Topic::Global], "8090"); + + // Get the brokers + let brokers: Vec = new_db_client!(discovery_endpoint, None) + .get_other_brokers() + .await + .expect("failed to get brokers") + .into_iter() + .collect(); + + // Create database clients as each broker + let mut broker0_db_client = new_db_client!(discovery_endpoint, Some(brokers[0].clone())); + let mut broker1_db_client = new_db_client!(discovery_endpoint, Some(brokers[1].clone())); + + // Make sure the first client connects to the first broker by setting the second + // broker as having a higher number of connections + broker1_db_client + .perform_heartbeat(1, Duration::from_secs(60)) + .await + .expect("broker failed to perform heartbeat"); + + // Connect the first client + let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { + panic!("failed to connect as client1"); + }; + + // Set the number of connections for the first broker to be higher + broker0_db_client + .perform_heartbeat(2, Duration::from_secs(60)) + .await + .expect("broker failed to perform heartbeat"); + + // Connect the second client + let Ok(()) = timeout(Duration::from_secs(1), client2.ensure_initialized()).await else { + panic!("failed to connect as client2"); + }; + + // Sleep for a second + sleep(Duration::from_millis(50)).await; + + // Assert the second client can send a message + client2 + .send_direct_message(&keypair_from_seed!(1).1, b"hello direct".to_vec()) + .await + .expect("failed to send message from first client"); + + // Assert the first client can't send a message + assert!( + client1 + .send_direct_message(&keypair_from_seed!(1).1, b"hello direct".to_vec()) + .await + .is_err(), + "second client connected when it shouldn't have" + ); +} From ba3a1b6b9ce56661e99eeefd34bbf2bc15df7f80 Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 3 May 2024 09:34:04 -0400 Subject: [PATCH 10/13] fix dockerfiles, add to test --- cdn-broker/Dockerfile | 2 +- cdn-client/Dockerfile | 2 +- cdn-marshal/Dockerfile | 2 +- tests/src/connections.rs | 20 +++++++++++++++++++- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/cdn-broker/Dockerfile b/cdn-broker/Dockerfile index d14c34c..8fe4821 100644 --- a/cdn-broker/Dockerfile +++ b/cdn-broker/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /build COPY . . # Build our example (all of them for caching) -RUN cargo build --profile release -p broker -p client -p marshal +RUN cargo build --profile release --bin broker --bin client --bin marshal # Use a minimal image for the final build FROM debian:bookworm as RUNNER diff --git a/cdn-client/Dockerfile b/cdn-client/Dockerfile index f8431d9..d12801e 100644 --- a/cdn-client/Dockerfile +++ b/cdn-client/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /build COPY . . # Build our example (all of them for caching) -RUN cargo build --profile release -p broker -p client -p marshal +RUN cargo build --profile release --bin broker --bin client --bin marshal # Use a minimal image for the final build FROM debian:bookworm as RUNNER diff --git a/cdn-marshal/Dockerfile b/cdn-marshal/Dockerfile index 6125f55..1d8b19c 100644 --- a/cdn-marshal/Dockerfile +++ b/cdn-marshal/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /build COPY . . # Build our example (all of them for caching) -RUN cargo build --profile release -p broker -p client -p marshal +RUN cargo build --profile release --bin broker --bin client --bin marshal # Use a minimal image for the final build FROM debian:bookworm as RUNNER diff --git a/tests/src/connections.rs b/tests/src/connections.rs index 95b7ec2..cab5372 100644 --- a/tests/src/connections.rs +++ b/tests/src/connections.rs @@ -11,7 +11,7 @@ use cdn_proto::{ crypto::signature::{KeyPair, Serializable}, def::{TestingConnection, TestingRunDef}, discovery::{embedded::Embedded, BrokerIdentifier, DiscoveryClient}, - message::Topic, + message::{Direct, Message, Topic}, }; use jf_primitives::signatures::{ bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme, @@ -165,6 +165,24 @@ async fn test_end_to_end() { .send_direct_message(&client_public_key, b"hello direct".to_vec()) .await .expect("failed to send message"); + + // The message that we expect to receive + let expected_message = Message::Direct(Direct { + recipient: client_public_key + .serialize() + .expect("failed to serialize public key"), + message: b"hello direct".to_vec(), + }); + + // Assert we have received the message + assert!( + client + .receive_message() + .await + .expect("failed to receive message") + == expected_message, + "wrong message received" + ); } /// Test that the whitelist works From 7a9445ad27d557da61fee10e6a737fd9f6a297e1 Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 3 May 2024 09:39:43 -0400 Subject: [PATCH 11/13] two seconds for connect --- tests/src/connections.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/src/connections.rs b/tests/src/connections.rs index cab5372..4080a67 100644 --- a/tests/src/connections.rs +++ b/tests/src/connections.rs @@ -206,10 +206,10 @@ async fn test_whitelist() { let client2 = new_client!(2, vec![Topic::Global], "8085"); // Assert both clients can connect - let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(2), client1.ensure_initialized()).await else { panic!("failed to connect as client1"); }; - let Ok(()) = timeout(Duration::from_secs(1), client2.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(2), client2.ensure_initialized()).await else { panic!("failed to connect as client2"); }; @@ -238,13 +238,13 @@ async fn test_whitelist() { let client2 = new_client!(2, vec![Topic::Global], "8085"); // Assert we can connect as client1 - let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(2), client1.ensure_initialized()).await else { panic!("failed to connect as client1"); }; // Assert we can't connect as client2 assert!( - timeout(Duration::from_secs(1), client2.ensure_initialized()) + timeout(Duration::from_secs(2), client2.ensure_initialized()) .await .is_err(), "client2 connected when it shouldn't have" @@ -269,10 +269,10 @@ async fn test_double_connect_same_broker() { let client2 = new_client!(1, vec![Topic::Global], "8085"); // Assert both clients are connected - let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(2), client1.ensure_initialized()).await else { panic!("failed to connect as client1"); }; - let Ok(()) = timeout(Duration::from_secs(1), client2.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(2), client2.ensure_initialized()).await else { panic!("failed to connect as client2"); }; @@ -330,7 +330,7 @@ async fn test_double_connect_different_broker() { .expect("broker failed to perform heartbeat"); // Connect the first client - let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(2), client1.ensure_initialized()).await else { panic!("failed to connect as client1"); }; @@ -341,7 +341,7 @@ async fn test_double_connect_different_broker() { .expect("broker failed to perform heartbeat"); // Connect the second client - let Ok(()) = timeout(Duration::from_secs(1), client2.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(2), client2.ensure_initialized()).await else { panic!("failed to connect as client2"); }; From 70541a4b4fee1f6f37fdea3d88984cef815685dd Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 3 May 2024 09:46:46 -0400 Subject: [PATCH 12/13] change ports for brokers so they don't overlap --- tests/src/connections.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/src/connections.rs b/tests/src/connections.rs index 4080a67..0b3add5 100644 --- a/tests/src/connections.rs +++ b/tests/src/connections.rs @@ -206,10 +206,10 @@ async fn test_whitelist() { let client2 = new_client!(2, vec![Topic::Global], "8085"); // Assert both clients can connect - let Ok(()) = timeout(Duration::from_secs(2), client1.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { panic!("failed to connect as client1"); }; - let Ok(()) = timeout(Duration::from_secs(2), client2.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(1), client2.ensure_initialized()).await else { panic!("failed to connect as client2"); }; @@ -238,13 +238,13 @@ async fn test_whitelist() { let client2 = new_client!(2, vec![Topic::Global], "8085"); // Assert we can connect as client1 - let Ok(()) = timeout(Duration::from_secs(2), client1.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { panic!("failed to connect as client1"); }; // Assert we can't connect as client2 assert!( - timeout(Duration::from_secs(2), client2.ensure_initialized()) + timeout(Duration::from_secs(1), client2.ensure_initialized()) .await .is_err(), "client2 connected when it shouldn't have" @@ -259,20 +259,20 @@ async fn test_double_connect_same_broker() { let discovery_endpoint = get_temp_db_path!(); // Create and start a new broker - new_broker!(0, "8083", "8084", discovery_endpoint); + new_broker!(0, "8086", "8087", discovery_endpoint); // Create and start a new marshal - new_marshal!("8085", discovery_endpoint); + new_marshal!("8088", discovery_endpoint); // Create 2 clients with the same keypair - let client1 = new_client!(1, vec![Topic::Global], "8085"); - let client2 = new_client!(1, vec![Topic::Global], "8085"); + let client1 = new_client!(1, vec![Topic::Global], "8088"); + let client2 = new_client!(1, vec![Topic::Global], "8088"); // Assert both clients are connected - let Ok(()) = timeout(Duration::from_secs(2), client1.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { panic!("failed to connect as client1"); }; - let Ok(()) = timeout(Duration::from_secs(2), client2.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(1), client2.ensure_initialized()).await else { panic!("failed to connect as client2"); }; @@ -300,15 +300,15 @@ async fn test_double_connect_different_broker() { let discovery_endpoint = get_temp_db_path!(); // Create and start two brokers - new_broker!(0, "8088", "8089", discovery_endpoint); - new_broker!(0, "8086", "8087", discovery_endpoint); + new_broker!(0, "8090", "8091", discovery_endpoint); + new_broker!(0, "8092", "8093", discovery_endpoint); // Create and start a new marshal - new_marshal!("8090", discovery_endpoint); + new_marshal!("8094", discovery_endpoint); // Create 2 clients with the same keypair - let client1 = new_client!(1, vec![Topic::Global], "8090"); - let client2 = new_client!(1, vec![Topic::Global], "8090"); + let client1 = new_client!(1, vec![Topic::Global], "8094"); + let client2 = new_client!(1, vec![Topic::Global], "8094"); // Get the brokers let brokers: Vec = new_db_client!(discovery_endpoint, None) @@ -330,7 +330,7 @@ async fn test_double_connect_different_broker() { .expect("broker failed to perform heartbeat"); // Connect the first client - let Ok(()) = timeout(Duration::from_secs(2), client1.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { panic!("failed to connect as client1"); }; @@ -341,7 +341,7 @@ async fn test_double_connect_different_broker() { .expect("broker failed to perform heartbeat"); // Connect the second client - let Ok(()) = timeout(Duration::from_secs(2), client2.ensure_initialized()).await else { + let Ok(()) = timeout(Duration::from_secs(1), client2.ensure_initialized()).await else { panic!("failed to connect as client2"); }; From 6eae102be2ff589ddfca33fe304697be41755d97 Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 3 May 2024 10:06:46 -0400 Subject: [PATCH 13/13] fix ports --- tests/src/connections.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/connections.rs b/tests/src/connections.rs index 0b3add5..839873c 100644 --- a/tests/src/connections.rs +++ b/tests/src/connections.rs @@ -300,8 +300,8 @@ async fn test_double_connect_different_broker() { let discovery_endpoint = get_temp_db_path!(); // Create and start two brokers - new_broker!(0, "8090", "8091", discovery_endpoint); new_broker!(0, "8092", "8093", discovery_endpoint); + new_broker!(0, "8090", "8091", discovery_endpoint); // Create and start a new marshal new_marshal!("8094", discovery_endpoint);