Permalink
Browse files

Begun

  • Loading branch information...
0 parents commit b0278c070c78f259bad773fa398941408319646d Rick Cotter committed Dec 5, 2012
Showing with 180 additions and 0 deletions.
  1. +2 −0 .gitignore
  2. +24 −0 README.md
  3. +98 −0 index.js
  4. +15 −0 package.json
  5. +41 −0 test/index.js
@@ -0,0 +1,2 @@
+node_modules
+.idea/*
@@ -0,0 +1,24 @@
+#EnrichStream
+
+A stream that provides asynchronous enrichment of data and guarantees FIFO ordering.
+
+##Implementation
+Enrichment is asynchronous.
+Concurrency is controlled via [async.queue](https://github.com/caolan/async).
+Stream writes are buffered until enrichment has completed.
+
+
+##Use
+Given **Future Improvements** the current best use cases are where enrichment is expected to be relatively fast.
+
+
+##Running Tests
+Run `node test` to send a stream of values for enrichment that are checked for being output in order.
+
+
+##Future Improvements
+*Better buffering
+*Optimize draining
+
+
+
@@ -0,0 +1,98 @@
+var Stream = require('stream');
+var util = require('util');
+var async = require('async');
+
+
+//TODO optimize with draining
+
+
+function EnrichStream(enrichFunc, enrichConcurrency) {
+ "use strict";
+
+ if (!enrichFunc) {
+ enrichFunc = function (task, callback) {
+ callback();
+ };
+ }
+
+ var self = this;
+ self.writable = true;
+ self.readable = true;
+
+ var streamingOut = false;
+ var endWanted = false;
+ var workCount = 0;
+ var completedCount = 0;
+ var buffer = [];
+ var queue = async.queue(enrichFunc, enrichConcurrency);
+ var destroyed = false;
+
+
+ function workCompleted() {
+ if (streamingOut || destroyed) {
+ return;
+ }
+
+ streamingOut = true;
+ while (streamingOut && buffer.length && buffer[0].done) {
+ self.emit('data', buffer.shift().data);
+ completedCount++;
+ }
+ streamingOut = false;
+
+ if (endWanted === true && completedCount === workCount) {
+ self.ended = true;
+ self.writable = false;
+ self.emit('end');
+ }
+ }
+
+
+ function getEnrichedFunc(work) {
+ return function (err) {
+ if (err) {
+ console.log(err);
+ }
+
+ work.done = true;
+ workCompleted();
+ };
+ }
+
+
+ this.write = function (data) {
+ workCount++;
+ var work = {data:data, done:false};
+ buffer.push(work);
+ queue.push(data, getEnrichedFunc(work));
+ return true;
+ };
+
+
+ this.end = function (data) {
+ if (data) {
+ this.write(data);
+ }
+
+ endWanted = true;
+ };
+
+
+ this.destroy = function () {
+ if (destroyed) {
+ return;
+ }
+
+ destroyed = true;
+ self.ended = true;
+ self.writable = false;
+ buffer.length = 0;
+ self.emit('close');
+ };
+}
+
+
+util.inherits(EnrichStream, Stream);
+
+
+module.exports = EnrichStream;
@@ -0,0 +1,15 @@
+{
+ "name":"enrich-stream",
+ "description":"A stream that allows asynchronous enrichment of items while preserving FIFO ordering",
+ "engines":{
+ "node":"node >= 0.8.0"
+ },
+ "version":"0.1.0",
+ "dependencies":{
+ "async":"0.1.22"
+ },
+ "devDependencies":{
+ "should":"~1.2.1",
+ "arraystream":"~0.0.5"
+ }
+}
@@ -0,0 +1,41 @@
+var EnrichStream = require("../index");
+var ArrayStream = require('arraystream');
+var should = require('should');
+
+
+var minMs = 1;
+var maxMs = 3;
+var count = 1000;
+var concurrency = 10;
+
+
+var array = [];
+for (var i = 0; i < count; i++) {
+ var timeout = Math.floor((Math.random() * maxMs) + minMs);
+ array.push(timeout);
+}
+console.log("Array length: " + array.length);
+
+
+var enrichStream = new EnrichStream(function (timeout, callback) {
+ setTimeout(function () {
+ callback();
+ }, timeout);
+}, concurrency);
+
+
+var outIndex = 0;
+enrichStream.on('data',
+ function (timeout) {
+ timeout.should.equal(array[outIndex++]);
+ }).on('end', function () {
+ outIndex.should.equal(array.length);
+ });
+
+
+ArrayStream.create(array)
+ .pipe(enrichStream);
+
+
+
+

0 comments on commit b0278c0

Please sign in to comment.