Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate when -> bluebird #295

Merged
merged 4 commits into from
Nov 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions channel_api.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
var raw_connect = require('./lib/connect').connect;
var ChannelModel = require('./lib/channel_model').ChannelModel;
var defer = require('when').defer;
var Promise = require('bluebird');

function connect(url, connOptions) {
var opened = defer();
raw_connect(url, connOptions, function(err, conn) {
if (err === null) opened.resolve(new ChannelModel(conn));
else opened.reject(err);
return Promise.fromCallback(function(cb) {
return raw_connect(url, connOptions, cb);
})
.then(function(conn) {
return new ChannelModel(conn);
});
return opened.promise;
};

module.exports.connect = connect;
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ of the tutorial programs to the callback-oriented API.
To run the tutorial code, you need amqplib installed. Assuming you are
in a clone of the amqplib repository, from the tutorials directory:

npm install ../..
npm install

or to use the latest released version,

Expand Down
7 changes: 3 additions & 4 deletions examples/tutorials/emit_log.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var when = require('when');

amqp.connect('amqp://localhost').then(function(conn) {
return when(conn.createChannel().then(function(ch) {
return conn.createChannel().then(function(ch) {
var ex = 'logs';
var ok = ch.assertExchange(ex, 'fanout', {durable: false})

Expand All @@ -16,5 +15,5 @@ amqp.connect('amqp://localhost').then(function(conn) {
console.log(" [x] Sent '%s'", message);
return ch.close();
});
})).ensure(function() { conn.close(); });
}).then(null, console.warn);
}).finally(function() { conn.close(); });
}).catch(console.warn);
7 changes: 3 additions & 4 deletions examples/tutorials/emit_log_direct.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var when = require('when');

var args = process.argv.slice(2);
var severity = (args.length > 0) ? args[0] : 'info';
var message = args.slice(1).join(' ') || 'Hello World!';

amqp.connect('amqp://localhost').then(function(conn) {
return when(conn.createChannel().then(function(ch) {
return conn.createChannel().then(function(ch) {
var ex = 'direct_logs';
var ok = ch.assertExchange(ex, 'direct', {durable: false});

Expand All @@ -17,5 +16,5 @@ amqp.connect('amqp://localhost').then(function(conn) {
console.log(" [x] Sent %s:'%s'", severity, message);
return ch.close();
});
})).ensure(function() { conn.close(); });
}).then(null, console.warn);
}).finally(function() { conn.close(); });
}).catch(console.warn);
7 changes: 3 additions & 4 deletions examples/tutorials/emit_log_topic.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var when = require('when');

var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'info';
var message = args.slice(1).join(' ') || 'Hello World!';

amqp.connect('amqp://localhost').then(function(conn) {
return when(conn.createChannel().then(function(ch) {
return conn.createChannel().then(function(ch) {
var ex = 'topic_logs';
var ok = ch.assertExchange(ex, 'topic', {durable: false});
return ok.then(function() {
ch.publish(ex, key, new Buffer(message));
console.log(" [x] Sent %s:'%s'", key, message);
return ch.close();
});
})).ensure(function() { conn.close(); })
}).then(null, console.log);
}).finally(function() { conn.close(); })
}).catch(console.log);
9 changes: 4 additions & 5 deletions examples/tutorials/new_task.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@
// Post a new task to the work queue

var amqp = require('amqplib');
var when = require('when');

amqp.connect('amqp://localhost').then(function(conn) {
return when(conn.createChannel().then(function(ch) {
return conn.createChannel().then(function(ch) {
var q = 'task_queue';
var ok = ch.assertQueue(q, {durable: true});

return ok.then(function() {
var msg = process.argv.slice(2).join(' ') || "Hello World!";
ch.sendToQueue(q, new Buffer(msg), {deliveryMode: true});
console.log(" [x] Sent '%s'", msg);
return ch.close();
});
})).ensure(function() { conn.close(); });
}).then(null, console.warn);
}).finally(function() { conn.close(); });
}).catch(console.warn);
5 changes: 2 additions & 3 deletions examples/tutorials/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
"description": "The RabbitMQ tutorials, ported to amqplib",
"main": "send.js",
"dependencies": {
"amqplib": "",
"when": "",
"node-uuid": ""
"amqplib": "../..",
"node-uuid": "*"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
Expand Down
8 changes: 4 additions & 4 deletions examples/tutorials/receive.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ var amqp = require('amqplib');
amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {

var ok = ch.assertQueue('hello', {durable: false});

ok = ok.then(function(_qok) {
return ch.consume('hello', function(msg) {
console.log(" [x] Received '%s'", msg.content.toString());
}, {noAck: true});
});

return ok.then(function(_consumeOk) {
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}).then(null, console.warn);
}).catch(console.warn);
2 changes: 1 addition & 1 deletion examples/tutorials/receive_logs.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ amqp.connect('amqp://localhost').then(function(conn) {
console.log(" [x] '%s'", msg.content.toString());
}
});
}).then(null, console.warn);
}).catch(console.warn);
4 changes: 2 additions & 2 deletions examples/tutorials/receive_logs_direct.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var all = require('when').all;
var all = require('bluebird').all;
var basename = require('path').basename;

var severities = process.argv.slice(2);
Expand Down Expand Up @@ -42,4 +42,4 @@ amqp.connect('amqp://localhost').then(function(conn) {
msg.content.toString());
}
});
}).then(null, console.warn);
}).catch(console.warn);
12 changes: 6 additions & 6 deletions examples/tutorials/receive_logs_topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var amqp = require('amqplib');
var basename = require('path').basename;
var all = require('when').all;
var all = require('bluebird').all;

