From 9d225c94662737ae5c31e3ce7bd7db2bb1fc4d39 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Tue, 10 Mar 2026 23:16:54 +0530 Subject: [PATCH 1/2] add message structs --- .github/workflows/ci.yml | 4 +- Cargo.lock | 7 ++ Cargo.toml | 4 +- crates/buqueue-core/Cargo.toml | 1 + crates/buqueue-core/src/message.rs | 104 ++++++++++++++++++++++++++++- 5 files changed, 114 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3c30f0a..9a24782 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,7 @@ name: CI on: push: - branches: [main] + branches: [**] pull_request: jobs: @@ -11,9 +11,7 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - - uses: Swatinem/rust-cache@v2 - run: cargo check --workspace diff --git a/Cargo.lock b/Cargo.lock index 229f56d..5634f87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6,9 +6,16 @@ version = 4 name = "buqueue-core" version = "0.1.0" dependencies = [ + "bytes", "thiserror", ] +[[package]] +name = "bytes" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + [[package]] name = "proc-macro2" version = "1.0.106" diff --git a/Cargo.toml b/Cargo.toml index b211182..fcdc45a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,11 +11,11 @@ edition="2024" [workspace.dependencies] buqueue-core = { path = "buqueue-core" } thiserror = { version = "2.0.18" } - +bytes = "1.11.1" [workspace.lints.rust] missing_docs = "warn" [workspace.lints.clippy] all = "warn" -pedantic = "warn" \ No newline at end of file +pedantic = "warn" diff --git a/crates/buqueue-core/Cargo.toml b/crates/buqueue-core/Cargo.toml index f37be34..077476d 100644 --- a/crates/buqueue-core/Cargo.toml +++ b/crates/buqueue-core/Cargo.toml @@ -7,6 +7,7 @@ license = "MIT" [dependencies] thiserror = { workspace = true } +bytes = { workspace = true } [lints] workspace = true diff --git a/crates/buqueue-core/src/message.rs b/crates/buqueue-core/src/message.rs index 56884b9..1d3fa4a 100644 --- a/crates/buqueue-core/src/message.rs +++ b/crates/buqueue-core/src/message.rs @@ -1 +1,103 @@ -// Todo \ No newline at end of file +//! The `Message` type: what you build and send into a queue +//! +//! `Message` carries three things: +//! - `payload`: raw bytes - you choose the encoding, +//! - `headers`: aribtary key - value metadata, propagated by all backends +//! - `metadata`: routing key, dedepulication ID, content type +//! +//! ## Building a message +//! +//! Use `Message::from_json` for the common case or `Message::builder` +//! when you need full control +//! +//! ```rust +//! use buqueue_core::message::Message; +//! use bytes::Bytes; +//! +//! // JSON shortcut: sets content-type header automatically +//! #[derive(serde::{Serialize, Deserialize})] +//! struct MyEvent { id: u32 } +//! +//! let msg = Message::from_json(&MyEvent { id: 1 }).unwrap(); +//! +//! // Full builder: any encoding, full header control +//! let msg = Message::builder() +//! .payload(Bytes::from_static(b"hello")) +//! .routing_key("orders.placed") +//! .header("schema-version", "2") +//! .deduplication_id("order-99") +//! .build(); +//! ``` + +use bytes::Bytes; +use std::collections::HashMap; + +/// A message to be sent into a queue +/// +/// Built via `Message::builder()` or convenience contructors +/// `Message::form_json` and `Message::from_json_with_key` +#[derive(Debug, Clone)] +pub struct Message { + /// Raw payload bytes. The encoding is entirely depends + /// buqueue never inspects or transfors the payloads + pub(crate) payload: Bytes, + + /// Arbitary key-value headers + /// + /// These propagate through all five backends: + /// - Kafka: record headers + /// - NATS: message headers + /// - SQS: message attributes + /// - RabbitMQ: AMQP headers + /// - Redis: stream entry fields prefixed with `hdr:` + pub(crate) headers: HashMap, + + /// Optional routing key + /// + /// Maps to the backends nattive routing concept: + /// - Kafka: message key (detemines partition) + /// - NATS: subject + /// - SQS: MessageGroupID (FIFO quques only; ingnored for standard) + /// - RabbitMQ: AMQP routing key + /// - Redis: stored as a stream field + pub(crate) rotuing_key: Option, + + /// Optional deduplication ID. + /// + /// - SQS FIFO: mapped to `MEssageDedpulicationID` + /// - Kafka: used as the idempotent producer key when idempotent is enabled + /// - Others: storead as the `bq-dedup-id` header - your consumer can read it and + /// implement application level deduplication manually + pub(crate) deduplication_id: Option, +} + +impl Message { + /// Start building a `Message` with the fluent builder API + /// + /// ```rust + /// use buqueue_core::message::Message; + /// use bytes::Bytes; + /// + /// let msg = Message::builder() + /// .payload(Bytes::from_static(b"raw bytes")) + /// .routing_key("orders.placed") + /// .header("schema-version", "2") + /// .build(); + /// ``` + pub fn builder() -> MessageBulder { + MessageBulder::default() + } +} + +/// Builder +/// +/// Fluent builder for `Message`. +/// +/// Obtain one with `Message::builder()` +#[derive(Debug, Default)] +pub struct MessageBulder { + payload: Bytes, + headers: HashMap, + routing_key: Option, + deduplication_id: Option, +} From e68a5bede6763282a79ab84ac2d859d6b35cbbc1 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Sun, 22 Mar 2026 19:48:18 +0530 Subject: [PATCH 2/2] feat: add Message and Message builder with unit test --- Cargo.lock | 63 +++++ Cargo.toml | 2 + crates/buqueue-core/Cargo.toml | 2 + crates/buqueue-core/src/message.rs | 433 ++++++++++++++++++++++++++++- 4 files changed, 492 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5634f87..2e944bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7,6 +7,8 @@ name = "buqueue-core" version = "0.1.0" dependencies = [ "bytes", + "serde", + "serde_json", "thiserror", ] @@ -16,6 +18,18 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + [[package]] name = "proc-macro2" version = "1.0.106" @@ -34,6 +48,49 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + [[package]] name = "syn" version = "2.0.117" @@ -70,3 +127,9 @@ name = "unicode-ident" version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index fcdc45a..223e1a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,8 @@ edition="2024" buqueue-core = { path = "buqueue-core" } thiserror = { version = "2.0.18" } bytes = "1.11.1" +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.149" [workspace.lints.rust] missing_docs = "warn" diff --git a/crates/buqueue-core/Cargo.toml b/crates/buqueue-core/Cargo.toml index 077476d..f931673 100644 --- a/crates/buqueue-core/Cargo.toml +++ b/crates/buqueue-core/Cargo.toml @@ -8,6 +8,8 @@ license = "MIT" [dependencies] thiserror = { workspace = true } bytes = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } [lints] workspace = true diff --git a/crates/buqueue-core/src/message.rs b/crates/buqueue-core/src/message.rs index 1d3fa4a..3bbcedc 100644 --- a/crates/buqueue-core/src/message.rs +++ b/crates/buqueue-core/src/message.rs @@ -15,7 +15,7 @@ //! use bytes::Bytes; //! //! // JSON shortcut: sets content-type header automatically -//! #[derive(serde::{Serialize, Deserialize})] +//! #[derive(serde::Serialize, serde::Deserialize)] //! struct MyEvent { id: u32 } //! //! let msg = Message::from_json(&MyEvent { id: 1 }).unwrap(); @@ -25,18 +25,22 @@ //! .payload(Bytes::from_static(b"hello")) //! .routing_key("orders.placed") //! .header("schema-version", "2") -//! .deduplication_id("order-99") +//! .dedpulication_id("order-99") //! .build(); //! ``` use bytes::Bytes; +use serde::Serialize; use std::collections::HashMap; +use crate::error::{BuqueueError, BuqueueResult, ErrorKind}; + /// A message to be sent into a queue /// /// Built via `Message::builder()` or convenience contructors /// `Message::form_json` and `Message::from_json_with_key` #[derive(Debug, Clone)] +#[allow(dead_code)] pub struct Message { /// Raw payload bytes. The encoding is entirely depends /// buqueue never inspects or transfors the payloads @@ -48,7 +52,7 @@ pub struct Message { /// - Kafka: record headers /// - NATS: message headers /// - SQS: message attributes - /// - RabbitMQ: AMQP headers + /// - `RabbitMQ`: AMQP headers /// - Redis: stream entry fields prefixed with `hdr:` pub(crate) headers: HashMap, @@ -57,17 +61,17 @@ pub struct Message { /// Maps to the backends nattive routing concept: /// - Kafka: message key (detemines partition) /// - NATS: subject - /// - SQS: MessageGroupID (FIFO quques only; ingnored for standard) - /// - RabbitMQ: AMQP routing key + /// - SQS: `MessageGroupID` (FIFO quques only; ingnored for standard) + /// - `RabbitMQ`: AMQP routing key /// - Redis: stored as a stream field - pub(crate) rotuing_key: Option, + pub(crate) routing_key: Option, /// Optional deduplication ID. /// /// - SQS FIFO: mapped to `MEssageDedpulicationID` /// - Kafka: used as the idempotent producer key when idempotent is enabled /// - Others: storead as the `bq-dedup-id` header - your consumer can read it and - /// implement application level deduplication manually + /// `implement` application level deduplication manually pub(crate) deduplication_id: Option, } @@ -84,9 +88,179 @@ impl Message { /// .header("schema-version", "2") /// .build(); /// ``` + #[must_use] pub fn builder() -> MessageBulder { MessageBulder::default() } + + /// Build a `Message` by serialising `value` as JSON + /// + /// Sets the `content-type` header to `application/json` automatically + /// + /// # Errors + /// + /// Returns an error if: + /// - serialization to JSON fails. + /// - build returns an error + /// + /// ```rust + /// # use buqueue_core::message::Message; + /// # #[derive(serde::Serialize)] + /// # struct Order { id: u32 } + /// let msg = Message::from_json(&Order { id: 1}).unwrap(); + /// assert_eq!(msg.header("content-type"), Some("application/json")); + /// ``` + pub fn from_json(value: &T) -> BuqueueResult { + let payload = serde_json::to_vec(value).map_err(|e| { + BuqueueError::with_source(ErrorKind::SerializationFailed(e.to_string()), e) + })?; + + Self::builder() + .payload(Bytes::from(payload)) + .header("content-type", "application/json") + .build() + } + + /// Build a `Message` by serialising `value` to JSON, with a routing key + /// + /// This is the most common pattern in event-driven systems: + /// + /// # Errors + /// + /// Returns an error if: + /// - serialization to JSON fails. + /// - build returns an error + /// + /// ```rust + /// # use buqueue_core::message::Message; + /// # #[derive(serde::Serialize)] + /// # struct Order { id: u32 } + /// let msg = Message::from_json_with_key(&Order { id: 1 }, "Orders.Placed").unwrap(); + /// assert_eq!(msg.routing_key(), Some("Orders.Placed")); + /// ``` + pub fn from_json_with_key( + value: &T, + routing_key: impl Into, + ) -> BuqueueResult { + let payload = serde_json::to_vec(value).map_err(|e| { + BuqueueError::with_source(ErrorKind::SerializationFailed(e.to_string()), e) + })?; + + Self::builder() + .payload(Bytes::from(payload)) + .routing_key(routing_key) + .header("content-type", "application/json") + .build() + } + + + /// Returns the raw payload bytes + pub fn payload(&self) -> &Bytes { + &self.payload + } + + /// Returns the value of a header by key, if present + pub fn header(&self, key: &str) -> Option<&str> { + self.headers.get(key).map(String::as_str) + } + + /// Returns all headers as a referenace to the underlying map + pub fn headers(&self) -> &HashMap { + &self.headers + } + + /// Return the routing key, if one was set + pub fn routing_key(&self) -> Option<&str> { + self.routing_key.as_deref() + } + + /// Returns the deduplication Id, if one was set + pub fn deduplication_id(&self) -> Option<&str> { + self.deduplication_id.as_deref() + } + + /// Adds a deduplication ID to this message, consuming and returning it + /// + /// # Errors + /// + /// Returns an error if the ID is an empty string. + /// + /// Userful for chaining after `Message::from_json_with_key` + /// + /// ```rust + /// # use buqueue_core::message::Message; + /// # #[derive(serde::Serialize)] + /// # struct Order { id: u32 } + /// let msg = Message::from_json_with_key(&Order { id: 99 }, "orders.placed") + /// .unwrap() + /// .with_deduplication_id("order-99"); + /// ``` + pub fn with_deduplication_id(mut self, id: impl Into) -> BuqueueResult { + let id = id.into(); + if id.is_empty() { + return Err(BuqueueError::new(ErrorKind::InvalidConfig( + "deduplication_id must not be empty".into(), + ))); + } + self.deduplication_id = Some(id); + Ok(self) + } + + /// Adds or overwirtes a header on this message, consuming and returning it + /// + /// # Errors + /// + /// Validates the key the same way the builder does — returns an error + /// if the key is empty or starts with the reserved `bq-` prefix. + /// + /// ```rust + /// # use buqueue_core::message::Message; + /// # use bytes::Bytes; + /// let msg = Message::builder() + /// .payload(Bytes::from_static(b"hello")) + /// .build() + /// .unwrap() + /// .with_header("retry-count", "3") + /// .unwrap(); + /// ``` + pub fn with_header(mut self, key: impl Into, value: impl Into) -> BuqueueResult { + let key = key.into(); + let value = value.into(); + validate_header_key(&key)?; + validate_header_value(&value, &key)?; + self.headers.insert(key, value); + Ok(self) + } +} + +const RESERVED_HEADER_PREFIX: &str = "bq-"; + +fn validate_header_key(key: &str) -> BuqueueResult<()> { + if key.is_empty() { + return Err(BuqueueError::new(ErrorKind::InvalidConfig( + "header key must not be empty".into() + ))) + } + + if key.starts_with(RESERVED_HEADER_PREFIX) { + return Err(BuqueueError::new(ErrorKind::InvalidConfig(format!( + "header key {key:?} uses the reserved \"bq-\" prefix -\ + this prefix is used internally by buqueue \ + Choose a different prefix for your own headers." + )))); + } + + Ok(()) +} + +fn validate_header_value(value: &str, key: &str) -> BuqueueResult<()> { + if value.is_empty() { + return Err(BuqueueError::new(ErrorKind::InvalidConfig(format!( + "header value for key {key:?} must not be empty — \ + omit the header entirely if you have no value to set" + )))); + } + Ok(()) } /// Builder @@ -95,9 +269,252 @@ impl Message { /// /// Obtain one with `Message::builder()` #[derive(Debug, Default)] +#[allow(dead_code)] pub struct MessageBulder { - payload: Bytes, + payload: Option, headers: HashMap, routing_key: Option, deduplication_id: Option, } + +impl MessageBulder { + /// Set the rwa payload bytes + /// + /// The encoding is entirely up to you - buqueue never inspects or modifies it + /// Must not be empty, `build()` will return an error if it is + #[must_use] + pub fn payload>(mut self, payload: B) -> Self { + self.payload = Some(payload.into()); + self + } + + /// Add a header key-value pair + /// + /// Calling `.header()` multiple times is fine, header accumulate + /// Calling it twice with the same key will overwrite the previous value. + /// + /// Validation is deferred to `build()`: + /// - Keys must not be empty + /// - Keys must not start with the reserved `bq-` prefix + /// - Values must not be empty + /// + /// Notes: `key` and `value` accept any `Into` independently, can mix `&str` and `String` + #[must_use] + pub fn header(mut self, key: impl Into, value: impl Into) -> Self { + self.headers.insert(key.into(), value.into()); + self + } + + /// Set the routing key + /// + /// Maps to each backend's native routing primitive: + /// kafa message key, NATS subject, `RabbitMQ` routing key, SQS `MessageGroupID` (FIFO only) + #[must_use] + pub fn routing_key(mut self, key: impl Into) -> Self { + self.routing_key = Some(key.into()); + self + } + + /// Set the deduplication ID + /// + /// Used for SQS FIFO exactly-once dedupilcation and Kafka idempotent prodicers. + /// Stored as the `bq-dedup-id` header on backends that dob't have native support + #[must_use] + pub fn dedpulication_id(mut self, id: impl Into) -> Self { + self.deduplication_id = Some(id.into()); + self + } + + /// Validate and Finalise the builder and return the `Message` + /// + /// # Errors + /// + /// Returns Error if: + /// - No payload was set or the payload is empty + /// - Any header key is empty + /// - Any header key start with the reserved `bq-` prefix + /// - Any header value is empty + /// - The deduplication ID is an empty string + pub fn build(self) -> BuqueueResult { + let payload = match self.payload { + None => return Err(BuqueueError::new(ErrorKind::InvalidConfig( + "payload must be set before calling build() - \ + call .payload(bytes) on the builder".into() + ))), + Some(p) if p.is_empty() => return Err(BuqueueError::new(ErrorKind::InvalidConfig( + "payload must not be empty - \ + if you need a zero-byte signal message, use a 1-bytes payload \ + or envode intent in a header instead".into() + ))), + Some(p) => p, + }; + + for (key, value) in &self.headers { + validate_header_key(key)?; + validate_header_value(value, key)?; + } + + if let Some(ref id) = self.deduplication_id && id.is_empty() { + return Err(BuqueueError::new(ErrorKind::InvalidConfig( + "deduplication_id must not be empty".into(), + ))); + } + + Ok(Message { + payload, + headers: self.headers, + routing_key: self.routing_key, + deduplication_id: self.deduplication_id, + }) + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)] + struct TestEvent { id: u32, name: String } + + #[test] + fn builder_sets_all_fields() { + let msg = Message::builder() + .payload(Bytes::from_static(b"hello")) + .routing_key("order.placed") + .header("schema-version", "3") + .header("source-service", "checkout") + .dedpulication_id("order-99") + .build() + .unwrap(); + + assert_eq!(msg.payload(), &Bytes::from_static(b"hello")); + assert_eq!(msg.routing_key(), Some("order.placed")); + assert_eq!(msg.header("schema-version"), Some("3")); + assert_eq!(msg.header("source-service"), Some("checkout")); + assert_eq!(msg.deduplication_id(), Some("order-99")); + } + + #[test] + fn from_json_sets_content_type() { + let msg = Message::from_json(&TestEvent {id: 1, name: "test".into() }).unwrap(); + assert_eq!(msg.header("content-type"), Some("application/json")); + } + + #[test] + fn from_json_with_key_sets_routing_key() { + let msg = Message::from_json_with_key( + &TestEvent {id: 1, name: "test".into()}, + "orders.placed" + ).unwrap(); + + assert_eq!(msg.routing_key(), Some("orders.placed")); + assert_eq!(msg.header("content-type"), Some("application/json")); + } + + #[test] + fn header_overwrite() { + let msg = Message::builder() + .payload(Bytes::from_static(b"x")) + .header("x-key", "first") + .header("x-key", "second") + .build() + .unwrap(); + + assert_eq!(msg.header("x-key"), Some("second")); + } + + #[test] + fn mixed_key_value_types_compile() { + let owned_value = String::from("v1"); + let msg = Message::builder() + .payload(Bytes::from_static(b"x")) + .header("x-key", owned_value) + .build() + .unwrap(); + + assert_eq!(msg.header("x-key"), Some("v1")); + } + + #[test] + fn error_on_missing_payload() { + let err = Message::builder().build().unwrap_err(); + assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); + assert!(err.to_string().contains("payload must be set")); + } + + #[test] + fn error_on_empty_payload() { + let err = Message::builder() + .payload(Bytes::new()) + .build() + .unwrap_err(); + assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); + assert!(err.to_string().contains("payload must not be empty")); + } + + #[test] + fn error_on_reserved_bq_prefix() { + let err = Message::builder() + .payload(Bytes::from_static(b"x")) + .header("", "value") + .build() + .unwrap_err(); + + assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); + assert!(err.to_string().contains("header key must not be empty")); + } + + #[test] + fn error_on_empty_header_key() { + let err = Message::builder() + .payload(Bytes::from_static(b"x")) + .header("", "value") + .build() + .unwrap_err(); + assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); + assert!(err.to_string().contains("header key must not be empty")); + } + + #[test] + fn error_on_empty_header_value() { + let err = Message::builder() + .payload(Bytes::from_static(b"x")) + .header("x-key", "") + .build() + .unwrap_err(); + assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); + assert!(err.to_string().contains("must not be empty")); + } + + #[test] + fn with_header_rejects_reserved_prefix() { + let msg = Message::builder() + .payload(Bytes::from_static(b"x")) + .build() + .unwrap(); + let err = msg.with_header("bq-custom", "value").unwrap_err(); + assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); + } + + #[test] + fn error_on_empty_deduplication_id() { + let err = Message::builder() + .payload(Bytes::from_static(b"x")) + .dedpulication_id("") + .build() + .unwrap_err(); + assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); + assert!(err.to_string().contains("deduplication_id must not be empty")); + } + + #[test] + fn with_deduplication_id_rejects_empty() { + let msg = Message::builder() + .payload(Bytes::from_static(b"x")) + .build() + .unwrap(); + let err = msg.with_deduplication_id("").unwrap_err(); + assert!(matches!(err.kind, ErrorKind::InvalidConfig(_))); + } +} \ No newline at end of file