diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..e3207ea --- /dev/null +++ b/.editorconfig @@ -0,0 +1,5 @@ +root = true + +[*] +indent_style = space +indent_size = 2 \ No newline at end of file diff --git a/README.md b/README.md index 67e183e..11a4d5a 100644 --- a/README.md +++ b/README.md @@ -54,17 +54,23 @@ Require returns a constructor for STOMP client instances. For backwards compatibility, `require('stomp-client').StompClient` is also supported. -## Stomp(address, port, user, pass, protocolVersion) +## Stomp(address, [port], [user], [pass], [protocolVersion], [reconnectOpts]) - `address`: address to connect to, default is `"127.0.0.1"` - `port`: port to connect to, default is `61613` - `user`: user to authenticate as, default is `""` - `pass`: password to authenticate with, default is `""` - `protocolVersion`: see below, defaults to `"1.0"` +- `reconnectOpts`: see below, defaults to `{}` Protocol version negotiation is not currently supported and version `"1.0"` is the only supported version. +ReconnectOpts should contain an integer `retries` specifying the maximum number +of reconnection attempts, and a `delay` which specifies the reconnection delay. + (reconnection timings are calculated using exponential backoff. The first reconnection + happens immediately, the second reconnection happens at `+delay` ms, the third at `+ 2*delay` ms, etc). + ## stomp.connect([callback, [errorCallback]]) Connect to the STOMP server. If the callbacks are provided, they will be @@ -76,8 +82,8 @@ If using virtualhosts to namespace your queues, you must pass a `version` header ## stomp.disconnect(callback) -Disconnect from the STOMP server. The callback will be attached on the -`'disconnect'` event. +Disconnect from the STOMP server. The callback will be executed when disconnection is complete. +No reconnections should be attempted, nor errors thrown as a result of this call. ## stomp.subscribe(queue, [headers,] callback) @@ -97,16 +103,21 @@ Disconnect from the STOMP server. The callback will be attached on the - `message`: message to publish, a string or buffer - `headers`: headers to add to the PUBLISH message -## Event: `'connect'` +## stomp.ack(messageId, subscription, [transaction]), +## stomp.nack(messageId, subscription, [transaction]) -Emitted on successful connect to the STOMP server. +- `messageId`: the id of the message to ack/nack +- `subscription`: the id of the subscription +- `transaction`: optional transaction name -## Event: `'disconnect'` +## Property: `stomp.publishable` (boolean) +Returns whether or not the connection is currently writable. During normal operation +this should be true, however if the client is in the process of reconnecting, +this will be false. -Emitted on successful disconnnect from the STOMP server. +## Event: `'connect'` -Also emitted when the server arbitrarily disconnects, which should -be considered a bug. +Emitted on successful connect to the STOMP server. ## Event: `'error'` @@ -114,6 +125,18 @@ Emitted on an error at either the TCP or STOMP protocol layer. An Error object will be passed. All error objects have a `.message` property, STOMP protocol errors may also have a `.details` property. +If the error was caused by a failure to reconnect after exceeding the number of +reconnection attempts, the error object will have a `reconnectionFailed` property. + +## Event: `'reconnect'` + +Emitted when the client has successfully reconnected. The event arguments are +the new `sessionId` and the reconnection attempt number. + +## Event: `'reconnecting'` + +Emitted when the client has been disconnected for whatever reason, but is going +to attempt to reconnect. ## LICENSE diff --git a/lib/client.js b/lib/client.js index b650731..3a6b498 100644 --- a/lib/client.js +++ b/lib/client.js @@ -49,7 +49,7 @@ var StompFrameCommands = { } }; -function StompClient(address, port, user, pass, protocolVersion, vhost) { +function StompClient(address, port, user, pass, protocolVersion, vhost, reconnectOpts) { events.EventEmitter.call(this); this.user = (user || ''); this.pass = (pass || ''); @@ -60,6 +60,9 @@ function StompClient(address, port, user, pass, protocolVersion, vhost) { assert(StompFrameCommands[this.version], 'STOMP version '+this.version+' is not supported'); this._stompFrameEmitter = new StompFrameEmitter(StompFrameCommands[this.version]); this.vhost = vhost || null; + this.reconnectOpts = reconnectOpts || {}; + this._retryNumber = 0; + this._retryDelay = this.reconnectOpts.delay; return this; } @@ -68,16 +71,37 @@ util.inherits(StompClient, events.EventEmitter); StompClient.prototype.connect = function (connectedCallback, errorCallback) { var self = this; + //reset this field. + delete this._disconnectCallback; + if (errorCallback) { self.on('error', errorCallback); } self.stream = net.createConnection(self.port, self.address); - self.stream.on('connect', function() { - self.onConnect(); - }); + self.stream.on('connect', self.onConnect.bind(this)); - self.stream.on('error', self.emit.bind(self, 'error')); + self.stream.on('error', function(err) { + process.nextTick(function() { + //clear all of the stomp frame emitter listeners - we don't need them, we've disconnected. + self._stompFrameEmitter.removeAllListeners(); + }); + if (self._retryNumber < self.reconnectOpts.retries) { + if (self._retryNumber === 0) { + //we're disconnected, but we're going to try and reconnect. + self.emit('reconnecting'); + } + self._reconnectTimer = setTimeout(function() { + self.connect(); + }, self._retryNumber++ * self.reconnectOpts.delay) + } else { + if (self._retryNumber === self.reconnectOpts.retries) { + err.message += ' [reconnect attempts reached]'; + err.reconnectionFailed = true; + } + self.emit('error', err); + } + }); if (connectedCallback) { self.on('connect', connectedCallback); @@ -85,19 +109,17 @@ StompClient.prototype.connect = function (connectedCallback, errorCallback) { return this; }; -StompClient.prototype.disconnect = function (callback, errorCallbackToRemove) { +StompClient.prototype.disconnect = function (callback) { var self = this; - if (this.stream) { - this.on('disconnect', function () { - if (errorCallbackToRemove) { - this.removeListener('error', errorCallbackToRemove); - } - }); + //just a bit of housekeeping. Remove the no-longer-useful reconnect timer. + if (self._reconnectTimer) { + clearTimeout(self._reconnectTimer); + } - if (callback) { - this.on('disconnect', callback); - } + if (this.stream) { + //provide a default no-op function as the callback is optional + this._disconnectCallback = callback || function() {}; var frame = new StompFrame({ command: 'DISCONNECT' @@ -123,15 +145,11 @@ StompClient.prototype.onConnect = function() { }); self.stream.on('end', function() { - self.stream.end(); - process.nextTick(function() { - // XXX(sam) This is a problem, the server can shutdown the connection - // proactively (on death or shutdown for example), and this event won't - // have a listener, most users will only add the listener by calling - // .disconnect(). - self.emit('disconnect'); - }); - + if (self._disconnectCallback) { + self._disconnectCallback(); + } else { + self.stream.emit('error', new Error('Server has gone away')); + } }); frameEmitter.on('MESSAGE', function(frame) { @@ -140,14 +158,20 @@ StompClient.prototype.onConnect = function() { // but until that UNSUBSCRIBE message is processed, we might still get // MESSAGE. Check to make sure we don't call .map() on null. if (subscribed) { - subscribed.map(function(callback) { + subscribed.listeners.map(function(callback) { callback(frame.body, frame.headers); }); } }); frameEmitter.on('CONNECTED', function(frame) { - self.emit('connect', frame.headers.session); + if (self._retryNumber > 0) { + //handle a reconnection differently to the initial connection. + self.emit('reconnect', frame.headers.session, self._retryNumber); + self._retryNumber = 0; + } else { + self.emit('connect', frame.headers.session); + } }); frameEmitter.on('ERROR', function(frame) { @@ -181,6 +205,14 @@ StompClient.prototype.onConnect = function() { command: 'CONNECT', headers: headers }).send(self.stream); + + //if we've just reconnected, we'll need to re-subscribe + for (var queue in self.subscriptions) { + new StompFrame({ + command: 'SUBSCRIBE', + headers: self.subscriptions[queue].headers + }).send(self.stream); + } }; StompClient.prototype.subscribe = function(queue, _headers, _callback) { @@ -201,13 +233,16 @@ StompClient.prototype.subscribe = function(queue, _headers, _callback) { var headers = _extend({}, _headers || _callback); headers.destination = queue; if (!(queue in this.subscriptions)) { - this.subscriptions[queue] = []; + this.subscriptions[queue] = { + listeners: [], + headers: headers + }; new StompFrame({ command: 'SUBSCRIBE', headers: headers }).send(this.stream); } - this.subscriptions[queue].push(callback); + this.subscriptions[queue].listeners.push(callback); return this; }; @@ -235,6 +270,36 @@ StompClient.prototype.publish = function(queue, message, headers) { return this; }; +function sendAckNack(acknack, messageId, subscription, transaction) { + var headers = { + 'message-id': messageId, + 'subscription': subscription + }; + if(transaction) { + headers['transaction'] = transaction; + } + new StompFrame({ + command: acknack, + headers: headers + }).send(this.stream); +} + +StompClient.prototype.ack = function(messageId, subscription, transaction) { + sendAckNack.call(this, 'ACK', messageId, subscription, transaction); + return this; +}; + +StompClient.prototype.nack = function(messageId, subscription, transaction) { + sendAckNack.call(this, 'NACK', messageId, subscription, transaction); + return this; +}; + +Object.defineProperty(StompClient.prototype, 'writable', { + get: function(){ + return this.stream && this.stream.writable; + } +}); + function SecureStompClient(address, port, user, pass, credentials) { events.EventEmitter.call(this); var self = this; @@ -255,3 +320,7 @@ util.inherits(SecureStompClient, StompClient); module.exports = StompClient; module.exports.StompClient = StompClient; module.exports.SecureStompClient = SecureStompClient; + +module.exports.Errors = { + streamNotWritable: 15201 +}; \ No newline at end of file diff --git a/test/client.test.js b/test/client.test.js index 27e8ecd..8730287 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -33,7 +33,7 @@ module.exports = testCase({ setUp: function(callback) { // Mock net object so we never try to send any real data connectionObserver = new Events(); - connectionObserver.destroy = function() {} + connectionObserver.destroy = function() {}; this.stompClient = new StompClient('127.0.0.1', 2098, 'user', 'pass', '1.0'); oldCreateConnection = net.createConnection; @@ -244,7 +244,7 @@ module.exports = testCase({ test.equal(body, messageToBeSent, 'Received message matches the sent one'); test.equal(headers['message-id'], messageId); test.equal(headers.destination, destination); - test.equal(self.stompClient.subscriptions[destination].length, 1, 'ensure callback was added to subscription stack'); + test.equal(self.stompClient.subscriptions[destination].listeners.length, 1, 'ensure callback was added to subscription stack'); // Unsubscribe and ensure queue is cleared of the subscription (and related callback) self.stompClient.unsubscribe(destination, {}); @@ -471,11 +471,7 @@ module.exports = testCase({ 'check disconnect method correctly sends DISCONNECT frame, disconnects TCP stream, and fires callback': function (test) { var self = this; - test.expect(7); - - self.stompClient.on('disconnect', function() { - test.ok(true, 'disconnect event fired'); - }); + test.expect(5); //mock that we received a CONNECTED from the stomp server in our send hook sendHook = function (stompFrame) { @@ -491,10 +487,6 @@ module.exports = testCase({ test.equal(stompFrame.body, ''); }; - self.stompClient.stream.on('end', function() { - test.ok(true, 'tcp stream end event is fired'); - }); - // Set disconnection callback to ensure it is called appropriately self.stompClient.disconnect(function () { test.ok(true, 'disconnect callback executed');