Skip to content

Commit

Permalink
Merge pull request #37 from christav/master
Browse files Browse the repository at this point in the history
Using service bus managed sequences numbers instead of doing it ourselves.
  • Loading branch information
Chris Tavares committed Mar 4, 2013
2 parents a8aa437 + be152d2 commit 90dfc4b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 146 deletions.
49 changes: 16 additions & 33 deletions lib/messagesequencer.js
Expand Up @@ -23,8 +23,8 @@ module.exports = MessageSequencer;
function MessageSequencer(options, inner) {
this.inner = inner;
this.started = false;
this.messageMetadata = { seq: 0, next: 1};
this.pendingMessages = {};
this.nextExpectedMessageNumber = null;
this.pendingMessages = [];

inner.on('message', this.receiveMessage.bind(this));
}
Expand All @@ -46,44 +46,27 @@ MessageSequencer.prototype.stop = function(callback) {
};

MessageSequencer.prototype.send = function(name, args) {
this.inner.send(name, args, this.messageMetadata);
this.nextSeq();
this.inner.send(name, args);
};

MessageSequencer.prototype.receiveMessage = function(sourceNodeId, name, message, metadata) {
var processFunc = this.processMessageFromKnownNode;
if (!this.pendingMessages[sourceNodeId]) {
processFunc = this.processMessageFromNewNode;
MessageSequencer.prototype.receiveMessage = function(sourceNodeId, name, message, seq) {
if (this.nextExpectedMessageNumber === null) {
this.nextExpectedMessageNumber = seq;
}
processFunc.call(this, sourceNodeId, name, message, metadata);
this.sendPendingMessages(sourceNodeId);
};

MessageSequencer.prototype.processMessageFromNewNode = function(sourceNodeId, name, message, metadata) {
this.pendingMessages[sourceNodeId] = {
waitingForMessage: metadata.seq,
messages: { }
};
this.pendingMessages[sourceNodeId].messages[metadata.seq] = [name, message, metadata];
};
if (seq >= this.nextExpectedMessageNumber) {
this.pendingMessages.push([sourceNodeId, name, message, seq]);
this.pendingMessages.sort(function (a, b) { return a[3] - b[3]; });

MessageSequencer.prototype.processMessageFromKnownNode = function(sourceNodeId, name, message, metadata) {
if (metadata.seq >= this.pendingMessages[sourceNodeId].waitingForMessage) {
this.pendingMessages[sourceNodeId].messages[metadata.seq] = [name, message, metadata];
this.sendPendingMessages();
}
};

MessageSequencer.prototype.sendPendingMessages = function(sourceNodeId) {
var pending = this.pendingMessages[sourceNodeId];
var seq = pending.waitingForMessage;
while (pending.messages[seq]) {
this.emit.apply(this, ['message', sourceNodeId].concat(pending.messages[seq]));
pending.waitingForMessage = pending.messages[seq][2].next;
delete pending.messages[seq];
seq = pending.waitingForMessage;
MessageSequencer.prototype.sendPendingMessages = function() {
while (this.pendingMessages.length > 0 && this.pendingMessages[0][3] === this.nextExpectedMessageNumber)
{
this.emit.apply(this, ['message'].concat(this.pendingMessages[0]));
this.pendingMessages.shift();
++this.nextExpectedMessageNumber;
}
};

MessageSequencer.prototype.nextSeq = function() {
this.messageMetadata = { seq: this.messageMetadata.next, next: this.messageMetadata.next + 1 };
};
14 changes: 6 additions & 8 deletions lib/servicebusconnector.js
Expand Up @@ -51,7 +51,7 @@ ServiceBusConnector.prototype.start = function () {

if (!err) {
var msg = self.unpackMessage(receivedMessage);
self.emit('message', msg.nodeId, msg.name, msg.args, msg.metadata);
self.emit('message', msg.nodeId, msg.name, msg.args, msg.seq);
}

if (!self.shouldStop) {
Expand All @@ -76,9 +76,9 @@ ServiceBusConnector.prototype.stop = function (cb) {
this.stopCallback = cb;
}

ServiceBusConnector.prototype.send = function (name, args, metadata) {
ServiceBusConnector.prototype.send = function (name, args) {
var self = this;
var message = this.packMessage(name, args, metadata);
var message = this.packMessage(name, args);
this.serviceBusSender.sendTopicMessage(this.topic, message, function (err) {

if (err) {
Expand All @@ -87,23 +87,21 @@ ServiceBusConnector.prototype.send = function (name, args, metadata) {
});
}

ServiceBusConnector.prototype.packMessage = function(name, args, metadata) {
ServiceBusConnector.prototype.packMessage = function(name, args) {
return {
body: JSON.stringify(args),
brokerProperties: {
CorrelationId: this.nodeId,
Label: name
},
customProperties: metadata
};
}};
}

ServiceBusConnector.prototype.unpackMessage = function(message) {
return {
name: message.brokerProperties.Label,
nodeId: message.brokerProperties.CorrelationId,
args: JSON.parse(message.body),
metadata: message.customProperties
seq: +message.brokerProperties.SequenceNumber
};
}

Expand Down
123 changes: 46 additions & 77 deletions test/sequencing-tests.js
Expand Up @@ -75,37 +75,6 @@ describe('Message sequencing layer', function () {
innerInterface.send.calledWithMatch('aMessage', [1, 2, 3]).should.be.true;

});

it('should add sequence number and next to first message', function () {
sequencer.send('firstMessage', [4, 5]);

innerInterface.send.calledWithMatch('firstMessage', [4, 5], { seq: 0, next: 1 }).should.be.true;
});
it('should add increasing sequence numbers to next messages', function () {
sequencer.send('msg', 'a');
sequencer.send('msg', 'b');
sequencer.send('msg', 'c');

innerInterface.send.callCount.should.equal(3);
innerInterface.send.calledWithMatch('msg', 'a', {seq: 0}).should.be.true;
innerInterface.send.calledWithMatch('msg', 'b', {seq: 1}).should.be.true;
should.not.exist(innerInterface.send.getCall(1).args[2].first);
innerInterface.send.calledWithMatch('msg', 'c', {seq: 2}).should.be.true;
should.not.exist(innerInterface.send.getCall(2).args[2].first);
});

it('should include next sequence number in message', function () {
sequencer.send('msg', 'q');
sequencer.send('msg', 'r');
sequencer.send('msg', 's');

var firstCall = innerInterface.send.getCall(0);
var secondCall = innerInterface.send.getCall(1);
var thirdCall = innerInterface.send.getCall(2);

firstCall.args[2].next.should.equal(secondCall.args[2].seq);
secondCall.args[2].next.should.equal(thirdCall.args[2].seq);
});
});

describe('when receiving in order', function () {
Expand All @@ -131,14 +100,14 @@ describe('Message sequencing layer', function () {

it('should deliver messages in order', function () {
var sourceNode = 'node-1';
receiveFunc(sourceNode, 'msg', [1, 2, 3], {first: 1, seq: 0, next: 1});
receiveFunc(sourceNode, 'msg', [4, 5, 6], {seq: 1, next: 2});
receiveFunc(sourceNode, 'msg', [7, 8, 9], {seq: 2, next : 3});
receiveFunc(sourceNode, 'msg', [1, 2, 3], 0);
receiveFunc(sourceNode, 'msg', [4, 5, 6], 1);
receiveFunc(sourceNode, 'msg', [7, 8, 9], 2);

receivedMessages.should.have.length(3);

for(var i = 0, len = receivedMessages.length; i < len; i++) {
receivedMessages[i][3].seq.should.equal(i);
receivedMessages[i][3].should.equal(i);
}
});
});
Expand Down Expand Up @@ -166,26 +135,26 @@ describe('Message sequencing layer', function () {
});

it('should deliver after first message', function () {
send('sourceNode', 'msg', 'hello', { seq: 1, next: 2});
send('sourceNode', 'msg', 'hello', 1);
receivedMessages.should.have.length(1);
});

it('should ignore messages older than first message received', function () {
send('sourceNode', 'msg', 'world', {seq: 1, next: 2});
send('sourceNode', 'msg', 'hello', {seq: 0, next:1});
send('sourceNode', 'msg', 'world', 1);
send('sourceNode', 'msg', 'hello', 0);

receivedMessages.should.have.length(1);
receivedMessages[0][2].should.equal('world');
});

it('should deliver all messages when missing message is received', function () {
send('sourceNode', 'msg', 'hello', { seq: 0, next: 1});
send('sourceNode', 'msg', 'hello', 0);
receivedMessages.should.have.length(1);

send('sourceNode', 'msg', 'from node', { seq: 2, next: 3});
send('sourceNode', 'msg', 'from node', 2);
receivedMessages.should.have.length(1);

send('sourceNode', 'msg', 'world', { seq: 1, next: 2});
send('sourceNode', 'msg', 'world', 1);

receivedMessages.should.have.length(3);

Expand All @@ -195,11 +164,11 @@ describe('Message sequencing layer', function () {
});

it('should deliver next message if received in order', function () {
send('sourceNode', 'msg', 'hello', { seq: 0, next: 1});
send('sourceNode', 'msg', 'from node', {seq: 2, next: 3});
send('sourceNode', 'msg', 'with affection', {seq: 3, next: 4});
send('sourceNode', 'msg', 'and stuff', {seq: 4, next: 5});
send('sourceNode', 'msg', 'world', { seq: 1, next: 2});
send('sourceNode', 'msg', 'hello', 0);
send('sourceNode', 'msg', 'from node', 2);
send('sourceNode', 'msg', 'with affection', 3);
send('sourceNode', 'msg', 'and stuff', 4);
send('sourceNode', 'msg', 'world', 1);

receivedMessages.should.have.length(5);
receivedMessages[0][2].should.equal('hello');
Expand All @@ -210,10 +179,10 @@ describe('Message sequencing layer', function () {
});

it('should ignore messages older than the next one expected', function () {
send('sourceNode', 'msg', 'from node', { seq: 2, next: 3});
send('sourceNode', 'msg', 'Hello', {seq: 0, next: 1});
send('sourceNode', 'msg', 'with affection', {seq: 3, next: 4});
send('sourceNode', 'msg', 'World', { seq: 1, next: 2});
send('sourceNode', 'msg', 'from node', 2);
send('sourceNode', 'msg', 'Hello', 0);
send('sourceNode', 'msg', 'with affection', 3);
send('sourceNode', 'msg', 'World', 1);

receivedMessages.should.have.length(2);
receivedMessages[0][2].should.equal('from node');
Expand Down Expand Up @@ -244,64 +213,64 @@ describe('Message sequencing layer', function () {
});

it('should deliver multiple in order messages', function () {
send('n1', 'msg', 'a', { seq: 4, next: 5});
send('n1', 'msg', 'a', 4);
receivedMessages.should.have.length(1);
send('n2', 'msg', 'A', { seq: 10, next: 11});
receivedMessages.should.have.length(2);
send('n2', 'msg', 'B', { seq: 11, next: 12});
receivedMessages.should.have.length(3);
send('n1', 'msg', 'b', {seq: 5, next: 6});
send('n2', 'msg', 'A', 6);
receivedMessages.should.have.length(1);
send('n2', 'msg', 'B', 7);
receivedMessages.should.have.length(1);
send('n1', 'msg', 'b', 5);
receivedMessages.should.have.length(4);

[['n1', 'a'], ['n2', 'A'], ['n2', 'B'], ['n1', 'b']].forEach(function (testData, i) {
[['n1', 'a'], ['n1', 'b'], ['n2', 'A'], ['n2', 'B']].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]);
})
});


it('should deliver in order from first node and out of order from second node', function () {
send('n1', 'msg', 'a', { seq: 0, next: 1});
it('should deliver in order from all nodes', function () {
send('n1', 'msg', 'a', 0);
receivedMessages.should.have.length(1);

send('n2', 'msg', 'A', { seq: 4, next: 5});
receivedMessages.should.have.length(2);
send('n2', 'msg', 'A', 2);
receivedMessages.should.have.length(1);

send('n2', 'msg', 'C', { seq: 6, next: 7});
receivedMessages.should.have.length(2);
send('n1', 'msg', 'b', 1);
receivedMessages.should.have.length(3);

send('n1', 'msg', 'b', { seq: 1, next: 2});
send('n2', 'msg', 'C', 4);
receivedMessages.should.have.length(3);

send('n2', 'msg', 'B', { seq: 5, next: 6});
send('n2', 'msg', 'B', 3);
receivedMessages.should.have.length(5);

[['n1', 'a'], ['n2', 'A'], ['n1', 'b'], ['n2', 'B'], ['n2', 'C']].forEach(function (testData, i) {
[['n1', 'a'], ['n1', 'b'], ['n2', 'A'], ['n2', 'B'], ['n2', 'C']].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]);
});
});

it('should deliver in order when both nodes are out of order', function () {
send('n1', 'msg', 'a', { seq: 0, next: 1});
send('n1', 'msg', 'a', 0);
receivedMessages.should.have.length(1);

send('n2', 'msg', 'A', { seq: 4, next: 5});
receivedMessages.should.have.length(2);
send('n2', 'msg', 'A', 3);
receivedMessages.should.have.length(1);

send('n2', 'msg', 'C', { seq: 6, next: 7});
receivedMessages.should.have.length(2);
send('n2', 'msg', 'C', 5);
receivedMessages.should.have.length(1);

send('n1', 'msg', 'c', { seq: 2, next: 3});
receivedMessages.should.have.length(2);
send('n1', 'msg', 'c', 2);
receivedMessages.should.have.length(1);

send('n2', 'msg', 'B', { seq: 5, next: 6});
receivedMessages.should.have.length(4);
send('n2', 'msg', 'B', 4);
receivedMessages.should.have.length(1);

send('n1', 'msg', 'b', { seq: 1, next: 2});
send('n1', 'msg', 'b', 1);
receivedMessages.should.have.length(6);

[['n1', 'a'], ['n2', 'A'], ['n2', 'B'], ['n2', 'C'], ['n1', 'b'], ['n1', 'c']].forEach(function (testData, i) {
[['n1', 'a'], ['n1', 'b'], ['n1', 'c'], ['n2', 'A'], ['n2', 'B'], ['n2', 'C']].forEach(function (testData, i) {
receivedMessages[i][0].should.equal(testData[0]);
receivedMessages[i][2].should.equal(testData[1]);
});
Expand Down
35 changes: 7 additions & 28 deletions test/servicebusconnector-tests.js
Expand Up @@ -80,23 +80,13 @@ describe('Service Bus connection layer', function () {
unpacked.args.b.should.equal(5);
});

it('should store metadata as custom properties in message', function () {
var packed = connector.packMessage('msg', 'hello', { seq: 1, next: 2, other: 'something'});

should.exist(packed.customProperties);
packed.customProperties.seq.should.equal(1);
packed.customProperties.next.should.equal(2);
packed.customProperties.other.should.equal('something');
});

it('should round trip metadata through unpack', function () {
var packed = connector.packMessage('msg', 'hello', { seq: 1, next: 2, other: 'something'});
it('should pull sequence number from broker properties', function () {
var packed = connector.packMessage('msg', 'world');
packed.brokerProperties.SequenceNumber = 53;
var unpacked = connector.unpackMessage(packed);

should.exist(unpacked.metadata);
unpacked.metadata.seq.should.equal(1);
unpacked.metadata.next.should.equal(2);
unpacked.metadata.other.should.equal('something');
should.exist(unpacked.seq);
unpacked.seq.should.equal(53);
});
});

Expand All @@ -117,14 +107,12 @@ describe('Service Bus connection layer', function () {
sb.sendTopicMessage.calledOnce.should.be.true;
});

it('should pack message that was sent, including metadata', function () {
connector.send('msg', 'hello', {seq: 6, next: 7});
it('should pack message that was sent', function () {
connector.send('msg', 'hello');

var sentMessage = sb.sendTopicMessage.firstCall.args[1];

sentMessage.body.should.equal('"hello"');
sentMessage.customProperties.seq.should.equal(6);
sentMessage.customProperties.next.should.equal(7);
});

it('should emit sberror event if service bus send fails', function () {
Expand Down Expand Up @@ -195,15 +183,6 @@ describe('Service Bus connection layer', function () {

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
});
it('should pass message metadata from received message', function (done) {
connector.on('message', function (nodeId, name, args, metadata) {
metadata.seq.should.equal(8);
metadata.next.should.equal(11);
done();
});

receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
});

it('should repoll service bus after message is received', function () {
receive(null, packMessage(connector, 'anotherNode', 'aMessage', [3, 1, 4], {seq: 8, next: 11}));
Expand Down

0 comments on commit 90dfc4b

Please sign in to comment.