diff --git a/Cargo.lock b/Cargo.lock index 2e944bb..347be04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,14 +2,43 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "bitflags" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" + +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + [[package]] name = "buqueue-core" version = "0.1.0" dependencies = [ "bytes", + "chrono", "serde", "serde_json", "thiserror", + "tokio", ] [[package]] @@ -18,18 +47,179 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cc" +version = "1.2.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" +dependencies = [ + "find-msvc-tools", + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "chrono" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" +dependencies = [ + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + +[[package]] +name = "iana-time-zone" +version = "0.1.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "itoa" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "js-sys" +version = "0.3.91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + +[[package]] +name = "libc" +version = "0.2.183" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" + +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + [[package]] name = "memchr" version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "mio" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + [[package]] name = "proc-macro2" version = "1.0.106" @@ -48,6 +238,27 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.228" @@ -91,6 +302,38 @@ dependencies = [ "zmij", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "syn" version = "2.0.117" @@ -122,12 +365,159 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +dependencies = [ + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "wasm-bindgen" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/Cargo.toml b/Cargo.toml index 223e1a1..a1ac8b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,8 @@ thiserror = { version = "2.0.18" } bytes = "1.11.1" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" +chrono = "0.4.44" +tokio = { version = "1.50.0", features = ["full"] } [workspace.lints.rust] missing_docs = "warn" diff --git a/crates/buqueue-core/Cargo.toml b/crates/buqueue-core/Cargo.toml index f931673..01c85f5 100644 --- a/crates/buqueue-core/Cargo.toml +++ b/crates/buqueue-core/Cargo.toml @@ -10,6 +10,8 @@ thiserror = { workspace = true } bytes = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +chrono = { workspace = true } +tokio = { workspace = true } [lints] workspace = true diff --git a/crates/buqueue-core/src/delivery.rs b/crates/buqueue-core/src/delivery.rs new file mode 100644 index 0000000..84a8f35 --- /dev/null +++ b/crates/buqueue-core/src/delivery.rs @@ -0,0 +1,236 @@ +//! The `Delivery` type: What you receive from a queue +//! +//! ## Rust 2024 edition notes +//! +//! The `AckHandle` trait uses native aync function in traits +//! No `#[async_trait]` proc-macro anywhere +//! +//! ## Lifecycle +//! +//! ```text +//! consumer.receive() +//! │ +//! ▼ +//! Delivery ──► .payload_json::() ──► your Rust type +//! │ +//! ├──► .ack().await — success: broker discards the message +//! └──► .nack().await — failure: broker redelivers (or DLQ after N attempts) +//! ``` +//! +//! Every `Delivery` must eventually be ack'd and nack'd. Dropping withouth +//! calling either causes redelivery after the broker's visibility timeout +//! intentional crash safety, but always call `nack()` explicitly so the +//! delivery count increments correctly + +use std::{collections::HashMap, pin::Pin, sync::Arc}; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use serde::de::DeserializeOwned; +use crate::error::{BuqueueError, BuqueueResult, ErrorKind}; + +/// A message received from a queue, ready to be processed and acknowledged. +#[derive(Debug)] +pub struct Delivery { + pub(crate) payload: Bytes, + pub(crate) headers: HashMap, + pub(crate) routing_key: Option, + #[allow(clippy::struct_field_names)] + pub(crate) delivery_count: u32, + pub(crate) first_delivered_at: Option>, + pub(crate) ack_handle: Arc, +} + +impl Delivery { + /// Construct a `Delivery`. called by backend implementation only + pub fn new( + payload: Bytes, + headers: HashMap, + routing_key: Option, + delivery_count: u32, + first_delivered_at: Option>, + ack_handle: Arc + ) -> Self { + Self { payload, headers, routing_key, delivery_count, first_delivered_at, ack_handle } + } + + /// Return the raw payload bytes + pub fn payload(&self) -> &Bytes { + &self.payload + } + + /// Deserialise the payload as JSON into the type `T` + /// + /// # Errors + /// + /// Returns `ErrorKind::DeserializationFailed` if the bytes are not valid + /// JSON or the schema deosn't match. Check `schema-version` header if you + /// version your events + pub fn payload_json(&self) -> BuqueueResult { + serde_json::from_slice(&self.payload).map_err(|e| { + BuqueueError::with_source(ErrorKind::DeserializationFailed(e.to_string()), e) + }) + } + + /// Returns the payload as a UTF-8 string slice + /// + /// # Errors + /// + /// Returns `ErrorKind::DeserializationFailed` if the bytes are not valid + /// UTF-8. + pub fn payload_str(&self) -> BuqueueResult<&str> { + str::from_utf8(&self.payload).map_err(|e| { + BuqueueError::with_source(ErrorKind::DeserializationFailed(e.to_string()), e) + }) + } + + /// Returns a single header value by key + pub fn header(&self, key: &str) -> Option<&str> { + self.headers.get(key).map(String::as_str) + } + + /// Returns all headers. + pub fn headers(&self) -> &HashMap{ + &self.headers + } + + /// Returns the routing key, if one was set on the original message + pub fn routing_key(&self) -> Option<&str> { + self.routing_key.as_deref() + } + + /// Returns the number of times this message has been delivered + /// + /// Starts at 1. If > 1, a prevoius processing attemp failed or timed out + pub fn delivery_count(&self) -> u32 { + self.delivery_count + } + + /// Returns the UTC timestamp of the first delivery attempt, if available + pub fn first_delivery_at(&self) -> Option> { + self.first_delivered_at + } + + /// Return `true` if this is not the first delivery attempt + /// + /// Log a warning when this is true, your handler should be idempotent + pub fn is_redelivery(&self) -> bool { + self.delivery_count > 1 + } + + /// Acknowledge: tell the broker this message was processed successfully + /// + /// # Errors + /// + /// Return error from emited by backend + /// + /// The broker will not redeliver it + pub async fn ack(self) -> BuqueueResult<()> { + self.ack_handle.ack().await + } + + /// Negatively acknowledge: tell the broker this message was not processed. + /// + /// # Errors + /// + /// Return error from emited by backend + /// + /// The broker will redeliver it, or route it to the DLQ if + /// `max_receive_count` has been reached. + pub async fn nack(self) -> BuqueueResult<()> { + self.ack_handle.nack().await + } +} + + +/// Internal trait implemeted by each backend to communicate ack/nack to the broker +/// +/// Backend implementors: implement this and wrap it in `Arc` +/// when construction a `Delivery`. Users never interact with this directly +/// +/// Uses native async fn in traits +pub trait AckHandle: Send + Sync + std::fmt::Debug { + /// Send acknowledgement to the broker + fn ack(&self) -> Pin> + Send + '_>>; + + /// Send negative acknowledgement to the broker + fn nack(&self) -> Pin> + Send + '_>>; +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicBool, Ordering}; + + use super::*; + + #[derive(Debug)] + struct SpyAckHandle { + acked: AtomicBool, + nacked: AtomicBool + } + + impl SpyAckHandle { + fn new() -> Arc { + Arc::new(Self { + acked: AtomicBool::new(false), + nacked: AtomicBool::new(false) + }) + } + } + + impl AckHandle for SpyAckHandle { + fn ack(&self) -> Pin> + Send + '_>> { + self.acked.store(true, Ordering::SeqCst); + Box::pin(async move { Ok(()) }) + } + + fn nack(&self) -> Pin> + Send + '_>> { + self.nacked.store(true, Ordering::SeqCst); + Box::pin(async move { Ok(()) }) + } + } + + fn make_delivery(handle: Arc) -> Delivery { + Delivery::new( + Bytes::from_static(b"{\"id\":1}"), + HashMap::new(), + Some("orders.placed".into()), + 1, + None, + handle + ) + } + + #[tokio::test] + async fn ack_calls_hanlde() { + let handle = SpyAckHandle::new(); + let spy = Arc::clone(&handle); + make_delivery(handle).ack().await.unwrap(); + assert!(spy.acked.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn nack_calls_handle() { + let handle = SpyAckHandle::new(); + let spy = Arc::clone(&handle); + make_delivery(handle).nack().await.unwrap(); + assert!(spy.nacked.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn payload_json_decode() { + #[derive(Debug, serde::Deserialize, PartialEq)] + struct E { id: u32 } + let event: E = make_delivery(SpyAckHandle::new()).payload_json().unwrap(); + assert_eq!(event, E { id: 1 }); + } + + #[test] + fn is_redelivery_false_on_first() { + assert!(!make_delivery(SpyAckHandle::new()).is_redelivery()); + } + + #[test] + fn routing_key_accessible() { + assert_eq!(make_delivery(SpyAckHandle::new()).routing_key(), Some("orders.placed")); + } +} \ No newline at end of file diff --git a/crates/buqueue-core/src/lib.rs b/crates/buqueue-core/src/lib.rs index 0b99f4f..b3c1ea2 100644 --- a/crates/buqueue-core/src/lib.rs +++ b/crates/buqueue-core/src/lib.rs @@ -35,3 +35,4 @@ pub mod error; pub mod message; +pub mod delivery; diff --git a/crates/buqueue-core/src/message.rs b/crates/buqueue-core/src/message.rs index 3bbcedc..1c748bd 100644 --- a/crates/buqueue-core/src/message.rs +++ b/crates/buqueue-core/src/message.rs @@ -71,7 +71,7 @@ pub struct Message { /// - SQS FIFO: mapped to `MEssageDedpulicationID` /// - Kafka: used as the idempotent producer key when idempotent is enabled /// - Others: storead as the `bq-dedup-id` header - your consumer can read it and - /// `implement` application level deduplication manually + /// `implement` application level deduplication manually pub(crate) deduplication_id: Option, } @@ -126,7 +126,7 @@ impl Message { /// This is the most common pattern in event-driven systems: /// /// # Errors - /// + /// /// Returns an error if: /// - serialization to JSON fails. /// - build returns an error @@ -153,7 +153,6 @@ impl Message { .build() } - /// Returns the raw payload bytes pub fn payload(&self) -> &Bytes { &self.payload @@ -180,9 +179,9 @@ impl Message { } /// Adds a deduplication ID to this message, consuming and returning it - /// + /// /// # Errors - /// + /// /// Returns an error if the ID is an empty string. /// /// Userful for chaining after `Message::from_json_with_key` @@ -209,7 +208,7 @@ impl Message { /// Adds or overwirtes a header on this message, consuming and returning it /// /// # Errors - /// + /// /// Validates the key the same way the builder does — returns an error /// if the key is empty or starts with the reserved `bq-` prefix. /// @@ -223,7 +222,11 @@ impl Message { /// .with_header("retry-count", "3") /// .unwrap(); /// ``` - pub fn with_header(mut self, key: impl Into, value: impl Into) -> BuqueueResult { + pub fn with_header( + mut self, + key: impl Into, + value: impl Into, + ) -> BuqueueResult { let key = key.into(); let value = value.into(); validate_header_key(&key)?; @@ -238,8 +241,8 @@ const RESERVED_HEADER_PREFIX: &str = "bq-"; fn validate_header_key(key: &str) -> BuqueueResult<()> { if key.is_empty() { return Err(BuqueueError::new(ErrorKind::InvalidConfig( - "header key must not be empty".into() - ))) + "header key must not be empty".into(), + ))); } if key.starts_with(RESERVED_HEADER_PREFIX) { @@ -326,9 +329,9 @@ impl MessageBulder { } /// Validate and Finalise the builder and return the `Message` - /// + /// /// # Errors - /// + /// /// Returns Error if: /// - No payload was set or the payload is empty /// - Any header key is empty @@ -337,15 +340,21 @@ impl MessageBulder { /// - The deduplication ID is an empty string pub fn build(self) -> BuqueueResult { let payload = match self.payload { - None => return Err(BuqueueError::new(ErrorKind::InvalidConfig( - "payload must be set before calling build() - \ - call .payload(bytes) on the builder".into() - ))), - Some(p) if p.is_empty() => return Err(BuqueueError::new(ErrorKind::InvalidConfig( - "payload must not be empty - \ + None => { + return Err(BuqueueError::new(ErrorKind::InvalidConfig( + "payload must be set before calling build() - \ + call .payload(bytes) on the builder" + .into(), + ))); + } + Some(p) if p.is_empty() => { + return Err(BuqueueError::new(ErrorKind::InvalidConfig( + "payload must not be empty - \ if you need a zero-byte signal message, use a 1-bytes payload \ - or envode intent in a header instead".into() - ))), + or envode intent in a header instead" + .into(), + ))); + } Some(p) => p, }; @@ -354,12 +363,14 @@ impl MessageBulder { validate_header_value(value, key)?; } - if let Some(ref id) = self.deduplication_id && id.is_empty() { + if let Some(ref id) = self.deduplication_id + && id.is_empty() + { return Err(BuqueueError::new(ErrorKind::InvalidConfig( "deduplication_id must not be empty".into(), ))); } - + Ok(Message { payload, headers: self.headers, @@ -369,13 +380,15 @@ impl MessageBulder { } } - #[cfg(test)] mod tests { use super::*; #[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)] - struct TestEvent { id: u32, name: String } + struct TestEvent { + id: u32, + name: String, + } #[test] fn builder_sets_all_fields() { @@ -397,16 +410,24 @@ mod tests { #[test] fn from_json_sets_content_type() { - let msg = Message::from_json(&TestEvent {id: 1, name: "test".into() }).unwrap(); + let msg = Message::from_json(&TestEvent { + id: 1, + name: "test".into(), + }) + .unwrap(); assert_eq!(msg.header("content-type"), Some("application/json")); } #[test] fn from_json_with_key_sets_routing_key() { let msg = Message::from_json_with_key( - &TestEvent {id: 1, name: "test".into()}, - "orders.placed" - ).unwrap(); + &TestEvent { + id: 1, + name: "test".into(), + }, + "orders.placed", + ) + .unwrap(); assert_eq!(msg.routing_key(), Some("orders.placed")); assert_eq!(msg.header("content-type"), Some("application/json")); @@ -486,7 +507,7 @@ mod tests { assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); assert!(err.to_string().contains("must not be empty")); } - + #[test] fn with_header_rejects_reserved_prefix() { let msg = Message::builder() @@ -505,9 +526,12 @@ mod tests { .build() .unwrap_err(); assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); - assert!(err.to_string().contains("deduplication_id must not be empty")); + assert!( + err.to_string() + .contains("deduplication_id must not be empty") + ); } - + #[test] fn with_deduplication_id_rejects_empty() { let msg = Message::builder() @@ -517,4 +541,4 @@ mod tests { let err = msg.with_deduplication_id("").unwrap_err(); assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); } -} \ No newline at end of file +}