Skip to content

Commit

Permalink
Merge pull request #43 from christav/master
Browse files Browse the repository at this point in the history
Handling parse errors without crashing
  • Loading branch information
Chris Tavares committed Mar 16, 2013
2 parents 6f4e69a + 1f7fe79 commit affbd03
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 28 deletions.
26 changes: 22 additions & 4 deletions lib/messagesequencer.js
Expand Up @@ -27,6 +27,7 @@ function MessageSequencer(options, inner) {
this.pendingMessages = [];

inner.on('message', this.receiveMessage.bind(this));
inner.on('badmessage', this.receiveBadMessage.bind(this));
}

util.inherits(MessageSequencer, EventEmitter);
Expand All @@ -50,23 +51,40 @@ MessageSequencer.prototype.send = function(name, args) {
};

MessageSequencer.prototype.receiveMessage = function(sourceNodeId, name, message, seq) {
this.processMessage([sourceNodeId, name, message, seq]);
};

MessageSequencer.prototype.receiveBadMessage = function(sourceNodeId, name, seq) {
var badMsg = [sourceNodeId, name, null, seq];
badMsg.isBadMessage = true;
this.processMessage(badMsg);
};

MessageSequencer.prototype.processMessage = function(message) {
var seq = message[3];
if (this.nextExpectedMessageNumber === null) {
this.nextExpectedMessageNumber = seq;
}

if (seq >= this.nextExpectedMessageNumber) {
this.pendingMessages.push([sourceNodeId, name, message, seq]);
this.pendingMessages.sort(function (a, b) { return a[3] - b[3]; });

this.addPendingMessage(message);
this.sendPendingMessages();
}
};

MessageSequencer.prototype.addPendingMessage = function(message) {
this.pendingMessages.push(message);
this.pendingMessages.sort(function (a, b) { return a[3] - b[3]; });
};

MessageSequencer.prototype.sendPendingMessages = function() {
while (this.pendingMessages.length > 0 && this.pendingMessages[0][3] === this.nextExpectedMessageNumber)
{
this.emit.apply(this, ['message'].concat(this.pendingMessages[0]));
if (!this.pendingMessages[0].isBadMessage) {
this.emit.apply(this, ['message'].concat(this.pendingMessages[0]));
}
this.pendingMessages.shift();
++this.nextExpectedMessageNumber;
}
};

23 changes: 18 additions & 5 deletions lib/servicebusconnector.js
Expand Up @@ -51,7 +51,11 @@ ServiceBusConnector.prototype.start = function () {

if (!err) {
var msg = self.unpackMessage(receivedMessage);
self.emit('message', msg.nodeId, msg.name, msg.args, msg.seq);
if (msg.args !== null) {
self.emit('message', msg.nodeId, msg.name, msg.args, msg.seq);
} else {
self.emit('badmessage', msg.nodeId, msg.name, msg.seq);
}
}

if (!self.shouldStop) {
Expand Down Expand Up @@ -93,16 +97,25 @@ ServiceBusConnector.prototype.packMessage = function(name, args) {
brokerProperties: {
CorrelationId: this.nodeId,
Label: name
}};
}
};
}

