From 3b8cb5ac4c498b8416e331c2b99ae7b8ceadf5a1 Mon Sep 17 00:00:00 2001 From: AJ <34186192+Jurshsmith@users.noreply.github.com> Date: Mon, 8 Jul 2024 19:11:42 +0100 Subject: [PATCH] feat(data-stream): simple connection test (#39) * feat: introduce FuelCore trait This trait describes the interface required for `fuel-core-nats` to extend `fuel-core`. Equally, it will be useful for mocking fuel-core related components in our integration tests. * feat: add test for simple nats connections * feat: introduce NatsConnection Houses NATS connection streams, messages, and other connection details. This would eventually expose custom functions for interacting with the NATS server on a need-to -modularize basis. * feat: further scope nats' connection testing * fix: use nats configuration * feat: test nkey authorization when connecting to nats --------- Co-authored-by: Pedro Nauck --- .env.sample | 2 +- Cargo.lock | 105 ++++++++++++++++++++ Cargo.toml | 5 +- Makefile | 8 +- crates/fuel-core-nats/Cargo.toml | 1 + crates/fuel-core-nats/nats.conf | 2 +- crates/fuel-core-nats/src/lib.rs | 157 ++++++++++++++++++++++-------- crates/fuel-core-nats/src/main.rs | 6 +- 8 files changed, 235 insertions(+), 51 deletions(-) diff --git a/.env.sample b/.env.sample index 7a0e01e..d3ca5d9 100644 --- a/.env.sample +++ b/.env.sample @@ -4,4 +4,4 @@ RELAYER_V2_LISTENING_CONTRACTS=0x768f9459E3339A1F7d59CcF24C80Eb4A711a01FB RELAYER_DA_DEPLOY_HEIGHT=5791365 RELAYER_LOG_PAGE_SIZE=2000 SYNC_HEADER_BATCH_SIZE=100 -NATS_NKEY=generated-nats-secret +NATS_NKEY=generated-nats-nkey-seed diff --git a/Cargo.lock b/Cargo.lock index 97b710b..c477145 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1608,6 +1608,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.9.0" @@ -1687,6 +1693,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dtoa" version = "1.0.9" @@ -2128,6 +2140,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2143,6 +2164,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fuel-asm" version = "0.54.1" @@ -2249,6 +2276,7 @@ dependencies = [ "fuel-core-types", "itertools 0.12.1", "postcard", + "rand", "serde", "serde_json", "serde_with", @@ -2372,6 +2400,7 @@ dependencies = [ "async-nats", "bytes", "clap 4.5.7", + "dotenvy", "fuel-core", "fuel-core-bin", "fuel-core-client", @@ -2500,10 +2529,12 @@ dependencies = [ "fuel-vm", "impl-tools", "itertools 0.12.1", + "mockall", "num_enum", "paste", "postcard", "primitive-types", + "rand", "serde", "strum 0.25.0", "strum_macros 0.25.3", @@ -2538,6 +2569,7 @@ dependencies = [ "fuel-core-services", "fuel-core-storage", "fuel-core-types", + "mockall", "num-rational", "parking_lot", "tokio", @@ -2557,6 +2589,7 @@ dependencies = [ "derivative", "derive_more", "fuel-vm", + "rand", "secrecy", "serde", "tai64", @@ -2684,6 +2717,7 @@ checksum = "38056e7a2b4def574a02e5c9859ffc084e17e565525b5e4a93af6249137d9ca0" dependencies = [ "fuel-derive", "hex", + "rand", "serde", ] @@ -2693,6 +2727,7 @@ version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f050af48f6028f2edcc5469e9522ce12b5be6d0761c3cfe7175dea9784e9596" dependencies = [ + "anyhow", "async-trait", "backtrace", "bitflags 2.5.0", @@ -2711,6 +2746,7 @@ dependencies = [ "paste", "percent-encoding", "primitive-types", + "rand", "serde", "serde_with", "sha3", @@ -4510,6 +4546,33 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockall" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "multer" version = "2.1.0" @@ -4711,6 +4774,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -5178,6 +5247,36 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "2.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +dependencies = [ + "difflib", + "float-cmp", + "itertools 0.10.5", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "prettyplease" version = "0.2.20" @@ -6646,6 +6745,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "textwrap" version = "0.16.1" diff --git a/Cargo.toml b/Cargo.toml index ff48d15..eb2a463 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ version = "0.0.1" [workspace.dependencies] anyhow = "1.0" clap = { version = "4.5", features = ["derive"] } +dotenvy = "0.15" futures = "0.3" tokio = { version = "1.38", features = ["full"] } fuel-core-bin = { version = "0.30", features = [ @@ -26,9 +27,9 @@ fuel-core-bin = { version = "0.30", features = [ "relayer", "rocksdb", ] } -fuel-core = { version = "0.30", features = ["p2p", "relayer", "rocksdb"] } +fuel-core = { version = "0.30", features = ["p2p", "relayer", "rocksdb", "test-helpers"] } fuel-core-client = { version = "0.30" } -fuel-core-types = { version = "0.30" } +fuel-core-types = { version = "0.30", features = ["test-helpers"] } fuel-core-services = "0.30" [profile.release] diff --git a/Makefile b/Makefile index d259f4c..e1d8a6a 100644 --- a/Makefile +++ b/Makefile @@ -37,11 +37,13 @@ stop: start.nats: COMMANDS=docker start.nats: check-commands - docker run -p 4222:4222 -p 8222:8222 -p 6222:6222 --name fuel-core-nats-server -ti nats:latest --js + docker run -p 4222:4222 -p 8222:8222 -p 6222:6222 \ + --mount type=bind,source="$$(pwd)"/crates/fuel-core-nats/nats.conf,target=/etc/nats/nats.conf \ + --name fuel-core-nats-server \ + -ti nats:latest --js --config /etc/nats/nats.conf stop.nats: - docker stop $$(docker ps -q --filter ancestor=nats:latest) - docker rm $$(docker ps -a -q --filter ancestor=nats:latest) + docker rm -f $$(docker ps -a -q --filter ancestor=nats:latest) # Starts fuel-core-nats service start.fuel-core-nats: diff --git a/crates/fuel-core-nats/Cargo.toml b/crates/fuel-core-nats/Cargo.toml index 6018323..28fe195 100644 --- a/crates/fuel-core-nats/Cargo.toml +++ b/crates/fuel-core-nats/Cargo.toml @@ -14,6 +14,7 @@ anyhow = "1.0.86" async-nats = "0.35.1" bytes = "1.6.0" clap = "4.5.4" +dotenvy = { workspace = true } fuel-core = { workspace = true } fuel-core-bin = { workspace = true } fuel-core-client = { workspace = true } diff --git a/crates/fuel-core-nats/nats.conf b/crates/fuel-core-nats/nats.conf index 2dfa37e..2a4875e 100644 --- a/crates/fuel-core-nats/nats.conf +++ b/crates/fuel-core-nats/nats.conf @@ -1,6 +1,6 @@ authorization: { users: [ - { nkey: UDMBGUBEOCJVCOXHSQGYRX4SGS3QEAEDIVNHDBD63YEKHSGPVO53I2NU } + { nkey: UAWFS7BJM4WF3SEEGDX42EGTEC2VPLERMAUYFVUXIPIV664IANONDJR5 } ] } jetstream = { diff --git a/crates/fuel-core-nats/src/lib.rs b/crates/fuel-core-nats/src/lib.rs index a16230f..e864570 100644 --- a/crates/fuel-core-nats/src/lib.rs +++ b/crates/fuel-core-nats/src/lib.rs @@ -32,13 +32,22 @@ use fuel_core_types::{ const NUM_TOPICS: usize = 3; -pub struct Publisher { +#[derive(Debug)] +struct NatsConnection { jetstream: async_nats::jetstream::Context, + #[allow(dead_code)] + /// Messages published to jetstream + jetstream_messages: async_nats::jetstream::stream::Stream, + /// Max publishing payload in connected NATS server + max_payload_size: usize, +} + +pub struct Publisher { chain_id: ChainId, base_asset_id: AssetId, - max_payload_size: usize, - fuel_database: CombinedDatabase, - block_subscription: Receiver + Send + Sync>>, + fuel_core_database: CombinedDatabase, + blocks_subscription: Receiver + Send + Sync>>, + nats: NatsConnection, } impl Publisher { @@ -47,10 +56,24 @@ impl Publisher { nats_nkey: Option, chain_id: ChainId, base_asset_id: AssetId, - fuel_database: CombinedDatabase, - block_subscription: Receiver + Send + Sync>>, + fuel_core_database: CombinedDatabase, + blocks_subscription: Receiver< + Arc + Send + Sync>, + >, ) -> anyhow::Result { - // Connect to the NATS server + Ok(Publisher { + chain_id, + base_asset_id, + fuel_core_database, + blocks_subscription, + nats: Self::connect_to_nats(nats_url, nats_nkey).await?, + }) + } + + async fn connect_to_nats( + nats_url: &str, + nats_nkey: Option, + ) -> anyhow::Result { let client = match nats_nkey { Some(nkey) => async_nats::connect_with_options( nats_url, @@ -65,10 +88,11 @@ impl Publisher { let max_payload_size = client.server_info().max_payload; info!("NATS Publisher: max_payload_size={max_payload_size}"); + // Create a JetStream context let jetstream = async_nats::jetstream::new(client); - // Create a JetStream stream (if it doesn't exist) - let _stream = jetstream + + let jetstream_messages = jetstream .get_or_create_stream(async_nats::jetstream::stream::Config { name: "fuel".to_string(), subjects: vec![ @@ -93,17 +117,15 @@ impl Publisher { ..Default::default() }) .await?; - Ok(Publisher { - max_payload_size, - chain_id, - base_asset_id, + + Ok(NatsConnection { jetstream, - fuel_database, - block_subscription, + jetstream_messages, + max_payload_size, }) } - /// Connect to a NATS server and publish messages + /// Publish messages from node(`fuel-core`) to NATS stream /// receipts.{height}.{contract_id}.{kind} e.g. receipts.9000.*.return /// receipts.{height}.{contract_id}.{topic_1} e.g. receipts.*.my_custom_topic /// receipts.{height}.{contract_id}.{topic_1}.{topic_2} e.g. receipts.*.counter.inrc @@ -126,6 +148,7 @@ impl Publisher { ..Default::default() }; let consumer = self + .nats .jetstream .create_consumer_on_stream(config, "fuel") .await?; @@ -141,7 +164,7 @@ impl Publisher { }; // Fast-forward the stream using the local Fuel node database - if let Some(chain_height) = self.fuel_database.on_chain().latest_height()? { + if let Some(chain_height) = self.fuel_core_database.on_chain().latest_height()? { let chain_height: u32 = chain_height.into(); if chain_height > stream_height + 1 { warn!("NATS Publisher: missing blocks: stream block height={stream_height}, chain block height={chain_height}"); @@ -149,7 +172,7 @@ impl Publisher { for height in stream_height + 1..=chain_height { let block: Block = self - .fuel_database + .fuel_core_database .on_chain() .get_sealed_block_by_height(&height.into())? .unwrap_or_else(|| { @@ -162,7 +185,7 @@ impl Publisher { let chain_id = self.chain_id; for t in block.transactions().iter() { let status: Option = self - .fuel_database + .fuel_core_database .off_chain() .get_tx_status(&t.id(&chain_id))?; match status { @@ -191,7 +214,7 @@ impl Publisher { } // Continue publishing blocks from the block importer subscription - while let Ok(result) = self.block_subscription.recv().await { + while let Ok(result) = self.blocks_subscription.recv().await { let mut receipts_: Vec = vec![]; for t in result.tx_status.iter() { let mut receipts = match &t.result { @@ -212,27 +235,6 @@ impl Publisher { Ok(()) } - /// A wrapper around JetStream::publish() that also checks that the payload size does not exceed NATS server's max_payload_size. - async fn publish( - &self, - subject: String, - payload: bytes::Bytes, - ) -> anyhow::Result<()> { - // Check message size - let payload_size = payload.len(); - if payload_size > self.max_payload_size { - anyhow::bail!( - "{subject} payload size={payload_size} exceeds max_payload_size={}", - self.max_payload_size - ) - } - // Publish - let ack_future = self.jetstream.publish(subject, payload).await?; - // Wait for an ACK - ack_future.await?; - Ok(()) - } - /// Publish the Block, its Transactions, and the given Receipts into NATS. pub async fn publish_block( &self, @@ -356,4 +358,77 @@ impl Publisher { Ok(()) } + + /// A wrapper around JetStream::publish() that also checks that the payload size does not exceed NATS server's max_payload_size. + async fn publish( + &self, + subject: String, + payload: bytes::Bytes, + ) -> anyhow::Result<()> { + // Check message size + let payload_size = payload.len(); + if payload_size > self.nats.max_payload_size { + anyhow::bail!( + "{subject} payload size={payload_size} exceeds max_payload_size={}", + self.nats.max_payload_size + ) + } + // Publish + let ack_future = self.nats.jetstream.publish(subject, payload).await?; + // Wait for an ACK + ack_future.await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use async_nats::jetstream::stream::LastRawMessageErrorKind; + + #[tokio::test] + async fn returns_authorization_error_without_nkey() { + assert!(Publisher::connect_to_nats(NATS_URL, None) + .await + .is_err_and(|e| { + e.source() + .expect("An error source must exist") + .to_string() + .contains("authorization violation: nats: authorization violation") + })); + } + + #[tokio::test] + async fn connects_to_nats_with_nkey() { + setup_test(); + + let nats = Publisher::connect_to_nats(NATS_URL, nkey()) + .await + .expect(&format!("Ensure NATS server is running at {NATS_URL}")); + + assert!(nats + .jetstream_messages + .get_last_raw_message_by_subject("*") + .await + .is_err_and(|err| err.kind() == LastRawMessageErrorKind::NoMessageFound)); + } + + #[tokio::test] + async fn returns_max_payload_size_allowed_on_the_connection() { + setup_test(); + + let nats = Publisher::connect_to_nats(NATS_URL, nkey()) + .await + .expect(&format!("Ensure NATS server is running at {NATS_URL}")); + + assert_eq!(nats.max_payload_size, 8_388_608) + } + + const NATS_URL: &str = "nats://localhost:4222"; + fn setup_test() { + dotenvy::dotenv().ok(); + } + fn nkey() -> Option { + std::env::var("NATS_NKEY").ok() + } } diff --git a/crates/fuel-core-nats/src/main.rs b/crates/fuel-core-nats/src/main.rs index 9192bb9..470aed3 100644 --- a/crates/fuel-core-nats/src/main.rs +++ b/crates/fuel-core-nats/src/main.rs @@ -1,6 +1,5 @@ use clap::Parser; -use fuel_core_bin::cli::run; use fuel_core_services::Service; #[derive(Parser)] @@ -12,10 +11,11 @@ pub struct Cli { default_value = "localhost:4222" )] nats_url: String, + /// The NKEY seed. It is usually prefixed with an 'S' #[arg(long, value_name = "NKEY", env = "NATS_NKEY")] nats_nkey: Option, #[command(flatten)] - fuel_core_config: run::Command, + fuel_core_config: fuel_core_bin::cli::run::Command, } #[tokio::main] @@ -23,7 +23,7 @@ async fn main() -> anyhow::Result<()> { fuel_core_bin::cli::init_logging(); let cli = Cli::parse(); - let service = run::get_service(cli.fuel_core_config)?; + let service = fuel_core_bin::cli::run::get_service(cli.fuel_core_config)?; let chain_config = service.shared.config.snapshot_reader.chain_config(); let chain_id = chain_config.consensus_parameters.chain_id(); let base_asset_id = chain_config.consensus_parameters.base_asset_id();