Skip to content

Commit

Permalink
fix queue size -1 failing to be retrieved
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBM committed Feb 2, 2024
1 parent 42fb50b commit d20da02
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "8.0.1"
authors = [
"David Bonet <webbonet@gmail.com>"
]
edition = "2018"
edition = "2021"
license = "MIT"
description = "Async RSMQ port to rust. RSMQ is a simple redis queue system that works in any redis v2.4+. It contains the same methods as the original one in https://github.com/smrchy/rsmq"
homepage = "https://crates.io/crates/rsmq_async"
Expand Down
16 changes: 8 additions & 8 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {

let time: (u64, u64) = redis::cmd("TIME").query_async(conn).await?;

let result: (Vec<Option<u64>>, u64, u64) = pipe()
let result: (Vec<Option<i64>>, u64, u64) = pipe()
.atomic()
.cmd("HMGET")
.arg(format!("{}:Q", key))
Expand Down Expand Up @@ -228,19 +228,19 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.0
.first()
.and_then(Option::as_ref)
.map(|dur| Duration::from_millis(*dur))
.map(|dur| Duration::from_millis((*dur).try_into().unwrap_or(0)))
.unwrap_or(Duration::ZERO),
delay: result
.0
.get(1)
.and_then(Option::as_ref)
.map(|dur| Duration::from_millis(*dur))
.map(|dur| Duration::from_millis((*dur).try_into().unwrap_or(0)))
.unwrap_or(Duration::ZERO),
maxsize: result.0.get(2).unwrap_or(&Some(0)).unwrap_or(0),
totalrecv: result.0.get(3).unwrap_or(&Some(0)).unwrap_or(0),
totalsent: result.0.get(4).unwrap_or(&Some(0)).unwrap_or(0),
created: result.0.get(5).unwrap_or(&Some(0)).unwrap_or(0),
modified: result.0.get(6).unwrap_or(&Some(0)).unwrap_or(0),
totalrecv: u64::try_from(result.0.get(3).unwrap_or(&Some(0)).unwrap_or(0)).unwrap_or(0),
totalsent: u64::try_from(result.0.get(4).unwrap_or(&Some(0)).unwrap_or(0)).unwrap_or(0),
created: u64::try_from(result.0.get(5).unwrap_or(&Some(0)).unwrap_or(0)).unwrap_or(0),
modified: u64::try_from(result.0.get(6).unwrap_or(&Some(0)).unwrap_or(0)).unwrap_or(0),
msgs: result.1,
hiddenmsgs: result.2,
})
Expand Down Expand Up @@ -473,7 +473,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
let time_millis = (result.1).0 * 1000;

let (hmget_first, hmget_second, hmget_third) =
match (result.0.get(0), result.0.get(1), result.0.get(2)) {
match (result.0.first(), result.0.get(1), result.0.get(2)) {
(Some(Some(v0)), Some(Some(v1)), Some(Some(v2))) => (v0, v1, v2),
_ => return Err(RsmqError::QueueNotFound),
};
Expand Down
2 changes: 1 addition & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub struct RsmqQueueAttributes {
/// since it was sent
pub delay: Duration,
/// Max size of the message in bytes in the queue
pub maxsize: u64,
pub maxsize: i64,
/// Number of messages received by the queue
pub totalrecv: u64,
/// Number of messages sent by the queue
Expand Down
20 changes: 20 additions & 0 deletions tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,23 @@ fn change_message_visibility() {
rsmq.delete_queue("queue6").await.unwrap();
})
}

#[test]
fn change_queue_size() {
let rt = tokio::runtime::Runtime::new().unwrap();

rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, None);

rsmq.create_queue("queue6", None, None, None).await.unwrap();

rsmq.set_queue_attributes("queue6", None, None, Some(-1)).await.unwrap();

let attributes = rsmq.get_queue_attributes("queue6").await.unwrap();

assert_eq!(attributes.maxsize, -1);

})
}

0 comments on commit d20da02

Please sign in to comment.