Skip to content

Commit

Permalink
Merge pull request #34 from storkme/master
Browse files Browse the repository at this point in the history
Reconnect semantics, acks&nacks, 'cleaner' disconnects
  • Loading branch information
easternbloc committed Mar 11, 2015
2 parents 8a0314a + 7d0f410 commit 6efc8cd
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .editorconfig
@@ -0,0 +1,5 @@
root = true

[*]
indent_style = space
indent_size = 2
41 changes: 32 additions & 9 deletions README.md
Expand Up @@ -54,17 +54,23 @@ Require returns a constructor for STOMP client instances.
For backwards compatibility, `require('stomp-client').StompClient` is also
supported.

## Stomp(address, port, user, pass, protocolVersion)
## Stomp(address, [port], [user], [pass], [protocolVersion], [reconnectOpts])

- `address`: address to connect to, default is `"127.0.0.1"`
- `port`: port to connect to, default is `61613`
- `user`: user to authenticate as, default is `""`
- `pass`: password to authenticate with, default is `""`
- `protocolVersion`: see below, defaults to `"1.0"`
- `reconnectOpts`: see below, defaults to `{}`

Protocol version negotiation is not currently supported and version `"1.0"` is
the only supported version.

ReconnectOpts should contain an integer `retries` specifying the maximum number
of reconnection attempts, and a `delay` which specifies the reconnection delay.
(reconnection timings are calculated using exponential backoff. The first reconnection
happens immediately, the second reconnection happens at `+delay` ms, the third at `+ 2*delay` ms, etc).

## stomp.connect([callback, [errorCallback]])

Connect to the STOMP server. If the callbacks are provided, they will be
Expand All @@ -76,8 +82,8 @@ If using virtualhosts to namespace your queues, you must pass a `version` header

## stomp.disconnect(callback)

Disconnect from the STOMP server. The callback will be attached on the
`'disconnect'` event.
Disconnect from the STOMP server. The callback will be executed when disconnection is complete.
No reconnections should be attempted, nor errors thrown as a result of this call.

## stomp.subscribe(queue, [headers,] callback)

Expand All @@ -97,23 +103,40 @@ Disconnect from the STOMP server. The callback will be attached on the
- `message`: message to publish, a string or buffer
- `headers`: headers to add to the PUBLISH message

## Event: `'connect'`
## stomp.ack(messageId, subscription, [transaction]),
## stomp.nack(messageId, subscription, [transaction])

Emitted on successful connect to the STOMP server.
- `messageId`: the id of the message to ack/nack
- `subscription`: the id of the subscription
- `transaction`: optional transaction name

## Event: `'disconnect'`
## Property: `stomp.publishable` (boolean)
Returns whether or not the connection is currently writable. During normal operation
this should be true, however if the client is in the process of reconnecting,
this will be false.

Emitted on successful disconnnect from the STOMP server.
## Event: `'connect'`

Also emitted when the server arbitrarily disconnects, which should
be considered a bug.
Emitted on successful connect to the STOMP server.

## Event: `'error'`

Emitted on an error at either the TCP or STOMP protocol layer. An Error object
will be passed. All error objects have a `.message` property, STOMP protocol
errors may also have a `.details` property.

If the error was caused by a failure to reconnect after exceeding the number of
reconnection attempts, the error object will have a `reconnectionFailed` property.

## Event: `'reconnect'`

Emitted when the client has successfully reconnected. The event arguments are
the new `sessionId` and the reconnection attempt number.

## Event: `'reconnecting'`

Emitted when the client has been disconnected for whatever reason, but is going
to attempt to reconnect.

## LICENSE

Expand Down
125 changes: 97 additions & 28 deletions lib/client.js
Expand Up @@ -49,7 +49,7 @@ var StompFrameCommands = {
}
};

