Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Danfuzz options #30

Merged
merged 7 commits into from

1 participant

@danfuzz

Harmonizes all the constructors.

@danfuzz

Just going ahead and merging.

@mikefleming you may want to update [redacted] to use this version, so that we don't end up having two copies of the module in common usage. But not a big deal either; it's not like the old version will stop working.

@danfuzz danfuzz merged commit 886bcc2 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 15, 2012
  1. About to break compatibility.

    Dan Bornstein authored
  2. Remove unused parameter.

    Dan Bornstein authored
  3. Re-complexify `handleCommon()`.

    Dan Bornstein authored
    I'd forgotten that `Cat` would need this.
  4. Switch `Cat` to take options instead of just `paused`.

    Dan Bornstein authored
  5. Convert `Slicer` to take constructor options.

    Dan Bornstein authored
  6. Add a to-do item.

    Dan Bornstein authored
This page is out of date. Refresh to see the latest.
View
43 README.md
@@ -156,10 +156,9 @@ encodings specified by those. This includes:
Common Options
--------------
-Some of the classes (as of this writing, most but not all of them)
-take an optional `options` constructor parameter. If not `undefined`,
-this must be a map from option names to values as specified by the
-class.
+All of the classes in this module take an optional `options`
+constructor parameter. If not `undefined`, this must be a map from
+option names to values as specified by the class.
The following are three commonly-accepted options. Classes all accept
whichever of these make sense.
@@ -227,7 +226,7 @@ This can be used, for example, to produce a stream that is prefixed
or suffixed with a given bit of data (when used in combination with
`Blip`, above).
-### var cat = new Cat(streams, [paused])
+### var cat = new Cat(streams, [options])
Constructs and returns a new cat which is to emit the events from
the given streams (each of which must be an `EventEmitter` and is
@@ -244,10 +243,8 @@ emitted by this instance, after which this instance emits a `close`
event. It will then become closed (emitting no further events, and
producing `false` for `cat.readable`).
-If the optional `paused` argument is specified, it indicates whether
-or not the new instance should start out in the paused state. It defaults
-to `false`. That said, the constructor argument is provided because it's
-pretty common to want to start instances out paused.
+This class recognizes all three of the common options (see above), and
+no others.
The constructed instance obeys the full standard Node stream protocol
for readers.
@@ -519,12 +516,16 @@ The ordering and meaning of the callback arguments are meant to be (a)
compatible with callbacks used with `fs.read()` and (b) somewhat more
informative and unambiguous.
-### var slicer = new Slicer(source, [incomingEncoding])
+### var slicer = new Slicer(source, [options])
+
+Constructs a new slicer, which listens to the given source.
+
+Of the common options, the only one recognized by this class is
+`incomingEncoding`. The class accepts no other options.
+
+This class recognizes all three of the common options (see above), and
+no others.
-Constructs a new slicer, which listens to the given source. The optional
-`incomingEncoding` indicates the initial encoding to use when `data`
-events are received with string payloads (defaults to `undefined`; see
-`setIncomingEncoding()`).
### slicer.readable => boolean
@@ -694,14 +695,12 @@ function createMyStream() {
The Valve will "sanitize" the events coming from your class, while
also providing the rest of the core readable Stream API.
-### var valve = new Valve(source, [paused])
+### var valve = new Valve(source, [options])
Constructs and returns a new valve, which listens to the given source.
-If the optional `paused` argument is specified, it indicates whether
-or not the new instance should start out in the paused state. It defaults
-to `false`. That said, the constructor argument is provided because it's
-pretty common to want to start instances out paused.
+This class recognizes all three of the common options (see above), and
+no others.
The constructed instance obeys the full standard Node stream protocol
for readers.
@@ -727,10 +726,8 @@ containing a string payload.
To Do
-----
-* Use `options` arguments consistently on construction.
-
-* Make `encoding`, `incomingEncoding`, and `paused` all be available
- as constructor options.
+* Consider adding a common option of `pressure: boolean` to indicate
+ whether `pause()` and `resume()` should recurse upstream.
Contributing
View
33 lib/cat.js
@@ -17,6 +17,7 @@ var util = require("util");
var Codec = require("./codec").Codec;
var consts = require("./consts");
+var opts = require("./opts");
var sealer = require("./sealer");
var sourcesanity = require("./sourcesanity");
@@ -25,13 +26,25 @@ var Valve = require("./valve").Valve;
/*
+ * Module variables
+ */
+
+/** Options spec */
+var OPTIONS = {
+ encoding: {},
+ incomingEncoding: {},
+ paused: {}
+};
+
+
+/*
* Helper functions
*/
/**
* Construct a Cat state object.
*/
-function State(emitter, streams, paused) {
+function State(emitter, streams) {
/** Outer event emitter. */
this.emitter = emitter;
@@ -86,7 +99,7 @@ function State(emitter, streams, paused) {
// it's a bad idea to try to be clever and do an `instanceof
// Valve` check, since that might mess up the client; they might
// be using a Valve for independent reasons.
- one = new Valve(one, true);
+ one = new Valve(one, { paused: true });
one.on(consts.DATA, this.onData);
one.on(consts.END, this.onEnd);
one.on(consts.ERROR, this.onError);
@@ -96,10 +109,6 @@ function State(emitter, streams, paused) {
if (specialBlip) {
specialBlip.resume();
}
-
- if (!paused) {
- this.resume();
- }
}
/**
@@ -220,16 +229,12 @@ Object.freeze(State.prototype);
/**
* Construct a Cat instance, which emits the `data` events it receives
* from any number of other `streams` (an array).
- *
- * The optional `paused` argument indicates whether the Cat starts
- * out paused (that is, buffering events). It defaults to `true`
- * (because that's the expected primary use case).
*/
-function Cat(streams, paused) {
- paused = typ.isDefined(paused) ? !!paused : false;
-
+function Cat(streams, options) {
+ options = opts.validate(options, OPTIONS);
stream.Stream.call(this);
- this.cat = sealer.seal(new State(this, streams, paused));
+ this.cat = sealer.seal(new State(this, streams));
+ opts.handleCommon(options, this, true);
}
util.inherits(Cat, stream.Stream);
View
12 lib/opts.js
@@ -123,7 +123,7 @@ function validate(options, allowed) {
* Handles the three common options `encoding`, `incomingEncoding`, and
* `paused`.
*/
-function handleCommon(options, target) {
+function handleCommon(options, target, constructedPaused) {
if (options.encoding) {
target.setEncoding(options.encoding);
}
@@ -132,8 +132,14 @@ function handleCommon(options, target) {
target.setIncomingEncoding(options.incomingEncoding);
}
- if (options.paused) {
- target.pause();
+ if (typ.isDefined(options.paused)) {
+ if (options.paused) {
+ if (!constructedPaused) {
+ target.pause();
+ }
+ } else if (constructedPaused) {
+ target.resume();
+ }
}
}
View
2  lib/sink.js
@@ -42,7 +42,7 @@ var OPTIONS = {
/**
* Construct a Sink state object.
*/
-function State(emitter, source, options) {
+function State(emitter, source) {
sourcesanity.validate(source);
/** "parent" emitter */
View
21 lib/slicer.js
@@ -17,11 +17,22 @@ var typ = require("typ");
var consts = require("./consts");
var Codec = require("./codec").Codec;
var errors = require("./errors");
+var opts = require("./opts");
var sealer = require("./sealer");
var sourcesanity = require("./sourcesanity");
/*
+ * Module variables
+ */
+
+/** Options spec */
+var OPTIONS = {
+ incomingEncoding: {}
+};
+
+
+/*
* Helper functions
*/
@@ -152,14 +163,14 @@ Object.freeze(PendingRead.prototype);
/**
* Constructs a Slicer state object.
*/
-function State(source, incomingEncoding) {
+function State(source) {
sourcesanity.validate(source);
/** upstream source (must be stream-like, if not actually a Stream) */
this.source = source;
/** event receipt encoding handler */
- this.encoder = new Codec(incomingEncoding);
+ this.encoder = new Codec();
/** queue of pending read operations */
this.pendingReads = [];
@@ -341,8 +352,10 @@ Object.freeze(State.prototype);
* `read(..., callback)` interface for consuming the so-collected
* data.
*/
-function Slicer(source, incomingEncoding) {
- this.slicer = sealer.seal(new State(source, incomingEncoding));
+function Slicer(source, options) {
+ options = opts.validate(options, OPTIONS);
+ this.slicer = sealer.seal(new State(source));
+ opts.handleCommon(options, this);
}
/**
View
23 lib/valve.js
@@ -19,18 +19,31 @@ var util = require("util");
var Codec = require("./codec").Codec;
var consts = require("./consts");
var errors = require("./errors");
+var opts = require("./opts");
var sealer = require("./sealer");
var sourcesanity = require("./sourcesanity");
/*
+ * Module variables
+ */
+
+/** Options spec */
+var OPTIONS = {
+ encoding: {},
+ incomingEncoding: {},
+ paused: {}
+};
+
+
+/*
* Helper functions
*/
/**
* Construct a Valve state object.
*/
-function State(emitter, source, paused) {
+function State(emitter, source) {
sourcesanity.validate(source);
/** Outer emitter instance. */
@@ -46,7 +59,7 @@ function State(emitter, source, paused) {
this.decoder = new Codec();
/** Currently paused? */
- this.paused = typ.isDefined(paused) ? !!paused : false;
+ this.paused = false;
/** Buffered up events, to be emitted in order once unpaused. */
this.buffer = [];
@@ -197,9 +210,11 @@ Object.freeze(State.prototype);
* out paused (that is, buffering events). It defaults to `true`
* (because that's the expected primary use case).
*/
-function Valve(source, paused) {
+function Valve(source, options) {
+ options = opts.validate(options, OPTIONS);
stream.Stream.call(this);
- this.valve = sealer.seal(new State(this, source, paused));
+ this.valve = sealer.seal(new State(this, source));
+ opts.handleCommon(options, this);
}
util.inherits(Valve, stream.Stream);
View
2  package.json
@@ -1,6 +1,6 @@
{
"name": "pipette",
- "version": "0.8.5",
+ "version": "0.9.0",
"keywords":
["stream", "pipe", "buffer", "valve", "data", "event", "blip", "cat",
"sink", "slicer", "reader", "read", "dropper"],
View
76 test/cat.js
@@ -25,7 +25,7 @@ var EventCollector = require("./eventcoll").EventCollector;
function makeErrorBlip(error) {
var emitter = new events.EventEmitter();
- var valve = new pipette.Valve(emitter, true);
+ var valve = new pipette.Valve(emitter, { paused: true });
emitter.emit("error", error);
emitter.emit("close");
@@ -44,17 +44,18 @@ function constructor() {
new Cat([]);
new Cat([new events.EventEmitter()]);
- new Cat([], false);
- new Cat([], true);
+ new Cat([], {});
+ new Cat([], { paused: false });
+ new Cat([], { paused: true });
- new Cat([new events.EventEmitter()], false);
- new Cat([new events.EventEmitter()], true);
+ new Cat([new events.EventEmitter()], { encoding: "utf8" });
+ new Cat([new events.EventEmitter()], { incomingEncoding: "ucs2" });
}
/**
* Test expected constructor failures.
*/
-function needStreams() {
+function constructorFailure() {
var good = new pipette.Blip("good");
function f1() {
@@ -99,6 +100,26 @@ function needStreams() {
new Cat([good, bad2]);
}
assert.throws(f7, /Source already ended: index 1/);
+
+ function f8() {
+ new Cat([], { encoding: 12 });
+ }
+ assert.throws(f8, /Bad value for option: encoding/);
+
+ function f9() {
+ new Cat([], { incomingEncoding: "zorch" });
+ }
+ assert.throws(f9, /Bad value for option: incomingEncoding/);
+
+ function f10() {
+ new Cat([], { paused: undefined });
+ }
+ assert.throws(f10, /Bad value for option: paused/);
+
+ function f11() {
+ new Cat([], { zorchSplat: undefined });
+ }
+ assert.throws(f11, /Unknown option: zorchSplat/);
}
/**
@@ -116,7 +137,7 @@ function noDataEvents() {
blips.push(blip);
}
- var cat = new Cat(blips, true);
+ var cat = new Cat(blips, { paused: true });
var coll = new EventCollector();
for (var i = 0; i < count; i++) {
@@ -146,7 +167,7 @@ function basicEventSequence() {
blips.push(new pipette.Blip(makeData(i)));
}
- var cat = new Cat(blips, true);
+ var cat = new Cat(blips, { paused: true });
var coll = new EventCollector();
for (var i = 0; i < count; i++) {
@@ -192,7 +213,7 @@ function basicErrorEventSequence() {
}
}
- var cat = new Cat(blips, true);
+ var cat = new Cat(blips, { paused: true });
var coll = new EventCollector();
for (var i = 0; i < blips.length; i++) {
@@ -222,7 +243,7 @@ function basicErrorEventSequence() {
* afterwards. Also, check that it becomes false after an error.
*/
function readableTransition() {
- var cat = new Cat([], true);
+ var cat = new Cat([], { paused: true });
assert.ok(cat.readable);
cat.resume();
@@ -230,7 +251,7 @@ function readableTransition() {
var blip = makeErrorBlip(new Error("eek"));
var coll = new EventCollector();
- cat = new Cat([blip], true);
+ cat = new Cat([blip], { paused: true });
coll.listenAllCommon(cat);
blip.resume();
@@ -360,10 +381,38 @@ function setIncomingEncodingTiming() {
}
/**
+ * Tests the common constructor options.
+ */
+function commonOptions() {
+ var theData = new Buffer("muffinberry biscuit");
+ var source = new events.EventEmitter();
+ var cat = new Cat([ source ],
+ { encoding: "base64",
+ incomingEncoding: "hex",
+ paused: true });
+ var coll = new EventCollector();
+
+ coll.listenAllCommon(cat);
+
+ source.emit("data", theData.toString("hex"));
+ source.emit("end");
+ source.emit("close");
+ assert.ok(cat.readable);
+ assert.equal(coll.events.length, 0);
+
+ cat.resume();
+ assert.ok(!cat.readable);
+ assert.equal(coll.events.length, 3);
+ coll.assertEvent(0, cat, "data", [theData.toString("base64")]);
+ coll.assertEvent(1, cat, "end");
+ coll.assertEvent(2, cat, "close");
+}
+
+/**
* Ensure that no events get passed after a `destroy()` call.
*/
function afterDestroy() {
- var cat = new Cat([], true);
+ var cat = new Cat([], { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(cat);
@@ -376,7 +425,7 @@ function afterDestroy() {
function test() {
constructor();
- needStreams();
+ constructorFailure();
noDataEvents();
basicEventSequence();
basicErrorEventSequence();
@@ -385,6 +434,7 @@ function test() {
setEncodingTiming();
setIncomingEncoding();
setIncomingEncodingTiming();
+ commonOptions();
afterDestroy();
}
View
18 test/slicer.js
@@ -23,8 +23,11 @@ var emit = require("./emit").emit;
* Makes sure the constructor doesn't fail off the bat.
*/
function constructor() {
- new Slicer(new events.EventEmitter());
- new Slicer(new events.EventEmitter(), "hex");
+ var emitter = new events.EventEmitter();
+
+ new Slicer(emitter);
+ new Slicer(emitter, {});
+ new Slicer(emitter, { incomingEncoding: "hex" });
}
/**
@@ -42,9 +45,14 @@ function constructorFailures() {
assert.throws(f2, /Source not an EventEmitter/);
function f3() {
- new Slicer(new events.EventEmitter(), "bad-encoding");
+ new Slicer(new events.EventEmitter(), { incomingEncoding: "bad-encoding" });
+ }
+ assert.throws(f3, /Bad value for option: incomingEncoding/);
+
+ function f4() {
+ new Slicer(new events.EventEmitter(), { frobnitz: "fizmo" });
}
- assert.throws(f3, /Invalid encoding name/);
+ assert.throws(f4, /Unknown option: frobnitz/);
}
/**
@@ -480,7 +488,7 @@ function constructorEncodings() {
function tryWith(encodingName, dataString) {
var source = new events.EventEmitter();
- var slicer = new Slicer(source, encodingName);
+ var slicer = new Slicer(source, { incomingEncoding: encodingName });
var coll = new CallbackCollector();
if (encodingName === "utf16le") {
View
49 test/valve.js
@@ -24,15 +24,20 @@ var emit = require("./emit").emit;
* Make sure the constructor doesn't fail off the bat.
*/
function constructor() {
- new Valve(new events.EventEmitter());
- new Valve(new events.EventEmitter(), true);
- new Valve(new events.EventEmitter(), false);
+ var emitter = new events.EventEmitter();
+
+ new Valve(emitter);
+ new Valve(emitter, {});
+ new Valve(emitter, { paused: true });
+ new Valve(emitter, { paused: false });
+ new Valve(emitter, { encoding: "utf16le" });
+ new Valve(emitter, { incomingEncoding: "utf16le" });
}
/**
* Test expected constructor failures.
*/
-function needSource() {
+function constructorFailure() {
function f1() {
new Valve();
}
@@ -51,6 +56,28 @@ function needSource() {
new Valve(bad);
}
assert.throws(f3, /Source already ended./);
+
+ var blip = new Blip();
+
+ function f4() {
+ new Valve(blip, { encoding: null });
+ }
+ assert.throws(f4, /Bad value for option: encoding/);
+
+ function f5() {
+ new Valve(blip, { incomingEncoding: {} });
+ }
+ assert.throws(f5, /Bad value for option: incomingEncoding/);
+
+ function f6() {
+ new Valve(blip, { paused: 5.8 });
+ }
+ assert.throws(f6, /Bad value for option: paused/);
+
+ function f7() {
+ new Valve(blip, { frobnitz: undefined });
+ }
+ assert.throws(f7, /Unknown option: frobnitz/);
}
/**
@@ -58,7 +85,7 @@ function needSource() {
*/
function noInitialEvents() {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -76,7 +103,7 @@ function readableTransition() {
function tryWith(name, arg) {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -157,7 +184,7 @@ function bufferDataEvents() {
function tryWith(count) {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -192,7 +219,7 @@ function bufferEnders() {
function tryWith(name, arg) {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -221,7 +248,7 @@ function bufferEnders() {
*/
function eventsAfterResume() {
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -426,7 +453,7 @@ function afterDestroy() {
function destroyDuringResume() {
var theData = new Buffer("stuff");
var source = new events.EventEmitter();
- var valve = new Valve(source, true);
+ var valve = new Valve(source, { paused: true });
var coll = new EventCollector();
coll.listenAllCommon(valve);
@@ -443,7 +470,7 @@ function destroyDuringResume() {
function test() {
constructor();
- needSource();
+ constructorFailure();
noInitialEvents();
readableTransition();
eventsAfterClose();
Something went wrong with that request. Please try again.