Skip to content

Commit

Permalink
Merge branch 'protocol-deps' into upgrades-1.72.0
Browse files Browse the repository at this point in the history
  • Loading branch information
vkkoskie committed Aug 28, 2023
2 parents 5a70131 + 72b2f4a commit bb41f94
Show file tree
Hide file tree
Showing 20 changed files with 223 additions and 181 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amiquip"
version = "0.4.2"
version = "0.5.0"
authors = ["John Gallagher <johnkgallagher@gmail.com>"]
edition = "2018"
build = "build.rs"
Expand All @@ -21,11 +21,11 @@ default = ["native-tls"]
snafu = { version = "0.7", default-features = false, features = ["std"]}
input_buffer = "0.5"
bytes = "1.1"
amq-protocol = "1.4"
amq-protocol = "7.1"
log = "0.4"
mio = "0.6"
mio-extras = "2.0"
cookie-factory = "0.2"
cookie-factory = "0.3"
crossbeam-channel = "0.5"
indexmap = "1.6"
url = "2.2.2"
Expand Down
7 changes: 7 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# Version 0.5.0 (2023-08-21)

* Updated dependencies to address future incompatiblity lints.
* amq-protocol (now 7.1)
* cookie-factory (now 0.3)
* Deprecated `Channel::qos()` in favor of `Channel::set_qos()`

# Version 0.4.2 (2022-01-12)

