Permalink
Browse files

Re-ordered constructor params - breaking change

  • Loading branch information...
Rick Cotter
Rick Cotter committed Dec 14, 2012
1 parent e81ab18 commit 284a72b4a9ec853c2079bb36c649679a75cee4f8
Showing with 123 additions and 63 deletions.
  1. +3 −2 README.md
  2. +9 −6 index.js
  3. +1 −1 package.json
  4. +59 −0 test/conditionally-enrich-test.js
  5. +2 −54 test/index.js
  6. +49 −0 test/optional-should-test.js
View
@@ -2,17 +2,18 @@
A stream that enables asynchronous enrichment of data with concurrency control while preserving FIFO ordering.
-##Implementation
+##Behavior
* Enrichment is asynchronous.
* Concurrency is controlled via [async.queue](https://github.com/caolan/async).
* Stream writes are buffered until enrichment has completed.
+##v1.0.0 Release Notes
+v1.0.0 Breaking change: constructor parameters are re-ordered and 'shouldEnrichFunc' is now optional.
##Use Cases
Given that enrichment likely takes time which requires buffering the current best use cases are where enrichment is expected to be relatively fast.
Use a [control stream](https://github.com/substack/stream-handbook#control-streams) or other mechanism to tune performance as a pre/post step.
-
##Running Tests
Run `node test` to send a stream of values for enrichment that are checked for being output in order.
View
@@ -3,22 +3,25 @@ var util = require('util');
var async = require('async');
-// EnrichStream
+//
+// EnrichStream v1.0.0
// BeauCoo 2012
// info@beaucoo.com
//
// A stream that enables asynchronous enrichment of data with concurrency control while preserving FIFO ordering.
//
// Controlling enrichment concurrency via https://github.com/caolan/async/#queue
-// 'shouldEnrichFunc' - function(data) { return true/false; } to skip/perform async enrichment
-// 'enrichFunc' - function(data, callback) { callback(null, enrichedData); } to update input data with enriched data
// 'enrichConcurrency' - positive integer
-function EnrichStream(shouldEnrichFunc, enrichFunc, enrichConcurrency) {
+// 'enrichFunc' - function(data, callback) { callback(null, enrichedData); } to update input data with enriched data
+// 'shouldEnrichFunc' (optional) - function(data) { return true/false; } to perform/skip asynchronous enrichment.
+// If not declared or null enrichment occurs for all items.
+//
+function EnrichStream(enrichConcurrency, enrichFunc, shouldEnrichFunc) {
"use strict";
if (!shouldEnrichFunc) {
- shouldEnrichFunc = function (data) {
- return false;
+ shouldEnrichFunc = function () {
+ return true;
};
}
View
@@ -1,6 +1,6 @@
{
"name":"enrich-stream",
- "version":"0.1.0",
+ "version":"1.0.0",
"description":"A stream that enables asynchronous enrichment of data with concurrency control while preserving FIFO ordering.",
"main":"./enrich-stream",
"keywords":[
@@ -0,0 +1,59 @@
+var EnrichStream = require("../index");
+var ArrayStream = require('arraystream');
+var should = require('should');
+
+
+var minMs = 0;
+var maxMs = 10;
+var count = 1000;
+var concurrency = 10;
+
+
+// Create test data of timeout values
+var array = [];
+for (var i = 0; i < count; i++) {
+ var timeout = Math.floor((Math.random() * maxMs) + minMs);
+ var sign = Math.floor((Math.random() * 2) + 1) === 2 ? 1 : -1;
+ array.push(timeout * sign);
+}
+
+
+// Configure enrichment stream
+var enrichStream = new EnrichStream(
+ concurrency,
+ function enrich(timeout, callback) {
+ setTimeout(function () {
+ callback(null, timeout + 1);
+ }, timeout);
+ },
+ function perform(timeout) {
+// console.log("%s %d", ((0 <= timeout) ? "enrich" : "skipped"), timeout);
+ return (0 <= timeout);
+ }
+);
+
+
+// Verify FIFO
+var outIndex = 0;
+enrichStream.on('data',
+ function (timeout) {
+ var enriched = 0 <= timeout;
+ if (enriched) {
+ timeout.should.equal(array[outIndex++] + 1);
+ } else {
+ timeout.should.equal(array[outIndex++]);
+ }
+ }).on('end', function () {
+ outIndex.should.equal(array.length);
+ });
+
+
+ArrayStream.create(array)
+ .pipe(enrichStream)
+ .on('end', function() {
+ console.log("Conditionally enriched %d items", array.length);
+ });
+
+
+
+
View
@@ -1,55 +1,3 @@
-var EnrichStream = require("../index");
-var ArrayStream = require('arraystream');
-var should = require('should');
-
-
-var minMs = 0;
-var maxMs = 10;
-var count = 1000;
-var concurrency = 10;
-
-
-// Create test data of timeout values
-var array = [];
-for (var i = 0; i < count; i++) {
- var timeout = Math.floor((Math.random() * maxMs) + minMs);
- var sign = Math.floor((Math.random() * 2) + 1) === 2 ? 1 : -1;
- array.push(timeout * sign);
-}
-console.log("Array length: " + array.length);
-
-
-// Configure enrichment stream
-var enrichStream = new EnrichStream(
- function perform(timeout) {
-// console.log("%s %d", ((0 <= timeout) ? "enrich" : "skipped"), timeout);
- return (0 <= timeout);
- },
- function enrich(timeout, callback) {
- setTimeout(function () {
- callback(null, timeout + 1);
- }, timeout);
- }, concurrency);
-
-
-// Verify FIFO
-var outIndex = 0;
-enrichStream.on('data',
- function (timeout) {
- var enriched = 0 <= timeout;
- if (enriched) {
- timeout.should.equal(array[outIndex++] + 1);
- } else {
- timeout.should.equal(array[outIndex++]);
- }
- }).on('end', function () {
- outIndex.should.equal(array.length);
- });
-
-
-ArrayStream.create(array)
- .pipe(enrichStream);
-
-
-
+require('./conditionally-enrich-test');
+require('./optional-should-test');
@@ -0,0 +1,49 @@
+var EnrichStream = require("../index");
+var ArrayStream = require('arraystream');
+var should = require('should');
+
+
+var minMs = 0;
+var maxMs = 10;
+var count = 1000;
+var concurrency = 10;
+
+
+// Create test data of timeout values
+var array = [];
+for (var i = 0; i < count; i++) {
+ var timeout = Math.floor((Math.random() * maxMs) + minMs);
+ array.push(timeout);
+}
+
+
+// Configure enrichment stream
+var enrichStream = new EnrichStream(
+ concurrency,
+ function enrich(timeout, callback) {
+ setTimeout(function () {
+ callback(null, timeout + 1);
+ }, timeout);
+ }
+);
+
+
+// Verify FIFO
+var outIndex = 0;
+enrichStream.on('data',
+ function (timeout) {
+ timeout.should.equal(array[outIndex++] + 1);
+ }).on('end', function () {
+ outIndex.should.equal(array.length);
+ });
+
+
+ArrayStream.create(array)
+ .pipe(enrichStream)
+ .on('end', function() {
+ console.log("Optionally enrich %d items", array.length);
+ });
+
+
+
+

0 comments on commit 284a72b

Please sign in to comment.