Skip to content
This repository was archived by the owner on Nov 28, 2018. It is now read-only.

Commit 9b2f5cf

Browse files
committed
added callback to subscribe function that will automatically be called when messages are placed on a given queue
1 parent 4184d0d commit 9b2f5cf

File tree

1 file changed

+14
-5
lines changed

1 file changed

+14
-5
lines changed

lib/stomp.js

+14-5
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ function _setupListeners(stomp) {
141141
buffer += chunk;
142142
var frames = buffer.split('\0\n');
143143

144-
// Temporary fix : NULL,LF is not a guranteed standard, the LF is optional, so lets deal with it. (Rauls)
144+
// Temporary fix : NULL,LF is not a guranteed standard, the LF is optional, so lets deal with it. (Rauls)
145145
if (frames.length == 1) {
146146
frames = buffer.split('\0');
147147
}
@@ -269,8 +269,17 @@ Stomp.prototype.handle_new_frame = function(this_frame) {
269269

270270
switch (this_frame.command) {
271271
case "MESSAGE":
272-
if (utils.really_defined(this_frame.headers['message-id']))
272+
if (utils.really_defined(this_frame.headers['message-id'])) {
273+
// if a subscription to the destination queue exists, fire callback
274+
if (this_frame.headers !== null && this_frame.headers.destination !== null && self._subscribed_to[this_frame.headers.destination] !== null) {
275+
var subscription = self._subscribed_to[this_frame.headers.destination];
276+
if (subscription.enabled && subscription.callback !== null && typeof(subscription.callback) == 'function') {
277+
subscription.callback(this_frame.body, this_frame.headers);
278+
}
279+
}
273280
self.emit('message', this_frame);
281+
}
282+
274283
break;
275284
case "CONNECTED":
276285
log.debug('Connected to STOMP');
@@ -299,12 +308,12 @@ Stomp.prototype.disconnect = function() {
299308
* Subscribe to destination (queue or topic)
300309
* @param {Object} headers
301310
*/
302-
Stomp.prototype.subscribe = function(headers) {
311+
Stomp.prototype.subscribe = function(headers, callback) {
303312
var self = this;
304313
destination = headers['destination'];
305314
headers['session'] = self.session;
306315
send_command(this, 'SUBSCRIBE', headers);
307-
self._subscribed_to[destination] = true;
316+
self._subscribed_to[destination] = { enabled: true, callback: callback };
308317
self.log.debug('subscribed to: ' + destination + ' with headers ' + sys.inspect(headers));
309318
};
310319

@@ -317,7 +326,7 @@ Stomp.prototype.unsubscribe = function(headers) {
317326
destination = headers['destination'];
318327
headers['session'] = self.session;
319328
send_command(this, 'UNSUBSCRIBE', headers);
320-
self._subscribed_to[destination] = false;
329+
self._subscribed_to[destination].enabled = false;
321330
self.log.debug('no longer subscribed to: ' + destination);
322331
};
323332

0 commit comments

Comments
 (0)