Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBM committed Nov 19, 2023
1 parent a55cd7b commit 083fe33
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 78 deletions.
38 changes: 19 additions & 19 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
conn: &mut T,
qname: &str,
message_id: &str,
seconds_hidden: Duration,
hidden: Duration,
) -> RsmqResult<()> {
let seconds_hidden = get_redis_duration(Some(seconds_hidden), &Duration::from_secs(30));
let hidden = get_redis_duration(Some(hidden), &Duration::from_secs(30));

let queue = self.get_queue(conn, qname, false).await?;

number_in_range(seconds_hidden, 0, 9_999_999_000)?;
number_in_range(hidden, 0, 9_999_999_000)?;

CHANGE_MESSAGE_VISIVILITY
.key(format!("{}{}", self.ns, qname))
.key(message_id)
.key(queue.ts + seconds_hidden)
.key(queue.ts + hidden)
.invoke_async::<_, bool>(conn)
.await?;

Expand All @@ -60,7 +60,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {

/// Creates a new queue. Attributes can be later modified with "set_queue_attributes" method
///
/// seconds_hidden: Time the messages will be hidden when they are received with the "receive_message" method.
/// hidden: Time the messages will be hidden when they are received with the "receive_message" method.
///
/// delay: Time the messages will be delayed before being delivered
///
Expand All @@ -69,18 +69,18 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
&self,
conn: &mut T,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()> {
valid_name_format(qname)?;

let key = format!("{}{}:Q", self.ns, qname);
let seconds_hidden = get_redis_duration(seconds_hidden, &Duration::from_secs(30));
let hidden = get_redis_duration(hidden, &Duration::from_secs(30));
let delay = get_redis_duration(delay, &Duration::ZERO);
let maxsize = maxsize.unwrap_or(65536);

number_in_range(seconds_hidden, 0, 9_999_999_000)?;
number_in_range(hidden, 0, 9_999_999_000)?;
number_in_range(delay, 0, 9_999_999)?;
if let Err(error) = number_in_range(maxsize, 1024, 65536) {
if maxsize != -1 {
Expand All @@ -96,7 +96,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.cmd("HSETNX")
.arg(&key)
.arg("vt")
.arg(seconds_hidden)
.arg(hidden)
.cmd("HSETNX")
.arg(&key)
.arg("delay")
Expand Down Expand Up @@ -283,24 +283,24 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
}))
}

/// Returns a message. The message stays hidden for some time (defined by "seconds_hidden"
/// Returns a message. The message stays hidden for some time (defined by "hidden"
/// argument or the queue settings). After that time, the message will be redelivered.
/// In order to avoid the redelivery, you need to use the "delete_message" after this function.
pub async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&self,
conn: &mut T,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
let queue = self.get_queue(conn, qname, false).await?;

let seconds_hidden = get_redis_duration(seconds_hidden, &queue.vt);
number_in_range(seconds_hidden, 0, 9_999_999_000)?;
let hidden = get_redis_duration(hidden, &queue.vt);
number_in_range(hidden, 0, 9_999_999_000)?;

let result: (bool, String, Vec<u8>, u64, u64) = RECEIVE_MESSAGE
.key(format!("{}{}", self.ns, qname))
.key(queue.ts)
.key(queue.ts + seconds_hidden)
.key(queue.ts + hidden)
.invoke_async(conn)
.await?;

Expand Down Expand Up @@ -387,9 +387,9 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
Ok(queue_uid)
}

/// Modify the queue attributes. Keep in mind that "seconds_hidden" and "delay" can be overwritten when the message is sent. "seconds_hidden" can be changed by the method "change_message_visibility"
/// Modify the queue attributes. Keep in mind that "hidden" and "delay" can be overwritten when the message is sent. "hidden" can be changed by the method "change_message_visibility"
///
/// seconds_hidden: Time the messages will be hidden when they are received with the "receive_message" method.
/// hidden: Time the messages will be hidden when they are received with the "receive_message" method.
///
/// delay: Time the messages will be delayed before being delivered
///
Expand All @@ -398,7 +398,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
&self,
conn: &mut T,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
Expand All @@ -417,8 +417,8 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.arg("modified")
.arg(time.0);

