Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling parse errors without crashing #43

Merged
merged 7 commits into from
Mar 16, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions lib/messagesequencer.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ function MessageSequencer(options, inner) {
this.pendingMessages = [];

inner.on('message', this.receiveMessage.bind(this));
inner.on('badmessage', this.receiveBadMessage.bind(this));
}

util.inherits(MessageSequencer, EventEmitter);
Expand All @@ -50,23 +51,40 @@ MessageSequencer.prototype.send = function(name, args) {
};

MessageSequencer.prototype.receiveMessage = function(sourceNodeId, name, message, seq) {
this.processMessage([sourceNodeId, name, message, seq]);
};

MessageSequencer.prototype.receiveBadMessage = function(sourceNodeId, name, seq) {
var badMsg = [sourceNodeId, name, null, seq];
badMsg.isBadMessage = true;
this.processMessage(badMsg);
};

MessageSequencer.prototype.processMessage = function(message) {
var seq = message[3];
if (this.nextExpectedMessageNumber === null) {
this.nextExpectedMessageNumber = seq;
}

if (seq >= this.nextExpectedMessageNumber) {
this.pendingMessages.push([sourceNodeId, name, message, seq]);
this.pendingMessages.sort(function (a, b) { return a[3] - b[3]; });

this.addPendingMessage(message);
this.sendPendingMessages();
}
};

MessageSequencer.prototype.addPendingMessage = function(message) {
this.pendingMessages.push(message);
this.pendingMessages.sort(function (a, b) { return a[3] - b[3]; });
};

MessageSequencer.prototype.sendPendingMessages = function() {
while (this.pendingMessages.length > 0 && this.pendingMessages[0][3] === this.nextExpectedMessageNumber)
{
this.emit.apply(this, ['message'].concat(this.pendingMessages[0]));
if (!this.pendingMessages[0].isBadMessage) {
this.emit.apply(this, ['message'].concat(this.pendingMessages[0]));
}
this.pendingMessages.shift();
++this.nextExpectedMessageNumber;
}
};

23 changes: 18 additions & 5 deletions lib/servicebusconnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ ServiceBusConnector.prototype.start = function () {

if (!err) {
var msg = self.unpackMessage(receivedMessage);
self.emit('message', msg.nodeId, msg.name, msg.args, msg.seq);
if (msg.args !== null) {
self.emit('message', msg.nodeId, msg.name, msg.args, msg.seq);
} else {
self.emit('badmessage', msg.nodeId, msg.name, msg.seq);
}
}

if (!self.shouldStop) {
Expand Down Expand Up @@ -93,16 +97,25 @@ ServiceBusConnector.prototype.packMessage = function(name, args) {
brokerProperties: {
CorrelationId: this.nodeId,
Label: name
}};
}
};
}

