Skip to content

Commit

Permalink
Remove normal facade as the redis::aio::Connection is deprecated
Browse files Browse the repository at this point in the history
in favor of redis::aio::MultiplexedConnection.

Keeping the pooled facade for specific workloads.
  • Loading branch information
DavidBM committed Apr 28, 2024
1 parent 8103652 commit df8444c
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 237 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ rand = "0.8"
radix_fmt = "1"
bb8 = "0.8"
thiserror = "1"
redis = { version = "0.23", features = ["tokio-comp", "async-std-comp"] }
redis = { version = "0.25", features = ["tokio-comp", "async-std-comp"] }
async-trait = "0.1"

[dev-dependencies]
net2 = "0.2"
tokio = { version = "1", features = ["rt-multi-thread"]}

[features]
default = ["tokio-comp", "async-std-comp"]
tokio-comp = ["redis/tokio-comp"]
async-std-comp = ["redis/async-std-comp"]
130 changes: 62 additions & 68 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
//! # RSMQ in async Rust
//!
//! RSMQ port to async rust. RSMQ is a simple redis queue system that works in any
//! redis v2.6+. It contains the same methods as the original one
//! in [https://github.com/smrchy/rsmq](https://github.com/smrchy/rsmq)
//!
//! This crate uses async in the implementation. If you want to use it in your sync
//! code you can use tokio/async_std "block_on" method. Async was used in order to
//! simplify the code and allow 1-to-1 port oft he JS code.
//! RSMQ port to async rust. RSMQ is a simple redis queue system that works in any redis v2.6+. It contains the same
//! methods as the original one in [https://github.com/smrchy/rsmq](https://github.com/smrchy/rsmq)
//!
//! [![Crates.io](https://img.shields.io/crates/v/rsmq_async)](https://crates.io/crates/rsmq_async)
//! [![Crates.io](https://img.shields.io/crates/l/rsmq_async)](https://choosealicense.com/licenses/mit/)
Expand All @@ -15,41 +10,35 @@
//!
//! ## Example
//!
//! ```rust,no_run
//!
//! use rsmq_async::{Rsmq, RsmqError, RsmqConnection};
//! ```rust
//! # use rsmq_async::RsmqError; use rsmq_async::{Rsmq, RsmqConnection};
//!
//! # async fn it_works() -> Result<(), RsmqError> {
//! let mut rsmq = Rsmq::new(Default::default()).await?;
//! # async fn it_works() -> Result<(), RsmqError> { let mut rsmq = Rsmq::new(Default::default()).await?;
//!
//! let message = rsmq.receive_message::<String>("myqueue", None).await?;
//!
//! if let Some(message) = message {
//! rsmq.delete_message("myqueue", &message.id).await?;
//! }
//! if let Some(message) = message { rsmq.delete_message("myqueue", &message.id).await?; }
//!
//! # Ok(())
//! # }
//!
//! ```
//!
//! Main object documentation are in: <a href="struct.Rsmq.html">Rsmq</a> and
//! <a href="struct.PooledRsmq.html">PooledRsmq</a> and they both implement the trait
//! <a href="trait.RsmqConnection.html">RsmqConnection</a> where you can see all the RSMQ
//! methods. Make sure you always import the trait <a href="trait.RsmqConnection.html">RsmqConnection</a>.
//! Main object documentation are in: [`Rsmq`] and[`PooledRsmq`] and they both implement the trait
//! [`RsmqConnection`] where you can see all the RSMQ methods. Make sure you always import the trait
//! [`RsmqConnection`].
//!
//! ## Installation
//!
//! Check [https://crates.io/crates/rsmq_async](https://crates.io/crates/rsmq_async)
//!
//!
//! ## Example
//!
//! ```rust,no_run
//! ```rust
//!
//! use rsmq_async::{Rsmq, RsmqConnection};
//!
//! async fn it_works() {
//! let mut rsmq = Rsmq::new(Default::default())
//! async fn it_works() { let mut rsmq = Rsmq::new(Default::default())
//! .await
//! .expect("connection failed");
//!
Expand All @@ -66,83 +55,90 @@
//! .await
//! .expect("cannot receive message");
//!
//! if let Some(message) = message {
//! rsmq.delete_message("myqueue", &message.id).await;
//! }
//! }
//! if let Some(message) = message { rsmq.delete_message("myqueue", &message.id).await; } }
//!
//! ```
//!
//! ## Realtime
//!
//! When [initializing](#initialize) RSMQ you can enable the realtime PUBLISH for
//! new messages. On every new message that gets sent to RSQM via `sendMessage` a
//! Redis PUBLISH will be issued to `{rsmq.ns}:rt:{qname}`. So, you can subscribe
//! to it using redis-rs library directly.
//! When initializing RSMQ you can enable the realtime PUBLISH for new messages. On every new message that gets sent to
//! RSQM via `sendMessage` a Redis PUBLISH will be issued to `{rsmq.ns}:rt:{qname}`. So, you can subscribe to it using
//! redis-rs library directly.
//!
//! ### How to use the realtime option
//!
//! Besides the PUBLISH when a new message is sent to RSMQ nothing else will happen.
//! Your app could use the Redis SUBSCRIBE command to be notified of new messages
//! and issue a `receiveMessage` then. However make sure not to listen with multiple
//! workers for new messages with SUBSCRIBE to prevent multiple simultaneous
//! `receiveMessage` calls.
//! Besides the PUBLISH redis command when a new message is sent to RSMQ nothing else will happen. Your app could use
//! the Redis SUBSCRIBE command to be notified of new messages and issue a `receiveMessage` then. However make sure not
//! to listen with multiple workers for new messages with SUBSCRIBE to prevent multiple simultaneous `receiveMessage`
//! calls.
//!
//! ## Guarantees
//!
//! If you want to implement "at least one delivery" guarantee, you need to receive
//! the messages using "receive_message" and then, once the message is successfully
//! processed, delete it with "delete_message".
//! If you want to implement "at least one delivery" guarantee, you need to receive the messages using "receive_message"
//! and then, once the message is successfully processed, delete it with "delete_message".
//!
//! ## Connection Pool
//!
//! If you want to use a connection pool, just use <a href="struct.PooledRsmq.html">PooledRsmq</a>
//! instad of Rsmq. It implements the RsmqConnection trait as the normal Rsmq.
//! If you want to use a connection pool, just use [`PooledRsmq`] instad of Rsmq. It implements the RsmqConnection trait
//! as the normal Rsmq.
//!
//! If you want to accept any of both implementation, just accept the trait
//! <a href="trait.RsmqConnection.html">RsmqConnection</a>
//! If you want to accept any of both implementation, just accept the trait [`RsmqConnection`]
//!
//! ## Executor compatibility
//!
//! Since version 0.16 [where this pull request was merged](https://github.com/mitsuhiko/redis-rs/issues/280)
//! redis-rs dependency supports tokio and async_std executors. By default it will
//! guess what you are using when creating the connection. You can check
//! [redis-rs](https://github.com/mitsuhiko/redis-rs/blob/master/Cargo.toml) `Cargo.tolm` for
//! the flags `async-std-comp` and `tokio-comp` in order to choose one or the other. If you don't select
//! any it should be able to automatically choose the correct one.
//! By default it will intruct redis-rs library to enable async-std and tokio compatibility and choose Tokio
//! if Tokio is avaialble, async-std if not. If you want to choose, you can change the `Cargo.toml` definition to
//!
//! ```toml
//!
//! rsmq_async = { version = "9", default-features = false, features = ["tokio-comp"] }
//!
//! ```
//!
//! Where `"tokio-comp"` can also be `"async-std-comp"`.
//!
//! ## `Rsmq` vs `PooledRsmq`
//!
//! In almost all workloads you might prefer the `Rsmq` object, as it works with a multiplexed connection.
//!
//! For specific workloads, where you might be sending a lof of data (images, documents, big blobs) you might prefer to
//! use the `PooledRsmq` and configure it with `PoolOptions`.
//!
//! They both use the `redis::aio::MultiplexedConnection`, but the pooled connection can be configured to spawn several
//! of those, so one operation won't block the other.
//!
//! ## Response types
//!
//! There are 3 functions that take generic types:
//!
//! - `pop_message` and `receive_message`: Where the type for the received message is
//! `RsmqMessage<E>` where `E: TryFrom<RedisBytes, Error = Vec<u8>>`. So, If you have custom type, you can implement the trait
//! `TryFrom<RedisBytes>` for `YourCustomType` and use it like: `rsmq.receive_message::<YourCustomType>("myqueue", None)`.
//! Implementations are provided for `String` and `Vec<u8>`.
//! - `send_message` where the message to send needs to implement `Into<RedisBytes> + Send`. So you will
//! need to implement the trait for your type. You can check the implementations for the type RedisBytes and see how
//! we did it. Implementations are provided for `String`, `&str` and `Vec<u8>`.
//! - `pop_message` and `receive_message`: Where the type for the received message is `RsmqMessage<E>` where `E:
//! TryFrom<RedisBytes, Error = Vec<u8>>`. So, If you have custom type, you can implement the trait
//! `TryFrom<RedisBytes>` for `YourCustomType` and use it like: `rsmq.receive_message::<YourCustomType>
//! ("myqueue", None)`. Implementations are provided for `String` and `Vec<u8>`.
//! - `send_message` where the message to send needs to implement `Into<RedisBytes> + Send`. So you will need to
//! implement the trait for your type. You can check the implementations for the type RedisBytes and see how we did
//! it. Implementations are provided for `String`, `&str` and `Vec<u8>`.
//!
//! All this is because strings in Rust are very convenient to use for json messages, so always returning a Vec<u8>
//! may not be the most ergonomic solution. But at the same time, we can just add some already made implementations
//! for it and you can just use it with your type or, if you are sending, let's say, images, just use the method
//! like: `rsmq.receive_message::<Vec<u8>>("myqueue", None)` and transform it later to your type. (Or just implement
//! the TryFrom<RedisBytes> for your type and the transformation will be automatic.)
//! All this is because strings in Rust are very convenient to use for json messages, so always returning a `Vec<u8>`
//! may not be the most ergonomic solution. But at the same time, we can just add some already made implementations for
//! it and you can just use it with your type or, if you are sending, let's say, images, just use the method like:
//! `rsmq.receive_message::<Vec<u8>>("myqueue", None)` and transform it later to your type. (Or just implement the
//! `TryFrom<RedisBytes>` for your type and the transformation will be automatic.)
//!
//! ### Example for implementing a custom type
//!
//! ```rust,ignore
//! impl TryFrom<RedisBytes> for String {
//!
//! // We sacrifice the ability of recovering the original error for the ability of having the
//! // original data. If you know how to conserver both, let me know!
//!
//! impl TryFrom<RedisBytes> for String {
//!
//! type Error = Vec<u8>; // Always set Error as Vec<u8>;
//!
//! fn try_from(bytes: RedisBytes) -> Result<Self, Self::Error> {
//! String::from_utf8(bytes.0).map_err(|e| e.into_bytes())
//! }
//!
//! }
//!
//! ```
//!