var keys = process.argv.slice(2);
if (keys.length < 1) {
Expand All @@ -16,29 +16,29 @@ amqp.connect('amqp://localhost').then(function(conn) {
return conn.createChannel().then(function(ch) {
var ex = 'topic_logs';
var ok = ch.assertExchange(ex, 'topic', {durable: false});

ok = ok.then(function() {
return ch.assertQueue('', {exclusive: true});
});

ok = ok.then(function(qok) {
var queue = qok.queue;
return all(keys.map(function(rk) {
ch.bindQueue(queue, ex, rk);
})).then(function() { return queue; });
});

ok = ok.then(function(queue) {
return ch.consume(queue, logMessage, {noAck: true});
});
return ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});

function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
}
});
}).then(null, console.warn);
}).catch(console.warn);
51 changes: 25 additions & 26 deletions examples/tutorials/rpc_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

var amqp = require('amqplib');
var basename = require('path').basename;
var when = require('when');
var defer = when.defer;
var Promise = require('bluebird');
var uuid = require('node-uuid');

// I've departed from the form of the original RPC tutorial, which
Expand All @@ -22,33 +21,33 @@ catch (e) {
}

amqp.connect('amqp://localhost').then(function(conn) {
return when(conn.createChannel().then(function(ch) {
var answer = defer();
var corrId = uuid();
function maybeAnswer(msg) {
if (msg.properties.correlationId === corrId) {
answer.resolve(msg.content.toString());
return conn.createChannel().then(function(ch) {
return new Promise(function(resolve) {
var corrId = uuid();
function maybeAnswer(msg) {
if (msg.properties.correlationId === corrId) {
resolve(msg.content.toString());
}
}
}

var ok = ch.assertQueue('', {exclusive: true})
.then(function(qok) { return qok.queue; });
var ok = ch.assertQueue('', {exclusive: true})
.then(function(qok) { return qok.queue; });

ok = ok.then(function(queue) {
return ch.consume(queue, maybeAnswer, {noAck: true})
.then(function() { return queue; });
});

ok = ok.then(function(queue) {
console.log(' [x] Requesting fib(%d)', n);
ch.sendToQueue('rpc_queue', new Buffer(n.toString()), {
correlationId: corrId, replyTo: queue
ok = ok.then(function(queue) {
return ch.consume(queue, maybeAnswer, {noAck: true})
.then(function() { return queue; });
});
return answer.promise;
});

return ok.then(function(fibN) {
console.log(' [.] Got %d', fibN);
ok = ok.then(function(queue) {
console.log(' [x] Requesting fib(%d)', n);
ch.sendToQueue('rpc_queue', new Buffer(n.toString()), {
correlationId: corrId, replyTo: queue
});
});
});
})).ensure(function() { conn.close(); });
}).then(null, console.warn);
})
.then(function(fibN) {
console.log(' [.] Got %d', fibN);
})
.finally(function() { conn.close(); });
}).catch(console.warn);
2 changes: 1 addition & 1 deletion examples/tutorials/rpc_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ amqp.connect('amqp://localhost').then(function(conn) {
ch.ack(msg);
}
});
}).then(null, console.warn);
}).catch(console.warn);
7 changes: 3 additions & 4 deletions examples/tutorials/send.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var when = require('when');

amqp.connect('amqp://localhost').then(function(conn) {
return when(conn.createChannel().then(function(ch) {
return conn.createChannel().then(function(ch) {
var q = 'hello';
var msg = 'Hello World!';

Expand All @@ -20,5 +19,5 @@ amqp.connect('amqp://localhost').then(function(conn) {
console.log(" [x] Sent '%s'", msg);
return ch.close();
});
})).ensure(function() { conn.close(); });
}).then(null, console.warn);
}).finally(function() { conn.close(); });
}).catch(console.warn);
2 changes: 1 addition & 1 deletion examples/tutorials/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ amqp.connect('amqp://localhost').then(function(conn) {
}, secs * 1000);
}
});
}).then(null, console.warn);
}).catch(console.warn);
4 changes: 2 additions & 2 deletions examples/waitForConfirms.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ function mkCallback(i) {
amqp.connect().then(function(c) {
c.createConfirmChannel().then(function(ch) {
for (var i=0; i < NUM_MSGS; i++) {
ch.publish('amq.topic', 'whatever', new Buffer('blah'), {}, mkCallback(i));
ch.publish('amq.topic', 'whatever', new Buffer('blah'), {}, mkCallback(i));
}
ch.waitForConfirms().then(function() {
console.log('All messages done');
c.close();
}, console.error);
}).catch(console.error);
});
});
19 changes: 10 additions & 9 deletions lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict';

var defs = require('./defs');
var when = require('when'), defer = when.defer;
var Promise = require('bluebird');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
Expand Down Expand Up @@ -313,15 +313,16 @@ ConfirmChannel.prototype.waitForConfirms = function(k) {
unconfirmed.forEach(function(val, index) {
if (val === null); // already confirmed
else {
var confirmed = defer();
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) confirmed.resolve();
else confirmed.reject(err);
};
awaiting.push(confirmed.promise);
var confirmed = new Promise(function(resolve, reject) {
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) resolve();
else reject(err);
};
});
awaiting.push(confirmed);
}
});
return when.all(awaiting).then(function() { k(); },
return Promise.all(awaiting).then(function() { k(); },
function(err) { k(err); });
};
Loading