* Fix compilation error with default features disabled (#36)
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ Add this to your `Cargo.toml`:

```toml
[dependencies]
amiquip = "0.4"
amiquip = "0.5"
```

For usage, see the [documentation](https://docs.rs/amiquip/) and
[examples](https://github.com/jgallagher/amiquip/tree/master/examples).

## Minimum Support Rust Version

The minimum supported Rust version for amiquip 0.4.2 is currently Rust 1.46.0,
The minimum supported Rust version for amiquip 0.5.0 is currently Rust 1.46.0,
but that may change with a patch release (and could change with a patch release
to a dependency without our knowledge).

Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub_receive_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn main() -> Result<()> {
println!("created exclusive queue {}", queue.name());

// Bind our queue to the logs exchange.
queue.bind(&exchange, "", FieldTable::new())?;
queue.bind(&exchange, "", FieldTable::default())?;

// Start a consumer. Use no_ack: true so the server doesn't wait for us to ack
// the messages it sends us.
Expand Down
2 changes: 1 addition & 1 deletion examples/routing_receive_logs_direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn main() -> Result<()> {

for severity in args.skip(1) {
// Bind to each requested log severity.
queue.bind(&exchange, severity, FieldTable::new())?;
queue.bind(&exchange, severity, FieldTable::default())?;
}

// Start a consumer. Use no_ack: true so the server doesn't wait for us to ack
Expand Down
5 changes: 3 additions & 2 deletions examples/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use amiquip::{
AmqpProperties, Channel, Connection, Consumer, ConsumerMessage, ConsumerOptions, Exchange,
Publish, Queue, QueueDeclareOptions, Result,
};
use amq_protocol::types::ShortString;
use uuid::Uuid;

struct FibonacciRpcClient<'a> {
Expand Down Expand Up @@ -36,12 +37,12 @@ impl<'a> FibonacciRpcClient<'a> {
}

fn call(&self, n: u64) -> Result<String> {
let correlation_id = format!("{}", Uuid::new_v4());
let correlation_id: ShortString = format!("{}", Uuid::new_v4()).into();
self.exchange.publish(Publish::with_properties(
format!("{}", n).as_bytes(),
"rpc_queue",
AmqpProperties::default()
.with_reply_to(self.queue.name().to_string())
.with_reply_to(self.queue.name().into())
.with_correlation_id(correlation_id.clone()),
))?;
for message in self.consumer.receiver().iter() {
Expand Down
2 changes: 1 addition & 1 deletion examples/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn main() -> Result<()> {

exchange.publish(Publish::with_properties(
response.as_bytes(),
reply_to,
reply_to.to_string(),
AmqpProperties::default().with_correlation_id(corr_id),
))?;
consumer.ack(delivery)?;
Expand Down
2 changes: 1 addition & 1 deletion examples/topics_receive_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn main() -> Result<()> {
}

for binding_key in args.skip(1) {
queue.bind(&exchange, binding_key, FieldTable::new())?;
queue.bind(&exchange, binding_key, FieldTable::default())?;
}

// Start a consumer. Use no_ack: true so the server doesn't wait for us to ack
Expand Down
2 changes: 1 addition & 1 deletion examples/work_queues_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() -> Result<()> {
)?;

// Set QOS to only send us 1 message at a time.
channel.qos(0, 1, false)?;
channel.set_qos(1, false)?;

// Start a consumer.
let consumer = queue.consume(ConsumerOptions::default())?;
Expand Down
126 changes: 57 additions & 69 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use amq_protocol::protocol::queue::Purge as QueuePurge;
use amq_protocol::protocol::queue::PurgeOk as QueuePurgeOk;
use amq_protocol::protocol::queue::Unbind as QueueUnbind;
use amq_protocol::protocol::queue::UnbindOk as QueueUnbindOk;
use amq_protocol::types::FieldTable;
use amq_protocol::types::{FieldTable, ShortString};
use crossbeam_channel::Receiver;
use std::cell::RefCell;
use std::fmt::Debug;
Expand Down Expand Up @@ -127,24 +127,28 @@ impl Channel {

/// Specify the prefetching window.
///
/// If `prefetch_size` is greater than 0, instructs the server to go ahead and send messages up
/// to `prefetch_size` in bytes even before previous deliveries have been acknowledged. If
/// `prefetch_count` is greater than 0, instructs the server to go ahead and send up to
/// `prefetch_count` messages even before previous deliveries have been acknowledged. If either
/// field is 0, that field is ignored. If both are 0, prefetching is disabled. If both are
/// nonzero, messages will only be sent before previous deliveries are acknowledged if that
/// send would satisfy both prefetch limits. If a consumer is started with `no_ack` set to
/// true, prefetch limits are ignored and messages are sent as quickly as possible.
/// `prefetch_size` is no longer supported and will be ignored. Use `set_qos` instead.
#[deprecated(since = "0.5.0", note = "amq-protocol no longer exposes prefetch_size")]
pub fn qos(&self, prefetch_size: u32, prefetch_count: u16, global: bool) -> Result<()> {
let _ = prefetch_size;
self.set_qos(prefetch_count, global)
}

/// Specify the prefetching window.
///
/// According to the AMQP spec, setting `global` to true means to apply these prefetch settings
/// If `prefetch_count` is greater than 0, instructs the server to go ahead and send up to
/// `prefetch_count` messages even before previous deliveries have been acknowledged. If zero,
/// the field is ignored, and prefetching is disabled. If a consumer is started with `no_ack`
/// set to `true`, prefetch limits are ignored and messages are sent as quickly as possible.
///
/// According to the AMQP spec, setting `global` to `true` means to apply these prefetch settings
/// to all channels in the entire connection, and `global` false means the settings apply only
/// to this channel. RabbitMQ does not interpret `global` the same way; for it, `global: true`
/// means the settings apply to all consumers on this channel, and `global: false` means the
/// settings apply only to consumers created on this channel after this call to `qos`, not
/// settings apply only to consumers created on this channel after this call to `set_qos`, not
/// affecting previously-created consumers.
pub fn qos(&self, prefetch_size: u32, prefetch_count: u16, global: bool) -> Result<()> {
pub fn set_qos(&self, prefetch_count: u16, global: bool) -> Result<()> {
self.call::<_, QosOk>(AmqpBasic::Qos(Qos {
prefetch_size,
prefetch_count,
global,
}))
Expand All @@ -164,18 +168,15 @@ impl Channel {
/// and then [`Exchange::publish`](struct.Exchange.html#method.publish) to avoid this.
pub fn basic_publish<S: Into<String>>(&self, exchange: S, publish: Publish) -> Result<()> {
let mut inner = self.inner.borrow_mut();
inner.call_nowait(AmqpBasic::Publish(AmqpPublish {
ticket: 0,
exchange: exchange.into(),
routing_key: publish.routing_key,
let amqp_publish = AmqpPublish {
exchange: exchange.into().into(), // S -> String -> ShortString
routing_key: publish.routing_key.into(),
mandatory: publish.mandatory,
immediate: publish.immediate,
}))?;
inner.send_content(
publish.body,
AmqpPublish::get_class_id(),
&publish.properties,
)
};
let class_id = amqp_publish.get_amqp_class_id();
inner.call_nowait(AmqpBasic::Publish(amqp_publish))?;
inner.send_content(publish.body, class_id, &publish.properties)
}

/// Open a crossbeam channel to receive publisher confirmations from the server.
Expand Down Expand Up @@ -251,7 +252,7 @@ impl Channel {
let ok = self.call::<_, QueueDeclareOk>(declare)?;
Ok(Queue::new(
self,
ok.queue,
ok.queue.to_string(),
Some(ok.message_count),
Some(ok.consumer_count),
))
Expand Down Expand Up @@ -290,13 +291,13 @@ impl Channel {
durable: false,
exclusive: false,
auto_delete: false,
arguments: FieldTable::new(),
arguments: FieldTable::default(),
};
let declare = AmqpQueue::Declare(options.into_declare(queue.into(), true, false));
let ok = self.call::<_, QueueDeclareOk>(declare)?;
Ok(Queue::new(
self,
ok.queue,
ok.queue.to_string(),
Some(ok.message_count),
Some(ok.consumer_count),
))
Expand All @@ -314,8 +315,7 @@ impl Channel {
/// to you on demand instead of polling with `get`.
pub fn basic_get<S: Into<String>>(&self, queue: S, no_ack: bool) -> Result<Option<Get>> {
self.inner.borrow_mut().get(AmqpGet {
ticket: 0,
queue: queue.into(),
queue: queue.into().into(), // S -> String -> ShortString
no_ack,
})
}
Expand All @@ -334,9 +334,8 @@ impl Channel {
// 2. The I/O loop allocates the channel to send deliveries when it
// receives consume-ok.
let (tag, rx) = self.inner.borrow_mut().consume(Consume {
ticket: 0,
queue: queue.into(),
consumer_tag: String::new(),
queue: queue.into().into(), // S -> String -> ShortString
consumer_tag: ShortString::default(),
no_local: options.no_local,
no_ack: options.no_ack,
exclusive: options.exclusive,
Expand All @@ -360,10 +359,9 @@ impl Channel {
arguments: FieldTable,
) -> Result<()> {
let bind = AmqpQueue::Bind(QueueBind {
ticket: 0,
queue: queue.into(),
exchange: exchange.into(),
routing_key: routing_key.into(),
queue: queue.into().into(),
exchange: exchange.into().into(),
routing_key: routing_key.into().into(),
nowait: false,
arguments,
});
Expand All @@ -384,10 +382,9 @@ impl Channel {
arguments: FieldTable,
) -> Result<()> {
let bind = AmqpQueue::Bind(QueueBind {
ticket: 0,
queue: queue.into(),
exchange: exchange.into(),
routing_key: routing_key.into(),
queue: queue.into().into(),
exchange: exchange.into().into(),
routing_key: routing_key.into().into(),
nowait: true,
arguments,
});
Expand All @@ -408,10 +405,9 @@ impl Channel {
arguments: FieldTable,
) -> Result<()> {
let unbind = AmqpQueue::Unbind(QueueUnbind {
ticket: 0,
queue: queue.into(),
exchange: exchange.into(),
routing_key: routing_key.into(),
queue: queue.into().into(),
exchange: exchange.into().into(),
routing_key: routing_key.into().into(),
arguments,
});
self.call::<_, QueueUnbindOk>(unbind).map(|_| ())
Expand All @@ -425,8 +421,7 @@ impl Channel {
/// [`Queue::purge`](struct.Queue.html#method.purge) to avoid this.
pub fn queue_purge<S: Into<String>>(&self, queue: S) -> Result<u32> {
let purge = AmqpQueue::Purge(QueuePurge {
ticket: 0,
queue: queue.into(),
queue: queue.into().into(), // S -> String -> ShortString
nowait: false,
});
self.call::<_, QueuePurgeOk>(purge)
Expand All @@ -440,8 +435,7 @@ impl Channel {
/// [`Queue::purge_nowait`](struct.Queue.html#method.purge_nowait) to avoid this.
pub fn queue_purge_nowait<S: Into<String>>(&self, queue: S) -> Result<()> {
let purge = AmqpQueue::Purge(QueuePurge {
ticket: 0,
queue: queue.into(),
queue: queue.into().into(),
nowait: true,
});
self.call_nowait(purge)
Expand Down Expand Up @@ -522,7 +516,7 @@ impl Channel {
durable: false,
auto_delete: false,
internal: false,
arguments: FieldTable::new(),
arguments: FieldTable::default(),
};
let declare =
AmqpExchange::Declare(options.into_declare(type_, exchange.clone(), true, false));
Expand All @@ -548,10 +542,9 @@ impl Channel {
arguments: FieldTable,
) -> Result<()> {
let bind = AmqpExchange::Bind(ExchangeBind {
ticket: 0,
destination: destination.into(),
source: source.into(),
routing_key: routing_key.into(),
destination: destination.into().into(),
source: source.into().into(),
routing_key: routing_key.into().into(),
nowait: false,
arguments,
});
Expand All @@ -576,10 +569,9 @@ impl Channel {
arguments: FieldTable,
) -> Result<()> {
let bind = AmqpExchange::Bind(ExchangeBind {
ticket: 0,
destination: destination.into(),
source: source.into(),
routing_key: routing_key.into(),
destination: destination.into().into(),
source: source.into().into(),
routing_key: routing_key.into().into(),
nowait: true,
arguments,
});
Expand All @@ -604,10 +596,9 @@ impl Channel {
arguments: FieldTable,
) -> Result<()> {
let unbind = AmqpExchange::Unbind(ExchangeUnbind {
ticket: 0,
destination: destination.into(),
source: source.into(),
routing_key: routing_key.into(),
destination: destination.into().into(),
source: source.into().into(),
routing_key: routing_key.into().into(),
nowait: false,
arguments,
});
Expand All @@ -634,10 +625,9 @@ impl Channel {
arguments: FieldTable,
) -> Result<()> {
let unbind = AmqpExchange::Unbind(ExchangeUnbind {
ticket: 0,
destination: destination.into(),
source: source.into(),
routing_key: routing_key.into(),
destination: destination.into().into(),
source: source.into().into(),
routing_key: routing_key.into().into(),
nowait: true,
arguments,
});
Expand All @@ -652,8 +642,7 @@ impl Channel {
/// `if_unused` was true and it has queue bindings), it will close this channel.
pub fn exchange_delete<S: Into<String>>(&self, exchange: S, if_unused: bool) -> Result<()> {
let delete = AmqpExchange::Delete(ExchangeDelete {
ticket: 0,
exchange: exchange.into(),
exchange: exchange.into().into(),
if_unused,
nowait: false,
});
Expand All @@ -672,8 +661,7 @@ impl Channel {
if_unused: bool,
) -> Result<()> {
let delete = AmqpExchange::Delete(ExchangeDelete {
ticket: 0,
exchange: exchange.into(),
exchange: exchange.into().into(),
if_unused,
nowait: true,
});
Expand Down Expand Up @@ -732,7 +720,7 @@ impl Channel {
// to not supproting nowait consume - we want the cancel-ok to clean
// up channels in the I/O loop.
self.call::<_, CancelOk>(AmqpBasic::Cancel(Cancel {
consumer_tag: consumer.consumer_tag().to_string(),
consumer_tag: consumer.consumer_tag().into(),
nowait: false,
}))
.map(|_ok| ())
Expand Down
Loading

0 comments on commit bb41f94

Please sign in to comment.