Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using service bus managed sequences numbers instead of doing it ourselves. #37

Merged
merged 2 commits into from Mar 4, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 16 additions & 33 deletions lib/messagesequencer.js
Expand Up @@ -23,8 +23,8 @@ module.exports = MessageSequencer;
function MessageSequencer(options, inner) { function MessageSequencer(options, inner) {
this.inner = inner; this.inner = inner;
this.started = false; this.started = false;
this.messageMetadata = { seq: 0, next: 1}; this.nextExpectedMessageNumber = null;
this.pendingMessages = {}; this.pendingMessages = [];


inner.on('message', this.receiveMessage.bind(this)); inner.on('message', this.receiveMessage.bind(this));
} }
Expand All @@ -46,44 +46,27 @@ MessageSequencer.prototype.stop = function(callback) {
}; };


MessageSequencer.prototype.send = function(name, args) { MessageSequencer.prototype.send = function(name, args) {
this.inner.send(name, args, this.messageMetadata); this.inner.send(name, args);
this.nextSeq();
}; };


MessageSequencer.prototype.receiveMessage = function(sourceNodeId, name, message, metadata) { MessageSequencer.prototype.receiveMessage = function(sourceNodeId, name, message, seq) {
var processFunc = this.processMessageFromKnownNode; if (this.nextExpectedMessageNumber === null) {
if (!this.pendingMessages[sourceNodeId]) { this.nextExpectedMessageNumber = seq;
processFunc = this.processMessageFromNewNode;
} }
processFunc.call(this, sourceNodeId, name, message, metadata);
this.sendPendingMessages(sourceNodeId);
};


MessageSequencer.prototype.processMessageFromNewNode = function(sourceNodeId, name, message, metadata) { if (seq >= this.nextExpectedMessageNumber) {
this.pendingMessages[sourceNodeId] = { this.pendingMessages.push([sourceNodeId, name, message, seq]);
waitingForMessage: metadata.seq, this.pendingMessages.sort(function (a, b) { return a[3] - b[3]; });
messages: { }
};
this.pendingMessages[sourceNodeId].messages[metadata.seq] = [name, message, metadata];
};


MessageSequencer.prototype.processMessageFromKnownNode = function(sourceNodeId, name, message, metadata) { this.sendPendingMessages();
if (metadata.seq >= this.pendingMessages[sourceNodeId].waitingForMessage) {
this.pendingMessages[sourceNodeId].messages[metadata.seq] = [name, message, metadata];
} }
}; };


MessageSequencer.prototype.sendPendingMessages = function(sourceNodeId) { MessageSequencer.prototype.sendPendingMessages = function() {
var pending = this.pendingMessages[sourceNodeId]; while (this.pendingMessages.length > 0 && this.pendingMessages[0][3] === this.nextExpectedMessageNumber)
var seq = pending.waitingForMessage; {
while (pending.messages[seq]) { this.emit.apply(this, ['message'].concat(this.pendingMessages[0]));
this.emit.apply(this, ['message', sourceNodeId].concat(pending.messages[seq])); this.pendingMessages.shift();
pending.waitingForMessage = pending.messages[seq][2].next; ++this.nextExpectedMessageNumber;
delete pending.messages[seq];
seq = pending.waitingForMessage;
} }
}; };

