Skip to content

Commit

Permalink
[feature] add support for objectMode
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreasMadsen committed Apr 5, 2013
1 parent 4df166f commit e118614
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
24 changes: 18 additions & 6 deletions README.md
@@ -1,6 +1,6 @@
#endpoint

> stream v2 compatible stream to single buffer module
> Converts a stream intro a buffer or array of objects
## Installation

Expand All @@ -10,20 +10,32 @@ npm install endpoint

## Documentation

`endpoint` is a `WriteStream` there converts a stream to a buffer, it also
collectes stream errors.
`endpoint` is a `WriteStream` there converts a stream to a buffer or an array
of object, it also collectes stream errors.

**Collect a buffer stream intro a single buffer:**

```JavaScript
var fs = require('fs';
var endpoint = require('endpoint');

fs.createReadStream(__filename).pipe(endpoint(function (err, buffer) {
BufferStream.pipe(endpoint(function (err, buffer) {
console.log('error:', err);
console.log('buffer'; buffer);
}));
```

While the data chunks are being collected the total current buffer can be
**Collect an object stream intro an array of object:**

```JavaScript
var endpoint = require('endpoint');

ObjectStream.pipe(endpoint({objectMode: true}, function (err, array) {
console.log('error:', err);
console.log('array'; array);
}));
```

In both situations the currently colllected buffer or array of object can be
accesses by `this.buffer`.

##License
Expand Down
28 changes: 20 additions & 8 deletions endpoint.js
Expand Up @@ -2,10 +2,18 @@
var stream = require('stream');
var util = require('util');

function Endpoint(callback) {
if (!(this instanceof Endpoint)) return new Endpoint(callback);
function Endpoint(options, callback) {
if (!(this instanceof Endpoint)) return new Endpoint(options, callback);

stream.Writable.call(this);
// `options` defaults to {}
if (typeof options === 'function') {
callback = options;
options = {};
}

stream.Writable.call(this, options);

this._objectMode = !!options.objectMode;

// will keep a long list of buffers
this._buffers = [];
Expand All @@ -31,17 +39,21 @@ function Endpoint(callback) {
module.exports = Endpoint;
util.inherits(Endpoint, stream.Writable);

Endpoint.prototype._write = function (chunk, encodeing, callback) {
this._buffers.push(chunk);
Endpoint.prototype._write = function (data, encodeing, callback) {
this._buffers.push(data);

return callback(null);
};

Object.defineProperty(Endpoint.prototype, "buffer", {
get: function () {
var total = Buffer.concat(this._buffers);
this._buffers = [ total ];
return total;
if (this._objectMode) {
return this._buffers;
} else {
var total = Buffer.concat(this._buffers);
this._buffers = [ total ];
return total;
}
},
enumerable: true,
configurable: true
Expand Down
19 changes: 19 additions & 0 deletions test.js
Expand Up @@ -40,3 +40,22 @@ test('simple error handling', function (t) {
point.emit('error', fakeError);
point.end();
});

test('simple write and end', function (t) {
var point = endpoint({objectMode: true}, function (err, data) {
t.equal(err, null);

t.ok(Array.isArray(data));
t.deepEqual(data, [[1], [2], [3]]);

t.ok(Array.isArray(point.buffer));
t.deepEqual(point.buffer, [[1], [2], [3]]);

t.end();
});

point.write([1]);
point.write([2]);
point.write([3]);
point.end();
});

0 comments on commit e118614

Please sign in to comment.