Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

WIP:

- Adding basic protocol version support
- FrameEmitter emits and additional event using frame command as identifier
- Refactored file locations and added NPM package info/dependencies
- Added some tests (more to follow)
  • Loading branch information...
commit e04e21cb3be0e0c06f38415b92c92219496fc8f1 1 parent 5a64a38
Ryan Grenz grenzr authored
1  .gitignore
View
@@ -0,0 +1 @@
+/node_modules
91 client.js → lib/client.js
View
@@ -1,36 +1,45 @@
var net = require('net');
-var fs = require('fs');
var sys = require('sys');
var events = require('events');
-var crypto = require('crypto');
var StompFrame = require('./frame').StompFrame;
var StompFrameEmitter = require('./parser').StompFrameEmitter;
-var StompServerCommands = [
- 'CONNECTED',
- 'MESSAGE',
- 'RECEIPT',
- 'ERROR',
-];
+var StompServerCommands = {
+ '1.0': [
+ 'CONNECTED',
+ 'MESSAGE',
+ 'RECEIPT',
+ 'ERROR'
+ ]
+};
-function StompClient(address, port, user, pass) {
+function StompClient(address, port, user, pass, protocolVersion) {
events.EventEmitter.call(this);
+ this.user = (user || '');
+ this.pass = (pass || '');
+ this.address = (address || '127.0.0.1');
+ //TODO Check what the default stomp port is
+ this.port = (port || 2098);
+ this.version = (protocolVersion || '1.0');
+ this.subscriptions = {};
+ this._stompFrameEmitter = new StompFrameEmitter(StompServerCommands[this.version]);
+ return this;
+}
+
+sys.inherits(StompClient, events.EventEmitter);
+
+StompClient.prototype.connect = function() {
var self = this;
- self.user = user;
- self.pass = pass;
- self.subscriptions = {};
- self.stream = net.createConnection(port, address);
+ self.stream = net.createConnection(self.port, self.address);
self.stream.on('connect', function() {
self.onConnect();
});
};
-sys.inherits(StompClient, events.EventEmitter);
-
StompClient.prototype.onConnect = function() {
var self = this;
- // First set up the frame parser
- var frameEmitter = new StompFrameEmitter(StompServerCommands);
+ // First set up the frame par ser
+ var frameEmitter = self._stompFrameEmitter;
self.stream.on('data', function (data) {
frameEmitter.handleData(data);
@@ -41,20 +50,30 @@ StompClient.prototype.onConnect = function() {
self.emit('disconnect');
});
- // Listen for events on it
- frameEmitter.on('frame', function(frame) {
- console.log('Received Frame: ' + frame);
- if (frame.command == 'MESSAGE') {
- self.subscriptions[frame.headers.destination].map(function(callback) {
- callback(frame.body, frame.headers);
- });
- }
- if (frame.command == 'CONNECTED') {
- self.emit('connect', frame.headers.session);
- }
+ // // Listen for events on it
+ // frameEmitter.on('frame', function(frame) {
+ // console.log('Received Frame: ' + frame);
+ // if (frame.command == 'MESSAGE') {
+ // self.subscriptions[frame.headers.destination].map(function(callback) {
+ // callback(frame.body, frame.headers);
+ // });
+ // }
+ // if (frame.command == 'CONNECTED') {
+ // self.emit('connect', frame.headers.session);
+ // }
+ // });
+
+ frameEmitter.on('MESSAGE', function(frame) {
+ self.subscriptions[frame.headers.destination].map(function(callback) {
+ callback(frame.body, frame.headers);
+ });
});
- frameEmitter.on('error', function(err) {
+ frameEmitter.on('CONNECTED', function(frame) {
+ self.emit('connect', frame.headers.session);
+ });
+
+ frameEmitter.on('parseError', function(err) {
console.log('Error Parsing Message: ' + err['message']);
});
@@ -63,8 +82,8 @@ StompClient.prototype.onConnect = function() {
command: 'CONNECT',
headers: {
'login': self.user,
- 'passcode': self.pass,
- },
+ 'passcode': self.pass
+ }
}).send(self.stream);
};
@@ -74,8 +93,8 @@ StompClient.prototype.subscribe = function(queue, callback) {
new StompFrame({
command: 'SUBSCRIBE',
headers: {
- destination: queue,
- },
+ destination: queue
+ }
}).send(this.stream);
}
this.subscriptions[queue].push(callback);
@@ -85,9 +104,9 @@ StompClient.prototype.publish = function(queue, message) {
new StompFrame({
command: 'SEND',
headers: {
- destination: queue,
+ destination: queue
},
- body: message,
+ body: message
}).send(this.stream);
};
@@ -104,7 +123,7 @@ function SecureStompClient(address, port, user, pass, credentials) {
self.stream.on('secure', function() {
self.onConnect();
});
-};
+}
sys.inherits(SecureStompClient, StompClient);
6 frame.js → lib/frame.js
View
@@ -1,18 +1,18 @@
function StompFrame(frame) {
- if (frame == undefined) {
+ if (frame === undefined) {
frame = {};
}
this.command = frame.command || '';
this.headers = frame.headers || {};
this.body = frame.body || '';
this.contentLength = -1;
-};
+}
StompFrame.prototype.toString = function() {
return JSON.stringify({
command: this.command,
headers: this.headers,
- body: this.body,
+ body: this.body
});
};
16 parser.js → lib/parser.js
View
@@ -6,7 +6,7 @@ var ParserStates = {
COMMAND: 0,
HEADERS: 1,
BODY: 2,
- ERROR: 3,
+ ERROR: 3
};
function StompFrameEmitter(commands) {
@@ -16,7 +16,7 @@ function StompFrameEmitter(commands) {
this.frames = [];
this.buffer = '';
this.commands = commands;
-};
+}
sys.inherits(StompFrameEmitter, events.EventEmitter);
@@ -59,18 +59,18 @@ StompFrameEmitter.prototype.popLine = function () {
};
StompFrameEmitter.prototype.error = function (err) {
- this.emit('error', err);
+ this.emit('parseError', err);
this.state = ParserStates.ERROR;
};
StompFrameEmitter.prototype.parseCommand = function() {
while (this.hasLine()) {
var line = this.popLine();
- if (line != '') {
+ if (line !== '') {
if (this.commands.indexOf(line) == -1) {
this.error({
message: 'No such command',
- details: 'Unrecognized Command \'' + line + '\'',
+ details: 'Unrecognized Command \'' + line + '\''
});
break;
}
@@ -84,7 +84,7 @@ StompFrameEmitter.prototype.parseCommand = function() {
StompFrameEmitter.prototype.parseHeaders = function() {
while (this.hasLine()) {
var line = this.popLine();
- if (line == '') {
+ if (line === '') {
this.incrementState();
break;
}
@@ -93,7 +93,7 @@ StompFrameEmitter.prototype.parseHeaders = function() {
if (kv.length != 2) {
this.error({
message: 'Error parsing header',
- details: 'No ":" in line "' + line + '"',
+ details: 'No ":" in line "' + line + '"'
});
break;
}
@@ -124,6 +124,8 @@ StompFrameEmitter.prototype.parseBody = function() {
this.frame.appendToBody(this.buffer.slice(0, index));
// Emit the frame and reset
this.emit('frame', this.frame);
+ this.emit(this.frame.command, this.frame);
+
this.frame = new StompFrame();
this.incrementState();
this.buffer = this.buffer.substr(index + 1);
0  server.js → lib/server.js
View
File renamed without changes
21 package.json
View
@@ -0,0 +1,21 @@
+{
+ "author": "Ben Marvell, Ryan Grenz",
+ "contributors": [
+ { "name": "Ben Marvell", "email": "Ben.Marvell@bskyb.com" },
+ { "name": "Ryan Grenz", "email": "Ryan.Grenz@bskyb.com" }
+ ]
+ "name": "stomp-client",
+ "description": "A STOMP protocol implementation in node.js",
+ "version": "0.0.0",
+ "repository": {
+ "url": ""
+ },
+ "main" : "lib/client.js",
+ "engines": {
+ "node": "~0.4.x"
+ },
+ "dependencies": {},
+ "devDependencies": {
+ "nodeunit": "0.6.4"
+ }
+}
99 test/client.test.js
View
@@ -0,0 +1,99 @@
+var sys = require('sys'),
+ Events = require('events').EventEmitter,
+ nodeunit = require('nodeunit'),
+ testCase = require('nodeunit').testCase;
+
+var StompClient = require('../lib/client').StompClient;
+
+//mockage
+var net = require('net');
+var StompFrame = require('../lib/frame').StompFrame;
+
+var assertBack = function() {};
+
+// Override StompFrame send function to allow inspection of frame data inside a test
+StompFrame.prototype.send = function(stream) {
+ assertBack(this);
+};
+
+// Mock net object so we never try to send any real data
+var connectionObserver = new Events();
+
+net.createConnection = function() {
+ return connectionObserver;
+};
+
+connectionObserver.write = function(data) {
+ //Supress writes
+};
+
+module.exports = testCase({
+
+ setUp: function(callback) {
+ this.stompClient = new StompClient('127.0.0.1', 2098, 'user', 'pass', '1.0');
+ callback();
+ },
+
+ tearDown: function(callback) {
+ delete this.stompClient;
+ connectionObserver = new Events();
+ assertBack = function() {};
+ callback();
+ },
+
+ 'check default properties are correctly set on a basic StompClient': function(test) {
+ var stompClient = new StompClient();
+
+ test.equal(stompClient.user, '');
+ test.equal(stompClient.pass, '');
+ test.equal(stompClient.address, '127.0.0.1');
+ test.equal(stompClient.port, 2098);
+ test.equal(stompClient.version, '1.0');
+
+ test.done();
+ },
+
+
+
+ 'check CONNECT StompFrame is correctly formed': function(test) {
+ assertBack = function(stompFrame) {
+ test.equal(stompFrame.command, 'CONNECT');
+ test.deepEqual(stompFrame.headers, {
+ login: 'user',
+ passcode: 'pass'
+ });
+ test.equal(stompFrame.body, '');
+ test.equal(stompFrame.contentLength, -1);
+
+ test.done();
+ };
+
+ //start the test
+ this.stompClient.connect();
+ connectionObserver.emit('connect');
+ },
+
+ 'check inbound CONNECTED frame parses correctly': function(test) {
+ var self = this;
+ var testId = '1234';
+ assertBack = function() {
+ self.stompClient.stream.emit('data', 'CONNECTED\nsession:' + testId + '\n\n\0');
+ };
+
+ this.stompClient._stompFrameEmitter.on('CONNECTED', function (stompFrame) {
+ test.equal(testId, stompFrame.headers.session);
+ test.done();
+ });
+
+
+ //start the test
+ this.stompClient.connect();
+ connectionObserver.emit('connect');
+ },
+
+ '': function(test) {
+ test.done();
+ }
+
+
+});
Please sign in to comment.
Something went wrong with that request. Please try again.