MessageSequencer.prototype.nextSeq = function() {
this.messageMetadata = { seq: this.messageMetadata.next, next: this.messageMetadata.next + 1 };
};
14 changes: 6 additions & 8 deletions lib/servicebusconnector.js
Expand Up @@ -51,7 +51,7 @@ ServiceBusConnector.prototype.start = function () {


if (!err) { if (!err) {
var msg = self.unpackMessage(receivedMessage); var msg = self.unpackMessage(receivedMessage);
self.emit('message', msg.nodeId, msg.name, msg.args, msg.metadata); self.emit('message', msg.nodeId, msg.name, msg.args, msg.seq);
} }


if (!self.shouldStop) { if (!self.shouldStop) {
Expand All @@ -76,9 +76,9 @@ ServiceBusConnector.prototype.stop = function (cb) {
this.stopCallback = cb; this.stopCallback = cb;
} }


ServiceBusConnector.prototype.send = function (name, args, metadata) { ServiceBusConnector.prototype.send = function (name, args) {
var self = this; var self = this;
var message = this.packMessage(name, args, metadata); var message = this.packMessage(name, args);
this.serviceBusSender.sendTopicMessage(this.topic, message, function (err) { this.serviceBusSender.sendTopicMessage(this.topic, message, function (err) {


if (err) { if (err) {
Expand All @@ -87,23 +87,21 @@ ServiceBusConnector.prototype.send = function (name, args, metadata) {
}); });
} }


ServiceBusConnector.prototype.packMessage = function(name, args, metadata) { ServiceBusConnector.prototype.packMessage = function(name, args) {
return { return {
body: JSON.stringify(args), body: JSON.stringify(args),
brokerProperties: { brokerProperties: {
CorrelationId: this.nodeId, CorrelationId: this.nodeId,
Label: name Label: name
}, }};
customProperties: metadata
};
} }


ServiceBusConnector.prototype.unpackMessage = function(message) { ServiceBusConnector.prototype.unpackMessage = function(message) {
return { return {
name: message.brokerProperties.Label, name: message.brokerProperties.Label,
nodeId: message.brokerProperties.CorrelationId, nodeId: message.brokerProperties.CorrelationId,
args: JSON.parse(message.body), args: JSON.parse(message.body),
metadata: message.customProperties seq: +message.brokerProperties.SequenceNumber
}; };
} }


Expand Down
123 changes: 46 additions & 77 deletions test/sequencing-tests.js
Expand Up @@ -75,37 +75,6 @@ describe('Message sequencing layer', function () {
innerInterface.send.calledWithMatch('aMessage', [1, 2, 3]).should.be.true; innerInterface.send.calledWithMatch('aMessage', [1, 2, 3]).should.be.true;


}); });

it('should add sequence number and next to first message', function () {
sequencer.send('firstMessage', [4, 5]);

innerInterface.send.calledWithMatch('firstMessage', [4, 5], { seq: 0, next: 1 }).should.be.true;
});
it('should add increasing sequence numbers to next messages', function () {
sequencer.send('msg', 'a');
sequencer.send('msg', 'b');
sequencer.send('msg', 'c');

innerInterface.send.callCount.should.equal(3);
innerInterface.send.calledWithMatch('msg', 'a', {seq: 0}).should.be.true;
innerInterface.send.calledWithMatch('msg', 'b', {seq: 1}).should.be.true;
should.not.exist(innerInterface.send.getCall(1).args[2].first);
innerInterface.send.calledWithMatch('msg', 'c', {seq: 2}).should.be.true;
should.not.exist(innerInterface.send.getCall(2).args[2].first);
});

it('should include next sequence number in message', function () {
sequencer.send('msg', 'q');
sequencer.send('msg', 'r');
sequencer.send('msg', 's');

var firstCall = innerInterface.send.getCall(0);
var secondCall = innerInterface.send.getCall(1);
var thirdCall = innerInterface.send.getCall(2);

firstCall.args[2].next.should.equal(secondCall.args[2].seq);
secondCall.args[2].next.should.equal(thirdCall.args[2].seq);
});
}); });


describe('when receiving in order', function () { describe('when receiving in order', function () {
Expand All @@ -131,14 +100,14 @@ describe('Message sequencing layer', function () {


it('should deliver messages in order', function () { it('should deliver messages in order', function () {
var sourceNode = 'node-1'; var sourceNode = 'node-1';
receiveFunc(sourceNode, 'msg', [1, 2, 3], {first: 1, seq: 0, next: 1}); receiveFunc(sourceNode, 'msg', [1, 2, 3], 0);
receiveFunc(sourceNode, 'msg', [4, 5, 6], {seq: 1, next: 2}); receiveFunc(sourceNode, 'msg', [4, 5, 6], 1);
receiveFunc(sourceNode, 'msg', [7, 8, 9], {seq: 2, next : 3}); receiveFunc(sourceNode, 'msg', [7, 8, 9], 2);


receivedMessages.should.have.length(3); receivedMessages.should.have.length(3);


for(var i = 0, len = receivedMessages.length; i < len; i++) { for(var i = 0, len = receivedMessages.length; i < len; i++) {
receivedMessages[i][3].seq.should.equal(i); receivedMessages[i][3].should.equal(i);
} }
}); });
}); });
Expand Down Expand Up @@ -166,26 +135,26 @@ describe('Message sequencing layer', function () {
}); });


it('should deliver after first message', function () { it('should deliver after first message', function () {
send('sourceNode', 'msg', 'hello', { seq: 1, next: 2}); send('sourceNode', 'msg', 'hello', 1);
receivedMessages.should.have.length(1); receivedMessages.should.have.length(1);
}); });


it('should ignore messages older than first message received', function () { it('should ignore messages older than first message received', function () {
send('sourceNode', 'msg', 'world', {seq: 1, next: 2}); send('sourceNode', 'msg', 'world', 1);
send('sourceNode', 'msg', 'hello', {seq: 0, next:1}); send('sourceNode', 'msg', 'hello', 0);


receivedMessages.should.have.length(1); receivedMessages.should.have.length(1);
receivedMessages[0][2].should.equal('world'); receivedMessages[0][2].should.equal('world');
}); });