if seconds_hidden.is_some() {
let duration = get_redis_duration(seconds_hidden, &Duration::from_secs(30));
if hidden.is_some() {
let duration = get_redis_duration(hidden, &Duration::from_secs(30));
number_in_range(duration, 0, 9_999_999_000)?;
commands = commands
.cmd("HSET")
Expand Down
28 changes: 8 additions & 20 deletions src/multiplexed_facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,22 @@ impl RsmqConnection for MultiplexedRsmq {
&mut self,
qname: &str,
message_id: &str,
seconds_hidden: Duration,
hidden: Duration,
) -> RsmqResult<()> {
self.functions
.change_message_visibility(&mut self.connection.0, qname, message_id, seconds_hidden)
.change_message_visibility(&mut self.connection.0, qname, message_id, hidden)
.await
}

async fn create_queue(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()> {
self.functions
.create_queue(
&mut self.connection.0,
qname,
seconds_hidden,
delay,
maxsize,
)
.create_queue(&mut self.connection.0, qname, hidden, delay, maxsize)
.await
}

Expand Down Expand Up @@ -124,10 +118,10 @@ impl RsmqConnection for MultiplexedRsmq {
async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.functions
.receive_message::<E>(&mut self.connection.0, qname, seconds_hidden)
.receive_message::<E>(&mut self.connection.0, qname, hidden)
.await
}

Expand All @@ -145,18 +139,12 @@ impl RsmqConnection for MultiplexedRsmq {
async fn set_queue_attributes(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
self.functions
.set_queue_attributes(
&mut self.connection.0,
qname,
seconds_hidden,
delay,
maxsize,
)
.set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize)
.await
}
}
28 changes: 8 additions & 20 deletions src/normal_facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,22 @@ impl RsmqConnection for Rsmq {
&mut self,
qname: &str,
message_id: &str,
seconds_hidden: Duration,
hidden: Duration,
) -> RsmqResult<()> {
self.functions
.change_message_visibility(&mut self.connection.0, qname, message_id, seconds_hidden)
.change_message_visibility(&mut self.connection.0, qname, message_id, hidden)
.await
}

async fn create_queue(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()> {
self.functions
.create_queue(
&mut self.connection.0,
qname,
seconds_hidden,
delay,
maxsize,
)
.create_queue(&mut self.connection.0, qname, hidden, delay, maxsize)
.await
}

Expand Down Expand Up @@ -123,10 +117,10 @@ impl RsmqConnection for Rsmq {
async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.functions
.receive_message::<E>(&mut self.connection.0, qname, seconds_hidden)
.receive_message::<E>(&mut self.connection.0, qname, hidden)
.await
}

Expand All @@ -144,18 +138,12 @@ impl RsmqConnection for Rsmq {
async fn set_queue_attributes(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
self.functions
.set_queue_attributes(
&mut self.connection.0,
qname,
seconds_hidden,
delay,
maxsize,
)
.set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize)
.await
}
}
16 changes: 8 additions & 8 deletions src/pooled_facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,26 +105,26 @@ impl RsmqConnection for PooledRsmq {
&mut self,
qname: &str,
message_id: &str,
seconds_hidden: Duration,
hidden: Duration,
) -> RsmqResult<()> {
let mut conn = self.pool.get().await?;

self.functions
.change_message_visibility(&mut conn, qname, message_id, seconds_hidden)
.change_message_visibility(&mut conn, qname, message_id, hidden)
.await
}

async fn create_queue(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()> {
let mut conn = self.pool.get().await?;

self.functions
.create_queue(&mut conn, qname, seconds_hidden, delay, maxsize)
.create_queue(&mut conn, qname, hidden, delay, maxsize)
.await
}

Expand Down Expand Up @@ -162,12 +162,12 @@ impl RsmqConnection for PooledRsmq {
async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
let mut conn = self.pool.get().await?;

self.functions
.receive_message::<E>(&mut conn, qname, seconds_hidden)
.receive_message::<E>(&mut conn, qname, hidden)
.await
}

Expand All @@ -187,14 +187,14 @@ impl RsmqConnection for PooledRsmq {
async fn set_queue_attributes(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
let mut conn = self.pool.get().await?;

self.functions
.set_queue_attributes(&mut conn, qname, seconds_hidden, delay, maxsize)
.set_queue_attributes(&mut conn, qname, hidden, delay, maxsize)
.await
}
}
22 changes: 11 additions & 11 deletions src/trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ use std::time::Duration;
pub trait RsmqConnection {
/// Change the hidden time of a already sent message.
///
/// `seconds_hidden` has a max time of 9_999_999 for compatibility reasons to this library JS version counterpart
/// `hidden` has a max time of 9_999_999 for compatibility reasons to this library JS version counterpart
async fn change_message_visibility(
&mut self,
qname: &str,
message_id: &str,
seconds_hidden: Duration,
hidden: Duration,
) -> RsmqResult<()>;

/// Creates a new queue. Attributes can be later modified with "set_queue_attributes" method
///
/// seconds_hidden: Time the messages will be hidden when they are received with the "receive_message" method. It
/// hidden: Time the messages will be hidden when they are received with the "receive_message" method. It
/// has a max time of 9_999_999 for compatibility reasons to this library JS version counterpart
///
/// delay: Time the messages will be delayed before being delivered
Expand All @@ -28,7 +28,7 @@ pub trait RsmqConnection {
async fn create_queue(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i32>,
) -> RsmqResult<()>;
Expand All @@ -51,15 +51,15 @@ pub trait RsmqConnection {
qname: &str,
) -> RsmqResult<Option<RsmqMessage<E>>>;

/// Returns a message. The message stays hidden for some time (defined by "seconds_hidden" argument or the queue
/// Returns a message. The message stays hidden for some time (defined by "hidden" argument or the queue
/// settings). After that time, the message will be redelivered. In order to avoid the redelivery, you need to use
/// the "delete_message" after this function.
///
/// `seconds_hidden` has a max time of 9_999_999 for compatibility reasons to this library JS version counterpart.
/// `hidden` has a max time of 9_999_999 for compatibility reasons to this library JS version counterpart.
async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>>;

/// Sends a message to the queue. The message will be delayed some time (controlled by the "delayed" argument or
Expand All @@ -71,10 +71,10 @@ pub trait RsmqConnection {
delay: Option<Duration>,
) -> RsmqResult<String>;

/// Modify the queue attributes. Keep in mind that "seconds_hidden" and "delay" can be overwritten when the message
/// is sent. "seconds_hidden" can be changed by the method "change_message_visibility"
/// Modify the queue attributes. Keep in mind that "hidden" and "delay" can be overwritten when the message
/// is sent. "hidden" can be changed by the method "change_message_visibility"
///
/// seconds_hidden: Time the messages will be hidden when they are received with the "receive_message" method. It
/// hidden: Time the messages will be hidden when they are received with the "receive_message" method. It
/// has a max time of 9_999_999 for compatibility reasons to this library JS version counterpart
///
/// delay: Time the messages will be delayed before being delivered
Expand All @@ -84,7 +84,7 @@ pub trait RsmqConnection {
async fn set_queue_attributes(
&mut self,
qname: &str,
seconds_hidden: Option<Duration>,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes>;
Expand Down

0 comments on commit 083fe33

Please sign in to comment.