ServiceBusConnector.prototype.unpackMessage = function(message) {
return {
var result = {
name: message.brokerProperties.Label,
nodeId: message.brokerProperties.CorrelationId,
args: JSON.parse(message.body),
seq: +message.brokerProperties.SequenceNumber
seq: +message.brokerProperties.SequenceNumber,
args: null
};

try {
result.args = JSON.parse(message.body);
return result;
} catch (ex) {
// Issue unpacking the message, assume it's bad and toss it
return result;
}
}

function createRetryFilter(options) {
Expand Down
85 changes: 78 additions & 7 deletions test/sequencing-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ describe('Message sequencing layer', function () {

it('should register for message events from inner', function() {
innerInterface.on.calledWith('message').should.be.true;
innerInterface.on.calledWith('badmessage').should.be.true;
});

it('should start inner interface when started', function () {
Expand Down Expand Up @@ -86,7 +87,9 @@ describe('Message sequencing layer', function () {
beforeEach(function () {
innerInterface = {
on: function (msg, callback) {
receiveFunc = callback;
if (msg === 'message') {
receiveFunc = callback;
}
},
send: noop
};
Expand Down Expand Up @@ -121,7 +124,9 @@ describe('Message sequencing layer', function () {
beforeEach(function () {
innerInterface = {
on: function (msg, callback) {
send = callback;
if (msg === 'message') {
send = callback;
}
},
start: noop,
send: noop
Expand Down Expand Up @@ -199,7 +204,9 @@ describe('Message sequencing layer', function () {
beforeEach(function () {
innerInterface = {
on: function (msg, callback) {
send = callback;
if (msg === 'message') {
send = callback;
}
},
start: noop,
send: noop
Expand Down Expand Up @@ -278,16 +285,80 @@ describe('Message sequencing layer', function () {
});
});

describe('when receiving a bad message', function () {
var innerInterface;
var sequencer;
var sendMessage;
var sendBadMessage;
var receivedMessages;
var badMessages;
beforeEach(function () {
innerInterface = {
on: function (msg, callback) {
if (msg === 'message') {
sendMessage = callback;
}
if (msg === 'badmessage') {
sendBadMessage = callback;
}
},
start: noop,
send: noop
};

sequencer = new MessageSequencer(createOptions, innerInterface);
receivedMessages = [];
sequencer.on('message', function (sourceNodeId, msg, args, seq) {
receivedMessages.push([sourceNodeId, msg, args, seq]);
});
});

it('should drop skip bad messages', function () {
sendBadMessage('n1', 'badMessage', 1);
sendBadMessage('n2', 'badMessage', 2);

receivedMessages.should.have.length(0);
});

it('should ignore bad message in middle of good messages', function () {
sendMessage('n1', 'goodMessage', 'Hello', 0);
sendBadMessage('n1', 'badMessage', 1);
sendMessage('n1', 'goodMessage', 'world', 2);

receivedMessages.should.have.length(2);
[['n1', 'Hello', 0], ['n1', 'world', 2]].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]);
receivedMessages[i][3].should.equal(testData[2]);
});
});

it('should ignore bad message in out of sequence messages', function () {
sendMessage('n1', 'msg', '0', 0);
sendMessage('n1', 'msg', '4', 4);
sendBadMessage('n1', 'msg', 1);
sendMessage('n1', 'msg', '2', 2);
sendBadMessage('n1', 'msg', 3);

receivedMessages.should.have.length(3);
[['n1', '0', 0], ['n1', '2', 2], ['n1', '4', 4]].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]);
receivedMessages[i][3].should.equal(testData[2]);
});
});
});

describe('when stopping', function () {
var innerInterface;
var sequencer;

beforeEach(function () {
innerInterface = {
on: noop,
start: sinon.spy(),
stop: sinon.spy(),
};
on: noop,
start: sinon.spy(),
stop: sinon.spy(),
};

sequencer = new MessageSequencer(createOptions, innerInterface);
});
Expand Down
60 changes: 48 additions & 12 deletions test/servicebusconnector-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ describe('Service Bus connection layer', function () {
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], 8));
});

it('should pass nodeId from received message', function (done) {
Expand All @@ -160,7 +160,7 @@ describe('Service Bus connection layer', function () {
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], 8));
});

it('should pass message name from received message', function (done) {
Expand All @@ -169,7 +169,7 @@ describe('Service Bus connection layer', function () {
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], 8));
});

it('should pass message arguments from received message', function (done) {
Expand All @@ -181,24 +181,59 @@ describe('Service Bus connection layer', function () {
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], 8));
});

it('should pass sequence number from message properties', function(done) {
connector.on('message', function (nodeId, name, args, seq) {
seq.should.equal(7);
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 5, 9], 7));
});

it('should repoll service bus after message is received', function () {
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], 8));

sb.receiveSubscriptionMessage.calledTwice.should.be.true;
});

it('should not raise event and repoll on receive error', function (done) {
connector.on('message', function (nodeId, name, args, metadata) {
connector.on('message', function (nodeId, name, args, seq) {
done(new Error('Should not be called'));
});

receive(new Error('Fake error'), null);
sb.receiveSubscriptionMessage.calledTwice.should.be.true;
done();
});

it('should not raise message event and repoll on undeserializable message', function (done) {
connector.on('message', function (nodeId, name, args, seq) {
done(new Error('Message received when deserialization fails. This should not happen.'));
});

var msg = packMessage(connector, 'anotherNode', 'aMessage', null, 12);
msg.body = 'This is not valid JSON';
receive(null, msg);
sb.receiveSubscriptionMessage.calledTwice.should.be.true;
done();
});

it('should raise badmessage event on undeserializable message', function (done) {
connector.on('badmessage', function (nodeId, name, seq) {
nodeId.should.equal('anotherNode');
name.should.equal('aMessage');
seq.should.equal(12);
done();
});

var msg = packMessage(connector, 'anotherNode', 'aMessage', null, 12);
msg.body = 'This is not valid JSON';
receive(null, msg);
sb.receiveSubscriptionMessage.calledTwice.should.be.true;
});
});

describe('when receiving with multiple receives at a time', function () {
Expand Down Expand Up @@ -229,25 +264,25 @@ describe('Service Bus connection layer', function () {
});

it('should raise message event when message is received', function (done) {
connector.on('message', function (nodeId, name, args, metadata) {
connector.on('message', function (nodeId, name, args, seq) {
done();
});

receive[0](null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], {seq: 8, next: 11}));
receive[0](null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], 8));
receive.shift();
});

it('should repoll service bus after message is received', function () {
var originalCalls = sb.receiveSubscriptionMessage.callCount;
receive[0](null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
receive[0](null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], 8));
receive.shift();

sb.receiveSubscriptionMessage.callCount.should.equal(originalCalls + 1);
});

it('should not raise event and repoll on receive error', function (done) {
var originalCalls = sb.receiveSubscriptionMessage.callCount;
connector.on('message', function (nodeId, name, args, metadata) {
connector.on('message', function (nodeId, name, args, seq) {
done(new Error('Should not be called'));
});

Expand Down Expand Up @@ -333,8 +368,9 @@ function makeConnector(serviceBus, callback) {
callback(serviceBus, connector);
}

function packMessage(connector, sourceNode, message, args, metadata) {
var packed = connector.packMessage(message, args, metadata);
function packMessage(connector, sourceNode, message, args, sequenceNumber) {
var packed = connector.packMessage(message, args);
packed.brokerProperties.CorrelationId = sourceNode;
packed.brokerProperties.SequenceNumber = sequenceNumber;
return packed;
}