Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge branch 'unsubscribe' of https://github.com/squaremo/node-amqp i…

…nto unsub
  • Loading branch information...
commit 0b6d0635cb03b4b67b3f52e0c9cd7eda9c0d5464 2 parents 964856c + e6192b5
Theo Schlossnagle postwait authored

Showing 3 changed files with 60 additions and 2 deletions. Show diff stats Hide diff stats

  1. +20 0 README.md
  2. +13 2 amqp.js
  3. +27 0 test/test-unsubscribe.js
20 README.md
Source Rendered
@@ -188,6 +188,26 @@ Look at the source code if you need to do this.
188 188
189 189 This method will emit 'basicConsumeOk' when ready.
190 190
  191 +### queue.unsubscribe(consumerTag)
  192 +
  193 +Unsubscribe from a queue, given the consumer tag. The consumer tag is
  194 +supplied to the *promise callback* of `Queue.subscribeRaw` or
  195 +`Queue.subscribe`:
  196 +
  197 + connection.queue('foo', function(queue) {
  198 + var ctag;
  199 + queue.subscribe(function(msg) {...})
  200 + .addCallback(function(ok) { ctag = ok.consumerTag; });
  201 + // ... and in some other callback
  202 + queue.unsubscribe(ctag);
  203 + });
  204 +
  205 +Note that `Queue.unsubscribe` will not requeue messages that have not
  206 +been acknowledged. You need to close the queue or connection for that
  207 +to happen. You may also receive messages after calling `unsubscribe`;
  208 +you will **not** receive messages from the queue after the unsubscribe
  209 +promise callback has been invoked, however.
  210 +
191 211 ### queue.shift()
192 212
193 213 For use with `subscribe({ack: true}, fn)`. Acknowledges the last
15 amqp.js
@@ -1349,7 +1349,7 @@ Channel.prototype._handleTaskReply = function (channel, method, args) {
1349 1349 if (this._tasks[i].reply == method) {
1350 1350 task = this._tasks[i];
1351 1351 this._tasks.splice(i, 1);
1352   - task.promise.emitSuccess();
  1352 + task.promise.emitSuccess(args);
1353 1353 this._tasksFlush();
1354 1354 return true;
1355 1355 }
@@ -1425,6 +1425,18 @@ Queue.prototype.subscribeRaw = function (/* options, messageListener */) {
1425 1425 });
1426 1426 };
1427 1427
  1428 +Queue.prototype.unsubscribe = function(consumerTag) {
  1429 + var self = this;
  1430 + return this._taskPush(methods.basicCancelOk, function () {
  1431 + self.connection._sendMethod(self.channel, methods.basicCancel,
  1432 + { reserved1: 0,
  1433 + consumerTag: consumerTag,
  1434 + noWait: false });
  1435 + })
  1436 + .addCallback(function () {
  1437 + delete self.consumerTagListeners[consumerTag];
  1438 + });
  1439 +};
1428 1440
1429 1441 Queue.prototype.subscribe = function (/* options, messageListener */) {
1430 1442 var self = this;
@@ -1514,7 +1526,6 @@ Queue.prototype.subscribe = function (/* options, messageListener */) {
1514 1526 };
1515 1527 Queue.prototype.subscribeJSON = Queue.prototype.subscribe;
1516 1528
1517   -
1518 1529 /* Acknowledges the last message */
1519 1530 Queue.prototype.shift = function () {
1520 1531 if (this._lastMessage) {
27 test/test-unsubscribe.js
... ... @@ -0,0 +1,27 @@
  1 +require('./harness');
  2 +
  3 +var queueName = 'node-unsubscribe-test';
  4 +var counter = 0;
  5 +
  6 +connection.on('ready', function() {
  7 + var ex = connection.exchange('');
  8 + var q = connection.queue(queueName, {autoDelete: false}, function() {
  9 + ex.publish(queueName, {"msg": 'Message1'});
  10 + var defr = q.subscribe(function() { counter += 1; });
  11 + defr.addCallback(function(ok) {
  12 + // NB there is a race here, since the publish above and this
  13 + // unsubscribe are sent over different channels.
  14 + var defr2 = q.unsubscribe(ok.consumerTag);
  15 + defr2.addCallback(function() {
  16 + connection.publish(queueName, {"msg": 'Message2'});
  17 + // alas I cannot think of a way to synchronise on the queue
  18 + setTimeout(function() { q.destroy().addCallback(function() {
  19 + connection.end(); })}, 500);
  20 + });
  21 + });
  22 + });
  23 +});
  24 +
  25 +process.addListener('exit', function() {
  26 + assert.equal(1, counter);
  27 +});

0 comments on commit 0b6d063

Please sign in to comment.
Something went wrong with that request. Please try again.