Expand All @@ -151,15 +147,13 @@
mod error;
mod functions;
mod multiplexed_facade;
mod normal_facade;
mod pooled_facade;
mod r#trait;
mod types;

pub use error::RsmqError;
pub use error::RsmqResult;
pub use multiplexed_facade::MultiplexedRsmq;
pub use normal_facade::Rsmq;
pub use multiplexed_facade::Rsmq;
pub use pooled_facade::{PoolOptions, PooledRsmq};
pub use r#trait::RsmqConnection;
pub use types::RedisBytes;
Expand Down
16 changes: 8 additions & 8 deletions src/multiplexed_facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ struct RedisConnection(redis::aio::MultiplexedConnection);

impl std::fmt::Debug for RedisConnection {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "RedisAsyncConnnection")
write!(f, "MultiplexedRedisAsyncConnnection")
}
}

#[derive(Debug, Clone)]
pub struct MultiplexedRsmq {
pub struct Rsmq {
connection: RedisConnection,
functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
}

impl MultiplexedRsmq {
impl Rsmq {
/// Creates a new RSMQ instance, including its connection
pub async fn new(options: RsmqOptions) -> RsmqResult<MultiplexedRsmq> {
pub async fn new(options: RsmqOptions) -> RsmqResult<Rsmq> {
let conn_info = redis::ConnectionInfo {
addr: redis::ConnectionAddr::Tcp(options.host, options.port),
redis: redis::RedisConnectionInfo {
Expand All @@ -37,7 +37,7 @@ impl MultiplexedRsmq {

let connection = client.get_multiplexed_async_connection().await?;

Ok(MultiplexedRsmq::new_with_connection(
Ok(Rsmq::new_with_connection(
connection,
options.realtime,
Some(&options.ns),
Expand All @@ -49,8 +49,8 @@ impl MultiplexedRsmq {
connection: redis::aio::MultiplexedConnection,
realtime: bool,
ns: Option<&str>,
) -> MultiplexedRsmq {
MultiplexedRsmq {
) -> Rsmq {
Rsmq {
connection: RedisConnection(connection),
functions: RsmqFunctions {
ns: ns.unwrap_or("rsmq").to_string(),
Expand All @@ -62,7 +62,7 @@ impl MultiplexedRsmq {
}

#[async_trait::async_trait]
impl RsmqConnection for MultiplexedRsmq {
impl RsmqConnection for Rsmq {
async fn change_message_visibility(
&mut self,
qname: &str,
Expand Down

0 comments on commit df8444c

Please sign in to comment.