Skip to content
This repository was archived by the owner on Nov 28, 2018. It is now read-only.

Make STOMP sockets not global #3

Merged
2 commits merged into from Feb 17, 2011
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions lib/frame.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,24 @@ Frame.prototype.build_frame = function(args, want_receipt) {
Frame.prototype.as_string = function() {
var self = this,
header_strs = [],
frame = null,
frame = "",
command = this.command,
headers = this.headers,
body = this.body;

for (var header in headers) {
header_strs.push(header + ': ' + headers[header]);
header_strs.push(header + ':' + headers[header]);
}

frame = command + '\n' + header_strs.join('\n') + '\n\n' + body + '\x00';
frame += command + "\n";
frame += header_strs.join('\n');
frame += "\n";

if(body)
frame += '\n' + body + '\x00';

frame += '\n\x00';

return frame;
};

Expand Down
44 changes: 23 additions & 21 deletions lib/stomp.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var net = require('net'),
stomp_utils = require('./stomp-utils'),
exceptions = require('./stomp-exceptions'),
utils = new stomp_utils.StompUtils(),
socket = net.Stream(),
log = null;

function parse_command(data) {
Expand Down Expand Up @@ -92,18 +91,19 @@ function parse_frame(chunk) {
};

function _connect(stomp) {
var socket = stomp.socket = net.Stream();
log = stomp.log;
log.debug('Connecting to ' + stomp.host + ':' + stomp.port);
socket.connect(stomp.port, stomp.host);

socket.on('connect', function() {
log.debug('Connected to socket');
if (utils.really_defined(stomp.login)
&& utils.really_defined(stomp.passcode)) {
stomp_connect({login: stomp.login, passcode: stomp.passcode});
if (utils.really_defined(stomp.login) &&
utils.really_defined(stomp.passcode)) {
stomp_connect(stomp, {login: stomp.login, passcode: stomp.passcode});
}
else {
stomp_connect();
stomp_connect(stomp);
}
});

Expand All @@ -117,8 +117,6 @@ function _connect(stomp) {
var frames = buffer.split('\0\n');
var parsed_frame = null;
var _frame = null;
buffer = frames.pop();

while (_frame = frames.shift()) {
parsed_frame = parse_frame(_frame + '\0\n')
stomp.handle_new_frame(parsed_frame);
Expand All @@ -138,10 +136,11 @@ function _connect(stomp) {
log.debug('disconnected');
if (error)
log.error('Disconnected with error: ' + error);
stomp.emit("disconnected");
});
};

function stomp_connect(headers) {
function stomp_connect(stomp, headers) {
var _frame = new frame.Frame(),
args = {},
headers = headers || {};
Expand All @@ -150,18 +149,20 @@ function stomp_connect(headers) {
args['headers'] = headers;

var frame_to_send = _frame.build_frame(args);
send_frame(frame_to_send);

send_frame(stomp, frame_to_send);
log.debug('Connected to STOMP');
};

function _disconnect() {
function _disconnect(stomp) {
var socket = stomp.socket;
socket.end();
if (socket.readyState == 'readOnly')
socket.destroy();
log.debug('disconnect called');
};

function send_command(command, headers, body, want_receipt) {
function send_command(stomp, command, headers, body, want_receipt) {
var want_receipt = want_receipt || false;
if (!utils.really_defined(headers))
headers = {};
Expand All @@ -174,12 +175,13 @@ function send_command(command, headers, body, want_receipt) {

var _frame = new frame.Frame();
var this_frame = _frame.build_frame(args, want_receipt);
send_frame(this_frame);
send_frame(stomp, this_frame);

return this_frame;
};

function send_frame(_frame) {
function send_frame(stomp, _frame) {
var socket = stomp.socket;
var frame_str = _frame.as_string();

if (socket.write(frame_str) === false) {
Expand Down Expand Up @@ -246,7 +248,7 @@ Stomp.prototype.handle_new_frame = function(this_frame) {
* Disconnect from STOMP broker
*/
Stomp.prototype.disconnect = function() {
_disconnect();
_disconnect(this);
}

/**
Expand All @@ -257,7 +259,7 @@ Stomp.prototype.subscribe = function(headers) {
var self = this;
destination = headers['destination'];
headers['session'] = self.session;
send_command('SUBSCRIBE', headers);
send_command(this, 'SUBSCRIBE', headers);
self._subscribed_to[destination] = true;
self.log.debug('subscribed to: ' + destination + ' with headers ' + sys.inspect(headers));
};
Expand All @@ -270,7 +272,7 @@ Stomp.prototype.unsubscribe = function(headers) {
var self = this;
destination = headers['destination'];
headers['session'] = self.session;
send_command('UNSUBSCRIBE', headers);
send_command(this, 'UNSUBSCRIBE', headers);
self._subscribed_to[destination] = false;
self.log.debug('no longer subscribed to: ' + destination);
};
Expand All @@ -281,7 +283,7 @@ Stomp.prototype.unsubscribe = function(headers) {
*/
Stomp.prototype.ack = function(message_id) {
var self = this;
send_command('ACK', {'message-id': message_id});
send_command(this, 'ACK', {'message-id': message_id});
self.log.debug('acknowledged message: ' + message_id);
};

Expand All @@ -292,7 +294,7 @@ Stomp.prototype.ack = function(message_id) {
Stomp.prototype.begin = function() {
var self = this;
transaction_id = Math.floor(Math.random()*99999999999).toString();
send_command('BEGIN', {'transaction': transaction_id});
send_command(this, 'BEGIN', {'transaction': transaction_id});
self.log.debug('begin transaction: ' + transaction_id);
return transaction_id;
};
Expand All @@ -303,7 +305,7 @@ Stomp.prototype.begin = function() {
*/
Stomp.prototype.commit = function(transaction_id) {
var self = this;
send_command('COMMIT', {'transaction': transaction_id});
send_command(this, 'COMMIT', {'transaction': transaction_id});
self.log.debug('commit transaction: ' + transaction_id);
};

Expand All @@ -313,7 +315,7 @@ Stomp.prototype.commit = function(transaction_id) {
*/
Stomp.prototype.abort = function(transaction_id) {
var self = this;
send_command('ABORT', {'transaction': transaction_id});
send_command(this, 'ABORT', {'transaction': transaction_id});
self.log.debug('abort transaction: ' + transaction_id);
};

Expand All @@ -328,7 +330,7 @@ Stomp.prototype.send = function(headers, want_receipt) {
destination = headers['destination'];
body = headers['body'] || null;
headers['session'] = self.session;
return send_command('SEND', headers, body, want_receipt)
return send_command(this, 'SEND', headers, body, want_receipt)
};

module.exports.Stomp = Stomp;