From 911cd1060990a1c61f86ff3abcce4af2ddccec1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Fantone?= Date: Tue, 1 Nov 2016 03:12:40 +0000 Subject: [PATCH 1/4] Migrate when -> bluebird --- channel_api.js | 12 +-- lib/callback_model.js | 19 ++-- lib/channel_model.js | 121 ++++++++++++------------- package.json | 2 +- test/channel.js | 69 +++++++------- test/channel_api.js | 206 +++++++++++++++++++----------------------- test/util.js | 38 ++++---- 7 files changed, 220 insertions(+), 247 deletions(-) diff --git a/channel_api.js b/channel_api.js index 42e70429..52440149 100644 --- a/channel_api.js +++ b/channel_api.js @@ -1,14 +1,14 @@ var raw_connect = require('./lib/connect').connect; var ChannelModel = require('./lib/channel_model').ChannelModel; -var defer = require('when').defer; +var Promise = require('bluebird'); function connect(url, connOptions) { - var opened = defer(); - raw_connect(url, connOptions, function(err, conn) { - if (err === null) opened.resolve(new ChannelModel(conn)); - else opened.reject(err); + return Promise.fromCallback(function(cb) { + return raw_connect(url, connOptions, cb); + }) + .then(function(conn) { + return new ChannelModel(conn); }); - return opened.promise; }; module.exports.connect = connect; diff --git a/lib/callback_model.js b/lib/callback_model.js index 4f44ee04..dca383f8 100644 --- a/lib/callback_model.js +++ b/lib/callback_model.js @@ -5,7 +5,7 @@ 'use strict'; var defs = require('./defs'); -var when = require('when'), defer = when.defer; +var Promise = require('bluebird'); var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var BaseChannel = require('./channel').BaseChannel; @@ -313,15 +313,16 @@ ConfirmChannel.prototype.waitForConfirms = function(k) { unconfirmed.forEach(function(val, index) { if (val === null); // already confirmed else { - var confirmed = defer(); - unconfirmed[index] = function(err) { - if (val) val(err); - if (err === null) confirmed.resolve(); - else confirmed.reject(err); - }; - awaiting.push(confirmed.promise); + var confirmed = new Promise(function(resolve, reject) { + unconfirmed[index] = function(err) { + if (val) val(err); + if (err === null) resolve(); + else reject(err); + }; + }); + awaiting.push(confirmed); } }); - return when.all(awaiting).then(function() { k(); }, + return Promise.all(awaiting).then(function() { k(); }, function(err) { k(err); }); }; diff --git a/lib/channel_model.js b/lib/channel_model.js index 0eceeabc..885a4b3e 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -5,7 +5,7 @@ 'use strict'; var defs = require('./defs'); -var when = require('when'), defer = when.defer; +var Promise = require('bluebird'); var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var BaseChannel = require('./channel').BaseChannel; @@ -29,12 +29,7 @@ module.exports.ChannelModel = ChannelModel; var CM = ChannelModel.prototype; CM.close = function() { - var closed = defer(); - this.connection.close(function (err) { - if (err === null) closed.resolve(); - else closed.reject(err); - }); - return closed.promise; + return Promise.fromCallback(this.connection.close.bind(this.connection)); }; // Channels @@ -59,17 +54,18 @@ var C = Channel.prototype; // response's fields; this is intended to be suitable for implementing // API procedures. C.rpc = function(method, fields, expect) { - var reply = defer(); - this._rpc(method, fields, expect, function(err, f) { - if (err !== null) reply.reject(err); - else reply.resolve(f.fields); + var self = this; + return Promise.fromCallback(function(cb) { + return self._rpc(method, fields, expect, cb); + }) + .then(function(f) { + return f.fields; }); - return reply.promise; }; // Do the remarkably simple channel open handshake C.open = function() { - return when.try(this.allocate.bind(this)).then( + return Promise.try(this.allocate.bind(this)).then( function(ch) { return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, defs.ChannelOpenOk); @@ -77,10 +73,10 @@ C.open = function() { }; C.close = function() { - var closed = defer(); - this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, - closed.resolve) - return closed.promise; + return Promise.fromCallback(function(cb) { + return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, + cb); + }); }; // === Public API, declaring queues and stuff === @@ -151,7 +147,7 @@ C.bindExchange = function(dest, source, pattern, argt) { C.unbindExchange = function(dest, source, pattern, argt) { return this.rpc(defs.ExchangeUnbind, Args.unbindExchange(dest, source, pattern, argt), - defs.ExchangeUnbindOk); + defs.ExchangeUnbindOk); }; // Working with messages @@ -170,58 +166,52 @@ C.consume = function(queue, callback, options) { // NB we want the callback to be run synchronously, so that we've // registered the consumerTag before any messages can arrive. var fields = Args.consume(queue, options); - var reply = defer(); - this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, - function(err, ok) { - if (err === null) { - self.registerConsumer(ok.fields.consumerTag, - callback); - reply.resolve(ok.fields); - } - else reply.reject(err); - }); - return reply.promise; + return Promise.fromCallback(function(cb) { + self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); + }) + .then(function(ok) { + self.registerConsumer(ok.fields.consumerTag, callback); + return ok.fields; + }); }; C.cancel = function(consumerTag) { var self = this; - var reply = defer(); - this._rpc(defs.BasicCancel, Args.cancel(consumerTag), - defs.BasicCancelOk, - function(err, ok) { - if (err === null) { - self.unregisterConsumer(consumerTag); - reply.resolve(ok.fields); - } - else reply.reject(err); - }); - return reply.promise; + return Promise.fromCallback(function(cb) { + self._rpc(defs.BasicCancel, Args.cancel(consumerTag), + defs.BasicCancelOk, + cb); + }) + .then(function(ok) { + self.unregisterConsumer(consumerTag); + return ok.fields; + }); }; C.get = function(queue, options) { - var reply = defer(); var self = this; var fields = Args.get(queue, options); - this.sendOrEnqueue(defs.BasicGet, fields, function(err, f) { - if (err === null) { - if (f.id === defs.BasicGetEmpty) { - reply.resolve(false); - } - else if (f.id === defs.BasicGetOk) { - var fields = f.fields; + return Promise.fromCallback(function(cb) { + return self.sendOrEnqueue(defs.BasicGet, fields, cb); + }) + .then(function(f) { + if (f.id === defs.BasicGetEmpty) { + return false; + } + else if (f.id === defs.BasicGetOk) { + var fields = f.fields; + return new Promise(function(resolve) { self.handleMessage = acceptMessage(function(m) { m.fields = fields; - reply.resolve(m); + resolve(m); }); - } - else { - reply.reject(new Error("Unexpected response to BasicGet: " + - inspect(f))); - } + }); } - else reply.reject(err); - }); - return reply.promise; + else { + throw new Error("Unexpected response to BasicGet: " + + inspect(f)); + } + }) }; C.ack = function(message, allUpTo) { @@ -313,14 +303,15 @@ CC.waitForConfirms = function() { unconfirmed.forEach(function(val, index) { if (val === null); // already confirmed else { - var confirmed = defer(); - unconfirmed[index] = function(err) { - if (val) val(err); - if (err === null) confirmed.resolve(); - else confirmed.reject(err); - }; - awaiting.push(confirmed.promise); + var confirmed = new Promise(function(resolve, reject) { + unconfirmed[index] = function(err) { + if (val) val(err); + if (err === null) resolve(); + else reject(err); + }; + }); + awaiting.push(confirmed); } }); - return when.all(awaiting); + return Promise.all(awaiting); }; diff --git a/package.json b/package.json index 7b8878e2..f4af712d 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "bitsyntax": "~0.0.4", "buffer-more-ints": "0.0.2", "readable-stream": "1.x >=1.1.9", - "when": "~3.6.2" + "bluebird": "^3.4.6" }, "devDependencies": { "mocha": "~1", diff --git a/test/channel.js b/test/channel.js index a57f1e1c..9e275302 100644 --- a/test/channel.js +++ b/test/channel.js @@ -3,7 +3,7 @@ 'use strict'; var assert = require('assert'); -var defer = require('when').defer; +var Promise = require('bluebird'); var Channel = require('../lib/channel').Channel; var Connection = require('../lib/connection').Connection; var util = require('./util'); @@ -76,14 +76,12 @@ var DELIVER_FIELDS = { }; function open(ch) { - var opened = defer(); - ch.allocate(); - ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, - function(err, _) { - if (err === null) opened.resolve(); - else opened.reject(err); - }); - return opened.promise; + return Promise.try(function() { + ch.allocate(); + return Promise.fromCallback(function(cb) { + ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, cb); + }); + }); } suite("channel open and close", function() { @@ -112,10 +110,9 @@ test("open, close", channelTest( function(ch, done) { open(ch) .then(function() { - var closed = defer(); - ch.closeBecause("Bye", defs.constants.REPLY_SUCCESS, - closed.resolve); - return closed.promise; + return new Promise(function(resolve) { + ch.closeBecause("Bye", defs.constants.REPLY_SUCCESS, resolve); + }); }) .then(succeed(done), fail(done)); }, @@ -204,7 +201,7 @@ test("RPC", channelTest( prefetchSize: 0, global: false }; - + ch._rpc(defs.BasicQos, fields, defs.BasicQosOk, wheeboom); ch._rpc(defs.BasicQos, fields, defs.BasicQosOk, wheeboom); ch._rpc(defs.BasicQos, fields, defs.BasicQosOk, wheeboom); @@ -233,7 +230,7 @@ test("Bad RPC", channelTest( assert.strictEqual(505, error.code); succeed(errLatch)(); }); - + open(ch) .then(function() { ch._rpc(defs.BasicRecover, {requeue: true}, defs.BasicRecoverOk, @@ -257,27 +254,34 @@ test("Bad RPC", channelTest( test("RPC on closed channel", channelTest( function(ch, done) { open(ch); - var close = defer(), fail1 = defer(), fail2 = defer(); - ch.on('error', function(error) { - assert.strictEqual(504, error.code); - close.resolve(); + + var close = new Promise(function(resolve) { + ch.on('error', function(error) { + assert.strictEqual(504, error.code); + resolve(); + }); }); - function failureCb(d) { + function failureCb(resolve, reject) { return function(err) { - if (err !== null) d.resolve(); - else d.reject(); + if (err !== null) resolve(); + else reject(); } } - ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk, - failureCb(fail1)); - ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk, - failureCb(fail2)); - close.promise - .then(function() { return fail1.promise; }) - .then(function() { return fail2.promise; }) - .then(succeed(done), fail(done)); + var fail1 = new Promise(function(resolve, reject) { + return ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk, + failureCb(resolve, reject)); + }); + + var fail2 = new Promise(function(resolve, reject) { + return ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk, + failureCb(resolve, reject)); + }); + + Promise.join(close, fail1, fail2) + .then(succeed(done)) + .catch(fail(done)); }, function(send, await, done, ch) { await(defs.BasicRecover)() @@ -289,7 +293,8 @@ test("RPC on closed channel", channelTest( }, ch); return await(defs.ChannelCloseOk); }) - .then(succeed(done), fail(done)); + .then(succeed(done)) + .catch(fail(done)); })); test("publish all < single chunk threshold", channelTest( @@ -453,7 +458,7 @@ test("bad properties send", channelTest( }, function(send, await, done, ch) { done(); - })); + })); test("bad consumer", channelTest( function(ch, done) { diff --git a/test/channel_api.js b/test/channel_api.js index f0f52439..8a258a84 100644 --- a/test/channel_api.js +++ b/test/channel_api.js @@ -6,8 +6,7 @@ var util = require('./util'); var succeed = util.succeed, fail = util.fail; var schedule = util.schedule; var randomString = util.randomString; -var when = require('when'); -var defer = when.defer; +var Promise = require('bluebird'); var URL = process.env.URL || 'amqp://localhost'; @@ -17,22 +16,9 @@ function connect() { // Expect this promise to fail, and flip the results accordingly. function expectFail(promise) { - var rev = defer(); - promise.then(rev.reject.bind(rev), rev.resolve.bind(rev)); - return rev.promise; -} - -// Often, even interdependent operations don't need to be explicitly -// chained together with `.then`, since the channel implicitly -// serialises RPCs. Synchronising on the last operation is sufficient, -// provided all the operations are successful. This procedure removes -// some of the `then` noise, while still failing if any of its -// arguments fail. -function doAll(/* promise... */) { - return when.all(arguments) - .then(function(results) { - return results[results.length - 1]; - }); + return new Promise(function(resolve, reject) { + return promise.then(reject).catch(resolve); + }); } // I'll rely on operations being rejected, rather than the channel @@ -124,15 +110,15 @@ chtest("fail on reasserting exchange with different type", }); chtest("channel break on publishing to non-exchange", function(ch) { - var bork = defer(); - ch.on('error', bork.resolve.bind(bork)); - ch.publish(randomString(), '', new Buffer('foobar')); - return bork.promise; + return new Promise(function(resolve) { + ch.on('error', resolve); + ch.publish(randomString(), '', new Buffer('foobar')); + }); }); chtest("delete queue", function(ch) { var q = 'test.delete-queue'; - return doAll( + return Promise.join( ch.assertQueue(q, QUEUE_OPTS), ch.checkQueue(q)) .then(function() { @@ -143,7 +129,7 @@ chtest("delete queue", function(ch) { chtest("delete exchange", function(ch) { var ex = 'test.delete-exchange'; - return doAll( + return Promise.join( ch.assertExchange(ex, 'fanout', EX_OPTS), ch.checkExchange(ex)) .then(function() { @@ -157,23 +143,18 @@ chtest("delete exchange", function(ch) { // Wait for the queue to meet the condition; useful for waiting for // messages to arrive, for example. function waitForQueue(q, condition) { - var ready = defer(); - connect(URL).then(function(c) { + return connect(URL).then(function(c) { return c.createChannel() .then(function(ch) { - function check() { - ch.checkQueue(q).then(function(qok) { - if (condition(qok)) { - c.close(); - ready.resolve(qok); - } - else schedule(check); - }); - } - check(); + return ch.checkQueue(q).then(function(qok) { + if (condition(qok)) { + c.close(); + return qok; + } + else schedule(check); + }); }); }); - return ready.promise; } // Return a promise that resolves when the queue has at least `num` @@ -191,14 +172,14 @@ suite("sendMessage", function() { chtest("send to queue and get from queue", function(ch) { var q = 'test.send-to-q'; var msg = randomString(); - return doAll( - ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q)) + return Promise.join(ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) .then(function() { ch.sendToQueue(q, new Buffer(msg)); - return waitForMessages(q);}) + return waitForMessages(q); + }) .then(function() { - return ch.get(q, {noAck: true});}) + return ch.get(q, {noAck: true}); + }) .then(function(m) { assert(m); assert.equal(msg, m.content.toString()); @@ -208,7 +189,7 @@ chtest("send to queue and get from queue", function(ch) { chtest("send (and get) zero content to queue", function(ch) { var q = 'test.send-to-q'; var msg = new Buffer(0); - return doAll( + return Promise.join( ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) .then(function() { @@ -232,7 +213,7 @@ chtest("route message", function(ch) { var q = 'test.route-message-q'; var msg = randomString(); - return doAll( + return Promise.join( ch.assertExchange(ex, 'fanout', EX_OPTS), ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), @@ -270,7 +251,7 @@ chtest("unbind queue", function(ch) { var viabinding = randomString(); var direct = randomString(); - return doAll( + return Promise.join( ch.assertExchange(ex, 'fanout', EX_OPTS), ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), @@ -303,7 +284,7 @@ chtest("consume via exchange-exchange binding", function(ch) { var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2'; var q = 'test.ex-ex-binding-q'; var rk = 'test.routing.key', msg = randomString(); - return doAll( + return Promise.join( ch.assertExchange(ex1, 'direct', EX_OPTS), ch.assertExchange(ex2, 'fanout', {durable: false, internal: true}), @@ -312,16 +293,16 @@ chtest("consume via exchange-exchange binding", function(ch) { ch.bindExchange(ex2, ex1, rk, {}), ch.bindQueue(q, ex2, '', {})) .then(function() { - var arrived = defer(); - function delivery(m) { - if (m.content.toString() === msg) arrived.resolve(); - else arrived.reject(new Error("Wrong message")); - } - ch.consume(q, delivery, {noAck: true}) - .then(function() { - ch.publish(ex1, rk, new Buffer(msg)); - }); - return arrived.promise; + return new Promise(function(resolve, reject) { + function delivery(m) { + if (m.content.toString() === msg) resolve(); + else reject(new Error("Wrong message")); + } + ch.consume(q, delivery, {noAck: true}) + .then(function() { + ch.publish(ex1, rk, new Buffer(msg)); + }); + }); }); }); @@ -333,7 +314,7 @@ chtest("unbind exchange", function(ch) { var viabinding = randomString(); var direct = randomString(); - return doAll( + return Promise.join( ch.assertExchange(source, 'fanout', EX_OPTS), ch.assertExchange(dest, 'fanout', EX_OPTS), ch.assertQueue(q, QUEUE_OPTS), @@ -365,58 +346,55 @@ chtest("unbind exchange", function(ch) { // This is a bit convoluted. Sorry. chtest("cancel consumer", function(ch) { var q = 'test.consumer-cancel'; - var recv1 = defer(); var ctag; - - doAll( + var recv1 = new Promise(function (resolve, reject) { + Promise.join( ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), // My callback is 'resolve the promise in `arrived`' - ch.consume(q, function() { recv1.resolve(); }, {noAck:true}) + ch.consume(q, resolve, {noAck:true}) .then(function(ok) { ctag = ok.consumerTag; ch.sendToQueue(q, new Buffer('foo')); })); + }); + // A message should arrive because of the consume - return recv1.promise.then(function() { - // replace the promise resolved by the consume callback - recv1 = defer(); - - return doAll( + return recv1.then(function() { + var recv2 = Promise.join( ch.cancel(ctag).then(function() { - ch.sendToQueue(q, new Buffer('bar')); + return ch.sendToQueue(q, new Buffer('bar')); }), // but check a message did arrive in the queue waitForMessages(q)) .then(function() { - ch.get(q, {noAck:true}) - .then(function(m) { - // I'm going to reject it, because I flip succeed/fail - // just below - if (m.content.toString() === 'bar') - recv1.reject(); - }); - return expectFail(recv1.promise); - // i.e., fail on delivery, succeed on get-ok + return ch.get(q, {noAck:true}); + }) + .then(function(m) { + // I'm going to reject it, because I flip succeed/fail + // just below + if (m.content.toString() === 'bar') + throw new Error(); }); + + return expectFail(recv2); }); }); chtest("cancelled consumer", function(ch) { var q = 'test.cancelled-consumer'; - var nullRecv = defer(); - - doAll( - ch.assertQueue(q), - ch.purgeQueue(q), - ch.consume(q, function(msg) { - if (msg === null) nullRecv.resolve(); - else nullRecv.reject(new Error('Message not expected')); - })) - .then(function() { - ch.deleteQueue(q); - }); - return nullRecv.promise; + return new Promise(function(resolve, reject) { + return Promise.join( + ch.assertQueue(q), + ch.purgeQueue(q), + ch.consume(q, function(msg) { + if (msg === null) resolve(); + else reject(new Error('Message not expected')); + })) + .then(function() { + return ch.deleteQueue(q); + }); + }); }); // ack, by default, removes a single message from the queue @@ -424,7 +402,7 @@ chtest("ack", function(ch) { var q = 'test.ack'; var msg1 = randomString(), msg2 = randomString(); - return doAll( + return Promise.join( ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) .then(function() { @@ -454,7 +432,7 @@ chtest("nack", function(ch) { var q = 'test.nack'; var msg1 = randomString(); - return doAll( + return Promise.join( ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) .then(function() { ch.sendToQueue(q, new Buffer(msg1)); @@ -479,7 +457,7 @@ chtest("reject", function(ch) { var q = 'test.reject'; var msg1 = randomString(); - return doAll( + return Promise.join( ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) .then(function() { ch.sendToQueue(q, new Buffer(msg1)); @@ -500,7 +478,7 @@ chtest("reject", function(ch) { chtest("prefetch", function(ch) { var q = 'test.prefetch'; - return doAll( + return Promise.join( ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), ch.prefetch(1)) .then(function() { @@ -509,18 +487,19 @@ chtest("prefetch", function(ch) { return waitForMessages(q, 2); }) .then(function() { - var first = defer(); - return doAll( - ch.consume(q, function(m) { - first.resolve(m); - }, {noAck: false}), - first.promise.then(function(m) { - first = defer(); - ch.ack(m); - return first.promise.then(function(m) { - ch.ack(m); - }) - })); + return new Promise(function(resolve) { + var messageCount = 0; + function receive(msg) { + ch.ack(msg); + if (++messageCount > 1) { + resolve(messageCount); + } + } + return ch.consume(q, receive, {noAck: false}) + }); + }) + .then(function(c) { + return assert.equal(2, c); }); }); @@ -532,7 +511,7 @@ suite("confirms", function() { confirmtest('message is confirmed', function(ch) { var q = 'test.confirm-message'; - return doAll( + return Promise.join( ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) .then(function() { return ch.sendToQueue(q, new Buffer('bleep')); @@ -546,7 +525,7 @@ confirmtest('message is confirmed', function(ch) { // multi-ack. confirmtest('multiple confirms', function(ch) { var q = 'test.multiple-confirms'; - return doAll( + return Promise.join( ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) .then(function() { var multipleRainbows = false; @@ -558,18 +537,15 @@ confirmtest('multiple confirms', function(ch) { var cs = []; function sendAndPushPromise() { - var conf = defer(); - ch.sendToQueue(q, new Buffer('bleep'), {}, - function(err) { - if (err) conf.reject(); - else conf.resolve(); - }); - cs.push(conf.promise); + var conf = Promise.fromCallback(function(cb) { + return ch.sendToQueue(q, new Buffer('bleep'), {}, cb); + }); + cs.push(conf); } for (var i=0; i < num; i++) sendAndPushPromise(); - return when.all(cs).then(function() { + return Promise.all(cs).then(function() { if (multipleRainbows) return true; else if (num > 500) throw new Error( "Couldn't provoke the server" + diff --git a/test/util.js b/test/util.js index 8f0f9a7d..6f279405 100644 --- a/test/util.js +++ b/test/util.js @@ -1,11 +1,11 @@ 'use strict'; +var Promise = require('bluebird'); var crypto = require('crypto'); var Connection = require('../lib/connection').Connection; var PassThrough = require('stream').PassThrough || require('readable-stream/passthrough'); -var defer = require('when').defer; var defs = require('../lib/defs'); var assert = require('assert'); @@ -78,24 +78,24 @@ function runServer(socket, run) { function await(method) { return function() { - var d = defer(); - if (method) { - frames.step(function(e, f) { - if (e !== null) return d.reject(e); - if (f.id === method) - d.resolve(f); - else - d.reject(new Error("Expected method: " + method + - ", got " + f.id)); - }); - } - else { - frames.step(function(e, f) { - if (e !== null) return d.reject(e); - else d.resolve(f); - }); - } - return d.promise; + return new Promise(function(resolve, reject) { + if (method) { + frames.step(function(e, f) { + if (e !== null) return reject(e); + if (f.id === method) + resolve(f); + else + reject(new Error("Expected method: " + method + + ", got " + f.id)); + }); + } + else { + frames.step(function(e, f) { + if (e !== null) return reject(e); + else resolve(f); + }); + } + }); }; } run(send, await); From 1522f6e90daa25a3a468220f222b0b0658567fbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Fantone?= Date: Tue, 1 Nov 2016 12:32:20 +0000 Subject: [PATCH 2/4] Remove when usages from examples --- examples/tutorials/emit_log.js | 7 ++-- examples/tutorials/emit_log_direct.js | 7 ++-- examples/tutorials/emit_log_topic.js | 7 ++-- examples/tutorials/new_task.js | 9 ++-- examples/tutorials/package.json | 5 +-- examples/tutorials/receive.js | 8 ++-- examples/tutorials/receive_logs.js | 2 +- examples/tutorials/receive_logs_direct.js | 4 +- examples/tutorials/receive_logs_topic.js | 12 +++--- examples/tutorials/rpc_client.js | 51 +++++++++++------------ examples/tutorials/rpc_server.js | 2 +- examples/tutorials/send.js | 7 ++-- examples/tutorials/worker.js | 2 +- examples/waitForConfirms.js | 4 +- 14 files changed, 60 insertions(+), 67 deletions(-) diff --git a/examples/tutorials/emit_log.js b/examples/tutorials/emit_log.js index d41a9265..9b5d57e0 100755 --- a/examples/tutorials/emit_log.js +++ b/examples/tutorials/emit_log.js @@ -1,10 +1,9 @@ #!/usr/bin/env node var amqp = require('amqplib'); -var when = require('when'); amqp.connect('amqp://localhost').then(function(conn) { - return when(conn.createChannel().then(function(ch) { + return conn.createChannel().then(function(ch) { var ex = 'logs'; var ok = ch.assertExchange(ex, 'fanout', {durable: false}) @@ -16,5 +15,5 @@ amqp.connect('amqp://localhost').then(function(conn) { console.log(" [x] Sent '%s'", message); return ch.close(); }); - })).ensure(function() { conn.close(); }); -}).then(null, console.warn); + }).finally(function() { conn.close(); }); +}).catch(console.warn); diff --git a/examples/tutorials/emit_log_direct.js b/examples/tutorials/emit_log_direct.js index aa7fd50a..b81231e0 100755 --- a/examples/tutorials/emit_log_direct.js +++ b/examples/tutorials/emit_log_direct.js @@ -1,14 +1,13 @@ #!/usr/bin/env node var amqp = require('amqplib'); -var when = require('when'); var args = process.argv.slice(2); var severity = (args.length > 0) ? args[0] : 'info'; var message = args.slice(1).join(' ') || 'Hello World!'; amqp.connect('amqp://localhost').then(function(conn) { - return when(conn.createChannel().then(function(ch) { + return conn.createChannel().then(function(ch) { var ex = 'direct_logs'; var ok = ch.assertExchange(ex, 'direct', {durable: false}); @@ -17,5 +16,5 @@ amqp.connect('amqp://localhost').then(function(conn) { console.log(" [x] Sent %s:'%s'", severity, message); return ch.close(); }); - })).ensure(function() { conn.close(); }); -}).then(null, console.warn); + }).finally(function() { conn.close(); }); +}).catch(console.warn); diff --git a/examples/tutorials/emit_log_topic.js b/examples/tutorials/emit_log_topic.js index 1e720707..5de3bdcb 100755 --- a/examples/tutorials/emit_log_topic.js +++ b/examples/tutorials/emit_log_topic.js @@ -1,14 +1,13 @@ #!/usr/bin/env node var amqp = require('amqplib'); -var when = require('when'); var args = process.argv.slice(2); var key = (args.length > 0) ? args[0] : 'info'; var message = args.slice(1).join(' ') || 'Hello World!'; amqp.connect('amqp://localhost').then(function(conn) { - return when(conn.createChannel().then(function(ch) { + return conn.createChannel().then(function(ch) { var ex = 'topic_logs'; var ok = ch.assertExchange(ex, 'topic', {durable: false}); return ok.then(function() { @@ -16,5 +15,5 @@ amqp.connect('amqp://localhost').then(function(conn) { console.log(" [x] Sent %s:'%s'", key, message); return ch.close(); }); - })).ensure(function() { conn.close(); }) -}).then(null, console.log); + }).finally(function() { conn.close(); }) +}).catch(console.log); diff --git a/examples/tutorials/new_task.js b/examples/tutorials/new_task.js index fd7f5231..56d976ba 100755 --- a/examples/tutorials/new_task.js +++ b/examples/tutorials/new_task.js @@ -2,18 +2,17 @@ // Post a new task to the work queue var amqp = require('amqplib'); -var when = require('when'); amqp.connect('amqp://localhost').then(function(conn) { - return when(conn.createChannel().then(function(ch) { + return conn.createChannel().then(function(ch) { var q = 'task_queue'; var ok = ch.assertQueue(q, {durable: true}); - + return ok.then(function() { var msg = process.argv.slice(2).join(' ') || "Hello World!"; ch.sendToQueue(q, new Buffer(msg), {deliveryMode: true}); console.log(" [x] Sent '%s'", msg); return ch.close(); }); - })).ensure(function() { conn.close(); }); -}).then(null, console.warn); + }).finally(function() { conn.close(); }); +}).catch(console.warn); diff --git a/examples/tutorials/package.json b/examples/tutorials/package.json index 55f7f59e..71f49afe 100644 --- a/examples/tutorials/package.json +++ b/examples/tutorials/package.json @@ -4,9 +4,8 @@ "description": "The RabbitMQ tutorials, ported to amqplib", "main": "send.js", "dependencies": { - "amqplib": "", - "when": "", - "node-uuid": "" + "amqplib": "../..", + "node-uuid": "*" }, "scripts": { "test": "echo \"Error: no test specified\" && exit 1" diff --git a/examples/tutorials/receive.js b/examples/tutorials/receive.js index 434a621a..b7a7975c 100755 --- a/examples/tutorials/receive.js +++ b/examples/tutorials/receive.js @@ -5,17 +5,17 @@ var amqp = require('amqplib'); amqp.connect('amqp://localhost').then(function(conn) { process.once('SIGINT', function() { conn.close(); }); return conn.createChannel().then(function(ch) { - + var ok = ch.assertQueue('hello', {durable: false}); - + ok = ok.then(function(_qok) { return ch.consume('hello', function(msg) { console.log(" [x] Received '%s'", msg.content.toString()); }, {noAck: true}); }); - + return ok.then(function(_consumeOk) { console.log(' [*] Waiting for messages. To exit press CTRL+C'); }); }); -}).then(null, console.warn); +}).catch(console.warn); diff --git a/examples/tutorials/receive_logs.js b/examples/tutorials/receive_logs.js index 38f35bd8..d592af96 100755 --- a/examples/tutorials/receive_logs.js +++ b/examples/tutorials/receive_logs.js @@ -25,4 +25,4 @@ amqp.connect('amqp://localhost').then(function(conn) { console.log(" [x] '%s'", msg.content.toString()); } }); -}).then(null, console.warn); +}).catch(console.warn); diff --git a/examples/tutorials/receive_logs_direct.js b/examples/tutorials/receive_logs_direct.js index 7122f153..17fc22de 100755 --- a/examples/tutorials/receive_logs_direct.js +++ b/examples/tutorials/receive_logs_direct.js @@ -1,7 +1,7 @@ #!/usr/bin/env node var amqp = require('amqplib'); -var all = require('when').all; +var all = require('bluebird').all; var basename = require('path').basename; var severities = process.argv.slice(2); @@ -42,4 +42,4 @@ amqp.connect('amqp://localhost').then(function(conn) { msg.content.toString()); } }); -}).then(null, console.warn); +}).catch(console.warn); diff --git a/examples/tutorials/receive_logs_topic.js b/examples/tutorials/receive_logs_topic.js index ac3329a1..3e8eb6f0 100755 --- a/examples/tutorials/receive_logs_topic.js +++ b/examples/tutorials/receive_logs_topic.js @@ -2,7 +2,7 @@ var amqp = require('amqplib'); var basename = require('path').basename; -var all = require('when').all; +var all = require('bluebird').all; var keys = process.argv.slice(2); if (keys.length < 1) { @@ -16,29 +16,29 @@ amqp.connect('amqp://localhost').then(function(conn) { return conn.createChannel().then(function(ch) { var ex = 'topic_logs'; var ok = ch.assertExchange(ex, 'topic', {durable: false}); - + ok = ok.then(function() { return ch.assertQueue('', {exclusive: true}); }); - + ok = ok.then(function(qok) { var queue = qok.queue; return all(keys.map(function(rk) { ch.bindQueue(queue, ex, rk); })).then(function() { return queue; }); }); - + ok = ok.then(function(queue) { return ch.consume(queue, logMessage, {noAck: true}); }); return ok.then(function() { console.log(' [*] Waiting for logs. To exit press CTRL+C.'); }); - + function logMessage(msg) { console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString()); } }); -}).then(null, console.warn); +}).catch(console.warn); diff --git a/examples/tutorials/rpc_client.js b/examples/tutorials/rpc_client.js index cb1f6bee..1215cde1 100755 --- a/examples/tutorials/rpc_client.js +++ b/examples/tutorials/rpc_client.js @@ -2,8 +2,7 @@ var amqp = require('amqplib'); var basename = require('path').basename; -var when = require('when'); -var defer = when.defer; +var Promise = require('bluebird'); var uuid = require('node-uuid'); // I've departed from the form of the original RPC tutorial, which @@ -22,33 +21,33 @@ catch (e) { } amqp.connect('amqp://localhost').then(function(conn) { - return when(conn.createChannel().then(function(ch) { - var answer = defer(); - var corrId = uuid(); - function maybeAnswer(msg) { - if (msg.properties.correlationId === corrId) { - answer.resolve(msg.content.toString()); + return conn.createChannel().then(function(ch) { + return new Promise(function(resolve) { + var corrId = uuid(); + function maybeAnswer(msg) { + if (msg.properties.correlationId === corrId) { + resolve(msg.content.toString()); + } } - } - var ok = ch.assertQueue('', {exclusive: true}) - .then(function(qok) { return qok.queue; }); + var ok = ch.assertQueue('', {exclusive: true}) + .then(function(qok) { return qok.queue; }); - ok = ok.then(function(queue) { - return ch.consume(queue, maybeAnswer, {noAck: true}) - .then(function() { return queue; }); - }); - - ok = ok.then(function(queue) { - console.log(' [x] Requesting fib(%d)', n); - ch.sendToQueue('rpc_queue', new Buffer(n.toString()), { - correlationId: corrId, replyTo: queue + ok = ok.then(function(queue) { + return ch.consume(queue, maybeAnswer, {noAck: true}) + .then(function() { return queue; }); }); - return answer.promise; - }); - return ok.then(function(fibN) { - console.log(' [.] Got %d', fibN); + ok = ok.then(function(queue) { + console.log(' [x] Requesting fib(%d)', n); + ch.sendToQueue('rpc_queue', new Buffer(n.toString()), { + correlationId: corrId, replyTo: queue + }); + }); }); - })).ensure(function() { conn.close(); }); -}).then(null, console.warn); + }) + .then(function(fibN) { + console.log(' [.] Got %d', fibN); + }) + .finally(function() { conn.close(); }); +}).catch(console.warn); diff --git a/examples/tutorials/rpc_server.js b/examples/tutorials/rpc_server.js index fb1ba0b1..bdb209be 100755 --- a/examples/tutorials/rpc_server.js +++ b/examples/tutorials/rpc_server.js @@ -36,4 +36,4 @@ amqp.connect('amqp://localhost').then(function(conn) { ch.ack(msg); } }); -}).then(null, console.warn); +}).catch(console.warn); diff --git a/examples/tutorials/send.js b/examples/tutorials/send.js index 0cd33e32..7ff0ecf5 100755 --- a/examples/tutorials/send.js +++ b/examples/tutorials/send.js @@ -1,10 +1,9 @@ #!/usr/bin/env node var amqp = require('amqplib'); -var when = require('when'); amqp.connect('amqp://localhost').then(function(conn) { - return when(conn.createChannel().then(function(ch) { + return conn.createChannel().then(function(ch) { var q = 'hello'; var msg = 'Hello World!'; @@ -20,5 +19,5 @@ amqp.connect('amqp://localhost').then(function(conn) { console.log(" [x] Sent '%s'", msg); return ch.close(); }); - })).ensure(function() { conn.close(); }); -}).then(null, console.warn); + }).finally(function() { conn.close(); }); +}).catch(console.warn); diff --git a/examples/tutorials/worker.js b/examples/tutorials/worker.js index 864010ce..fdb72dfd 100755 --- a/examples/tutorials/worker.js +++ b/examples/tutorials/worker.js @@ -25,4 +25,4 @@ amqp.connect('amqp://localhost').then(function(conn) { }, secs * 1000); } }); -}).then(null, console.warn); +}).catch(console.warn); diff --git a/examples/waitForConfirms.js b/examples/waitForConfirms.js index 0dd5da1b..60cd6d37 100644 --- a/examples/waitForConfirms.js +++ b/examples/waitForConfirms.js @@ -12,11 +12,11 @@ function mkCallback(i) { amqp.connect().then(function(c) { c.createConfirmChannel().then(function(ch) { for (var i=0; i < NUM_MSGS; i++) { - ch.publish('amq.topic', 'whatever', new Buffer('blah'), {}, mkCallback(i)); + ch.publish('amq.topic', 'whatever', new Buffer('blah'), {}, mkCallback(i)); } ch.waitForConfirms().then(function() { console.log('All messages done'); c.close(); - }, console.error); + }).catch(console.error); }); }); From 3fcd53d5e0d8f1c764b68b7acbcf5227be5a7eb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Fantone?= Date: Tue, 1 Nov 2016 14:50:53 +0000 Subject: [PATCH 3/4] Fix check reference not defined --- test/channel_api.js | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/test/channel_api.js b/test/channel_api.js index 8a258a84..877050ac 100644 --- a/test/channel_api.js +++ b/test/channel_api.js @@ -147,11 +147,16 @@ function waitForQueue(q, condition) { return c.createChannel() .then(function(ch) { return ch.checkQueue(q).then(function(qok) { - if (condition(qok)) { - c.close(); - return qok; + function check() { + return ch.checkQueue(q).then(function(qok) { + if (condition(qok)) { + c.close(); + return qok; + } + else schedule(check); + }); } - else schedule(check); + return check(); }); }); }); @@ -373,8 +378,9 @@ chtest("cancel consumer", function(ch) { .then(function(m) { // I'm going to reject it, because I flip succeed/fail // just below - if (m.content.toString() === 'bar') + if (m.content.toString() === 'bar') { throw new Error(); + } }); return expectFail(recv2); From 23b24475403ac2789709f388714c4281b4e4205c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Fantone?= Date: Tue, 1 Nov 2016 14:52:22 +0000 Subject: [PATCH 4/4] Update install cmd on examples --- examples/tutorials/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/tutorials/README.md b/examples/tutorials/README.md index d234d994..333fda3f 100644 --- a/examples/tutorials/README.md +++ b/examples/tutorials/README.md @@ -9,7 +9,7 @@ of the tutorial programs to the callback-oriented API. To run the tutorial code, you need amqplib installed. Assuming you are in a clone of the amqplib repository, from the tutorials directory: - npm install ../.. + npm install or to use the latest released version,