Skip to content

Commit

Permalink
Revise the handling of exceptions thrown in callbacks
Browse files Browse the repository at this point in the history
On second thoughts, exceptions thrown in callbacks *should* crash the
program, since they are likely to indicate bugs. Redirecting them to
an error event is conflating them with operational errors -- dropped
connections and the like.

It's possible to use domains (available since Node.JS 0.8) to redirect
otherwise uncaught exceptions, as demonstrated by the tests.
  • Loading branch information
squaremo committed May 13, 2014
1 parent c007e43 commit ae5a495
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 28 deletions.
24 changes: 9 additions & 15 deletions lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,17 @@ CallbackModel.prototype.createChannel = function(cb) {
return ch;
};

// Wrap an RPC callback so that it translates errors thrown in the
// callback to channel errors. This also accounts for missing
// callbacks (usually a result of no function being passed to an RPC
// method), and makes sure the callback is invoked with either `(null,
// value)` or `(error)`, i.e., never two non-null values.
// Wrap an RPC callback to make sure the callback is invoked with
// either `(null, value)` or `(error)`, i.e., never two non-null
// values. Also substitutes a stub if the callback is `undefined` or
// otherwise falsey, for convenience in methods for which the callback
// is optional (that is, most of them).
function callbackWrapper(ch, cb) {
return (cb !== undefined) ? function(err, ok) {
try {
if (err === null) {
cb(null, ok);
}
else cb(err);
}
catch (e) {
ch.closeWithError("Application error",
defs.constants.INTERNAL_ERROR, e);
return (cb) ? function(err, ok) {
if (err === null) {
cb(null, ok);
}
else cb(err);
} : function() {};
}

Expand Down
46 changes: 33 additions & 13 deletions test/callback_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ var api = require('../callback_api');
var util = require('./util');
var schedule = util.schedule;
var randomString = util.randomString;
var domain = require('domain');

var URL = process.env.URL || 'amqp://localhost';

function connect(cb) {
api.connect(URL, {}, cb);
}

// Suitable for supplying a `done` value
// Construct a node-style callback from a `done` function
function doneCallback(done) {
return function(err, _) {
if (err == null) done();
Expand Down Expand Up @@ -224,45 +225,64 @@ confirm_channel_test('Receive confirmation', function(ch, done) {

suite("Error handling", function() {

channel_test('Channel open callback throws an error', function(ch, done) {
ch.on('error', failCallback(done));
// TODO: refactor {error_test, channel_test}
function error_test(name, fun) {
test(name, function(done) {
var dom = domain.createDomain();
dom.run(function() {
connect(kCallback(function(c) {
// Seems like there were some unironed wrinkles in 0.8's
// implementation of domains; explicitly adding the connection
// to the domain makes sure any exception thrown in the course
// of processing frames is handled by the domain.
dom.add(c);
c.createChannel(kCallback(function(ch) {
fun(ch, done, dom);
}, done));
}, done));
});
});
}

error_test('Channel open callback throws an error', function(ch, done, dom) {
dom.on('error', failCallback(done));
throw new Error('Error in open callback');
});

channel_test('RPC callback throws error', function(ch, done) {
ch.on('error', failCallback(done));
error_test('RPC callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.prefetch(0, false, function(err, ok) {
throw new Error('Spurious callback error');
});
});

channel_test('Get callback throws error', function(ch, done) {
ch.on('error', failCallback(done));
error_test('Get callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.assertQueue('test.cb.get-with-error', {}, function(err, ok) {
ch.get('test.cb.get-with-error', {noAck: true}, function() {
throw new Error('Spurious callback error');
});
});
});

channel_test('Consume callback throws error', function(ch, done) {
ch.on('error', failCallback(done));
error_test('Consume callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.assertQueue('test.cb.consume-with-error', {}, function(err, ok) {
ch.consume('test.cb.consume-with-error', ignore, {noAck: true}, function() {
throw new Error('Spurious callback error');
});
});
});

channel_test('Get from non-queue invokes error k', function(ch, done) {
error_test('Get from non-queue invokes error k', function(ch, done, dom) {
var both = twice(failCallback(done));
ch.on('error', both.first);
dom.on('error', both.first);
ch.get('', {}, both.second);
});

channel_test('Consume from non-queue invokes error k', function(ch, done) {
error_test('Consume from non-queue invokes error k', function(ch, done, dom) {
var both = twice(failCallback(done));
ch.on('error', both.first);
dom.on('error', both.first);
ch.consume('', both.second);
});

Expand Down

0 comments on commit ae5a495

Please sign in to comment.