Skip to content

Commit dc5c83e

Browse files
committed
Oh hey, let's start working on this again! Functioning producer.
1 parent 7cb2b24 commit dc5c83e

File tree

4 files changed

+178
-53
lines changed

4 files changed

+178
-53
lines changed

lib/frame.js

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,56 +3,67 @@ var sys = require('sys'),
33
exceptions = require('./stomp-exceptions');
44

55
Frame = module.exports = function(logger) {
6+
this.me = Math.floor(Math.random()*10000000+1);
67
this.sock = null;
78
this.command = null;
89
this.headers = null;
910
this.body = null;
11+
this.reply = null;
1012
this.session = null;
1113
this.stomp_log = logger;
1214
this.rqueue = new StompQueue();
1315
this.iqueue = new IntermediateQueue();
16+
this.utils = new StompUtils();
1417
};
1518

1619
Frame.prototype.stomp_connect = function(client) {
17-
this.sock = client;
18-
var args = {};
19-
var headers = {};
20-
var next_frame = null;
20+
var self = this,
21+
args = {},
22+
headers = {}
23+
next_frame = null;
2124

25+
this.sock = client;
2226
args['command'] = 'CONNECT';
2327
args['headers'] = headers;
24-
frame_to_send = this.build_frame(args);
25-
this.send_frame(frame_to_send);
28+
frame_to_send = this.build_frame(args, true);
29+
parsed_frame = this.send_frame(frame_to_send);
2630
this.stomp_log.debug('Connected to STOMP');
27-
next_frame = this.get_reply();
28-
this.session = next_frame.headers;
31+
this.stomp_log.debug('headers: ' + sys.inspect(this.headers));
32+
if ('session' in this.headers)
33+
this.session = this.headers['session'];
34+
2935
return this;
3036
};
3137

3238
Frame.prototype.build_frame = function(args, want_receipt) {
39+
var self = this,
40+
receipt_stamp = null;
41+
3342
this.command = args['command'];
3443
this.headers = args['headers'];
3544
this.body = args['body'];
36-
var receipt_stamp = null;
3745

3846
if (want_receipt) {
3947
receipt_stamp = Math.floor(Math.random()*10000000+1);
40-
this.headers['receipt'] = this.session['session'] + '-' + receipt_stamp;
41-
this.stomp_log.debug(want_receipt);
48+
if (this.session != null) {
49+
this.headers['receipt'] = receipt_stamp + "-" + this.session;
50+
}
51+
else {
52+
this.headers['receipt'] = receipt_stamp + "-";
53+
}
4254
}
43-
4455
return this;
4556
};
4657

