Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #30 from Obvious/danfuzz-options

Danfuzz options
  • Loading branch information...
commit 886bcc2083ff390e891f693d9530de9a8811ef39 2 parents d42b35c + a66d960
@danfuzz danfuzz authored
View
43 README.md
@@ -156,10 +156,9 @@ encodings specified by those. This includes:
Common Options
--------------
-Some of the classes (as of this writing, most but not all of them)
-take an optional `options` constructor parameter. If not `undefined`,
-this must be a map from option names to values as specified by the
-class.
+All of the classes in this module take an optional `options`
+constructor parameter. If not `undefined`, this must be a map from
+option names to values as specified by the class.
The following are three commonly-accepted options. Classes all accept
whichever of these make sense.
@@ -227,7 +226,7 @@ This can be used, for example, to produce a stream that is prefixed
or suffixed with a given bit of data (when used in combination with
`Blip`, above).
-### var cat = new Cat(streams, [paused])
+### var cat = new Cat(streams, [options])
Constructs and returns a new cat which is to emit the events from
the given streams (each of which must be an `EventEmitter` and is
@@ -244,10 +243,8 @@ emitted by this instance, after which this instance emits a `close`
event. It will then become closed (emitting no further events, and
producing `false` for `cat.readable`).
-If the optional `paused` argument is specified, it indicates whether
-or not the new instance should start out in the paused state. It defaults
-to `false`. That said, the constructor argument is provided because it's
-pretty common to want to start instances out paused.
+This class recognizes all three of the common options (see above), and
+no others.
The constructed instance obeys the full standard Node stream protocol
for readers.
@@ -519,12 +516,16 @@ The ordering and meaning of the callback arguments are meant to be (a)
compatible with callbacks used with `fs.read()` and (b) somewhat more
informative and unambiguous.
-### var slicer = new Slicer(source, [incomingEncoding])
+### var slicer = new Slicer(source, [options])
+
+Constructs a new slicer, which listens to the given source.
+
+Of the common options, the only one recognized by this class is
+`incomingEncoding`. The class accepts no other options.
+
+This class recognizes all three of the common options (see above), and
+no others.
-Constructs a new slicer, which listens to the given source. The optional
-`incomingEncoding` indicates the initial encoding to use when `data`
-events are received with string payloads (defaults to `undefined`; see
-`setIncomingEncoding()`).
### slicer.readable => boolean
@@ -694,14 +695,12 @@ function createMyStream() {
The Valve will "sanitize" the events coming from your class, while
also providing the rest of the core readable Stream API.
-### var valve = new Valve(source, [paused])
+### var valve = new Valve(source, [options])
Constructs and returns a new valve, which listens to the given source.
-If the optional `paused` argument is specified, it indicates whether
-or not the new instance should start out in the paused state. It defaults
-to `false`. That said, the constructor argument is provided because it's
-pretty common to want to start instances out paused.
+This class recognizes all three of the common options (see above), and
+no others.
The constructed instance obeys the full standard Node stream protocol
for readers.
@@ -727,10 +726,8 @@ containing a string payload.
To Do
-----
-* Use `options` arguments consistently on construction.
-
-* Make `encoding`, `incomingEncoding`, and `paused` all be available
- as constructor options.
+* Consider adding a common option of `pressure: boolean` to indicate
+ whether `pause()` and `resume()` should recurse upstream.
Contributing
View
33 lib/cat.js
@@ -17,6 +17,7 @@ var util = require("util");
var Codec = require("./codec").Codec;
var consts = require("./consts");
+var opts = require("./opts");
var sealer = require("./sealer");
var sourcesanity = require("./sourcesanity");
@@ -25,13 +26,25 @@ var Valve = require("./valve").Valve;
/*
+ * Module variables
+ */
+
+/** Options spec */
+var OPTIONS = {
+ encoding: {},
+ incomingEncoding: {},
+ paused: {}
+};
+
+
+/*
* Helper functions
*/
/**
* Construct a Cat state object.
*/
-function State(emitter, streams, paused) {
+function State(emitter, streams) {
/** Outer event emitter. */
this.emitter = emitter;
@@ -86,7 +99,7 @@ function State(emitter, streams, paused) {
// it's a bad idea to try to be clever and do an `instanceof
// Valve` check, since that might mess up the client; they might
// be using a Valve for independent reasons.
- one = new Valve(one, true);
+ one = new Valve(one, { paused: true });
one.on(consts.DATA, this.onData);
one.on(consts.END, this.onEnd);
one.on(consts.ERROR, this.onError);
@@ -96,10 +109,6 @@ function State(emitter, streams, paused) {
if (specialBlip) {
specialBlip.resume();
}
-
- if (!paused) {
- this.resume();
- }
}
/**
@@ -220,16 +229,12 @@ Object.freeze(State.prototype);
/**
* Construct a Cat instance, which emits the `data` events it receives
* from any number of other `streams` (an array).
- *
- * The optional `paused` argument indicates whether the Cat starts
- * out paused (that is, buffering events). It defaults to `true`
- * (because that's the expected primary use case).
*/
-function Cat(streams, paused) {
- paused = typ.isDefined(paused) ? !!paused : false;
-
+function Cat(streams, options) {
+ options = opts.validate(options, OPTIONS);
stream.Stream.call(this);
- this.cat = sealer.seal(new State(this, streams, paused));
+ this.cat = sealer.seal(new State(this, streams));
+ opts.handleCommon(options, this, true);
}
util.inherits(Cat, stream.Stream);
View
12 lib/opts.js
@@ -123,7 +123,7 @@ function validate(options, allowed) {
* Handles the three common options `encoding`, `incomingEncoding`, and
* `paused`.
*/
-function handleCommon(options, target) {
+function handleCommon(options, target, constructedPaused) {
if (options.encoding) {
target.setEncoding(options.encoding);
}
@@ -132,8 +132,14 @@ function handleCommon(options, target) {
target.setIncomingEncoding(options.incomingEncoding);
}
- if (options.paused) {
- target.pause();
+ if (typ.isDefined(options.paused)) {
+ if (options.paused) {
+ if (!constructedPaused) {
+ target.pause();
+ }
+ } else if (constructedPaused) {
+ target.resume();
+ }
}
}
View
2  lib/sink.js
@@ -42,7 +42,7 @@ var OPTIONS = {
/**
* Construct a Sink state object.
*/
-function State(emitter, source, options) {
+function State(emitter, source) {
sourcesanity.validate(source);
/** "parent" emitter */
View
21 lib/slicer.js
@@ -17,11 +17,22 @@ var typ = require("typ");
var consts = require("./consts");
var Codec = require("./codec").Codec;
var errors = require("./errors");
+var opts = require("./opts");
var sealer = require("./sealer");
var sourcesanity = require("./sourcesanity");
/*
+ * Module variables
+ */
+
+/** Options spec */
+var OPTIONS = {
+ incomingEncoding: {}
+};
+
+
+/*
* Helper functions
*/
@@ -152,14 +163,14 @@ Object.freeze(PendingRead.prototype);
/**
* Constructs a Slicer state object.
*/
-function State(source, incomingEncoding) {
+function State(source) {
sourcesanity.validate(source);
/** upstream source (must be stream-like, if not actually a Stream) */
this.source = source;
/** event receipt encoding handler */
- this.encoder = new Codec(incomingEncoding);
+ this.encoder = new Codec();
/** queue of pending read operations */
this.pendingReads = [];
@@ -341,8 +352,10 @@ Object.freeze(State.prototype);
* `read(..., callback)` interface for consuming the so-collected
* data.
*/
-function Slicer(source, incomingEncoding) {
- this.slicer = sealer.seal(new State(source, incomingEncoding));
+function Slicer(source, options) {
+ options = opts.validate(options, OPTIONS);
+ this.slicer = sealer.seal(new State(source));
+ opts.handleCommon(options, this);
}
/**
View
23 lib/valve.js
@@ -19,18 +19,31 @@ var util = require("util");
var Codec = require("./codec").Codec;
var consts = require("./consts");
var errors = require("./errors");
+var opts = require("./opts");
var sealer = require("./sealer");
var sourcesanity = require("./sourcesanity");
/*
+ * Module variables
+ */
+
+/** Options spec */
+var OPTIONS = {
+ encoding: {},
+ incomingEncoding: {},
+ paused: {}
+};
+
+
+/*
* Helper functions
*/
/**
* Construct a Valve state object.
*/
-function State(emitter, source, paused) {
+function State(emitter, source) {
sourcesanity.validate(source);
/** Outer emitter instance. */
@@ -46,7 +59,7 @@ function State(emitter, source, paused) {
this.decoder = new Codec();
/** Currently paused? */
- this.paused = typ.isDefined(paused) ? !!paused : false;
+ this.paused = false;
/** Buffered up events, to be emitted in order once unpaused. */
this.buffer = [];
@@ -197,9 +210,11 @@ Object.freeze(State.prototype);
* out paused (that is, buffering events). It defaults to `true`
* (because that's the expected primary use case).
*/
-function Valve(source, paused) {
+function Valve(source, options) {
+ options = opts.validate(options, OPTIONS);
stream.Stream.call(this);
- this.valve = sealer.seal(new State(this, source, paused));
+ this.valve = sealer.seal(new State(this, source));
+ opts.handleCommon(options, this);
}
util.inherits(Valve, stream.Stream);
View
2  package.json
@@ -1,6 +1,6 @@
{
"name": "pipette",
- "version": "0.8.5",
+ "version": "0.9.0",
"keywords":
["stream", "pipe", "buffer", "valve", "data", "event", "blip", "cat",
"sink", "slicer", "reader", "read", "dropper"],
View
76 test/cat.js
@@ -25,7 +25,7 @@ var EventCollector = require("./eventcoll").EventCollector;
function makeErrorBlip(error) {
var emitter = new events.EventEmitter();
- var valve = new pipette.Valve(emitter, true);
+ var valve = new pipette.Valve(emitter, { paused: true });
emitter.emit("error", error);
emitter.emit("close");
@@ -44,17 +44,18 @@ function constructor() {
new Cat([]);
new Cat([new events.EventEmitter()]);
- new Cat([], false);
- new Cat([], true);
+ new Cat([], {});
+ new Cat([], { paused: false });
+ new Cat([], { paused: true });
- new Cat([new events.EventEmitter()], false);
- new Cat([new events.EventEmitter()], true);
+ new Cat([new events.EventEmitter()], { encoding: "utf8" });
+ new Cat([new events.EventEmitter()], { incomingEncoding: "ucs2" });
}
/**
* Test expected constructor failures.
*/
-function needStreams() {
+function constructorFailure() {
var good = new pipette.Blip("good");
function f1() {
@@ -99,6 +100,26 @@ function needStreams() {
new Cat([good, bad2]);
}
assert.throws(f7, /Source already ended: index 1/);
+
+ function f8() {
+ new Cat([], { encoding: 12 });
+ }
+ assert.throws(f8, /Bad value for option: encoding/);
+
+ function f9() {
+ new Cat([], { incomingEncoding: "zorch" });
+ }
+ assert.throws(f9, /Bad value for option: incomingEncoding/);
+
+ function f10() {
+ new Cat([], { paused: undefined });
+ }
+ assert.throws(f10, /Bad value for option: paused/);
+
+ function f11() {
+ new Cat([], { zorchSplat: undefined });
+ }
+ assert.throws(f11, /Unknown option: zorchSplat/);
}
/**
@@ -116,7 +137,7 @@ function noDataEvents() {
blips.push(blip);
}
- var cat = new Cat(blips, true);
+ var cat = new Cat(blips, { paused: true });
var coll = new EventCollector();
for (var i = 0; i < count; i++) {
@@ -146,7 +167,7 @@ function basicEventSequence() {
blips.push(new pipette.Blip(makeData(i)));
}
- var cat = new Cat(blips, true);
+ var cat = new Cat(blips, { paused: true });
var coll = new EventCollector();
for (var i = 0; i < count; i++) {
@@ -192,7 +213,7 @@ function basicErrorEventSequence() {
}
}
- var cat = new Cat(blips, true);
+ var cat = new Cat(blips, { paused: true });
var coll = new EventCollector();
for (var i = 0; i < blips.length; i++) {
@@ -222,7 +243,7 @@ function basicErrorEventSequence() {
* afterwards. Also, check that it becomes false after an error.
*/
function readableTransition() {
- var cat = new Cat([], true);
+ var cat = new Cat([], { paused: true });
assert.ok(cat.readable);
cat.resume();
@@ -230,7 +251,7 @@ function readableTransition() {
var blip = makeErrorBlip(new Error("eek"));
var coll = new EventCollector();
- cat = new Cat([blip], true);
+ cat = new Cat([blip], { paused: true });
coll.listenAllCommon(cat);
blip.resume();
@@ -360,10 +381,38 @@ function setIncomingEncodingTiming() {
}
/**
+ * Tests the common constructor options.
+ */
+function commonOptions() {
+ var theData = new Buffer("muffinberry biscuit");
+ var source = new events.EventEmitter();
+ var cat = new Cat([ source ],
+ { encoding: "base64",
+ incomingEncoding: "hex",
+ paused: true });
+ var coll = new EventCollector();
+
+ coll.listenAllCommon(cat);
+
+ source.emit("data", theData.toString("hex"));
+ source.emit("end");
+ source.emit("close");
+ assert.ok(cat.readable);
+ assert.equal(coll.events.length, 0);
+
+ cat.resume();
+ assert.ok(!cat.readable);
+ assert.equal(coll.events.length, 3);
+ coll.assertEvent(0, cat, "data", [theData.toString("base64")]);
+ coll.assertEvent(1, cat, "end");
+ coll.assertEvent(2, cat, "close");
+}
+
+/**
* Ensure that no events get passed after a `destroy()` call.
*/
function afterDestroy() {
- var cat = new Cat([], true);
+ var cat = new Cat([], { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(cat);
@@ -376,7 +425,7 @@ function afterDestroy() {
function test() {
constructor();
- needStreams();
+ constructorFailure();
noDataEvents();
basicEventSequence();
basicErrorEventSequence();
@@ -385,6 +434,7 @@ function test() {
setEncodingTiming();
setIncomingEncoding();
setIncomingEncodingTiming();
+ commonOptions();
afterDestroy();
}
View
18 test/slicer.js
@@ -23,8 +23,11 @@ var emit = require("./emit").emit;
* Makes sure the constructor doesn't fail off the bat.
*/
function constructor() {
- new Slicer(new events.EventEmitter());
- new Slicer(new events.EventEmitter(), "hex");
+ var emitter = new events.EventEmitter();
+
+ new Slicer(emitter);
+ new Slicer(emitter, {});
+ new Slicer(emitter, { incomingEncoding: "hex" });
}
/**
@@ -42,9 +45,14 @@ function constructorFailures() {
assert.throws(f2, /Source not an EventEmitter/);
function f3() {
- new Slicer(new events.EventEmitter(), "bad-encoding");
+ new Slicer(new events.EventEmitter(), { incomingEncoding: "bad-encoding" });
+ }
+ assert.throws(f3, /Bad value for option: incomingEncoding/);
+
+ function f4() {
+ new Slicer(new events.EventEmitter(), { frobnitz: "fizmo" });
}
- assert.throws(f3, /Invalid encoding name/);
+ assert.throws(f4, /Unknown option: frobnitz/);
}
/**
@@ -480,7 +488,7 @@ function constructorEncodings() {
function tryWith(encodingName, dataString) {
var source = new events.EventEmitter();
- var slicer = new Slicer(source, encodingName);
+ var slicer = new Slicer(source, { incomingEncoding: encodingName });
var coll = new CallbackCollector();
if (encodingName === "utf16le") {
View
49 test/valve.js
@@ -24,15 +24,20 @@ var emit = require("./emit").emit;
* Make sure the constructor doesn't fail off the bat.
*/
function constructor() {
- new Valve(new events.EventEmitter());
- new Valve(new events.EventEmitter(), true);
- new Valve(new events.EventEmitter(), false);
+ var emitter = new events.EventEmitter();
+
+ new Valve(emitter);
+ new Valve(emitter, {});
+ new Valve(emitter, { paused: true });
+ new Valve(emitter, { paused: false });
+ new Valve(emitter, { encoding: "utf16le" });
+ new Valve(emitter, { incomingEncoding: "utf16le" });
}
/**
* Test expected constructor failures.
*/
-function needSource() {
+function constructorFailure() {
function f1() {
new Valve();
}
@@ -51,6 +56,28 @@ function needSource() {
new Valve(bad);
}
assert.throws(f3, /Source already ended./);
+
+ var blip = new Blip();
+
+ function f4() {
+ new Valve(blip, { encoding: null });
+ }
+ assert.throws(f4, /Bad value for option: encoding/);
+
+ function f5() {
+ new Valve(blip, { incomingEncoding: {} });
+ }
+ assert.throws(f5, /Bad value for option: incomingEncoding/);
+
+ function f6() {
+ new Valve(blip, { paused: 5.8 });
+ }
+ assert.throws(f6, /Bad value for option: paused/);
+
+ function f7() {
+ new Valve(blip, { frobnitz: undefined });
+ }
+ assert.throws(f7, /Unknown option: frobnitz/);
}
/**
@@ -58,7 +85,7 @@ function needSource() {
*/
function noInitialEvents() {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -76,7 +103,7 @@ function readableTransition() {
function tryWith(name, arg) {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -157,7 +184,7 @@ function bufferDataEvents() {
function tryWith(count) {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -192,7 +219,7 @@ function bufferEnders() {
function tryWith(name, arg) {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -221,7 +248,7 @@ function bufferEnders() {
*/
function eventsAfterResume() {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -426,7 +453,7 @@ function afterDestroy() {
function destroyDuringResume() {
var theData = new Buffer("stuff");
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -443,7 +470,7 @@ function destroyDuringResume() {
function test() {
constructor();
- needSource();
+ constructorFailure();
noInitialEvents();
readableTransition();
eventsAfterClose();
Please sign in to comment.
Something went wrong with that request. Please try again.