Skip to content
This repository was archived by the owner on Nov 28, 2018. It is now read-only.

Reconnect #31

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
updated code to emit disconnected event to enable client reconnect
  • Loading branch information
dkhunt27 committed Dec 9, 2013
commit 8994c9c6b0b2ad1c45e9fa91b3365ad211fe6b63
105 changes: 74 additions & 31 deletions lib/stomp.js
Original file line number Diff line number Diff line change
@@ -60,6 +60,8 @@ var Stomp = module.exports = function() {
options.sslOptions = args.ssl_options || {};
options.clientId = args['client-id'] || null;
options.vhost = args.vhost || null;
options.timeout = args.timeout || 120000;
options.keepAlive = args.keepAlive || false;

var debug = args.debug || false;
var StompLogger = require('./stomp.logger.js');
@@ -112,6 +114,7 @@ var Stomp = module.exports = function() {

if (options.ssl) {
log.debug('Connecting to ' + host + ':' + port + ' using SSL');

socket = tls.connect(port, host, options.sslOptions, function() {
log.debug('SSL connection complete');
if (!socket.authorized) {
@@ -121,35 +124,55 @@ var Stomp = module.exports = function() {
return;
}
}

socket.setTimeout(options.timeout);
socket.setKeepAlive(options.keepAlive);

setupListeners();
});
}
else {
log.debug('Connecting to ' + host + ':' + port);
socket = new net.Socket();
socket.connect(port, host);
setupListeners();
socket.setTimeout(options.timeout);
socket.setKeepAlive(options.keepAlive);
socket.connect(port, host);
}
_connected = true;
};

/**
* Removes sockets used to connects STOMP broker. Sets the connected flag to *false* upon completion.
* Emits "disconnected" upon completion.
* Removes sockets used to connects STOMP broker. Calls socket.end which emits an "end" event. The 'on end' handler
* calls disconnectFinish which destroys the socket
*
* @returns {void}
*/
var disconnect = function() {
if(!_initialized) {throw new Error(errMsgNotInitialized);}

log.debug('ending socket');
socket.end();
};

/**
* Finishes the disconnect process (destroys socket). Broken into two phases since disconnect called socket.end which its handler called
* disconnect...created an infinite loop
*
* Emits "disconnected" upon completion.
*
* @returns {void}
*/
var disconnectFinish = function() {
if(!_initialized) {throw new Error(errMsgNotInitialized);}

if (socket.readyState === 'readOnly') {
log.debug('destroying socket');
socket.destroy();
}

log.debug('disconnect called');
log.debug('disconnected emitted');
_connected = false;
emitWrapper("disconnected");
};

/**
@@ -288,25 +311,25 @@ var Stomp = module.exports = function() {
*/
var handleNewFrame = function(frame) {
switch (frame.command) {
case "MESSAGE":
if (isMessage(frame)) {
shouldRunMessageCallback(frame);
emitWrapper('message', frame);
}
break;
case "CONNECTED":
log.debug('Connected to STOMP');
session = frame.headers.session;
emitWrapper('connected');
break;
case "RECEIPT":
emitWrapper('receipt', frame.headers['receipt-id']);
break;
case "ERROR":
emitWrapper('error', frame);
break;
default:
log.debug("Could not parse command: " + frame.command);
case "MESSAGE":
if (isMessage(frame)) {
shouldRunMessageCallback(frame);
emitWrapper('message', frame);
}
break;
case "CONNECTED":
log.debug('Connected to STOMP');
session = frame.headers.session;
emitWrapper('connected');
break;
case "RECEIPT":
emitWrapper('receipt', frame.headers['receipt-id']);
break;
case "ERROR":
emitWrapper('error', frame);
break;
default:
log.debug("Could not parse command: " + frame.command);
}
};

@@ -323,11 +346,12 @@ var Stomp = module.exports = function() {
var setupListeners = function() {

socket.on('drain', function(data) {
log.debug('draining');
log.debug('on drain');
});

var buffer = '';
socket.on('data', function(chunk) {
log.debug('on data');
buffer += chunk;
var frames = buffer.split('\0\n');

@@ -352,21 +376,30 @@ var Stomp = module.exports = function() {
}
});

socket.on('timeout', function() {
log.debug("on timeout");
disconnect();
});


socket.on('end', function() {
log.debug("end");
log.debug("on end");
disconnectFinish();
});

socket.on('error', function(error) {
log.debug('on error');
log.error(error.stack + 'error name: ' + error.name);
emitWrapper("error", error);
});

socket.on('close', function(error) {
log.debug('disconnected');
log.debug('on close');
if (error) {
log.error('Disconnected with error: ' + error);
log.error('Closed with error: ' + error);
}
emitWrapper("disconnected", error);
log.debug('closed emitted');
emitWrapper("closed", error);
});

if (options.ssl) {
@@ -446,6 +479,7 @@ var Stomp = module.exports = function() {
* @returns {Object} returns the headers from the frame
*/
var parseHeaders = function(rawHeaders) {
log.debug('rawHeaders',rawHeaders);
var headers = {};
var headersSplit = rawHeaders.split('\n');

@@ -457,7 +491,16 @@ var Stomp = module.exports = function() {
headers[headerKey] = headerVal;
continue;
}
headers[header[0].trim()] = header[1].trim();

var header0 = header[0];
var header1 = header[1];
if (header0) {
header0 = header0.trim();
}
if (header1) {
header1 = header1.trim();
}
headers[header0] = header1;
}
return headers;
};
@@ -571,7 +614,7 @@ var Stomp = module.exports = function() {
if (frame.headers) {
if (frame.headers.login) {
log.debug('attempting to login with: ' + frame.headers.login);
log.debug('login passcode: ' + frame.headers.passcode); //TODO remove this
//log.debug('login passcode: ' + frame.headers.passcode); //TODO remove this
}
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
"queue",
"protocol"
],
"version": "0.2.0",
"version": "0.2.1",
"homepage": "https://github.com/benjaminws/stomp-js",
"author": "Benjamin W. Smith <benjaminws@just-another.net>",
"contributors": [