4758
Frame.prototype.as_string = function() {
48-
var header_strs = Array(),
59+
var self = this,
60+
header_strs = Array(),
4961
frame = null,
5062
command = this.command,
5163
headers = this.headers,
5264
body = this.body;
5365

5466
for (var header in headers) {
55-
this.stomp_log.debug(header);
5667
header_strs.push(header + ':' + headers[header] + '\n');
5768
}
5869

@@ -62,6 +73,7 @@ Frame.prototype.as_string = function() {
6273
};
6374

6475
Frame.prototype.send_frame = function(frame) {
76+
self = this;
6577
this.sock.write(frame.as_string());
6678

6779
if ('receipt' in frame.headers)
@@ -70,11 +82,12 @@ Frame.prototype.send_frame = function(frame) {
7082
};
7183

7284
Frame.prototype.parse_frame = function(data) {
73-
var args = [],
85+
var self = this,
86+
args = [],
7487
headers_str = null;
7588

76-
if (data == null || data == undefined)
77-
return;
89+
if (!this.utils.really_defined(data))
90+
return null;
7891

7992
this.command = this.parse_command(data);
8093
var _data = data.slice(this.command.length + 1, data.length);
@@ -95,20 +108,23 @@ Frame.prototype.parse_frame = function(data) {
95108
args['body'] = this.body;
96109

97110
this_frame = new Frame(this.sock);
98-
return this_frame.build_frame(args);
111+
return_frame = this_frame.build_frame(args);
112+
return return_frame;
99113
};
100114

101115
Frame.prototype.parse_headers = function(headers_str) {
102-
var these_headers = Array(),
103-
one_header = Array(),
116+
var these_headers = new Array(),
117+
one_header = new Array(),
118+
header_key = null,
119+
header_val = null,
104120
headers_split = headers_str.split('\n');
105121

122+
106123
for (var i = 0; i < headers_split.length; i++) {
107124
one_header = headers_split[i].split(':');
108-
109125
if (one_header.length > 1) {
110-
var header_key = one_header.shift();
111-
var header_val = one_header.join(':');
126+
header_key = one_header.shift();
127+
header_val = one_header.join(':');
112128
these_headers[header_key] = header_val;
113129
}
114130
else {
@@ -126,32 +142,36 @@ Frame.prototype.parse_command = function(data) {
126142
};
127143

128144
Frame.prototype.get_reply = function() {
145+
var self = this;
146+
129147
while (true) {
130148
try {
131-
return this.rqueue.get()
149+
return self.rqueue.get()
132150
}
133151
catch (error) {
134152
if (error.name == "QueueEmpty") {
135-
frame = this.parse_frame();
136-
if (frame == null || frame == undefined)
153+
this_frame = self.reply;
154+
if (!self.utils.really_defined(this_frame))
137155
return null;
138-
if (frame.command == "MESSAGE")
139-
this.iqueue.put(frame);
156+
if (this_frame.command == "MESSAGE")
157+
return self.iqueue.put(this_frame);
140158
else
141-
this.rqueue.put(frame);
159+
return self.rqueue.put(this_frame);
142160
}
143161
}
144162
}
145163
};
146164

147165
Frame.prototype.get_message = function() {
166+
var self = this;
167+
148168
while (true) {
149-
frame = this.rqueue.get()
150-
if (frame == null || frame == undefined)
169+
this_frame = this.rqueue.get()
170+
if (!this.utils.really_defined(this_frame))
151171
return null;
152-
if (frame.command == "MESSAGE")
172+
if (this_frame.command == "MESSAGE")
153173
return frame;
154174
else
155-
this.rqueue.put(frame);
175+
this.rqueue.put(this_frame);
156176
}
157177
};

lib/stomp-utils.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,21 @@ StompLogging.prototype.debug = function(message) {
99
console.log("debug: " + message);
1010
};
1111

12+
StompUtils = exports.StompUtils = function() {
13+
this.available_utils = [];
14+
};
15+
16+
StompUtils.prototype.really_defined = function(var_to_test) {
17+
return !(var_to_test == null || var_to_test == undefined);
18+
};
19+
20+
// Extend associative array
21+
StompUtils.prototype.extend = function(destination, source) {
22+
for (var property in source)
23+
destination[property] = source[property];
24+
return destination;
25+
};
26+
1227
/**
1328
* Queueing implementation, blatantly ripped off from:
1429
* http://safalra.com/web-design/javascript/queues/Queue.js
@@ -68,6 +83,7 @@ IntermediateQueue = exports.IntermediateQueue = function() {
6883
IntermediateQueue.prototype.put = function(frame) {
6984
if (!"destination" in frame.headers)
7085
return;
86+
console.log(frame.headers)
7187
this._queue.put(frame);
7288
}
7389

lib/stomp.js

Lines changed: 91 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,110 @@
11
var net = require('net'),
2+
sys = require('sys'),
23
frame = require('./frame'),
34
utils = require('./stomp-utils'),
45
exceptions = require('./stomp-exceptions');
56

6-
Stomp = module.exports = function(port, host, debug) {
7-
this.port = port;
8-
this.host = host;
7+
function Stomp(port, host, debug, conf) {
8+
this.socket = null;
9+
this.port = port || 61613;
10+
this.host = host || "127.0.0.1";
911
this.socket_connected = false;
1012
this.debug = debug;
1113
this.stomp_log = new StompLogging(debug);
12-
this.frame = new Frame(this.stomp_log);
14+
this.frame_object = new Frame(this.stomp_log);
15+
this.utils = new StompUtils();
1316
this.connected_frame = null;
14-
};
17+
this._subscribed_to = {};
18+
}
19+
20+
Stomp.prototype = new process.EventEmitter();
1521

16-
Stomp.prototype.connect = function() {
22+
Stomp.prototype.connect = function(headers) {
23+
var self = this;
1724
this.stomp_log.debug('Connecting to ' + this.host + ':' + this.port);
18-
var client = net.createConnection(this.port, this.host);
19-
var _stomp = this;
25+
socket = net.createConnection(this.port, this.host);
26+
socket.setTimeout(0);
27+
socket.setNoDelay(true);
2028

21-
client.addListener('connect', function () {
22-
_stomp.stomp_log.debug('Connected to socket');
23-
_stomp.connected_frame = _stomp.frame.stomp_connect(client);
29+
socket.addListener('connect', function () {
30+
self.stomp_log.debug('Connected to socket');
31+
self.connected_frame = self.frame_object.stomp_connect(self.socket);
2432
});
25-
client.addListener('data', function (data) {
26-
_stomp.connected_frame.parse_frame(data);
33+
34+
socket.addListener('data', function (data) {
35+
var reply_frame = self.connected_frame.parse_frame(data)
36+
if ('session' in reply_frame.headers)
37+
self.connected_frame.session = reply_frame.headers['session'];
38+
39+
switch (reply_frame.command) {
40+
case "CONNECTED":
41+
self.emit('connected');
42+
break;
43+
case "MESSAGE":
44+
self.emit('message');
45+
break;
46+
case "RECEIPT":
47+
self.emit('receipt');
48+
break;
49+
case "ERROR":
50+
self.emit('error');
51+
break;
52+
}
2753
});
28-
client.addListener('end', function () {
29-
_stomp.stomp_log.debug('goodbye');
54+
socket.addListener('end', function () {
55+
self.stomp_log.debug('goodbye');
3056
});
31-
client.addListener('error', function (error) {
57+
socket.addListener('error', function (error) {
3258
console.log('error: ' + error.stack);
3359
});
34-
client.addListener('close', function (error) {
35-
_stomp.stomp_log.debug('disconnected');
60+
socket.addListener('close', function (error) {
61+
self.stomp_log.debug('disconnected');
3662
});
63+
64+
this.socket = socket;
65+
};
66+
67+
68+
Stomp.prototype.subscribe = function(headers, callback) {
69+
70+
destination = headers['destination'];
71+
this._send_command('SUBSCRIBE', headers);
72+
this._subscribed_to[destination] = callback;
73+
this.stomp_log.debug('subscribed to: ' + destination);
74+
75+
};
76+
77+
Stomp.prototype.unsubscribe = function(headers) {
78+
79+
destination = headers['destination'];
80+
this._send_command('UNSUBSCRIBE', headers);
81+
this._subscribed_to[destination] = null;
82+
this.stomp_log.debug('no longer subscribed to: ' + destination);
83+
3784
};
85+
86+
Stomp.prototype.send = function(headers) {
87+
88+
destination = headers['destination'];
89+
headers['want_receipt'] = true;
90+
91+
return this._send_command('SEND', headers)
92+
};
93+
94+
Stomp.prototype._send_command = function(command, headers, extra) {
95+
96+
if (!this.utils.really_defined(headers))
97+
headers = {};
98+
99+
frame_conf = {'command': command, 'headers': headers};
100+
this_frame = this.connected_frame.build_frame(frame_conf);
101+
reply = this.connected_frame.send_frame(this_frame);
102+
103+
if ('want_receipt' in headers)
104+
return reply;
105+
106+
return this_frame;
107+
108+
};
109+
110+
module.exports.Stomp = Stomp;

stomp-client.js

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,23 @@
11
#!/usr/bin/env node
22

3-
var Stomp = require('./lib/stomp');
3+
var stomp = require('./lib/stomp');
44

5-
var client = new Stomp(61613, 'localhost', true);
5+
var client = new stomp.Stomp(61613, 'localhost', true, {'login': 'bah', 'password': 'bah'});
6+
7+
var queue = '/queue/test';
8+
9+
var headers = {'destination': queue, 'ack': 'client'};
610

711
client.connect();
12+
13+
client.on('connected', function() {
14+
client.subscribe(headers);
15+
16+
for (var i = 0; i < 10; i++) {
17+
console.log(i);
18+
client.send({'destination': queue,
19+
'body': 'Testing' + i,
20+
'persistent': 'true'
21+
});
22+
}
23+
});

0 commit comments

Comments
 (0)