ServiceBusConnector.prototype.unpackMessage = function(message) {
return {
var result = {
name: message.brokerProperties.Label,
nodeId: message.brokerProperties.CorrelationId,
args: JSON.parse(message.body),
seq: +message.brokerProperties.SequenceNumber
seq: +message.brokerProperties.SequenceNumber,
args: null
};

try {
result.args = JSON.parse(message.body);
return result;
} catch (ex) {
// Issue unpacking the message, assume it's bad and toss it
return result;
}
}

function createRetryFilter(options) {
Expand Down
85 changes: 78 additions & 7 deletions test/sequencing-tests.js
Expand Up @@ -40,6 +40,7 @@ describe('Message sequencing layer', function () {

it('should register for message events from inner', function() {
innerInterface.on.calledWith('message').should.be.true;
innerInterface.on.calledWith('badmessage').should.be.true;
});

it('should start inner interface when started', function () {
Expand Down Expand Up @@ -86,7 +87,9 @@ describe('Message sequencing layer', function () {
beforeEach(function () {
innerInterface = {
on: function (msg, callback) {
receiveFunc = callback;
if (msg === 'message') {
receiveFunc = callback;
}
},
send: noop
};
Expand Down Expand Up @@ -121,7 +124,9 @@ describe('Message sequencing layer', function () {
beforeEach(function () {
innerInterface = {
on: function (msg, callback) {
send = callback;
if (msg === 'message') {
send = callback;
}
},
start: noop,
send: noop
Expand Down Expand Up @@ -199,7 +204,9 @@ describe('Message sequencing layer', function () {
beforeEach(function () {
innerInterface = {
on: function (msg, callback) {
send = callback;
if (msg === 'message') {
send = callback;
}
},
start: noop,
send: noop
Expand Down Expand Up @@ -278,16 +285,80 @@ describe('Message sequencing layer', function () {
});
});

describe('when receiving a bad message', function () {
var innerInterface;
var sequencer;
var sendMessage;
var sendBadMessage;
var receivedMessages;
var badMessages;
beforeEach(function () {
innerInterface = {
on: function (msg, callback) {
if (msg === 'message') {
sendMessage = callback;
}
if (msg === 'badmessage') {
sendBadMessage = callback;
}
},
start: noop,
send: noop
};

sequencer = new MessageSequencer(createOptions, innerInterface);
receivedMessages = [];
sequencer.on('message', function (sourceNodeId, msg, args, seq) {
receivedMessages.push([sourceNodeId, msg, args, seq]);
});
});

it('should drop skip bad messages', function () {
sendBadMessage('n1', 'badMessage', 1);
sendBadMessage('n2', 'badMessage', 2);

receivedMessages.should.have.length(0);
});

it('should ignore bad message in middle of good messages', function () {
sendMessage('n1', 'goodMessage', 'Hello', 0);
sendBadMessage('n1', 'badMessage', 1);
sendMessage('n1', 'goodMessage', 'world', 2);

receivedMessages.should.have.length(2);
[['n1', 'Hello', 0], ['n1', 'world', 2]].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]);
receivedMessages[i][3].should.equal(testData[2]);
});
});

it('should ignore bad message in out of sequence messages', function () {
sendMessage('n1', 'msg', '0', 0);
sendMessage('n1', 'msg', '4', 4);
sendBadMessage('n1', 'msg', 1);
sendMessage('n1', 'msg', '2', 2);
sendBadMessage('n1', 'msg', 3);

receivedMessages.should.have.length(3);
[['n1', '0', 0], ['n1', '2', 2], ['n1', '4', 4]].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]);
receivedMessages[i][3].should.equal(testData[2]);
});
});
});

describe('when stopping', function () {
var innerInterface;
var sequencer;

beforeEach(function () {
innerInterface = {
on: noop,
start: sinon.spy(),
stop: sinon.spy(),
};
on: noop,
start: sinon.spy(),
stop: sinon.spy(),
};

sequencer = new MessageSequencer(createOptions, innerInterface);
});
Expand Down
60 changes: 48 additions & 12 deletions test/servicebusconnector-tests.js
Expand Up @@ -151,7 +151,7 @@ describe('Service Bus connection layer', function () {
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], 8));
});

it('should pass nodeId from received message', function (done) {
Expand All @@ -160,7 +160,7 @@ describe('Service Bus connection layer', function () {
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], 8));
});

it('should pass message name from received message', function (done) {
Expand All @@ -169,7 +169,7 @@ describe('Service Bus connection layer', function () {
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], 8));
});

it('should pass message arguments from received message', function (done) {
Expand All @@ -181,24 +181,59 @@ describe('Service Bus connection layer', function () {
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], 8));
});

it('should pass sequence number from message properties', function(done) {
connector.on('message', function (nodeId, name, args, seq) {
seq.should.equal(7);
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [1, 5, 9], 7));
});

it('should repoll service bus after message is received', function () {
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], 8));

sb.receiveSubscriptionMessage.calledTwice.should.be.true;
});

it('should not raise event and repoll on receive error', function (done) {
connector.on('message', function (nodeId, name, args, metadata) {
connector.on('message', function (nodeId, name, args, seq) {
done(new Error('Should not be called'));
});

receive(new Error('Fake error'), null);
sb.receiveSubscriptionMessage.calledTwice.should.be.true;
done();
});

it('should not raise message event and repoll on undeserializable message', function (done) {
connector.on('message', function (nodeId, name, args, seq) {
done(new Error('Message received when deserialization fails. This should not happen.'));
});

var msg = packMessage(connector, 'anotherNode', 'aMessage', null, 12);
msg.body = 'This is not valid JSON';
receive(null, msg);
sb.receiveSubscriptionMessage.calledTwice.should.be.true;
done();
});

it('should raise badmessage event on undeserializable message', function (done) {
connector.on('badmessage', function (nodeId, name, seq) {
nodeId.should.equal('anotherNode');
name.should.equal('aMessage');
seq.should.equal(12);
done();
});

var msg = packMessage(connector, 'anotherNode', 'aMessage', null, 12);
msg.body = 'This is not valid JSON';
receive(null, msg);
sb.receiveSubscriptionMessage.calledTwice.should.be.true;
});
});

describe('when receiving with multiple receives at a time', function () {
Expand Down Expand Up @@ -229,25 +264,25 @@ describe('Service Bus connection layer', function () {
});

it('should raise message event when message is received', function (done) {
connector.on('message', function (nodeId, name, args, metadata) {
connector.on('message', function (nodeId, name, args, seq) {
done();
});

receive[0](null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], {seq: 8, next: 11}));
receive[0](null, packMessage(connector, 'anotherNode', 'aMessage', [1, 2, 3], 8));
receive.shift();
});

it('should repoll service bus after message is received', function () {
var originalCalls = sb.receiveSubscriptionMessage.callCount;
receive[0](null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
receive[0](null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], 8));
receive.shift();

sb.receiveSubscriptionMessage.callCount.should.equal(originalCalls + 1);
});

it('should not raise event and repoll on receive error', function (done) {
var originalCalls = sb.receiveSubscriptionMessage.callCount;
connector.on('message', function (nodeId, name, args, metadata) {
connector.on('message', function (nodeId, name, args, seq) {
done(new Error('Should not be called'));
});

Expand Down Expand Up @@ -333,8 +368,9 @@ function makeConnector(serviceBus, callback) {
callback(serviceBus, connector);
}

function packMessage(connector, sourceNode, message, args, metadata) {
var packed = connector.packMessage(message, args, metadata);
function packMessage(connector, sourceNode, message, args, sequenceNumber) {
var packed = connector.packMessage(message, args);
packed.brokerProperties.CorrelationId = sourceNode;
packed.brokerProperties.SequenceNumber = sequenceNumber;
return packed;
}

0 comments on commit affbd03

Please sign in to comment.