From 166fdd1e36941e14bd4bf563d3a87f2a701ca3d3 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Tue, 24 Mar 2026 10:20:33 +0530 Subject: [PATCH 1/3] feat: queueProducer in --- .github/workflows/ci.yml | 2 +- crates/buqueue-core/src/delivery.rs | 59 ++++++++++-------- crates/buqueue-core/src/lib.rs | 3 +- crates/buqueue-core/src/producer.rs | 92 +++++++++++++++++++++++++++++ 4 files changed, 130 insertions(+), 26 deletions(-) create mode 100644 crates/buqueue-core/src/producer.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9a24782..149c8e1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,7 @@ name: CI on: push: - branches: [**] + branches: ["main"] pull_request: jobs: diff --git a/crates/buqueue-core/src/delivery.rs b/crates/buqueue-core/src/delivery.rs index 84a8f35..97b7cd4 100644 --- a/crates/buqueue-core/src/delivery.rs +++ b/crates/buqueue-core/src/delivery.rs @@ -22,11 +22,11 @@ //! intentional crash safety, but always call `nack()` explicitly so the //! delivery count increments correctly -use std::{collections::HashMap, pin::Pin, sync::Arc}; +use crate::error::{BuqueueError, BuqueueResult, ErrorKind}; use bytes::Bytes; use chrono::{DateTime, Utc}; use serde::de::DeserializeOwned; -use crate::error::{BuqueueError, BuqueueResult, ErrorKind}; +use std::{collections::HashMap, pin::Pin, sync::Arc}; /// A message received from a queue, ready to be processed and acknowledged. #[derive(Debug)] @@ -34,10 +34,10 @@ pub struct Delivery { pub(crate) payload: Bytes, pub(crate) headers: HashMap, pub(crate) routing_key: Option, - #[allow(clippy::struct_field_names)] - pub(crate) delivery_count: u32, pub(crate) first_delivered_at: Option>, pub(crate) ack_handle: Arc, + #[allow(clippy::struct_field_names)] + pub(crate) delivery_count: u32, } impl Delivery { @@ -48,9 +48,16 @@ impl Delivery { routing_key: Option, delivery_count: u32, first_delivered_at: Option>, - ack_handle: Arc + ack_handle: Arc, ) -> Self { - Self { payload, headers, routing_key, delivery_count, first_delivered_at, ack_handle } + Self { + payload, + headers, + routing_key, + delivery_count, + first_delivered_at, + ack_handle, + } } /// Return the raw payload bytes @@ -61,7 +68,7 @@ impl Delivery { /// Deserialise the payload as JSON into the type `T` /// /// # Errors - /// + /// /// Returns `ErrorKind::DeserializationFailed` if the bytes are not valid /// JSON or the schema deosn't match. Check `schema-version` header if you /// version your events @@ -72,9 +79,9 @@ impl Delivery { } /// Returns the payload as a UTF-8 string slice - /// + /// /// # Errors - /// + /// /// Returns `ErrorKind::DeserializationFailed` if the bytes are not valid /// UTF-8. pub fn payload_str(&self) -> BuqueueResult<&str> { @@ -89,7 +96,7 @@ impl Delivery { } /// Returns all headers. - pub fn headers(&self) -> &HashMap{ + pub fn headers(&self) -> &HashMap { &self.headers } @@ -111,7 +118,7 @@ impl Delivery { } /// Return `true` if this is not the first delivery attempt - /// + /// /// Log a warning when this is true, your handler should be idempotent pub fn is_redelivery(&self) -> bool { self.delivery_count > 1 @@ -120,9 +127,9 @@ impl Delivery { /// Acknowledge: tell the broker this message was processed successfully /// /// # Errors - /// + /// /// Return error from emited by backend - /// + /// /// The broker will not redeliver it pub async fn ack(self) -> BuqueueResult<()> { self.ack_handle.ack().await @@ -131,9 +138,9 @@ impl Delivery { /// Negatively acknowledge: tell the broker this message was not processed. /// /// # Errors - /// + /// /// Return error from emited by backend - /// + /// /// The broker will redeliver it, or route it to the DLQ if /// `max_receive_count` has been reached. pub async fn nack(self) -> BuqueueResult<()> { @@ -141,7 +148,6 @@ impl Delivery { } } - /// Internal trait implemeted by each backend to communicate ack/nack to the broker /// /// Backend implementors: implement this and wrap it in `Arc` @@ -165,14 +171,14 @@ mod tests { #[derive(Debug)] struct SpyAckHandle { acked: AtomicBool, - nacked: AtomicBool + nacked: AtomicBool, } impl SpyAckHandle { fn new() -> Arc { Arc::new(Self { acked: AtomicBool::new(false), - nacked: AtomicBool::new(false) + nacked: AtomicBool::new(false), }) } } @@ -191,12 +197,12 @@ mod tests { fn make_delivery(handle: Arc) -> Delivery { Delivery::new( - Bytes::from_static(b"{\"id\":1}"), + Bytes::from_static(b"{\"id\":1}"), HashMap::new(), - Some("orders.placed".into()), + Some("orders.placed".into()), 1, None, - handle + handle, ) } @@ -219,7 +225,9 @@ mod tests { #[tokio::test] async fn payload_json_decode() { #[derive(Debug, serde::Deserialize, PartialEq)] - struct E { id: u32 } + struct E { + id: u32, + } let event: E = make_delivery(SpyAckHandle::new()).payload_json().unwrap(); assert_eq!(event, E { id: 1 }); } @@ -231,6 +239,9 @@ mod tests { #[test] fn routing_key_accessible() { - assert_eq!(make_delivery(SpyAckHandle::new()).routing_key(), Some("orders.placed")); + assert_eq!( + make_delivery(SpyAckHandle::new()).routing_key(), + Some("orders.placed") + ); } -} \ No newline at end of file +} diff --git a/crates/buqueue-core/src/lib.rs b/crates/buqueue-core/src/lib.rs index b3c1ea2..b492400 100644 --- a/crates/buqueue-core/src/lib.rs +++ b/crates/buqueue-core/src/lib.rs @@ -33,6 +33,7 @@ #![warn(unsafe_code)] +pub mod delivery; pub mod error; pub mod message; -pub mod delivery; +pub mod producer; diff --git a/crates/buqueue-core/src/producer.rs b/crates/buqueue-core/src/producer.rs new file mode 100644 index 0000000..99d8a9e --- /dev/null +++ b/crates/buqueue-core/src/producer.rs @@ -0,0 +1,92 @@ +//! The `QueueProducer` trait: the sending side of a queue. +//! +//! ## Rust 2024 edition notes +//! +//! Uses native async function in traits, no `#[async_trait]` proc-macro + +use crate::{error::BuqueueResult, message::Message}; + + +/// A unique identifier for a sent message, as assigned by the broker. +/// +/// The format is backend-specific: +/// - Kafka: `{topic}-{partition}-{offset}` +/// - Nats: `JetStream` sequence number as a string +/// - SQS: the SQS `MessageId` UUID +/// - `RabbitMQ`: monotonically incrementing string per channel +/// - Redis: the XADD entry ID +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct MessageId(pub String); + +impl std::fmt::Display for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for MessageId { + fn from(value: String) -> Self { + Self(value) + } +} + +impl From<&str> for MessageId { + fn from(value: &str) -> Self { + Self(value.to_string()) + } +} + +/// Trait for the sending side of a message queue. +/// +/// All five buqueue backends implement this trait. Write your busniess logic +/// agains this trait to stay backend-generic. +/// +/// ## Implementing this trait +/// +/// ```rust,ignore +/// // No #[async_trait] needed- native async fn in traits, Rust 2024 edition. +/// impl QueueProducer for MyBackendProducer { +/// async fn send(&self, message: Message) -> BuqueueResult { +/// // ... publish to your broker ... +/// } +/// } +/// ``` +pub trait QueueProducer: Send + Sync { + /// Send a single message and return its broke-assigned ID. + /// + /// This is the fundamental operation that every backend must implement. + fn send(&self, message: Message) -> impl Future> + Send; + + /// Send multiple messages in one broker operation where supported + /// + /// Returns one `MessageID` per message in the same order as input + /// + /// The default sends messages sequentially. Override in your backed + /// to use native batch APIs + /// + /// If any individual send fails, the error is returned immediately and subsequent + /// messages in the batch are not sent + fn send_batch( + &self, + messages: Vec, + ) -> impl Future>> + Send { + async move { + let mut ids = Vec::with_capacity(messages.len()); + for msg in messages { + ids.push(self.send(msg).await?); + } + Ok(ids) + } + } + + /// Send a message that will not be visible to consmers until `delivery_at`. + /// + /// The message is accepted by the broker immediately but held back until + /// the specified UTC timestamp. Your consumer code is unchanged, it simply + /// receives the message when it becomes visible, with no knowledge it was scheduled. + /// + /// Each backend implemets this differently under the hood: + /// - SQS: native `DelaySeconds` (< 15 min); holding queue for longer delays + /// - NATS: `JetStream` native publish-after timestamp + /// - `RabbitMQ`: +} \ No newline at end of file From eb3f4e22bfc0f9852036729004f0793f2ecc2ae6 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 8 Apr 2026 16:45:11 +0530 Subject: [PATCH 2/3] feat: added core feature for producer and consumer backend --- Cargo.lock | 127 +++++++++++ Cargo.toml | 2 + crates/buqueue-core/Cargo.toml | 2 + crates/buqueue-core/src/consumer.rs | 337 ++++++++++++++++++++++++++++ crates/buqueue-core/src/delivery.rs | 2 +- crates/buqueue-core/src/lib.rs | 2 + crates/buqueue-core/src/producer.rs | 251 +++++++++++++++++++-- crates/buqueue-core/src/shutdown.rs | 155 +++++++++++++ 8 files changed, 860 insertions(+), 18 deletions(-) create mode 100644 crates/buqueue-core/src/consumer.rs create mode 100644 crates/buqueue-core/src/shutdown.rs diff --git a/Cargo.lock b/Cargo.lock index 347be04..86d9803 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,10 +35,12 @@ version = "0.1.0" dependencies = [ "bytes", "chrono", + "futures", "serde", "serde_json", "thiserror", "tokio", + "tracing", ] [[package]] @@ -98,6 +100,94 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "slab", +] + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -318,6 +408,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.1" @@ -393,6 +489,37 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", +] + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/Cargo.toml b/Cargo.toml index a1ac8b4..d4344e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,8 @@ serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" chrono = "0.4.44" tokio = { version = "1.50.0", features = ["full"] } +tracing = "0.1.44" +futures = "0.3.32" [workspace.lints.rust] missing_docs = "warn" diff --git a/crates/buqueue-core/Cargo.toml b/crates/buqueue-core/Cargo.toml index 01c85f5..33103ba 100644 --- a/crates/buqueue-core/Cargo.toml +++ b/crates/buqueue-core/Cargo.toml @@ -12,6 +12,8 @@ serde = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } +futures = { workspace = true } [lints] workspace = true diff --git a/crates/buqueue-core/src/consumer.rs b/crates/buqueue-core/src/consumer.rs new file mode 100644 index 0000000..e1163e7 --- /dev/null +++ b/crates/buqueue-core/src/consumer.rs @@ -0,0 +1,337 @@ +//! The `QueueConsumer` trait and its dynamic wrapper `DynConsumer` +//! +//! ## Architecture +//! +//! Mirrors the layers exactly: +//! +//! ```text +//! QueueConsumer — generic trait, impl Future, Send (not Sized — needs &mut self) +//! │ .into_dyn() +//! ▼ +//! ErasedQueueConsumer — dyn-compatible bridge (Pin>), crate-private +//! │ +//! ▼ +//! BaseDynConsumer — the type users actually hold when they want type erasure +//! (= DynConsumer) +//! ``` +//! +//! ## Why `QueueConsumer` is not Sized +//! +//! `QueueProducer` is Sized because producer only take `&self`. So, you can +//! put them behind `Arc`. `QueueConsumer` takes `&mut self` (receive mutates +//! internal state like a channel cursor), so it cannot be `Sized + shared`. +//! The `ref_delegate!` macro therefore can only covers `Box` and `&mut T`, +//! not `Arc`. + +use std::pin::Pin; + +use futures::{Stream, stream}; + +use crate::{delivery::Delivery, error::BuqueueResult, shutdown::ShutdownHandle}; + +// -------- QueueConsumer ------------------------------ + +/// Traits for receiving side of a message queue +/// +/// Implement this for your backend. Use `DynConsumer (via .into_dyn())` +/// when you need type erasure +/// +/// ## Basic usage +/// +/// ```rust,ignore +/// while let Some(delivery) = consumer.receive_graceful().await { +/// let event: MyEvent = delivery.payload_json()?; +/// process(event).await?; +/// delivery.ack().await?; +/// } +/// ``` +/// +/// ## Implementing this trait +/// +/// Only `receive()`[`Self::receive`] is required. All other methods have correct +/// default implementations. +/// +/// ```rust,ignore +/// impl QueueConsumer for MyBackendConsumer { +/// async fn receive(&mut self) -> BuqueueResult { +/// // ... poll your broker ... +/// } +/// } +/// ``` +pub trait QueueConsumer: Send { + /// Block until a message is available and return it. + /// + /// The only method you must implement. The returned `Delivery` must + /// eventually the ack'd and nack'd + fn receive(&mut self) -> impl Future> + Send; + + /// Returns a message immediately if one is availabe, or `None` if empty. + /// + /// Does not block. Useful in tests to assert a queue is drained. + /// The default wraps `receive()` in a zero-duration timeout. + /// Override in your backend for a true non-blocking poll. + fn try_receive(&mut self) -> impl Future>> + Send { + async move { + use tokio::time::{Duration, timeout}; + match timeout(Duration::ZERO, self.receive()).await { + Ok(result) => result.map(Some), + Err(_) => Ok(None), + } + } + } + + /// Request up to `max` messages in one call. + /// + /// Always blocks for the first message, the poll non-blocking until + /// `max` is reached or the queue is drained. Never returns an empty vec. + /// + /// Override in your backend to use Native batch APIs + /// (SQS `ReceiveMessage`, Kafka `poll`, NATS `fetch`, etc.). + fn receive_batch( + &mut self, + max: usize, + ) -> impl Future>> + Send { + async move { + let first = self.receive().await?; + let mut deliveries = vec![first]; + while deliveries.len() < max { + match self.try_receive().await? { + Some(delivery) => deliveries.push(delivery), + None => break, + } + } + Ok(deliveries) + } + } + + /// Returns a `ShutdownHandle` to signal this consumer to shutdown gracefully + /// + /// The default returns a no-noop handle. Override in your backend to wire + /// up real shutdown signalling + fn shutdown_handle(&self) -> ShutdownHandle { + ShutdownHandle::new_noop() + } + + /// Like `receive()`[(`Self::receive`)], but returns `None` after shutdown + /// + /// Races between a new message arriving and shutdown sginal, + /// whicever resolves first wins: + /// + /// ```rust,ignore + /// while let Some(delivery) = consumer.receive_graceful().await { + /// delivery.payload_json::()?.process().await?; + /// delivery.ack().await?; + /// } + /// ``` + fn receive_graceful(&mut self) -> impl Future>> + Send { + async move { + if self.shutdown_handle().is_shutdown() { + return None; + } + let shutdown = self.shutdown_handle(); + tokio::select! { + delivery = self.receive() => Some(delivery), + () = shutdown.wait_for_shutdown() => None, + } + } + } + + /// Convert this consumer into an async `Stream` of deliverie + /// + /// Consumes `self`. Implemented with `futures::stream::unfold`, no unsafe + /// + /// ```rust,ignore + /// use futures::StreamExt; + /// consumer.into_stream() + /// .take(100) + /// .for_each_concurrent(8, |res| async move { + /// res.unwrap().ack().await.unwrap(); + /// }) + /// .await; + /// ``` + fn into_stream(self) -> impl Stream> + Send + where + Self: Sized + 'static, + { + stream::unfold(self, |mut consumer| async move { + if consumer.shutdown_handle().is_shutdown() { + return None; + } + let item = consumer.receive().await; + Some((item, consumer)) + }) + } +} + +// ------- ref_delegate! ------------------------ +// +// Implements QueueConsumer for &mut T and Box where T: QueueConsumer +// Arc is excluded, Consumer take &mut self so shared ownership via Arc +// is not possible without a Mutex, which is the caller's responsibility + +macro_rules! ref_delefate { + ($ty_params:ident, $ty:ty) => { + #[deny(unconditional_recursion)] + impl<$ty_params: QueueConsumer> QueueConsumer for $ty { + fn receive(&mut self) -> impl Future> + Send { + (**self).receive() + } + + fn try_receive( + &mut self, + ) -> impl Future>> + Send { + (**self).try_receive() + } + + fn receive_batch( + &mut self, + max: usize, + ) -> impl Future>> + Send { + (**self).receive_batch(max) + } + + fn shutdown_handle(&self) -> ShutdownHandle { + (**self).shutdown_handle() + } + + fn receive_graceful( + &mut self, + ) -> impl Future>> + Send { + (**self).receive_graceful() + } + } + }; +} + +ref_delefate!(T, &mut T); +ref_delefate!(T, Box); + +// ------ ErasedQueueConsumer ------------------ +// +// The dyn-compatible bridge. Same reasoning as ErasedQueueProducer. +// into_stream and into_dyn are excluded, both require Self: Sized + +pub(crate) trait ErasedQueueConsumer: Send { + fn receive<'a>( + &'a mut self, + ) -> Pin> + Send + 'a>>; + + fn try_receive<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>>; + + fn receive_batch<'a>( + &'a mut self, + max: usize, + ) -> Pin>> + Send + 'a>>; + + fn shutdown_handle(&self) -> ShutdownHandle; + + fn receive_graceful<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>>; +} + +// --- DynConsumerInner ------------------ + +struct DynConsumerInner { + inner: C +} + +impl ErasedQueueConsumer for DynConsumerInner { + fn receive<'a>( + &'a mut self, + ) -> Pin> + Send + 'a>> { + Box::pin(self.inner.receive()) + } + + fn try_receive<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>> { + Box::pin(self.inner.try_receive()) + } + + fn receive_batch<'a>( + &'a mut self, + max: usize, + ) -> Pin>> + Send + 'a>> { + Box::pin(self.inner.receive_batch(max)) + } + + fn shutdown_handle(&self) -> ShutdownHandle { + self.inner.shutdown_handle() + } + + fn receive_graceful<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>> { + Box::pin(self.inner.receive_graceful()) + } +} + + +// ---- BaseDynConsumer / DynConsumer ---------------------- + +/// A type erased consumer. Obtain one by calling `.into_dyn()`[(`QueueConsumer::into_dyn`)] +/// on any concrete consumer, or via `BackendBuilder::make_dynamic()` +pub struct BaseDynConsumer<'a>(Box); + +/// A `'static` type-erased consumer. The standard way to hold a consumer +/// without a generic parameter. +/// +/// ```rust,ignore +/// struct AppState { +/// consumer: DynConsumer, +/// } +/// ``` +pub type DynConsumer = BaseDynConsumer<'static>; + +impl <'a> BaseDynConsumer<'a> { + fn new(inner: impl QueueConsumer + 'a) -> Self { + Self(Box::new(DynConsumerInner { inner })) + } + + /// Block until a message is available. + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn receive(&mut self) -> BuqueueResult { + self.0.receive().await + } + + /// Non-blocking poll, returns `None` if queue is currently empty + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn try_receive(&mut self) -> BuqueueResult> { + self.0.try_receive().await + } + + /// Returns up to `max` messages + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn receive_batch(&mut self, max: usize) -> BuqueueResult> { + self.0.receive_batch(max).await + } + + /// Returns the shutdown handle for this consumer + #[must_use] + pub fn shutdown_handle(&self) -> ShutdownHandle { + self.0.shutdown_handle() + } + + /// Like `receive`, but returns `None` after shutdown is signalled + pub async fn receive_graceful(&mut self) -> Option> { + self.0.receive_graceful().await + } +} + +impl std::fmt::Debug for BaseDynConsumer<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DynConsumer").finish_non_exhaustive() + } +} \ No newline at end of file diff --git a/crates/buqueue-core/src/delivery.rs b/crates/buqueue-core/src/delivery.rs index 97b7cd4..6af1230 100644 --- a/crates/buqueue-core/src/delivery.rs +++ b/crates/buqueue-core/src/delivery.rs @@ -54,9 +54,9 @@ impl Delivery { payload, headers, routing_key, - delivery_count, first_delivered_at, ack_handle, + delivery_count, } } diff --git a/crates/buqueue-core/src/lib.rs b/crates/buqueue-core/src/lib.rs index b492400..ca47ddc 100644 --- a/crates/buqueue-core/src/lib.rs +++ b/crates/buqueue-core/src/lib.rs @@ -33,7 +33,9 @@ #![warn(unsafe_code)] +pub mod consumer; pub mod delivery; pub mod error; pub mod message; pub mod producer; +pub mod shutdown; diff --git a/crates/buqueue-core/src/producer.rs b/crates/buqueue-core/src/producer.rs index 99d8a9e..ce089ad 100644 --- a/crates/buqueue-core/src/producer.rs +++ b/crates/buqueue-core/src/producer.rs @@ -1,14 +1,15 @@ //! The `QueueProducer` trait: the sending side of a queue. -//! +//! //! ## Rust 2024 edition notes -//! +//! //! Uses native async function in traits, no `#[async_trait]` proc-macro use crate::{error::BuqueueResult, message::Message}; - +use chrono::{DateTime, Utc}; +use std::{future::Future, pin::Pin, sync::Arc}; /// A unique identifier for a sent message, as assigned by the broker. -/// +/// /// The format is backend-specific: /// - Kafka: `{topic}-{partition}-{offset}` /// - Nats: `JetStream` sequence number as a string @@ -37,12 +38,12 @@ impl From<&str> for MessageId { } /// Trait for the sending side of a message queue. -/// +/// /// All five buqueue backends implement this trait. Write your busniess logic /// agains this trait to stay backend-generic. -/// +/// /// ## Implementing this trait -/// +/// /// ```rust,ignore /// // No #[async_trait] needed- native async fn in traits, Rust 2024 edition. /// impl QueueProducer for MyBackendProducer { @@ -51,19 +52,32 @@ impl From<&str> for MessageId { /// } /// } /// ``` -pub trait QueueProducer: Send + Sync { +/// +/// ## Starting acorss tasks +/// +/// `QueueProcuder` is implemented for `Arc` where `T: QueueProducer`, so +/// you can clone `Arc` into each task without wrapping in a `Mutex`: +/// +/// ```rust,ignore +/// let producer = Arc::new(SqsProducer::new(config).await?); +/// for _ in 0..8 { +/// let p = Arc::clone(&producer); +/// tokio::spwan(async move{ p.send(msg).await }); +/// } +/// ``` +pub trait QueueProducer: Send + Sync + Sized { /// Send a single message and return its broke-assigned ID. - /// + /// /// This is the fundamental operation that every backend must implement. fn send(&self, message: Message) -> impl Future> + Send; /// Send multiple messages in one broker operation where supported /// /// Returns one `MessageID` per message in the same order as input - /// + /// /// The default sends messages sequentially. Override in your backed /// to use native batch APIs - /// + /// /// If any individual send fails, the error is returned immediately and subsequent /// messages in the batch are not sent fn send_batch( @@ -80,13 +94,216 @@ pub trait QueueProducer: Send + Sync { } /// Send a message that will not be visible to consmers until `delivery_at`. - /// + /// /// The message is accepted by the broker immediately but held back until /// the specified UTC timestamp. Your consumer code is unchanged, it simply /// receives the message when it becomes visible, with no knowledge it was scheduled. - /// + /// /// Each backend implemets this differently under the hood: - /// - SQS: native `DelaySeconds` (< 15 min); holding queue for longer delays - /// - NATS: `JetStream` native publish-after timestamp - /// - `RabbitMQ`: -} \ No newline at end of file + /// - SQS: native `DelaySeconds` (< 15 min); holding queue for longer delays + /// - NATS: `JetStream` native publish-after timestamp + /// - `RabbitMQ`: `rabbitmq-delayed-message-exchange` plugin, no official support + /// - Redis: sorted-set staging area with a background forwarder task + /// - Kafka: deplay topic with a buqueue-managed forwarder consumer + /// + /// **The default implementation ignores `delivery_at` and sends immediately** + /// It emits a `tracing::warn!` so you know it's not scheduled. + /// Override this in your backend to provide real scheduling + fn send_at( + &self, + message: Message, + delivery_at: DateTime, + ) -> impl Future> + Send { + async move { + tracing::warn!( + %delivery_at, + "send_at() called but this backend has not implemented scheduling -\ + sending immediately. Override send_at() in yout QueueProducer impl." + ); + self.send(message).await + } + } + + /// Erase this producer's concrete type, returning a `DynProducer` + /// + /// Use this when you need to: + /// - Store a producer in a struct without a generic paramenter + /// - Select a backend at runtime and store the result uniformly + /// - Return a producer from a function without exposing the backend type + /// + /// ```rust,ignore + /// let producer: DynProducer = SqsProducer::new(config).await?.into_dyn(); + /// ``` + fn into_dyn<'a>(self) -> BaseDynProducer<'a> + where + Self: 'a, + { + BaseDynProducer::new(self) + } +} + +// -- ref_delegate! ------------ +// +// Implements QueueProcuder for &T, Box, and Arc where T: QueueProducer. +// This means all three wrapper types forward every method to the inner T +// withouth any manual boilerplate. Add new wrapper types here as needed. +// +// #[deny(unconditional_recursion)] guards against accidentally forwarding +// to self instead of **self, which would loop forever at runtime. + +macro_rules! ref_delegate { + ($ty_param:ident, $ty:ty) => { + #[deny(unconditional_recursion)] + impl<$ty_param: QueueProducer> QueueProducer for $ty { + fn send( + &self, + message: Message, + ) -> impl Future> + Send { + (**self).send(message) + } + + fn send_batch( + &self, + messages: Vec, + ) -> impl Future>> + Send { + (**self).send_batch(messages) + } + + fn send_at( + &self, + message: Message, + delivery_at: DateTime, + ) -> impl Future> + Send { + (**self).send_at(message, delivery_at) + } + } + }; +} + +ref_delegate!(T, &T); +ref_delegate!(T, Box); +ref_delegate!(T, Arc); + +// --- ErasedQueueProducer ------------- +// +// The dyn-compatible bridge between QueueProducer and BaseDynProducer. +// +// QueueProducer uses `impl Fturue` which is not dyn-compatible (opaque +// associated types cannot go in a vtable). ErasedQueueProducer uses +// Pin> which is dyn compatible, fixed size, known ABI +// +// This trait is create-private. Users never interact with it directly + +pub(crate) trait ErasedQueueProducer: Send + Sync { + fn send<'a>( + &'a self, + message: Message, + ) -> Pin> + Send + 'a>>; + + fn send_batch<'a>( + &'a self, + messages: Vec, + ) -> Pin>> + Send + 'a>>; + + fn send_at<'a>( + &'a self, + message: Message, + delivery_at: DateTime, + ) -> Pin> + Send + 'a>>; +} + +// -- DynProducerInner ----------- +// +// Wraps a concrete QueueProducer and implements ErasedQueueProducer for it +// by boxing the future. This is the only place Box::pin(...) is needed. + +struct DynProducerInner

