diff --git a/Cargo.lock b/Cargo.lock index 347be04..86d9803 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,10 +35,12 @@ version = "0.1.0" dependencies = [ "bytes", "chrono", + "futures", "serde", "serde_json", "thiserror", "tokio", + "tracing", ] [[package]] @@ -98,6 +100,94 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "slab", +] + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -318,6 +408,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.1" @@ -393,6 +489,37 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", +] + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/Cargo.toml b/Cargo.toml index a1ac8b4..d4344e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,8 @@ serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" chrono = "0.4.44" tokio = { version = "1.50.0", features = ["full"] } +tracing = "0.1.44" +futures = "0.3.32" [workspace.lints.rust] missing_docs = "warn" diff --git a/crates/buqueue-core/Cargo.toml b/crates/buqueue-core/Cargo.toml index 01c85f5..33103ba 100644 --- a/crates/buqueue-core/Cargo.toml +++ b/crates/buqueue-core/Cargo.toml @@ -12,6 +12,8 @@ serde = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } +futures = { workspace = true } [lints] workspace = true diff --git a/crates/buqueue-core/src/consumer.rs b/crates/buqueue-core/src/consumer.rs new file mode 100644 index 0000000..fa43036 --- /dev/null +++ b/crates/buqueue-core/src/consumer.rs @@ -0,0 +1,353 @@ +//! The `QueueConsumer` trait and its dynamic wrapper `DynConsumer` +//! +//! ## Architecture +//! +//! Mirrors the layers exactly: +//! +//! ```text +//! QueueConsumer — generic trait, impl Future, Send (not Sized — needs &mut self) +//! │ .into_dyn() +//! ▼ +//! ErasedQueueConsumer — dyn-compatible bridge (Pin>), crate-private +//! │ +//! ▼ +//! BaseDynConsumer — the type users actually hold when they want type erasure +//! (= DynConsumer) +//! ``` +//! +//! ## Why `QueueConsumer` is not Sized +//! +//! `QueueProducer` is Sized because producer only take `&self`. So, you can +//! put them behind `Arc`. `QueueConsumer` takes `&mut self` (receive mutates +//! internal state like a channel cursor), so it cannot be `Sized + shared`. +//! The `ref_delegate!` macro therefore can only covers `Box` and `&mut T`, +//! not `Arc`. + +use std::pin::Pin; + +use futures::{Stream, stream}; + +use crate::{delivery::Delivery, error::BuqueueResult, shutdown::ShutdownHandle}; + +// -------- QueueConsumer ------------------------------ + +/// Traits for receiving side of a message queue +/// +/// Implement this for your backend. Use `DynConsumer (via .into_dyn())` +/// when you need type erasure +/// +/// ## Basic usage +/// +/// ```rust,ignore +/// while let Some(delivery) = consumer.receive_graceful().await { +/// let event: MyEvent = delivery.payload_json()?; +/// process(event).await?; +/// delivery.ack().await?; +/// } +/// ``` +/// +/// ## Implementing this trait +/// +/// Only `receive()`[`Self::receive`] is required. All other methods have correct +/// default implementations. +/// +/// ```rust,ignore +/// impl QueueConsumer for MyBackendConsumer { +/// async fn receive(&mut self) -> BuqueueResult { +/// // ... poll your broker ... +/// } +/// } +/// ``` +pub trait QueueConsumer: Send { + /// Block until a message is available and return it. + /// + /// The only method you must implement. The returned `Delivery` must + /// eventually the ack'd and nack'd + fn receive(&mut self) -> impl Future> + Send; + + /// Returns a message immediately if one is availabe, or `None` if empty. + /// + /// Does not block. Useful in tests to assert a queue is drained. + /// The default wraps `receive()` in a zero-duration timeout. + /// Override in your backend for a true non-blocking poll. + fn try_receive(&mut self) -> impl Future>> + Send { + async move { + use tokio::time::{Duration, timeout}; + match timeout(Duration::ZERO, self.receive()).await { + Ok(result) => result.map(Some), + Err(_) => Ok(None), + } + } + } + + /// Request up to `max` messages in one call. + /// + /// Always blocks for the first message, the poll non-blocking until + /// `max` is reached or the queue is drained. Never returns an empty vec. + /// + /// Override in your backend to use Native batch APIs + /// (SQS `ReceiveMessage`, Kafka `poll`, NATS `fetch`, etc.). + fn receive_batch( + &mut self, + max: usize, + ) -> impl Future>> + Send { + async move { + let first = self.receive().await?; + let mut deliveries = vec![first]; + while deliveries.len() < max { + match self.try_receive().await? { + Some(delivery) => deliveries.push(delivery), + None => break, + } + } + Ok(deliveries) + } + } + + /// Returns a `ShutdownHandle` to signal this consumer to shutdown gracefully + /// + /// The default returns a no-noop handle. Override in your backend to wire + /// up real shutdown signalling + fn shutdown_handle(&self) -> ShutdownHandle { + ShutdownHandle::new_noop() + } + + /// Like `receive()`[(`Self::receive`)], but returns `None` after shutdown + /// + /// Races between a new message arriving and shutdown sginal, + /// whicever resolves first wins: + /// + /// ```rust,ignore + /// while let Some(delivery) = consumer.receive_graceful().await { + /// delivery.payload_json::()?.process().await?; + /// delivery.ack().await?; + /// } + /// ``` + fn receive_graceful(&mut self) -> impl Future>> + Send { + async move { + if self.shutdown_handle().is_shutdown() { + return None; + } + let shutdown = self.shutdown_handle(); + tokio::select! { + delivery = self.receive() => Some(delivery), + () = shutdown.wait_for_shutdown() => None, + } + } + } + + /// Convert this consumer into an async `Stream` of deliverie + /// + /// Consumes `self`. Implemented with `futures::stream::unfold`, no unsafe + /// + /// ```rust,ignore + /// use futures::StreamExt; + /// consumer.into_stream() + /// .take(100) + /// .for_each_concurrent(8, |res| async move { + /// res.unwrap().ack().await.unwrap(); + /// }) + /// .await; + /// ``` + fn into_stream(self) -> impl Stream> + Send + where + Self: Sized + 'static, + { + stream::unfold(self, |mut consumer| async move { + if consumer.shutdown_handle().is_shutdown() { + return None; + } + let item = consumer.receive().await; + Some((item, consumer)) + }) + } + + /// Erase this consumer's concrete type, returning a `DynConsumer` + /// + /// Use this when you need to: + /// - Store a consumer in a struct without a generic parameter + /// - Select a backend at runtime and store the result uniformly + /// - Return a consumer from a function without exposing the backend type + /// + /// ```rust,ignore + /// let consumer: DynConsumer = SqsConsumer::new(config).await?.into_dyn(); + /// ``` + fn into_dyn<'a>(self) -> BaseDynConsumer<'a> + where + Self: Sized + 'a, + { + BaseDynConsumer::new(self) + } +} + +// ------- ref_delegate! ------------------------ +// +// Implements QueueConsumer for &mut T and Box where T: QueueConsumer +// Arc is excluded, Consumer take &mut self so shared ownership via Arc +// is not possible without a Mutex, which is the caller's responsibility + +macro_rules! ref_delefate { + ($ty_params:ident, $ty:ty) => { + #[deny(unconditional_recursion)] + impl<$ty_params: QueueConsumer> QueueConsumer for $ty { + fn receive(&mut self) -> impl Future> + Send { + (**self).receive() + } + + fn try_receive( + &mut self, + ) -> impl Future>> + Send { + (**self).try_receive() + } + + fn receive_batch( + &mut self, + max: usize, + ) -> impl Future>> + Send { + (**self).receive_batch(max) + } + + fn shutdown_handle(&self) -> ShutdownHandle { + (**self).shutdown_handle() + } + + fn receive_graceful( + &mut self, + ) -> impl Future>> + Send { + (**self).receive_graceful() + } + } + }; +} + +ref_delefate!(T, &mut T); +ref_delefate!(T, Box); + +// ------ ErasedQueueConsumer ------------------ +// +// The dyn-compatible bridge. Same reasoning as ErasedQueueProducer. +// into_stream and into_dyn are excluded, both require Self: Sized + +pub(crate) trait ErasedQueueConsumer: Send { + fn receive<'a>( + &'a mut self, + ) -> Pin> + Send + 'a>>; + + fn try_receive<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>>; + + fn receive_batch<'a>( + &'a mut self, + max: usize, + ) -> Pin>> + Send + 'a>>; + + fn shutdown_handle(&self) -> ShutdownHandle; + + fn receive_graceful<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>>; +} + +// --- DynConsumerInner ------------------ + +struct DynConsumerInner { + inner: C, +} + +impl ErasedQueueConsumer for DynConsumerInner { + fn receive<'a>( + &'a mut self, + ) -> Pin> + Send + 'a>> { + Box::pin(self.inner.receive()) + } + + fn try_receive<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>> { + Box::pin(self.inner.try_receive()) + } + + fn receive_batch<'a>( + &'a mut self, + max: usize, + ) -> Pin>> + Send + 'a>> { + Box::pin(self.inner.receive_batch(max)) + } + + fn shutdown_handle(&self) -> ShutdownHandle { + self.inner.shutdown_handle() + } + + fn receive_graceful<'a>( + &'a mut self, + ) -> Pin>> + Send + 'a>> { + Box::pin(self.inner.receive_graceful()) + } +} + +// ---- BaseDynConsumer / DynConsumer ---------------------- + +/// A type erased consumer. Obtain one by calling `.into_dyn()`[(`QueueConsumer::into_dyn`)] +/// on any concrete consumer, or via `BackendBuilder::make_dynamic()` +pub struct BaseDynConsumer<'a>(Box); + +/// A `'static` type-erased consumer. The standard way to hold a consumer +/// without a generic parameter. +/// +/// ```rust,ignore +/// struct AppState { +/// consumer: DynConsumer, +/// } +/// ``` +pub type DynConsumer = BaseDynConsumer<'static>; + +impl<'a> BaseDynConsumer<'a> { + fn new(inner: impl QueueConsumer + 'a) -> Self { + Self(Box::new(DynConsumerInner { inner })) + } + + /// Block until a message is available. + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn receive(&mut self) -> BuqueueResult { + self.0.receive().await + } + + /// Non-blocking poll, returns `None` if queue is currently empty + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn try_receive(&mut self) -> BuqueueResult> { + self.0.try_receive().await + } + + /// Returns up to `max` messages + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn receive_batch(&mut self, max: usize) -> BuqueueResult> { + self.0.receive_batch(max).await + } + + /// Returns the shutdown handle for this consumer + #[must_use] + pub fn shutdown_handle(&self) -> ShutdownHandle { + self.0.shutdown_handle() + } + + /// Like `receive`, but returns `None` after shutdown is signalled + pub async fn receive_graceful(&mut self) -> Option> { + self.0.receive_graceful().await + } +} + +impl std::fmt::Debug for BaseDynConsumer<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DynConsumer").finish_non_exhaustive() + } +} diff --git a/crates/buqueue-core/src/delivery.rs b/crates/buqueue-core/src/delivery.rs index 27bed28..6af1230 100644 --- a/crates/buqueue-core/src/delivery.rs +++ b/crates/buqueue-core/src/delivery.rs @@ -34,10 +34,10 @@ pub struct Delivery { pub(crate) payload: Bytes, pub(crate) headers: HashMap, pub(crate) routing_key: Option, - #[allow(clippy::struct_field_names)] - pub(crate) delivery_count: u32, pub(crate) first_delivered_at: Option>, pub(crate) ack_handle: Arc, + #[allow(clippy::struct_field_names)] + pub(crate) delivery_count: u32, } impl Delivery { @@ -54,9 +54,9 @@ impl Delivery { payload, headers, routing_key, - delivery_count, first_delivered_at, ack_handle, + delivery_count, } } diff --git a/crates/buqueue-core/src/lib.rs b/crates/buqueue-core/src/lib.rs index 3a51ea3..ca47ddc 100644 --- a/crates/buqueue-core/src/lib.rs +++ b/crates/buqueue-core/src/lib.rs @@ -33,6 +33,9 @@ #![warn(unsafe_code)] +pub mod consumer; pub mod delivery; pub mod error; pub mod message; +pub mod producer; +pub mod shutdown; diff --git a/crates/buqueue-core/src/producer.rs b/crates/buqueue-core/src/producer.rs new file mode 100644 index 0000000..ce089ad --- /dev/null +++ b/crates/buqueue-core/src/producer.rs @@ -0,0 +1,309 @@ +//! The `QueueProducer` trait: the sending side of a queue. +//! +//! ## Rust 2024 edition notes +//! +//! Uses native async function in traits, no `#[async_trait]` proc-macro + +use crate::{error::BuqueueResult, message::Message}; +use chrono::{DateTime, Utc}; +use std::{future::Future, pin::Pin, sync::Arc}; + +/// A unique identifier for a sent message, as assigned by the broker. +/// +/// The format is backend-specific: +/// - Kafka: `{topic}-{partition}-{offset}` +/// - Nats: `JetStream` sequence number as a string +/// - SQS: the SQS `MessageId` UUID +/// - `RabbitMQ`: monotonically incrementing string per channel +/// - Redis: the XADD entry ID +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct MessageId(pub String); + +impl std::fmt::Display for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for MessageId { + fn from(value: String) -> Self { + Self(value) + } +} + +impl From<&str> for MessageId { + fn from(value: &str) -> Self { + Self(value.to_string()) + } +} + +/// Trait for the sending side of a message queue. +/// +/// All five buqueue backends implement this trait. Write your busniess logic +/// agains this trait to stay backend-generic. +/// +/// ## Implementing this trait +/// +/// ```rust,ignore +/// // No #[async_trait] needed- native async fn in traits, Rust 2024 edition. +/// impl QueueProducer for MyBackendProducer { +/// async fn send(&self, message: Message) -> BuqueueResult { +/// // ... publish to your broker ... +/// } +/// } +/// ``` +/// +/// ## Starting acorss tasks +/// +/// `QueueProcuder` is implemented for `Arc` where `T: QueueProducer`, so +/// you can clone `Arc` into each task without wrapping in a `Mutex`: +/// +/// ```rust,ignore +/// let producer = Arc::new(SqsProducer::new(config).await?); +/// for _ in 0..8 { +/// let p = Arc::clone(&producer); +/// tokio::spwan(async move{ p.send(msg).await }); +/// } +/// ``` +pub trait QueueProducer: Send + Sync + Sized { + /// Send a single message and return its broke-assigned ID. + /// + /// This is the fundamental operation that every backend must implement. + fn send(&self, message: Message) -> impl Future> + Send; + + /// Send multiple messages in one broker operation where supported + /// + /// Returns one `MessageID` per message in the same order as input + /// + /// The default sends messages sequentially. Override in your backed + /// to use native batch APIs + /// + /// If any individual send fails, the error is returned immediately and subsequent + /// messages in the batch are not sent + fn send_batch( + &self, + messages: Vec, + ) -> impl Future>> + Send { + async move { + let mut ids = Vec::with_capacity(messages.len()); + for msg in messages { + ids.push(self.send(msg).await?); + } + Ok(ids) + } + } + + /// Send a message that will not be visible to consmers until `delivery_at`. + /// + /// The message is accepted by the broker immediately but held back until + /// the specified UTC timestamp. Your consumer code is unchanged, it simply + /// receives the message when it becomes visible, with no knowledge it was scheduled. + /// + /// Each backend implemets this differently under the hood: + /// - SQS: native `DelaySeconds` (< 15 min); holding queue for longer delays + /// - NATS: `JetStream` native publish-after timestamp + /// - `RabbitMQ`: `rabbitmq-delayed-message-exchange` plugin, no official support + /// - Redis: sorted-set staging area with a background forwarder task + /// - Kafka: deplay topic with a buqueue-managed forwarder consumer + /// + /// **The default implementation ignores `delivery_at` and sends immediately** + /// It emits a `tracing::warn!` so you know it's not scheduled. + /// Override this in your backend to provide real scheduling + fn send_at( + &self, + message: Message, + delivery_at: DateTime, + ) -> impl Future> + Send { + async move { + tracing::warn!( + %delivery_at, + "send_at() called but this backend has not implemented scheduling -\ + sending immediately. Override send_at() in yout QueueProducer impl." + ); + self.send(message).await + } + } + + /// Erase this producer's concrete type, returning a `DynProducer` + /// + /// Use this when you need to: + /// - Store a producer in a struct without a generic paramenter + /// - Select a backend at runtime and store the result uniformly + /// - Return a producer from a function without exposing the backend type + /// + /// ```rust,ignore + /// let producer: DynProducer = SqsProducer::new(config).await?.into_dyn(); + /// ``` + fn into_dyn<'a>(self) -> BaseDynProducer<'a> + where + Self: 'a, + { + BaseDynProducer::new(self) + } +} + +// -- ref_delegate! ------------ +// +// Implements QueueProcuder for &T, Box, and Arc where T: QueueProducer. +// This means all three wrapper types forward every method to the inner T +// withouth any manual boilerplate. Add new wrapper types here as needed. +// +// #[deny(unconditional_recursion)] guards against accidentally forwarding +// to self instead of **self, which would loop forever at runtime. + +macro_rules! ref_delegate { + ($ty_param:ident, $ty:ty) => { + #[deny(unconditional_recursion)] + impl<$ty_param: QueueProducer> QueueProducer for $ty { + fn send( + &self, + message: Message, + ) -> impl Future> + Send { + (**self).send(message) + } + + fn send_batch( + &self, + messages: Vec, + ) -> impl Future>> + Send { + (**self).send_batch(messages) + } + + fn send_at( + &self, + message: Message, + delivery_at: DateTime, + ) -> impl Future> + Send { + (**self).send_at(message, delivery_at) + } + } + }; +} + +ref_delegate!(T, &T); +ref_delegate!(T, Box); +ref_delegate!(T, Arc); + +// --- ErasedQueueProducer ------------- +// +// The dyn-compatible bridge between QueueProducer and BaseDynProducer. +// +// QueueProducer uses `impl Fturue` which is not dyn-compatible (opaque +// associated types cannot go in a vtable). ErasedQueueProducer uses +// Pin> which is dyn compatible, fixed size, known ABI +// +// This trait is create-private. Users never interact with it directly + +pub(crate) trait ErasedQueueProducer: Send + Sync { + fn send<'a>( + &'a self, + message: Message, + ) -> Pin> + Send + 'a>>; + + fn send_batch<'a>( + &'a self, + messages: Vec, + ) -> Pin>> + Send + 'a>>; + + fn send_at<'a>( + &'a self, + message: Message, + delivery_at: DateTime, + ) -> Pin> + Send + 'a>>; +} + +// -- DynProducerInner ----------- +// +// Wraps a concrete QueueProducer and implements ErasedQueueProducer for it +// by boxing the future. This is the only place Box::pin(...) is needed. + +struct DynProducerInner

{ + inner: P, +} + +impl ErasedQueueProducer for DynProducerInner

{ + fn send<'a>( + &'a self, + message: Message, + ) -> Pin> + Send + 'a>> { + Box::pin(self.inner.send(message)) + } + + fn send_batch<'a>( + &'a self, + messages: Vec, + ) -> Pin>> + Send + 'a>> { + Box::pin(self.inner.send_batch(messages)) + } + + fn send_at<'a>( + &'a self, + message: Message, + delivery_at: DateTime, + ) -> Pin> + Send + 'a>> { + Box::pin(self.inner.send_at(message, delivery_at)) + } +} + +// --- BaseDynProducer / DynProducer --------- + +/// A type erased producer. Obtain one by calling `.into_dyn()`(`QueueProducer::into_dyn`) +/// on any concrete producer, or via `BackendBuilder::make_dynamic()` +/// +/// `DynProducer` is an alias for `BaseDynProducer<'static>`, which is what +/// you want in almost all cases. Use `BaseDynProducer<'a>` only when the +/// underlyinh producer borrows data with a shorter lifetime +pub struct BaseDynProducer<'a>(Box); + +/// A `'static` type erased producer. The standard to hold a producer +/// without a generic parameter. +/// +/// ```rust,ignore +/// struct AppState { +/// producer: DynProducer, +/// } +/// ``` +pub type DynProducer = BaseDynProducer<'static>; + +impl<'a> BaseDynProducer<'a> { + fn new(inner: impl QueueProducer + 'a) -> Self { + Self(Box::new(DynProducerInner { inner })) + } + + /// Send a single message + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn send(&self, message: Message) -> BuqueueResult { + self.0.send(message).await + } + + /// Send a batch of messages + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn send_batch(&self, messages: Vec) -> BuqueueResult> { + self.0.send_batch(messages).await + } + + /// Send a message with a scheduled delivery time + /// + /// # Errors + /// + /// Return Error emitted by backend + pub async fn send_at( + &self, + message: Message, + delivery_at: DateTime, + ) -> BuqueueResult { + self.0.send_at(message, delivery_at).await + } +} + +impl std::fmt::Debug for BaseDynProducer<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DynProducer").finish_non_exhaustive() + } +} diff --git a/crates/buqueue-core/src/shutdown.rs b/crates/buqueue-core/src/shutdown.rs new file mode 100644 index 0000000..0a2a225 --- /dev/null +++ b/crates/buqueue-core/src/shutdown.rs @@ -0,0 +1,152 @@ +//! Graceful shutdown signalling for consumers. +//! +//! A `ShutdownHandle` is returned by `QueueConsumer::shutdown_handle()`. +//! Call `ShutdownHandle::shutdown()` from a SIGTERM handler to signal the +//! consumer to drain and stop. + +use std::sync::Arc; +use tokio::sync::watch; + +// / A handle that signal a consumer to shutdown gracefully +/// +/// Cloneable, you can send copies to multipler tasks. +/// When any copy calls `shutdown()`[(`Self::shutdown`)], all consumers +/// holding a matching handle will see the shutdown signal. +/// +/// ## Usage +/// +/// ```rust,ignore +/// let shutdown = consumer.shutdown_handle(); +/// +/// tokio::spwan(async move { +/// tokio::signal::ctrl_c().await.unwrap(); +/// shutdown.shutdown().await; +/// }); +/// +/// while let Some(delivery) = consumer.receive_graceful().await { +/// // process... +/// } +/// ``` +#[derive(Clone, Debug)] +pub struct ShutdownHandle { + inner: Arc, +} + +/// Internal shared shutdown signal using a `watch` channel. +/// +/// The channel broadcasts a boolean flag: +/// - `false` → system is running +/// - `true` → shutdown requested +/// +/// All `ShutdownHandle` clones share this state, allowing coordinated +/// shutdown across async tasks. +/// +/// This is an internal implementation detail. +#[derive(Debug)] +pub struct ShutdownInner { + sender: watch::Sender, + receiver: watch::Receiver, +} + +impl ShutdownHandle { + /// Create a new , active (not yet shutdown) `ShutdownHandle` pair. + /// + /// The returned handle is the signal sender. Pass it (or a clone) to + /// your shutdown task. The receiver side is embedded inside the handle + /// via `Arc` so consumer can check `is_shutdown()`. + #[must_use] + pub fn new() -> Self { + let (tx, rx) = watch::channel(false); + Self { + inner: Arc::new(ShutdownInner { + sender: tx, + receiver: rx, + }), + } + } + + /// Creats a no-op handle that is never triggered. + /// + /// Used as the deault in `QueueConsumer::shutdown_handle()` for + /// backends that haven't implemented graceful shutdown yet + #[must_use] + pub fn new_noop() -> Self { + Self::new() + } + + /// Signal all consumers holding this handle to shutdown. + /// + /// After this returns, `is_shutdown()`[(`Self::is_shutdown`)] will return + /// `true` on all clones of this handle. + pub fn shutdown(&self) { + // send(true) will neven fail since we hold both end via Arc + let _ = self.inner.sender.send(true); + } + + /// Returns `true` if shutdown has been signaled + #[must_use] + pub fn is_shutdown(&self) -> bool { + *self.inner.receiver.borrow() + } + + /// Await shutdown, suspends the calling task until `shutdown()`[(`Self::is_shutdown`)] + /// is called. Useful for wiring into a select loop: + /// + /// ```rust,ignore + /// tokio::select! { + /// _ = handle.wait_for_shutdown() => { /* clean up */} + /// delivery = consumer.receive() => { /* process */ } + /// } + /// ``` + pub async fn wait_for_shutdown(&self) { + let mut rx = self.inner.receiver.clone(); + // Wait until the value becomes true + let _ = rx.wait_for(|v| *v).await; + } +} + +impl Default for ShutdownHandle { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn start_not_shutdown() { + let handle = ShutdownHandle::new(); + assert!(!handle.is_shutdown()); + } + + #[tokio::test] + async fn shutdown_is_visible_on_clone() { + let handle = ShutdownHandle::new(); + let clone = handle.clone(); + + handle.shutdown(); + + assert!(clone.is_shutdown()); + } + + #[tokio::test] + async fn wait_for_shutdown_resolves_after_signal() { + let handle = ShutdownHandle::new(); + let trigger = handle.clone(); + + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + trigger.shutdown(); + }); + + // This will hang forever if shutdown is never signaled + tokio::time::timeout( + tokio::time::Duration::from_millis(500), + handle.wait_for_shutdown(), + ) + .await + .expect("shutdown should have signaled within 500ms"); + } +}