it('should deliver all messages when missing message is received', function () { it('should deliver all messages when missing message is received', function () {
send('sourceNode', 'msg', 'hello', { seq: 0, next: 1}); send('sourceNode', 'msg', 'hello', 0);
receivedMessages.should.have.length(1); receivedMessages.should.have.length(1);


send('sourceNode', 'msg', 'from node', { seq: 2, next: 3}); send('sourceNode', 'msg', 'from node', 2);
receivedMessages.should.have.length(1); receivedMessages.should.have.length(1);


send('sourceNode', 'msg', 'world', { seq: 1, next: 2}); send('sourceNode', 'msg', 'world', 1);


receivedMessages.should.have.length(3); receivedMessages.should.have.length(3);


Expand All @@ -195,11 +164,11 @@ describe('Message sequencing layer', function () {
}); });


it('should deliver next message if received in order', function () { it('should deliver next message if received in order', function () {
send('sourceNode', 'msg', 'hello', { seq: 0, next: 1}); send('sourceNode', 'msg', 'hello', 0);
send('sourceNode', 'msg', 'from node', {seq: 2, next: 3}); send('sourceNode', 'msg', 'from node', 2);
send('sourceNode', 'msg', 'with affection', {seq: 3, next: 4}); send('sourceNode', 'msg', 'with affection', 3);
send('sourceNode', 'msg', 'and stuff', {seq: 4, next: 5}); send('sourceNode', 'msg', 'and stuff', 4);
send('sourceNode', 'msg', 'world', { seq: 1, next: 2}); send('sourceNode', 'msg', 'world', 1);


receivedMessages.should.have.length(5); receivedMessages.should.have.length(5);
receivedMessages[0][2].should.equal('hello'); receivedMessages[0][2].should.equal('hello');
Expand All @@ -210,10 +179,10 @@ describe('Message sequencing layer', function () {
}); });


it('should ignore messages older than the next one expected', function () { it('should ignore messages older than the next one expected', function () {
send('sourceNode', 'msg', 'from node', { seq: 2, next: 3}); send('sourceNode', 'msg', 'from node', 2);
send('sourceNode', 'msg', 'Hello', {seq: 0, next: 1}); send('sourceNode', 'msg', 'Hello', 0);
send('sourceNode', 'msg', 'with affection', {seq: 3, next: 4}); send('sourceNode', 'msg', 'with affection', 3);
send('sourceNode', 'msg', 'World', { seq: 1, next: 2}); send('sourceNode', 'msg', 'World', 1);


receivedMessages.should.have.length(2); receivedMessages.should.have.length(2);
receivedMessages[0][2].should.equal('from node'); receivedMessages[0][2].should.equal('from node');
Expand Down Expand Up @@ -244,64 +213,64 @@ describe('Message sequencing layer', function () {
}); });


it('should deliver multiple in order messages', function () { it('should deliver multiple in order messages', function () {
send('n1', 'msg', 'a', { seq: 4, next: 5}); send('n1', 'msg', 'a', 4);
receivedMessages.should.have.length(1); receivedMessages.should.have.length(1);
send('n2', 'msg', 'A', { seq: 10, next: 11}); send('n2', 'msg', 'A', 6);
receivedMessages.should.have.length(2); receivedMessages.should.have.length(1);
send('n2', 'msg', 'B', { seq: 11, next: 12}); send('n2', 'msg', 'B', 7);
receivedMessages.should.have.length(3); receivedMessages.should.have.length(1);
send('n1', 'msg', 'b', {seq: 5, next: 6}); send('n1', 'msg', 'b', 5);
receivedMessages.should.have.length(4); receivedMessages.should.have.length(4);


[['n1', 'a'], ['n2', 'A'], ['n2', 'B'], ['n1', 'b']].forEach(function (testData, i) { [['n1', 'a'], ['n1', 'b'], ['n2', 'A'], ['n2', 'B']].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]); receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]); receivedMessages[i][2].should.equal(testData[1]);
}) })
}); });




it('should deliver in order from first node and out of order from second node', function () { it('should deliver in order from all nodes', function () {
send('n1', 'msg', 'a', { seq: 0, next: 1}); send('n1', 'msg', 'a', 0);
receivedMessages.should.have.length(1); receivedMessages.should.have.length(1);


send('n2', 'msg', 'A', { seq: 4, next: 5}); send('n2', 'msg', 'A', 2);
receivedMessages.should.have.length(2); receivedMessages.should.have.length(1);


send('n2', 'msg', 'C', { seq: 6, next: 7}); send('n1', 'msg', 'b', 1);
receivedMessages.should.have.length(2); receivedMessages.should.have.length(3);


send('n1', 'msg', 'b', { seq: 1, next: 2}); send('n2', 'msg', 'C', 4);
receivedMessages.should.have.length(3); receivedMessages.should.have.length(3);


send('n2', 'msg', 'B', { seq: 5, next: 6}); send('n2', 'msg', 'B', 3);
receivedMessages.should.have.length(5); receivedMessages.should.have.length(5);


[['n1', 'a'], ['n2', 'A'], ['n1', 'b'], ['n2', 'B'], ['n2', 'C']].forEach(function (testData, i) { [['n1', 'a'], ['n1', 'b'], ['n2', 'A'], ['n2', 'B'], ['n2', 'C']].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]); receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]); receivedMessages[i][2].should.equal(testData[1]);
}); });
}); });