{ + inner: P, +} + +impl ErasedQueueProducer for DynProducerInner

{ + fn send<'a>( + &'a self, + message: Message, + ) -> Pin> + Send + 'a>> { + Box::pin(self.inner.send(message)) + } + + fn send_batch<'a>( + &'a self, + messages: Vec, + ) -> Pin>> + Send + 'a>> { + Box::pin(self.inner.send_batch(messages)) + } + + fn send_at<'a>( + &'a self, + message: Message, + delivery_at: DateTime, + ) -> Pin> + Send + 'a>> { + Box::pin(self.inner.send_at(message, delivery_at)) + } +} + +// --- BaseDynProducer / DynProducer --------- + +/// A type erased producer. Obtain one by calling `.into_dyn()`(`QueueProducer::into_dyn`) +/// on any concrete producer, or via `BackendBuilder::make_dynamic()` +/// +/// `DynProducer` is an alias for `BaseDynProducer<'static>`, which is what +/// you want in almost all cases. Use `BaseDynProducer<'a>` only when the +/// underlyinh producer borrows data with a shorter lifetime +pub struct BaseDynProducer<'a>(Box); + +/// A `'static` type erased producer. The standard to hold a producer +/// without a generic parameter. +/// +/// ```rust,ignore +/// struct AppState { +/// producer: DynProducer, +/// } +/// ``` +pub type DynProducer = BaseDynProducer<'static>; + +impl<'a> BaseDynProducer<'a> { + fn new(inner: impl QueueProducer + 'a) -> Self { + Self(Box::new(DynProducerInner { inner })) + } + + /// Send a single message + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn send(&self, message: Message) -> BuqueueResult { + self.0.send(message).await + } + + /// Send a batch of messages + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn send_batch(&self, messages: Vec) -> BuqueueResult> { + self.0.send_batch(messages).await + } + + /// Send a message with a scheduled delivery time + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn send_at( + &self, + message: Message, + delivery_at: DateTime, + ) -> BuqueueResult { + self.0.send_at(message, delivery_at).await + } +} + +impl std::fmt::Debug for BaseDynProducer<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DynProducer").finish_non_exhaustive() + } +} diff --git a/crates/buqueue-core/src/shutdown.rs b/crates/buqueue-core/src/shutdown.rs new file mode 100644 index 0000000..aafd605 --- /dev/null +++ b/crates/buqueue-core/src/shutdown.rs @@ -0,0 +1,155 @@ +//! Graceful shutdown signalling for consumers. +//! +//! A `ShutdownHandle` is returned by `QueueConsumer::shutdown_handle()`. +//! Call `ShutdownHandle::shutdown()` from a SIGTERM handler to signal the +//! consumer to drain and stop. + +use std::sync::Arc; + +use tokio::sync::watch; + + +/// A handle that signal a consumer to shutdown gracefully +/// +/// Cloneable, you can send copies to multipler tasks. +/// When any copy calls `shutdown()`[(`Self::shutdown`)], all consumers +/// holding a matching handle will see the shutdown signal. +/// +/// ## Usage +/// +/// ```rust,ignore +/// let shutdown = consumer.shutdown_handle(); +/// +/// tokio::spwan(async move { +/// tokio::signal::ctrl_c().await.unwrap(); +/// shutdown.shutdown().await; +/// }); +/// +/// while let Some(delivery) = consumer.receive_graceful().await { +/// // process... +/// } +/// ``` +#[derive(Clone, Debug)] +pub struct ShutdownHandle { + inner: Arc, +} + +/// Internal shared shutdown signal using a `watch` channel. +/// +/// The channel broadcasts a boolean flag: +/// - `false` → system is running +/// - `true` → shutdown requested +/// +/// All `ShutdownHandle` clones share this state, allowing coordinated +/// shutdown across async tasks. +/// +/// This is an internal implementation detail. +#[derive(Debug)] +pub struct ShutdownInner { + sender: watch::Sender, + receiver: watch::Receiver, +} + +impl ShutdownHandle { + /// Create a new , active (not yet shutdown) `ShutdownHandle` pair. + /// + /// The returned handle is the signal sender. Pass it (or a clone) to + /// your shutdown task. The receiver side is embedded inside the handle + /// via `Arc` so consumer can check `is_shutdown()`. + #[must_use] + pub fn new() -> Self { + let (tx, rx) = watch::channel(false); + Self { + inner: Arc::new(ShutdownInner { + sender: tx, + receiver: rx, + }), + } + } + + /// Creats a no-op handle that is never triggered. + /// + /// Used as the deault in `QueueConsumer::shutdown_handle()` for + /// backends that haven't implemented graceful shutdown yet + #[must_use] + pub fn new_noop() -> Self { + Self::new() + } + + /// Signal all consumers holding this handle to shutdown. + /// + /// After this returns, `is_shutdown()`[(`Self::is_shutdown`)] will return + /// `true` on all clones of this handle. + pub fn shutdown(&self) { + // send(true) will neven fail since we hold both end via Arc + let _ = self.inner.sender.send(true); + } + + /// Returns `true` if shutdown has been signaled + #[must_use] + pub fn is_shutdown(&self) -> bool { + *self.inner.receiver.borrow() + } + + /// Await shutdown, suspends the calling task until `shutdown()`[(`Self::is_shutdown`)] + /// is called. Useful for wiring into a select loop: + /// + /// ```rust,ignore + /// tokio::select! { + /// _ = handle.wait_for_shutdown() => { /* clean up */} + /// delivery = consumer.receive() => { /* process */ } + /// } + /// ``` + pub async fn wait_for_shutdown(&self) { + let mut rx = self.inner.receiver.clone(); + // Wait until the value becomes true + let _ = rx.wait_for(|v| *v).await; + } +} + +impl Default for ShutdownHandle { + fn default() -> Self { + Self::new() + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn start_not_shutdown() { + let handle = ShutdownHandle::new(); + assert!(!handle.is_shutdown()); + } + + #[tokio::test] + async fn shutdown_is_visible_on_clone() { + let handle = ShutdownHandle::new(); + let clone = handle.clone(); + + handle.shutdown(); + + assert!(clone.is_shutdown()); + } + + #[tokio::test] + async fn wait_for_shutdown_resolves_after_signal() { + let handle = ShutdownHandle::new(); + let trigger = handle.clone(); + + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + trigger.shutdown(); + }); + + // This will hang forever if shutdown is never signaled + tokio::time::timeout( + tokio::time::Duration::from_millis(500), + handle.wait_for_shutdown(), + ) + .await + .expect("shutdown should have signaled within 500ms"); + } +} \ No newline at end of file From d7565c95b35ddcb7bd19e53d17d00e2d6b2f4ef0 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 8 Apr 2026 16:58:22 +0530 Subject: [PATCH 3/3] into_dyn() --- crates/buqueue-core/src/consumer.rs | 44 ++++++++++++++++++++--------- crates/buqueue-core/src/shutdown.rs | 21 ++++++-------- 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/crates/buqueue-core/src/consumer.rs b/crates/buqueue-core/src/consumer.rs index e1163e7..fa43036 100644 --- a/crates/buqueue-core/src/consumer.rs +++ b/crates/buqueue-core/src/consumer.rs @@ -161,6 +161,23 @@ pub trait QueueConsumer: Send { Some((item, consumer)) }) } + + /// Erase this consumer's concrete type, returning a `DynConsumer` + /// + /// Use this when you need to: + /// - Store a consumer in a struct without a generic parameter + /// - Select a backend at runtime and store the result uniformly + /// - Return a consumer from a function without exposing the backend type + /// + /// ```rust,ignore + /// let consumer: DynConsumer = SqsConsumer::new(config).await?.into_dyn(); + /// ``` + fn into_dyn<'a>(self) -> BaseDynConsumer<'a> + where + Self: Sized + 'a, + { + BaseDynConsumer::new(self) + } } // ------- ref_delegate! ------------------------ @@ -235,26 +252,26 @@ pub(crate) trait ErasedQueueConsumer: Send { // --- DynConsumerInner ------------------ struct DynConsumerInner { - inner: C + inner: C, } -impl ErasedQueueConsumer for DynConsumerInner { +impl ErasedQueueConsumer for DynConsumerInner { fn receive<'a>( - &'a mut self, - ) -> Pin> + Send + 'a>> { + &'a mut self, + ) -> Pin> + Send + 'a>> { Box::pin(self.inner.receive()) } fn try_receive<'a>( - &'a mut self, - ) -> Pin>> + Send + 'a>> { + &'a mut self, + ) -> Pin>> + Send + 'a>> { Box::pin(self.inner.try_receive()) } fn receive_batch<'a>( - &'a mut self, - max: usize, - ) -> Pin>> + Send + 'a>> { + &'a mut self, + max: usize, + ) -> Pin>> + Send + 'a>> { Box::pin(self.inner.receive_batch(max)) } @@ -263,13 +280,12 @@ impl ErasedQueueConsumer for DynConsumerInner { } fn receive_graceful<'a>( - &'a mut self, - ) -> Pin>> + Send + 'a>> { + &'a mut self, + ) -> Pin>> + Send + 'a>> { Box::pin(self.inner.receive_graceful()) } } - // ---- BaseDynConsumer / DynConsumer ---------------------- /// A type erased consumer. Obtain one by calling `.into_dyn()`[(`QueueConsumer::into_dyn`)] @@ -286,7 +302,7 @@ pub struct BaseDynConsumer<'a>(Box); /// ``` pub type DynConsumer = BaseDynConsumer<'static>; -impl <'a> BaseDynConsumer<'a> { +impl<'a> BaseDynConsumer<'a> { fn new(inner: impl QueueConsumer + 'a) -> Self { Self(Box::new(DynConsumerInner { inner })) } @@ -334,4 +350,4 @@ impl std::fmt::Debug for BaseDynConsumer<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DynConsumer").finish_non_exhaustive() } -} \ No newline at end of file +} diff --git a/crates/buqueue-core/src/shutdown.rs b/crates/buqueue-core/src/shutdown.rs index aafd605..0a2a225 100644 --- a/crates/buqueue-core/src/shutdown.rs +++ b/crates/buqueue-core/src/shutdown.rs @@ -1,15 +1,13 @@ //! Graceful shutdown signalling for consumers. //! //! A `ShutdownHandle` is returned by `QueueConsumer::shutdown_handle()`. -//! Call `ShutdownHandle::shutdown()` from a SIGTERM handler to signal the +//! Call `ShutdownHandle::shutdown()` from a SIGTERM handler to signal the //! consumer to drain and stop. use std::sync::Arc; - use tokio::sync::watch; - -/// A handle that signal a consumer to shutdown gracefully +// / A handle that signal a consumer to shutdown gracefully /// /// Cloneable, you can send copies to multipler tasks. /// When any copy calls `shutdown()`[(`Self::shutdown`)], all consumers @@ -24,7 +22,7 @@ use tokio::sync::watch; /// tokio::signal::ctrl_c().await.unwrap(); /// shutdown.shutdown().await; /// }); -/// +/// /// while let Some(delivery) = consumer.receive_graceful().await { /// // process... /// } @@ -59,10 +57,10 @@ impl ShutdownHandle { #[must_use] pub fn new() -> Self { let (tx, rx) = watch::channel(false); - Self { - inner: Arc::new(ShutdownInner { - sender: tx, - receiver: rx, + Self { + inner: Arc::new(ShutdownInner { + sender: tx, + receiver: rx, }), } } @@ -113,7 +111,6 @@ impl Default for ShutdownHandle { } } - #[cfg(test)] mod tests { use super::*; @@ -146,10 +143,10 @@ mod tests { // This will hang forever if shutdown is never signaled tokio::time::timeout( - tokio::time::Duration::from_millis(500), + tokio::time::Duration::from_millis(500), handle.wait_for_shutdown(), ) .await .expect("shutdown should have signaled within 500ms"); } -} \ No newline at end of file +}