Skip to content

Commit

Permalink
Merge pull request #10 from bkw/streamSpec
Browse files Browse the repository at this point in the history
implement full spec for writable streams
  • Loading branch information
bkw committed Oct 19, 2012
2 parents 1454069 + 996091b commit 274ed0d
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 38 deletions.
112 changes: 75 additions & 37 deletions lib/Reader.js
@@ -1,12 +1,12 @@
// vim: ts=2:sw=2

var util = require('util');
var EventEmitter = require('events').EventEmitter;
var Stream = require('stream').Stream;

module.exports = Reader;
util.inherits(Reader, EventEmitter);
util.inherits(Reader, Stream);
function Reader(options) {
EventEmitter.call(this);
Stream.call(this);

if (Buffer.isBuffer(options)) {
options = {buffer: options};
Expand All @@ -17,9 +17,9 @@ function Reader(options) {
this.writable = true;

this._buffer = options.buffer || new Buffer(0);
this._offset = options.offset || 0;
this._compact = options.compact || false;
this._paused = false;
this._setOffset(options.offset || 0);
}

Reader.prototype.write = function(newBuffer) {
Expand All @@ -28,10 +28,17 @@ Reader.prototype.write = function(newBuffer) {
// existing buffer has enough space in the beginning?
var reuseBuffer = (this._offset >= newBuffer.length);

if (!this.writable) {
var err = new Error('stream not writable');
err.code = 'EPIPE';
this.emit('error', err);
return false;
}

if (reuseBuffer) {
// move unread bytes forward to make room for the new
this._buffer.copy(this._buffer, this._offset - newBuffer.length, this._offset);
this._offset -= newBuffer.length;
this._moveOffset(- newBuffer.length);

// add the new bytes at the end
newBuffer.copy(this._buffer, this._buffer.length - newBuffer.length);
Expand All @@ -47,7 +54,7 @@ Reader.prototype.write = function(newBuffer) {
// copy the old and new buffer into it
oldBuffer.copy(this._buffer, 0, this._offset);
newBuffer.copy(this._buffer, bytesAhead);
this._offset = 0;
this._setOffset(0);
}

return !this._paused;
Expand All @@ -59,6 +66,7 @@ Reader.prototype.pause = function() {

Reader.prototype.resume = function() {
this._paused = false;
this.emit('drain');
};

Reader.prototype.bytesAhead = function() {
Expand All @@ -70,71 +78,59 @@ Reader.prototype.bytesBuffered = function() {
};

Reader.prototype.uint8 = function() {
return this._buffer.readUInt8(this._offset++);
return this._buffer.readUInt8(this._moveOffset(1));
};

Reader.prototype.int8 = function() {
return this._buffer.readInt8(this._offset++);
return this._buffer.readInt8(this._moveOffset(1));
};

Reader.prototype.uint16BE = function() {
this._offset += 2;
return this._buffer.readUInt16BE(this._offset - 2);
return this._buffer.readUInt16BE(this._moveOffset(2));
};

Reader.prototype.int16BE = function() {
this._offset += 2;
return this._buffer.readInt16BE(this._offset - 2);
return this._buffer.readInt16BE(this._moveOffset(2));
};

Reader.prototype.uint16LE = function() {
this._offset += 2;
return this._buffer.readUInt16LE(this._offset - 2);
return this._buffer.readUInt16LE(this._moveOffset(2));
};

Reader.prototype.int16LE = function() {
this._offset += 2;
return this._buffer.readInt16LE(this._offset - 2);
return this._buffer.readInt16LE(this._moveOffset(2));
};

Reader.prototype.uint32BE = function() {
this._offset += 4;
return this._buffer.readUInt32BE(this._offset - 4);
return this._buffer.readUInt32BE(this._moveOffset(4));
};

Reader.prototype.int32BE = function() {
this._offset += 4;
return this._buffer.readInt32BE(this._offset - 4);
return this._buffer.readInt32BE(this._moveOffset(4));
};

Reader.prototype.uint32LE = function() {
this._offset += 4;
return this._buffer.readUInt32LE(this._offset - 4);
return this._buffer.readUInt32LE(this._moveOffset(4));
};

Reader.prototype.int32LE = function() {
this._offset += 4;
return this._buffer.readInt32LE(this._offset - 4);
return this._buffer.readInt32LE(this._moveOffset(4));
};

Reader.prototype.float32BE = function() {
this._offset += 4;
return this._buffer.readFloatBE(this._offset - 4);
return this._buffer.readFloatBE(this._moveOffset(4));
};

Reader.prototype.float32LE = function() {
this._offset += 4;
return this._buffer.readFloatLE(this._offset - 4);
return this._buffer.readFloatLE(this._moveOffset(4));
};

Reader.prototype.double64BE = function() {
this._offset += 8;
return this._buffer.readDoubleBE(this._offset - 8);
return this._buffer.readDoubleBE(this._moveOffset(8));
};

Reader.prototype.double64LE = function() {
this._offset += 8;
return this._buffer.readDoubleLE(this._offset - 8);
return this._buffer.readDoubleLE(this._moveOffset(8));
};

Reader.prototype.ascii = function(bytes) {
Expand All @@ -152,12 +148,12 @@ Reader.prototype._string = function(encoding, bytes) {
}

var offset = this._offset;
this._offset += bytes;
this._moveOffset(bytes);

var value = this._buffer.toString(encoding, offset, this._offset);

if (nullTerminated) {
this._offset++;
this._moveOffset(1);
}

return value;
Expand All @@ -173,17 +169,17 @@ Reader.prototype._nullDistance = function() {
};

Reader.prototype.buffer = function(bytes) {
this._offset += bytes;
this._moveOffset(bytes);
var ret = new Buffer(bytes);
this._buffer.copy(ret, 0, this._offset - bytes, this._offset);
return ret;
};

Reader.prototype.skip = function(bytes) {
if (bytes > this.bytesAhead() || bytes < 0) {
if (bytes < 0) {
this.emit('error', new Error('tried to skip outsite of the buffer'));
}
this._offset += bytes;
this._moveOffset(bytes);
};

Reader.prototype.compact = function() {
Expand All @@ -192,5 +188,47 @@ Reader.prototype.compact = function() {
}

this._buffer = this._buffer.slice(this._offset);
this._setOffset(0);
};

Reader.prototype.end = function(newBuffer) {
if (undefined !== newBuffer) {
this.write(newBuffer);
}
this.writable = false;
if (0 === this.bytesAhead()) {
this.destroy();
}
return true;
};

Reader.prototype.destroy = function() {
this._buffer = null;
this._offset = 0;
this.writable = false;
this.emit('close');
};


Reader.prototype._setOffset = function(offset) {
var that = this;
if ((offset < 0) || (offset > this.bytesBuffered())) {
this.emit('error', new Error('tried to read outsite of buffer boundary'));
}
this._offset = offset;

// handle end()
if (! this.writable && (this.bytesAhead() === 0)) {
// delay since a read may be in progress
process.nextTick(function () {
that.destroy();
});
}
return offset;
};

Reader.prototype._moveOffset = function(relativeOffset) {
var oldOffset = this._offset;
this._setOffset(oldOffset + relativeOffset);
return oldOffset;
};
4 changes: 3 additions & 1 deletion package.json
Expand Up @@ -19,7 +19,9 @@
"optionalDependencies": {},
"devDependencies": {
"utest": "0.0.6",
"urun": "0.0.6"
"urun": "0.0.6",
"stream-tester": "0.0.2",
"stream-spec": "~0.3.1"
},
"engines": {
"node": "*"
Expand Down
76 changes: 76 additions & 0 deletions test/unit/test-Reader.js
Expand Up @@ -42,6 +42,16 @@ test('WritableStream interface', {
assert.equal(reader.write(new Buffer(0)), true);
},

'resume: emits drain': function() {
var reader = new Reader();
var drainEmitted = false;
reader.on('drain', function () {
drainEmitted = true;
});
reader.resume();
assert.equal(drainEmitted, true, 'calling resume emits drain event');
},

'write: collects buffer data': function() {
var reader = new Reader();

Expand All @@ -53,6 +63,27 @@ test('WritableStream interface', {
assert.equal(reader.uint8(), 3);
},

'passes stream-spec' : function () {
var spec = require('stream-spec');
var reader = new Reader();
var tester = require('stream-tester');
var randomWriter = tester.createRandomStream(
function () { return 'line ' + Math.random() + '\n';},
5000
);

spec(reader)
.writable()
.drainable()
.validateOnExit();

randomWriter.pipe(reader);
randomWriter.on('end', function () {
// consume buffer to emit end()
reader.buffer(reader.bytesAhead());
});
},

'write: uses existing buffer if possible': function() {
var reader = new Reader();

Expand All @@ -77,6 +108,51 @@ test('WritableStream interface', {
assert.equal(reader.uint8(), 4);
assert.equal(reader.uint8(), 5);
},

'end: writes optional buffer' : function() {
var reader = new Reader();
reader.end(new Buffer([1]));
assert.equal(reader.uint8(), 1);
},

'end: emits close' : function() {
var reader = new Reader();
var closeEmitted = false;
reader.on('close', function () {
closeEmitted = true;
});
reader.end();
assert.equal(closeEmitted, true, 'calling end emits close event');
},

'end: does not emit close with unread buffer' : function() {
var reader = new Reader();
var closeEmitted = false;
reader.on('close', function () {
closeEmitted = true;
});
reader.end(new Buffer([1]));
assert.equal(
closeEmitted,
false,
'calling end with non-empty buffer does not emit close event'
);
},

'end: does emit close after buffer is read' : function() {
var reader = new Reader(new Buffer([1]));
var closeEmitted = false;
reader.on('close', function () {
closeEmitted = true;
});
reader.end();
reader.buffer(1);
process.nextTick(function () {
assert.equal(
closeEmitted, true, 'emptying buffer after end emits close event'
);
});
},
});

test('Read Methods', {
Expand Down

0 comments on commit 274ed0d

Please sign in to comment.