Skip to content

Commit

Permalink
Merge pull request #2 from bkearns/master
Browse files Browse the repository at this point in the history
Added qos (with a prefetch shortcut) and exchange bindings.
  • Loading branch information
Antti committed Jun 4, 2015
2 parents 4ce2f71 + 7d83778 commit ff0fef7
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 2 deletions.
1 change: 1 addition & 0 deletions .travis.yml
@@ -1,2 +1,3 @@
language: rust
rust: nightly

5 changes: 4 additions & 1 deletion examples/consumer.rs
Expand Up @@ -15,6 +15,7 @@ fn consumer_function(channel: &mut Channel, deliver: protocol::basic::Deliver, h
println!("Deliver info: {:?}", deliver);
println!("Content headers: {:?}", headers);
println!("Content body: {:?}", body);
println!("Content body(as string): {:?}", String::from_utf8(body).unwrap());
channel.basic_ack(deliver.delivery_tag, false);
}

Expand All @@ -26,11 +27,13 @@ fn main() {
let queue_name = "test_queue";
//queue: &str, passive: bool, durable: bool, exclusive: bool, auto_delete: bool, nowait: bool, arguments: Table
let queue_declare = channel.queue_declare(queue_name, false, true, false, false, false, table::new());
println!("Queue declare: {:?}", queue_declare);

println!("Queue declare: {:?}", queue_declare);
channel.basic_prefetch(10);
//queue: &str, consumer_tag: &str, no_local: bool, no_ack: bool, exclusive: bool, nowait: bool, arguments: Table
println!("Declaring consumer...");
let consumer_name = channel.basic_consume(consumer_function, queue_name, "", false, false, false, false, table::new());

println!("Starting consumer {:?}", consumer_name);
channel.start_consuming();

Expand Down
35 changes: 35 additions & 0 deletions examples/exchange_bind.rs
@@ -0,0 +1,35 @@
extern crate amqp;

use amqp::session::Options;
use amqp::session::Session;
use amqp::protocol;
use amqp::table;
use amqp::basic::Basic;
use std::default::Default;


fn main() {
let mut session = Session::new(Options{.. Default::default()}).ok().unwrap();
let mut channel = session.open_channel(1).ok().unwrap();
println!("Openned channel: {:?}", channel.id);

let exchange1 = "test_exchange";
let exchange2 = "test_exchange2";
let exchange_type = "topic";

//queue: &str, passive: bool, durable: bool, exclusive: bool, auto_delete: bool, nowait: bool, arguments: Table
let exchange_declare1 = channel.exchange_declare(exchange1, exchange_type,
false, true, false, false, false, table::new());

println!("Exchange declare: {:?}", exchange_declare1);
let exchange_declare2 = channel.exchange_declare(exchange2, exchange_type,
false, true, false, false, false, table::new());
println!("Exchange declare: {:?}", exchange_declare2);

let bind_reply = channel.exchange_bind(exchange1, exchange2, "#", table::new());
println!("Exchange bind: {:?}", bind_reply);


channel.close(200, "Bye".to_string());
session.close(200, "Good Bye".to_string());
}
15 changes: 14 additions & 1 deletion src/basic.rs
Expand Up @@ -3,7 +3,7 @@ use channel::ConsumerCallback;
use table::Table;
use framing::{ContentHeaderFrame, FrameType, Frame};
use protocol::{MethodFrame, basic, Method};
use protocol::basic::{BasicProperties, GetOk, Consume, ConsumeOk, Deliver, Publish, Ack, Nack, Reject};
use protocol::basic::{BasicProperties, GetOk, Consume, ConsumeOk, Deliver, Publish, Ack, Nack, Reject, Qos, QosOk};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};

enum AckAction {
Expand Down Expand Up @@ -31,6 +31,8 @@ pub trait Basic <'a> {
fn basic_ack(&mut self, delivery_tag: u64, multiple: bool);
fn basic_nack(&mut self, delivery_tag: u64, multiple: bool, requeue: bool);
fn basic_reject(&mut self, delivery_tag: u64, requeue: bool);
fn basic_prefetch(&mut self, prefetch_count: u16);
fn basic_qos(&mut self, prefetch_size: u32, prefetch_count: u16, global: bool);
}

// #[derive(Debug)]
Expand Down Expand Up @@ -190,4 +192,15 @@ impl <'a> Basic<'a> for Channel {
self.send_method_frame(&Reject{delivery_tag: delivery_tag, requeue: requeue});
}

fn basic_prefetch(&mut self, prefetch_count:u16 ){
self.basic_qos(0, prefetch_count, false);
}

fn basic_qos(&mut self, prefetch_size: u32, prefetch_count: u16, global: bool){
let qos=&Qos{prefetch_size: prefetch_size,
prefetch_count: prefetch_count,
global: global};
let reply: QosOk = self.rpc(qos, "basic.qos-ok").ok().unwrap();
println!("reply: {:?}", reply);
}
}
9 changes: 9 additions & 0 deletions src/channel.rs
Expand Up @@ -80,6 +80,15 @@ impl Channel {
self.rpc(&declare,"exchange.declare-ok")
}

pub fn exchange_bind(&mut self, destination: &str, source: &str,
routing_key: &str, arguments: Table) {
let bind = protocol::exchange::Bind {
ticket: 0, destination: destination.to_string(), source: source.to_string(),
routing_key:routing_key.to_string(), nowait: false, arguments: arguments
};
let _reply: protocol::exchange::BindOk = self.rpc(&bind, "exchange.bind-ok").ok().unwrap();
}

pub fn queue_declare(&mut self, queue: &str, passive: bool, durable: bool, exclusive: bool,
auto_delete: bool, nowait: bool, arguments: Table) -> AMQPResult<protocol::queue::DeclareOk> {
let declare = protocol::queue::Declare {
Expand Down

0 comments on commit ff0fef7

Please sign in to comment.