Skip to content
This repository was archived by the owner on Nov 28, 2018. It is now read-only.

Commit 7cb2b24

Browse files
committed
Well, it kinda works. Calling it a night.
1 parent 116659a commit 7cb2b24

File tree

4 files changed

+86
-10
lines changed

4 files changed

+86
-10
lines changed

lib/frame.js

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,30 @@ var sys = require('sys'),
22
utils = require('./stomp-utils'),
33
exceptions = require('./stomp-exceptions');
44

5-
65
Frame = module.exports = function(logger) {
7-
this.connected = false;
86
this.sock = null;
97
this.command = null;
108
this.headers = null;
119
this.body = null;
10+
this.session = null;
1211
this.stomp_log = logger;
12+
this.rqueue = new StompQueue();
13+
this.iqueue = new IntermediateQueue();
1314
};
1415

1516
Frame.prototype.stomp_connect = function(client) {
1617
this.sock = client;
1718
var args = {};
1819
var headers = {};
20+
var next_frame = null;
1921

2022
args['command'] = 'CONNECT';
2123
args['headers'] = headers;
2224
frame_to_send = this.build_frame(args);
2325
this.send_frame(frame_to_send);
2426
this.stomp_log.debug('Connected to STOMP');
25-
this.connected = true;
27+
next_frame = this.get_reply();
28+
this.session = next_frame.headers;
2629
return this;
2730
};
2831

@@ -61,15 +64,18 @@ Frame.prototype.as_string = function() {
6164
Frame.prototype.send_frame = function(frame) {
6265
this.sock.write(frame.as_string());
6366

64-
if ('receipt' in frame.headers) {
67+
if ('receipt' in frame.headers)
6568
return this.get_reply();
66-
}
69+
6770
};
6871

6972
Frame.prototype.parse_frame = function(data) {
7073
var args = [],
7174
headers_str = null;
7275

76+
if (data == null || data == undefined)
77+
return;
78+
7379
this.command = this.parse_command(data);
7480
var _data = data.slice(this.command.length + 1, data.length);
7581
_data = _data.toString('utf8', start=0, end=_data.length);
@@ -114,7 +120,38 @@ Frame.prototype.parse_headers = function(headers_str) {
114120

115121
Frame.prototype.parse_command = function(data) {
116122
var command,
117-
this_string = data.toString('ascii', start=0, end=data.length);
123+
this_string = data.toString('utf8', start=0, end=data.length);
118124
command = this_string.split('\n');
119125
return command[0];
120126
};
127+
128+
Frame.prototype.get_reply = function() {
129+
while (true) {
130+
try {
131+
return this.rqueue.get()
132+
}
133+
catch (error) {
134+
if (error.name == "QueueEmpty") {
135+
frame = this.parse_frame();
136+
if (frame == null || frame == undefined)
137+
return null;
138+
if (frame.command == "MESSAGE")
139+
this.iqueue.put(frame);
140+
else
141+
this.rqueue.put(frame);
142+
}
143+
}
144+
}
145+
};
146+
147+
Frame.prototype.get_message = function() {
148+
while (true) {
149+
frame = this.rqueue.get()
150+
if (frame == null || frame == undefined)
151+
return null;
152+
if (frame.command == "MESSAGE")
153+
return frame;
154+
else
155+
this.rqueue.put(frame);
156+
}
157+
};

lib/stomp-exceptions.js

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
11
BrokerErrorResponse = exports.BrokerErrorResponse = function(value) {
2-
32
this.value = value;
4-
this.message = "Broker returned error: " + this.value
5-
console.error(this.message + this.value);
3+
this.message = "Broker returned error: " + this.value;
4+
};
65

6+
BrokerErrorResponse.prototype.toString = function() {
7+
return this.messge + this.value;
78
};
89

10+
QueueEmpty = exports.QueueEmpty = function() {
11+
this.name = "QueueEmpty";
12+
this.message = "Queue is Empty";
13+
};
14+
15+
QueueEmpty.prototype.toString = function() {
16+
return this.message;
17+
};

lib/stomp-utils.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
var exceptions = require('./stomp-exceptions');
2+
13
StompLogging = exports.StompLogging = function(should_debug) {
24
this.should_debug = should_debug;
35
};
@@ -37,9 +39,14 @@ StompQueue.prototype.get = function() {
3739

3840
if (++this.queue_space * 2 >= this.queue.length) {
3941
this.queue = this.queue.slice(this.queue_space);
42+
console.log(this.queue);
4043
this.queue_space = 0;
4144
}
4245
}
46+
47+
if (item == undefined)
48+
throw new QueueEmpty();
49+
4350
return item;
4451
};
4552

@@ -48,6 +55,29 @@ StompQueue.prototype.get_oldest_item = function() {
4855

4956
if (this.queue.length)
5057
item = this.queue[this.queue_space];
58+
else
59+
throw new QueueEmpty();
5160

5261
return item;
5362
};
63+
64+
IntermediateQueue = exports.IntermediateQueue = function() {
65+
this._queue = new StompQueue();
66+
};
67+
68+
IntermediateQueue.prototype.put = function(frame) {
69+
if (!"destination" in frame.headers)
70+
return;
71+
this._queue.put(frame);
72+
}
73+
74+
IntermediateQueue.prototype.get = function(frame) {
75+
try {
76+
this._queue.get();
77+
}
78+
catch (error) {
79+
if (error.name == "QueueName") {
80+
return frame.parse_frame();
81+
}
82+
}
83+
}

lib/stomp.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Stomp.prototype.connect = function() {
2929
_stomp.stomp_log.debug('goodbye');
3030
});
3131
client.addListener('error', function (error) {
32-
console.log('error: ' + error);
32+
console.log('error: ' + error.stack);
3333
});
3434
client.addListener('close', function (error) {
3535
_stomp.stomp_log.debug('disconnected');

0 commit comments

Comments
 (0)