Skip to content

Commit

Permalink
Allow for use without Exchange object, but defaulting to amq.topic
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Apr 22, 2010
1 parent a498466 commit 4bda812
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 10 deletions.
32 changes: 23 additions & 9 deletions README.md
Expand Up @@ -17,15 +17,13 @@ An example of connecting to a server and listening on a queue.

// Wait for connection to become established.
connection.addListener('ready', function () {
// Use the default 'amq.topic' exchange
var exchange = connection.exchange();

// Create a queue and bind to all messages.
var queue = connection.queue('my-queue');
queue.bind(exchange, '#')
// Use the default 'amq.topic' exchange
var q = connection.queue('my-queue');
q.bind(exchange, '#');

// Receive messages
queue.subscribe(function (message) {
q.subscribe(function (message) {
// Print messages to stdout
sys.p(message);
});
Expand Down Expand Up @@ -56,12 +54,25 @@ must be completed before any communication can begin. `net.Connection` does
the handshake automatically and emits the `ready` event when the handshaking
is complete.

To close the connection use `connection.close()`.

### connection.publish(routingKey, body)

Publishes a message to the default 'amq.topic' exchange.


### connection.end()

`amqp.Connection` is derived from `net.Stream` and has all the same methods.
So use `connection.end()` to terminate a connection gracefully.



## Exchange

Events: `'open'`, this is emitted when the exchange is declared and ready to

### exchange.addListener('open', callback)

The open event is emitted when the exchange is declared and ready to
be used.


Expand Down Expand Up @@ -198,11 +209,14 @@ For use with `subscribe({ack: true}, fn)`. Acknowledges the last
message.


### `queue.bind(exchange, routing)`
### `queue.bind([exchange,] routing)`

This method binds a queue to an exchange. Until a queue is
bound it will not receive any messages.

If the `exchange` argument is left out `'amq.topic'` will be used.


### `queue.delete(options)`

Delete the queue. Without options, the queue will be deleted even if it has
Expand Down
26 changes: 25 additions & 1 deletion amqp.js
Expand Up @@ -685,6 +685,8 @@ function Connection (options) {
var state = 'handshake';
var parser;

this._defaultExchange = null;

self.addListener('connect', function () {
// channel 0 is the control channel.
self.channels = [self];
Expand Down Expand Up @@ -1072,6 +1074,12 @@ Connection.prototype.exchange = function (name, options) {
return exchange;
};

// Publishes a message to the amq.topic exchange.
Connection.prototype.publish = function (routingKey, body) {
if (!this._defaultExchange) this._defaultExchange = this.exchange();
return this._defaultExchange.publish(routingKey, body);
};



// Properties:
Expand Down Expand Up @@ -1287,8 +1295,24 @@ Queue.prototype.shift = function () {
};


Queue.prototype.bind = function (exchange, routingKey) {
Queue.prototype.bind = function (/* [exchange,] routingKey */) {
var self = this;

// The first argument, exchange is optional.
// If not supplied the connection will use the default 'amq.topic'
// exchange.

var exchange, routingKey;

if (arguments.length == 2) {
exchange = arguments[0];
routingKey = arguments[1];
} else {
exchange = 'amq.topic';
routingKey = arguments[0];
}


return this._taskPush(methods.queueBindOk, function () {
var exchangeName = exchange instanceof Exchange ? exchange.name : exchange;
self.connection._sendMethod(self.channel, methods.queueBind,
Expand Down
46 changes: 46 additions & 0 deletions test/test-default-exchange.js
@@ -0,0 +1,46 @@
require('./harness');

var recvCount = 0;
var body = "hello world";

connection.addListener('ready', function () {
puts("connected to " + connection.serverProperties.product);

var q = connection.queue('node-default-exchange');

q.bind("#");

q.subscribe(function (msg) {
recvCount++;

switch (msg._routingKey) {
case 'message.msg1':
assert.equal(1, msg.one);
assert.equal(2, msg.two);
break;

case 'message.msg2':
assert.equal('world', msg.hello);
assert.equal('bar', msg.foo);
break;

default:
throw new Error('unexpected routing key: ' + msg._routingKey);
}
})
.addCallback(function () {
puts("publishing 2 msg messages");
connection.publish('message.msg1', {two:2, one:1});
connection.publish('message.msg2', {foo:'bar', hello: 'world'});

setTimeout(function () {
// wait one second to receive the message, then quit
connection.end();
}, 1000);
});
});


process.addListener('exit', function () {
assert.equal(2, recvCount);
});

0 comments on commit 4bda812

Please sign in to comment.