function StompClient(address, port, user, pass, protocolVersion, vhost) {
function StompClient(address, port, user, pass, protocolVersion, vhost, reconnectOpts) {
events.EventEmitter.call(this);
this.user = (user || '');
this.pass = (pass || '');
Expand All @@ -60,6 +60,9 @@ function StompClient(address, port, user, pass, protocolVersion, vhost) {
assert(StompFrameCommands[this.version], 'STOMP version '+this.version+' is not supported');
this._stompFrameEmitter = new StompFrameEmitter(StompFrameCommands[this.version]);
this.vhost = vhost || null;
this.reconnectOpts = reconnectOpts || {};
this._retryNumber = 0;
this._retryDelay = this.reconnectOpts.delay;
return this;
}

Expand All @@ -68,36 +71,55 @@ util.inherits(StompClient, events.EventEmitter);
StompClient.prototype.connect = function (connectedCallback, errorCallback) {
var self = this;

//reset this field.
delete this._disconnectCallback;

if (errorCallback) {
self.on('error', errorCallback);
}

self.stream = net.createConnection(self.port, self.address);
self.stream.on('connect', function() {
self.onConnect();
});
self.stream.on('connect', self.onConnect.bind(this));

self.stream.on('error', self.emit.bind(self, 'error'));
self.stream.on('error', function(err) {
process.nextTick(function() {
//clear all of the stomp frame emitter listeners - we don't need them, we've disconnected.
self._stompFrameEmitter.removeAllListeners();
});
if (self._retryNumber < self.reconnectOpts.retries) {
if (self._retryNumber === 0) {
//we're disconnected, but we're going to try and reconnect.
self.emit('reconnecting');
}
self._reconnectTimer = setTimeout(function() {
self.connect();
}, self._retryNumber++ * self.reconnectOpts.delay)
} else {
if (self._retryNumber === self.reconnectOpts.retries) {
err.message += ' [reconnect attempts reached]';
err.reconnectionFailed = true;
}
self.emit('error', err);
}
});

if (connectedCallback) {
self.on('connect', connectedCallback);
}
return this;
};

StompClient.prototype.disconnect = function (callback, errorCallbackToRemove) {
StompClient.prototype.disconnect = function (callback) {
var self = this;

if (this.stream) {
this.on('disconnect', function () {
if (errorCallbackToRemove) {
this.removeListener('error', errorCallbackToRemove);
}
});
//just a bit of housekeeping. Remove the no-longer-useful reconnect timer.
if (self._reconnectTimer) {
clearTimeout(self._reconnectTimer);
}

if (callback) {
this.on('disconnect', callback);
}
if (this.stream) {
//provide a default no-op function as the callback is optional
this._disconnectCallback = callback || function() {};

var frame = new StompFrame({
command: 'DISCONNECT'
Expand All @@ -123,15 +145,11 @@ StompClient.prototype.onConnect = function() {
});

self.stream.on('end', function() {
self.stream.end();
process.nextTick(function() {
// XXX(sam) This is a problem, the server can shutdown the connection
// proactively (on death or shutdown for example), and this event won't
// have a listener, most users will only add the listener by calling
// .disconnect().
self.emit('disconnect');
});

if (self._disconnectCallback) {
self._disconnectCallback();
} else {
self.stream.emit('error', new Error('Server has gone away'));
}
});

frameEmitter.on('MESSAGE', function(frame) {
Expand All @@ -140,14 +158,20 @@ StompClient.prototype.onConnect = function() {
// but until that UNSUBSCRIBE message is processed, we might still get
// MESSAGE. Check to make sure we don't call .map() on null.
if (subscribed) {
subscribed.map(function(callback) {
subscribed.listeners.map(function(callback) {
callback(frame.body, frame.headers);
});
}
});

frameEmitter.on('CONNECTED', function(frame) {
self.emit('connect', frame.headers.session);
if (self._retryNumber > 0) {
//handle a reconnection differently to the initial connection.
self.emit('reconnect', frame.headers.session, self._retryNumber);
self._retryNumber = 0;
} else {
self.emit('connect', frame.headers.session);
}
});

frameEmitter.on('ERROR', function(frame) {
Expand Down Expand Up @@ -181,6 +205,14 @@ StompClient.prototype.onConnect = function() {
command: 'CONNECT',
headers: headers
}).send(self.stream);

//if we've just reconnected, we'll need to re-subscribe
for (var queue in self.subscriptions) {
new StompFrame({
command: 'SUBSCRIBE',
headers: self.subscriptions[queue].headers
}).send(self.stream);
}
};

StompClient.prototype.subscribe = function(queue, _headers, _callback) {
Expand All @@ -201,13 +233,16 @@ StompClient.prototype.subscribe = function(queue, _headers, _callback) {
var headers = _extend({}, _headers || _callback);
headers.destination = queue;
if (!(queue in this.subscriptions)) {
this.subscriptions[queue] = [];
this.subscriptions[queue] = {
listeners: [],
headers: headers
};
new StompFrame({
command: 'SUBSCRIBE',
headers: headers
}).send(this.stream);
}
this.subscriptions[queue].push(callback);
this.subscriptions[queue].listeners.push(callback);
return this;
};

Expand Down Expand Up @@ -235,6 +270,36 @@ StompClient.prototype.publish = function(queue, message, headers) {
return this;
};

function sendAckNack(acknack, messageId, subscription, transaction) {
var headers = {
'message-id': messageId,
'subscription': subscription
};
if(transaction) {
headers['transaction'] = transaction;
}
new StompFrame({
command: acknack,
headers: headers
}).send(this.stream);
}

StompClient.prototype.ack = function(messageId, subscription, transaction) {
sendAckNack.call(this, 'ACK', messageId, subscription, transaction);
return this;
};

StompClient.prototype.nack = function(messageId, subscription, transaction) {
sendAckNack.call(this, 'NACK', messageId, subscription, transaction);
return this;
};

Object.defineProperty(StompClient.prototype, 'writable', {
get: function(){
return this.stream && this.stream.writable;
}
});

function SecureStompClient(address, port, user, pass, credentials) {
events.EventEmitter.call(this);
var self = this;
Expand All @@ -255,3 +320,7 @@ util.inherits(SecureStompClient, StompClient);
module.exports = StompClient;
module.exports.StompClient = StompClient;
module.exports.SecureStompClient = SecureStompClient;

module.exports.Errors = {
streamNotWritable: 15201
};
14 changes: 3 additions & 11 deletions test/client.test.js
Expand Up @@ -33,7 +33,7 @@ module.exports = testCase({
setUp: function(callback) {
// Mock net object so we never try to send any real data
connectionObserver = new Events();
connectionObserver.destroy = function() {}
connectionObserver.destroy = function() {};
this.stompClient = new StompClient('127.0.0.1', 2098, 'user', 'pass', '1.0');

oldCreateConnection = net.createConnection;
Expand Down Expand Up @@ -244,7 +244,7 @@ module.exports = testCase({
test.equal(body, messageToBeSent, 'Received message matches the sent one');
test.equal(headers['message-id'], messageId);
test.equal(headers.destination, destination);
test.equal(self.stompClient.subscriptions[destination].length, 1, 'ensure callback was added to subscription stack');
test.equal(self.stompClient.subscriptions[destination].listeners.length, 1, 'ensure callback was added to subscription stack');

// Unsubscribe and ensure queue is cleared of the subscription (and related callback)
self.stompClient.unsubscribe(destination, {});
Expand Down Expand Up @@ -471,11 +471,7 @@ module.exports = testCase({
'check disconnect method correctly sends DISCONNECT frame, disconnects TCP stream, and fires callback': function (test) {
var self = this;

test.expect(7);

self.stompClient.on('disconnect', function() {
test.ok(true, 'disconnect event fired');
});
test.expect(5);

//mock that we received a CONNECTED from the stomp server in our send hook
sendHook = function (stompFrame) {
Expand All @@ -491,10 +487,6 @@ module.exports = testCase({
test.equal(stompFrame.body, '');
};

self.stompClient.stream.on('end', function() {
test.ok(true, 'tcp stream end event is fired');
});

// Set disconnection callback to ensure it is called appropriately
self.stompClient.disconnect(function () {
test.ok(true, 'disconnect callback executed');
Expand Down

0 comments on commit 6efc8cd

Please sign in to comment.