it('should deliver in order when both nodes are out of order', function () { it('should deliver in order when both nodes are out of order', function () {
send('n1', 'msg', 'a', { seq: 0, next: 1}); send('n1', 'msg', 'a', 0);
receivedMessages.should.have.length(1); receivedMessages.should.have.length(1);


send('n2', 'msg', 'A', { seq: 4, next: 5}); send('n2', 'msg', 'A', 3);
receivedMessages.should.have.length(2); receivedMessages.should.have.length(1);


send('n2', 'msg', 'C', { seq: 6, next: 7}); send('n2', 'msg', 'C', 5);
receivedMessages.should.have.length(2); receivedMessages.should.have.length(1);


send('n1', 'msg', 'c', { seq: 2, next: 3}); send('n1', 'msg', 'c', 2);
receivedMessages.should.have.length(2); receivedMessages.should.have.length(1);


send('n2', 'msg', 'B', { seq: 5, next: 6}); send('n2', 'msg', 'B', 4);
receivedMessages.should.have.length(4); receivedMessages.should.have.length(1);


send('n1', 'msg', 'b', { seq: 1, next: 2}); send('n1', 'msg', 'b', 1);
receivedMessages.should.have.length(6); receivedMessages.should.have.length(6);


[['n1', 'a'], ['n2', 'A'], ['n2', 'B'], ['n2', 'C'], ['n1', 'b'], ['n1', 'c']].forEach(function (testData, i) { [['n1', 'a'], ['n1', 'b'], ['n1', 'c'], ['n2', 'A'], ['n2', 'B'], ['n2', 'C']].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]); receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]); receivedMessages[i][2].should.equal(testData[1]);
}); });
Expand Down
35 changes: 7 additions & 28 deletions test/servicebusconnector-tests.js
Expand Up @@ -80,23 +80,13 @@ describe('Service Bus connection layer', function () {
unpacked.args.b.should.equal(5); unpacked.args.b.should.equal(5);
}); });


it('should store metadata as custom properties in message', function () { it('should pull sequence number from broker properties', function () {
var packed = connector.packMessage('msg', 'hello', { seq: 1, next: 2, other: 'something'}); var packed = connector.packMessage('msg', 'world');

packed.brokerProperties.SequenceNumber = 53;
should.exist(packed.customProperties);
packed.customProperties.seq.should.equal(1);
packed.customProperties.next.should.equal(2);
packed.customProperties.other.should.equal('something');
});

it('should round trip metadata through unpack', function () {
var packed = connector.packMessage('msg', 'hello', { seq: 1, next: 2, other: 'something'});
var unpacked = connector.unpackMessage(packed); var unpacked = connector.unpackMessage(packed);


should.exist(unpacked.metadata); should.exist(unpacked.seq);
unpacked.metadata.seq.should.equal(1); unpacked.seq.should.equal(53);
unpacked.metadata.next.should.equal(2);
unpacked.metadata.other.should.equal('something');
}); });
}); });


Expand All @@ -117,14 +107,12 @@ describe('Service Bus connection layer', function () {
sb.sendTopicMessage.calledOnce.should.be.true; sb.sendTopicMessage.calledOnce.should.be.true;
}); });


it('should pack message that was sent, including metadata', function () { it('should pack message that was sent', function () {
connector.send('msg', 'hello', {seq: 6, next: 7}); connector.send('msg', 'hello');


var sentMessage = sb.sendTopicMessage.firstCall.args[1]; var sentMessage = sb.sendTopicMessage.firstCall.args[1];


sentMessage.body.should.equal('"hello"'); sentMessage.body.should.equal('"hello"');
sentMessage.customProperties.seq.should.equal(6);
sentMessage.customProperties.next.should.equal(7);
}); });


it('should emit sberror event if service bus send fails', function () { it('should emit sberror event if service bus send fails', function () {
Expand Down Expand Up @@ -195,15 +183,6 @@ describe('Service Bus connection layer', function () {


receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11})); receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
}); });
it('should pass message metadata from received message', function (done) {
connector.on('message', function (nodeId, name, args, metadata) {
metadata.seq.should.equal(8);
metadata.next.should.equal(11);
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
});


it('should repoll service bus after message is received', function () { it('should repoll service bus after message is received', function () {
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11})); receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
Expand Down