Skip to content

Commit

Permalink
Migrate when -> bluebird
Browse files Browse the repository at this point in the history
  • Loading branch information
nfantone committed Nov 1, 2016
1 parent e4e8b8a commit 911cd10
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 247 deletions.
12 changes: 6 additions & 6 deletions channel_api.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
var raw_connect = require('./lib/connect').connect;
var ChannelModel = require('./lib/channel_model').ChannelModel;
var defer = require('when').defer;
var Promise = require('bluebird');

function connect(url, connOptions) {
var opened = defer();
raw_connect(url, connOptions, function(err, conn) {
if (err === null) opened.resolve(new ChannelModel(conn));
else opened.reject(err);
return Promise.fromCallback(function(cb) {
return raw_connect(url, connOptions, cb);
})
.then(function(conn) {
return new ChannelModel(conn);
});
return opened.promise;
};

module.exports.connect = connect;
Expand Down
19 changes: 10 additions & 9 deletions lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict';

var defs = require('./defs');
var when = require('when'), defer = when.defer;
var Promise = require('bluebird');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
Expand Down Expand Up @@ -313,15 +313,16 @@ ConfirmChannel.prototype.waitForConfirms = function(k) {
unconfirmed.forEach(function(val, index) {
if (val === null); // already confirmed
else {
var confirmed = defer();
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) confirmed.resolve();
else confirmed.reject(err);
};
awaiting.push(confirmed.promise);
var confirmed = new Promise(function(resolve, reject) {
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) resolve();
else reject(err);
};
});
awaiting.push(confirmed);
}
});
return when.all(awaiting).then(function() { k(); },
return Promise.all(awaiting).then(function() { k(); },
function(err) { k(err); });
};
121 changes: 56 additions & 65 deletions lib/channel_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict';

var defs = require('./defs');
var when = require('when'), defer = when.defer;
var Promise = require('bluebird');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
Expand All @@ -29,12 +29,7 @@ module.exports.ChannelModel = ChannelModel;
var CM = ChannelModel.prototype;

CM.close = function() {
var closed = defer();
this.connection.close(function (err) {
if (err === null) closed.resolve();
else closed.reject(err);
});
return closed.promise;
return Promise.fromCallback(this.connection.close.bind(this.connection));
};

// Channels
Expand All @@ -59,28 +54,29 @@ var C = Channel.prototype;
// response's fields; this is intended to be suitable for implementing
// API procedures.
C.rpc = function(method, fields, expect) {
var reply = defer();
this._rpc(method, fields, expect, function(err, f) {
if (err !== null) reply.reject(err);
else reply.resolve(f.fields);
var self = this;
return Promise.fromCallback(function(cb) {
return self._rpc(method, fields, expect, cb);
})
.then(function(f) {
return f.fields;
});
return reply.promise;
};

// Do the remarkably simple channel open handshake
C.open = function() {
return when.try(this.allocate.bind(this)).then(
return Promise.try(this.allocate.bind(this)).then(
function(ch) {
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
defs.ChannelOpenOk);
});
};

C.close = function() {
var closed = defer();
this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
closed.resolve)
return closed.promise;
return Promise.fromCallback(function(cb) {
return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
cb);
});
};

// === Public API, declaring queues and stuff ===
Expand Down Expand Up @@ -151,7 +147,7 @@ C.bindExchange = function(dest, source, pattern, argt) {
C.unbindExchange = function(dest, source, pattern, argt) {
return this.rpc(defs.ExchangeUnbind,
Args.unbindExchange(dest, source, pattern, argt),
defs.ExchangeUnbindOk);
defs.ExchangeUnbindOk);
};

// Working with messages
Expand All @@ -170,58 +166,52 @@ C.consume = function(queue, callback, options) {
// NB we want the callback to be run synchronously, so that we've
// registered the consumerTag before any messages can arrive.
var fields = Args.consume(queue, options);
var reply = defer();
this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk,
function(err, ok) {
if (err === null) {
self.registerConsumer(ok.fields.consumerTag,
callback);
reply.resolve(ok.fields);
}
else reply.reject(err);
});
return reply.promise;
return Promise.fromCallback(function(cb) {
self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
})
.then(function(ok) {
self.registerConsumer(ok.fields.consumerTag, callback);
return ok.fields;
});
};

C.cancel = function(consumerTag) {
var self = this;
var reply = defer();
this._rpc(defs.BasicCancel, Args.cancel(consumerTag),
defs.BasicCancelOk,
function(err, ok) {
if (err === null) {
self.unregisterConsumer(consumerTag);
reply.resolve(ok.fields);
}
else reply.reject(err);
});
return reply.promise;
return Promise.fromCallback(function(cb) {
self._rpc(defs.BasicCancel, Args.cancel(consumerTag),
defs.BasicCancelOk,
cb);
})
.then(function(ok) {
self.unregisterConsumer(consumerTag);
return ok.fields;
});
};

