Skip to content

Commit

Permalink
Use Buffer#copy instead of inspecting every byte, and make the code a…
Browse files Browse the repository at this point in the history
… bit easier otherwise by using closures to encapsulate the parser state.
  • Loading branch information
squaremo authored and postwait committed Jan 10, 2012
1 parent c136497 commit 8e6c757
Showing 1 changed file with 81 additions and 93 deletions.
174 changes: 81 additions & 93 deletions amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,85 @@ function AMQPParser (version, type) {
}
})(); // end anon scope

this.frameHeader = new Buffer(7);
this.frameHeader.used = 0;
var frameHeader = new Buffer(7);
frameHeader.used = 0;
var frameBuffer, frameType, frameChannel;

var self = this;

function header(data) {
var fh = frameHeader;
var needed = fh.length - fh.used;
data.copy(fh, fh.used, 0, fh.length);
fh.used += data.length; // sloppy
if (fh.used >= fh.length) {
fh.read = 0;
frameType = fh[fh.read++];
frameChannel = parseInt(fh, 2);
var frameSize = parseInt(fh, 4);
fh.used = 0; // for reuse
if (frameSize > maxFrameBuffer) {
self.throwError("Oversized frame " + frameSize);
}
frameBuffer = new Buffer(frameSize);
frameBuffer.used = 0;
return frame(data.slice(needed));
}
else { // need more!
return header;
}
}

function frame(data) {
var fb = frameBuffer;
var needed = fb.length - fb.used;
data.copy(fb, 0, fb.used, fb.length);
if (data.length > needed) {
return frameEnd(data.slice(needed));
}
else if (data.length == needed) {
return frameEnd;
}
else {
return frame;
}
}

function frameEnd(data) {
if (data.length > 0) {
if (data[0] === 206) {
switch (frameType) {
case 1:
self._parseMethodFrame(frameChannel, frameBuffer);
break;
case 2:
self._parseHeaderFrame(frameChannel, frameBuffer);
break;
case 3:
if (self.onContent) {
self.onContent(frameChannel, frameBuffer);
}
break;
case 8:
debug("hearbeat");
if (self.onHeartBeat) self.onHeartBeat();
break;
default:
self.throwError("Unhandled frame type " + frameType);
break;
}
return header(data.slice(1));
}
else {
self.throwError("Missing frame end marker");
}
}
else {
return frameEnd;
}
}

self.parse = header;
}

// If there's an error in the parser, call the onError handler or throw
Expand All @@ -150,98 +227,9 @@ AMQPParser.prototype.throwError = function (error) {
// parsing.
AMQPParser.prototype.execute = function (data) {
// This function only deals with dismantling and buffering the frames.
// It delegats to other functions for parsing the frame-body.
// It delegates to other functions for parsing the frame-body.
debug('execute: ' + data.toString());
for (var i = 0; i < data.length; i++) {
switch (this.state) {
case 'frameHeader':
// Here we buffer the frame header. Remember, this is a fully
// interruptible parser - it could be (although unlikely)
// that we receive only several octets of the frame header
// in one packet.
this.frameHeader[this.frameHeader.used++] = data[i];

if (this.frameHeader.used == this.frameHeader.length) {
// Finished buffering the frame header - parse it
//var h = this.frameHeader.unpack("oonN", 0);

this.frameHeader.read = 0;
this.frameType = this.frameHeader[this.frameHeader.read++];
this.frameChannel = parseInt(this.frameHeader, 2);
this.frameSize = parseInt(this.frameHeader, 4);

this.frameHeader.used = 0; // for reuse

debug("got frame: " + JSON.stringify([ this.frameType
, this.frameChannel
, this.frameSize
]));

if (this.frameSize > maxFrameBuffer) {
this.throwError("Oversized frame " + this.frameSize);
}

// TODO use a free list and keep a bunch of 8k buffers around
this.frameBuffer = new Buffer(this.frameSize);
this.frameBuffer.used = 0;
this.state = 'bufferFrame';
}
break;

case 'bufferFrame':
// Buffer the entire frame. I would love to avoid this, but doing
// otherwise seems to be extremely painful.

// Copy the incoming data byte-by-byte to the buffer.
// FIXME This is slow! Can be improved with a memcpy binding.
if(this.frameSize > 0)
this.frameBuffer[this.frameBuffer.used++] = data[i];
else
i--; // the frame ending is actuall this frame (rewind 1)

if (this.frameBuffer.used == this.frameSize) {
// Finished buffering the frame. Parse the frame.
switch (this.frameType) {
case 1:
this._parseMethodFrame(this.frameChannel, this.frameBuffer);
break;

case 2:
this._parseHeaderFrame(this.frameChannel, this.frameBuffer);
break;

case 3:
if (this.onContent) {
this.onContent(this.frameChannel, this.frameBuffer);
}
break;

case 8:
debug("hearbeat");
if (this.onHeartBeat) this.onHeartBeat();
break;

default:
this.throwError("Unhandled frame type " + this.frameType);
break;
}
this.state = 'frameEnd';
}
break;

case 'frameEnd':
// Frames are terminated by a single octet.
if (data[i] != 206 /* constants.frameEnd */) {
debug('data[' + i + '] = ' + data[i].toString(16));
debug('data = ' + data.toString());
debug('frameHeader: ' + this.frameHeader.toString());
debug('frameBuffer: ' + this.frameBuffer.toString());
this.throwError("Oversized frame");
}
this.state = 'frameHeader';
break;
}
}
this.parse = this.parse(data);
};


Expand Down

0 comments on commit 8e6c757

Please sign in to comment.