Skip to content
This repository was archived by the owner on Nov 28, 2018. It is now read-only.

Commit cc11fae

Browse files
committed
A couple of bug fixes, but it all works. Move the event emiter logic to a seperate method. Add some more concise examples and update the README. Oh, and hey look, a license
1 parent 24513c6 commit cc11fae

File tree

7 files changed

+189
-56
lines changed

7 files changed

+189
-56
lines changed

README

-1
This file was deleted.

README.md

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
stomp-js
2+
========
3+
4+
## Overview
5+
6+
An exercise with node.js to implement the STOMP protocol.
7+
The design & implementation is heavily inspired by [python-stomp](http://bitbucket.org/benjaminws/python-stomp/), with an evented twist.
8+
9+
This is merely the first iteration; a complete spike. I will be rewriting it from the ground up with BDD.
10+
11+
## Examples
12+
13+
### Consumer
14+
15+
#!/usr/bin/env node
16+
17+
var stomp = require('./lib/stomp');
18+
19+
var stomp_args = {
20+
port: 61613,
21+
host: 'localhost',
22+
debug: true
23+
}
24+
// Could also add..
25+
//{login: 'bah', password: 'bah'}
26+
27+
var client = new stomp.Stomp(stomp_args);
28+
29+
var headers = {
30+
destination: '/queue/test_stomp',
31+
ack: 'client'
32+
};
33+
34+
var messages = [];
35+
36+
client.connect();
37+
38+
client.on('connected', function() {
39+
client.subscribe(headers);
40+
console.log('Connected');
41+
});
42+
43+
client.on('message', function(message) {
44+
messages.push(message);
45+
});
46+
47+
client.on('error', function(error_frame) {
48+
console.log(error_frame.body);
49+
});
50+
51+
process.on('SIGINT', function() {
52+
console.log('\nConsumed ' + messages.length + ' messages');
53+
client.disconnect();
54+
});
55+
56+
### Producer
57+
58+
#!/usr/bin/env node
59+
60+
var stomp = require('./lib/stomp');
61+
62+
var num = process.argv[2];
63+
64+
var stomp_args = {
65+
port: 61613,
66+
host: 'localhost',
67+
debug: true
68+
}
69+
// Could also add..
70+
//{login: 'bah', password: 'bah'}
71+
72+
var client = new stomp.Stomp(stomp_args);
73+
74+
var queue = '/queue/test_stomp';
75+
76+
client.connect();
77+
78+
client.on('connected', function() {
79+
if (!num) num = 10;
80+
81+
for (var i = 0; i < num; i++) {
82+
console.log(i);
83+
client.send({
84+
'destination': queue,
85+
'body': 'Testing' + i,
86+
'persistent': 'true'
87+
});
88+
}
89+
client.disconnect();
90+
});
91+
92+
client.on('error', function(error_frame) {
93+
console.log(error_frame.body);
94+
client.disconnect();
95+
});
96+
97+
process.on('SIGINT', function() {
98+
client.disconnect();
99+
});

lib/frame.js

+8-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ var sys = require('sys'),
22
utils = require('./stomp-utils'),
33
exceptions = require('./stomp-exceptions');
44

5-
Frame = module.exports = function(logger) {
5+
function Frame(logger) {
66
this.sock = null;
77
this.command = null;
88
this.headers = null;
@@ -27,7 +27,6 @@ Frame.prototype.stomp_connect = function(client) {
2727
frame_to_send = this.build_frame(args, true);
2828
parsed_frame = this.send_frame(frame_to_send);
2929
this.stomp_log.debug('Connected to STOMP');
30-
this.stomp_log.debug('headers: ' + sys.inspect(this.headers));
3130
if ('session' in this.headers)
3231
this.session = this.headers['session'];
3332

@@ -63,11 +62,10 @@ Frame.prototype.as_string = function() {
6362
body = this.body;
6463

6564
for (var header in headers) {
66-
header_strs.push(header + ':' + headers[header] + '\n');
65+
header_strs.push(header + ': ' + headers[header]);
6766
}
6867

6968
frame = command + '\n' + header_strs.join('\n') + '\n\n' + body + '\x00';
70-
7169
return frame;
7270
};
7371

@@ -103,8 +101,9 @@ Frame.prototype.parse_frame = function(data) {
103101
args['headers'] = this.headers;
104102
args['body'] = this.body;
105103

106-
this_frame = new Frame(this.sock);
107-
return_frame = this_frame.build_frame(args);
104+
var this_frame = new Frame(this.sock);
105+
var return_frame = this_frame.build_frame(args);
106+
108107
return return_frame;
109108
};
110109

@@ -166,8 +165,10 @@ Frame.prototype.get_message = function() {
166165
if (!this.utils.really_defined(this_frame))
167166
return null;
168167
if (this_frame.command == "MESSAGE")
169-
return frame;
168+
return this_frame;
170169
else
171170
this.rqueue.put(this_frame);
172171
}
173172
};
173+
174+
module.exports.Frame = Frame;

lib/stomp-utils.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ StompQueue.prototype.get = function() {
5454

5555
if (++this.queue_space * 2 >= this.queue.length) {
5656
this.queue = this.queue.slice(this.queue_space);
57-
console.log(this.queue);
57+
console.log("Queue: " + this.queue);
5858
this.queue_space = 0;
5959
}
6060
}
@@ -83,7 +83,7 @@ IntermediateQueue = exports.IntermediateQueue = function() {
8383
IntermediateQueue.prototype.put = function(frame) {
8484
if (!"destination" in frame.headers)
8585
return;
86-
console.log(frame.headers)
86+
console.log("Queue headers: " + frame.headers)
8787
this._queue.put(frame);
8888
}
8989

lib/stomp.js

+51-37
Original file line numberDiff line numberDiff line change
@@ -4,66 +4,78 @@ var net = require('net'),
44
utils = require('./stomp-utils'),
55
exceptions = require('./stomp-exceptions');
66

7-
function Stomp(port, host, debug) {
7+
function Stomp(args) {
88
this.socket = null;
9-
this.port = port || 61613;
10-
this.host = host || "127.0.0.1";
9+
this.port = args['port'] || 61613;
10+
this.host = args['host'] || "127.0.0.1";
1111
this.socket_connected = false;
12-
this.debug = debug;
13-
this.stomp_log = new StompLogging(debug);
14-
this.frame_object = new Frame(this.stomp_log);
12+
this.debug = args['debug'];
13+
this.stomp_log = new StompLogging(this.debug);
14+
this.frame_object = new frame.Frame(this.stomp_log);
1515
this.utils = new StompUtils();
1616
this.connected_frame = null;
17-
this._subscribed_to = [];
18-
}
17+
this._subscribed_to = {};
18+
};
1919

2020
Stomp.prototype = new process.EventEmitter();
2121

2222
Stomp.prototype.connect = function(headers) {
2323
var self = this;
2424
this.stomp_log.debug('Connecting to ' + this.host + ':' + this.port);
2525
socket = net.createConnection(this.port, this.host);
26+
socket.setEncoding('ascii');
2627
socket.setTimeout(0);
2728
socket.setNoDelay(true);
2829

29-
socket.addListener('connect', function () {
30+
socket.addListener('connect', function() {
3031
self.stomp_log.debug('Connected to socket');
3132
self.connected_frame = self.frame_object.stomp_connect(self.socket);
3233
});
3334

34-
socket.addListener('data', function (data) {
35-
var reply_frame = self.connected_frame.parse_frame(data)
36-
if ('session' in reply_frame.headers)
37-
self.connected_frame.session = reply_frame.headers['session'];
38-
39-
switch (reply_frame.command) {
40-
case "CONNECTED":
41-
self.emit('connected');
42-
break;
43-
case "MESSAGE":
44-
self.emit('message', reply_frame);
45-
break;
46-
case "RECEIPT":
47-
self.emit('receipt', reply_frame.headers['receipt']);
48-
break;
49-
case "ERROR":
50-
self.emit('error', reply_frame);
51-
break;
35+
socket.addListener('drain', function() {
36+
37+
});
38+
39+
socket.addListener('data', function(data) {
40+
var frames = data.split('\0\n');
41+
var reply_frame = null;
42+
var frame = null;
43+
while (frame = frames.shift()) {
44+
reply_frame = self.connected_frame.parse_frame(frame + '\0\n')
45+
self.handle_new_frame(reply_frame);
5246
}
5347
});
54-
socket.addListener('end', function () {
48+
socket.addListener('end', function() {
5549
self.stomp_log.debug('goodbye');
5650
});
57-
socket.addListener('error', function (error) {
51+
socket.addListener('error', function(error) {
5852
console.log('error: ' + error.stack);
5953
});
60-
socket.addListener('close', function (error) {
54+
socket.addListener('close', function(error) {
6155
self.stomp_log.debug('disconnected');
6256
});
6357

6458
this.socket = socket;
6559
};
6660

61+
Stomp.prototype.handle_new_frame = function(this_frame) {
62+
63+
switch (this_frame.command) {
64+
case "MESSAGE":
65+
this.emit('message', this_frame);
66+
break;
67+
case "CONNECTED":
68+
this.emit('connected');
69+
break;
70+
case "RECEIPT":
71+
this.emit('receipt', this_frame.headers['receipt']);
72+
break;
73+
case "ERROR":
74+
this.emit('error', this_frame);
75+
break;
76+
}
77+
};
78+
6779
Stomp.prototype.disconnect = function() {
6880

6981
this.socket.end();
@@ -77,7 +89,7 @@ Stomp.prototype.subscribe = function(headers, callback) {
7789
destination = headers['destination'];
7890
this._send_command('SUBSCRIBE', headers);
7991
this._subscribed_to[destination] = callback;
80-
this.stomp_log.debug('subscribed to: ' + destination);
92+
this.stomp_log.debug('subscribed to: ' + destination + ' with headers ' + sys.inspect(headers));
8193

8294
};
8395

@@ -100,23 +112,25 @@ Stomp.prototype.ack = function(message_id) {
100112
Stomp.prototype.send = function(headers) {
101113

102114
destination = headers['destination'];
115+
body = headers['body'] || null;
103116
headers['want_receipt'] = true;
104117

105-
return this._send_command('SEND', headers)
118+
return this._send_command('SEND', headers, body)
106119
};
107120

108-
Stomp.prototype._send_command = function(command, headers, extra) {
121+
Stomp.prototype._send_command = function(command, headers, body) {
109122

110123
if (!this.utils.really_defined(headers))
111-
headers = [];
124+
headers = {};
112125

113126
frame_conf = {
114127
'command': command,
115-
'headers': headers
128+
'headers': headers,
129+
'body': body
116130
};
117131

118-
this_frame = this.connected_frame.build_frame(frame_conf);
119-
reply = this.connected_frame.send_frame(this_frame);
132+
var this_frame = this.connected_frame.build_frame(frame_conf);
133+
var reply = this.connected_frame.send_frame(this_frame);
120134

121135
if ('want_receipt' in headers)
122136
return reply;

stomp-consumer.js

100644100755
+18-5
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,19 @@
22

33
var stomp = require('./lib/stomp');
44

5-
//{'login': 'bah', 'password': 'bah'}
6-
var client = new stomp.Stomp(61613, 'localhost', true);
5+
var stomp_args = {
6+
port: 61613,
7+
host: 'localhost',
8+
debug: true
9+
}
10+
// Could also add..
11+
//{login: 'bah', password: 'bah'}
12+
13+
var client = new stomp.Stomp(stomp_args);
714

815
var headers = {
9-
'destination': '/queue/test',
10-
'ack': 'auto'
16+
destination: '/queue/test_stomp',
17+
ack: 'client'
1118
};
1219

1320
var messages = [];
@@ -20,14 +27,20 @@ client.on('connected', function() {
2027
});
2128

2229
client.on('message', function(message) {
30+
if (!client.utils.really_defined(message.headers['message-id'])) {
31+
console.log(message);
32+
return;
33+
}
34+
client.ack(message.headers['message-id']);
2335
messages.push(message);
2436
});
2537

2638
client.on('error', function(error_frame) {
2739
console.log(error_frame.body);
40+
client.disconnect();
2841
});
2942

3043
process.on('SIGINT', function() {
31-
console.log(messages.length);
44+
console.log('\nConsumed ' + messages.length + ' messages');
3245
client.disconnect();
3346
});

0 commit comments

Comments
 (0)