C.get = function(queue, options) {
var reply = defer();
var self = this;
var fields = Args.get(queue, options);
this.sendOrEnqueue(defs.BasicGet, fields, function(err, f) {
if (err === null) {
if (f.id === defs.BasicGetEmpty) {
reply.resolve(false);
}
else if (f.id === defs.BasicGetOk) {
var fields = f.fields;
return Promise.fromCallback(function(cb) {
return self.sendOrEnqueue(defs.BasicGet, fields, cb);
})
.then(function(f) {
if (f.id === defs.BasicGetEmpty) {
return false;
}
else if (f.id === defs.BasicGetOk) {
var fields = f.fields;
return new Promise(function(resolve) {
self.handleMessage = acceptMessage(function(m) {
m.fields = fields;
reply.resolve(m);
resolve(m);
});
}
else {
reply.reject(new Error("Unexpected response to BasicGet: " +
inspect(f)));
}
});
}
else reply.reject(err);
});
return reply.promise;
else {
throw new Error("Unexpected response to BasicGet: " +
inspect(f));
}
})
};

C.ack = function(message, allUpTo) {
Expand Down Expand Up @@ -313,14 +303,15 @@ CC.waitForConfirms = function() {
unconfirmed.forEach(function(val, index) {
if (val === null); // already confirmed
else {
var confirmed = defer();
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) confirmed.resolve();
else confirmed.reject(err);
};
awaiting.push(confirmed.promise);
var confirmed = new Promise(function(resolve, reject) {
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) resolve();
else reject(err);
};
});
awaiting.push(confirmed);
}
});
return when.all(awaiting);
return Promise.all(awaiting);
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"bitsyntax": "~0.0.4",
"buffer-more-ints": "0.0.2",
"readable-stream": "1.x >=1.1.9",
"when": "~3.6.2"
"bluebird": "^3.4.6"
},
"devDependencies": {
"mocha": "~1",
Expand Down
69 changes: 37 additions & 32 deletions test/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
'use strict';

var assert = require('assert');
var defer = require('when').defer;
var Promise = require('bluebird');
var Channel = require('../lib/channel').Channel;
var Connection = require('../lib/connection').Connection;
var util = require('./util');
Expand Down Expand Up @@ -76,14 +76,12 @@ var DELIVER_FIELDS = {
};

function open(ch) {
var opened = defer();
ch.allocate();
ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk,
function(err, _) {
if (err === null) opened.resolve();
else opened.reject(err);
});
return opened.promise;
return Promise.try(function() {
ch.allocate();
return Promise.fromCallback(function(cb) {
ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, cb);
});
});
}

suite("channel open and close", function() {
Expand Down Expand Up @@ -112,10 +110,9 @@ test("open, close", channelTest(
function(ch, done) {
open(ch)
.then(function() {
var closed = defer();
ch.closeBecause("Bye", defs.constants.REPLY_SUCCESS,
closed.resolve);
return closed.promise;
return new Promise(function(resolve) {
ch.closeBecause("Bye", defs.constants.REPLY_SUCCESS, resolve);
});
})
.then(succeed(done), fail(done));
},
Expand Down Expand Up @@ -204,7 +201,7 @@ test("RPC", channelTest(
prefetchSize: 0,
global: false
};

ch._rpc(defs.BasicQos, fields, defs.BasicQosOk, wheeboom);
ch._rpc(defs.BasicQos, fields, defs.BasicQosOk, wheeboom);
ch._rpc(defs.BasicQos, fields, defs.BasicQosOk, wheeboom);
Expand Down Expand Up @@ -233,7 +230,7 @@ test("Bad RPC", channelTest(
assert.strictEqual(505, error.code);
succeed(errLatch)();
});

open(ch)
.then(function() {
ch._rpc(defs.BasicRecover, {requeue: true}, defs.BasicRecoverOk,
Expand All @@ -257,27 +254,34 @@ test("Bad RPC", channelTest(
test("RPC on closed channel", channelTest(
function(ch, done) {
open(ch);
var close = defer(), fail1 = defer(), fail2 = defer();
ch.on('error', function(error) {
assert.strictEqual(504, error.code);
close.resolve();

var close = new Promise(function(resolve) {
ch.on('error', function(error) {
assert.strictEqual(504, error.code);
resolve();
});
});

function failureCb(d) {
function failureCb(resolve, reject) {
return function(err) {
if (err !== null) d.resolve();
else d.reject();
if (err !== null) resolve();
else reject();
}
}

ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk,
failureCb(fail1));
ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk,
failureCb(fail2));
close.promise
.then(function() { return fail1.promise; })
.then(function() { return fail2.promise; })
.then(succeed(done), fail(done));
var fail1 = new Promise(function(resolve, reject) {
return ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk,
failureCb(resolve, reject));
});

var fail2 = new Promise(function(resolve, reject) {
return ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk,
failureCb(resolve, reject));
});

Promise.join(close, fail1, fail2)
.then(succeed(done))
.catch(fail(done));
},
function(send, await, done, ch) {
await(defs.BasicRecover)()
Expand All @@ -289,7 +293,8 @@ test("RPC on closed channel", channelTest(
}, ch);
return await(defs.ChannelCloseOk);
})
.then(succeed(done), fail(done));
.then(succeed(done))
.catch(fail(done));
}));

test("publish all < single chunk threshold", channelTest(
Expand Down Expand Up @@ -453,7 +458,7 @@ test("bad properties send", channelTest(
},
function(send, await, done, ch) {
done();
}));
}));

test("bad consumer", channelTest(
function(ch, done) {
Expand Down
Loading

0 comments on commit 911cd10

Please sign in to comment.