Permalink
Browse files

Move reducers pipe here.

  • Loading branch information...
Gozala committed Nov 24, 2012
1 parent 7519afd commit f38f6e6299df361b7e08a15d8bf93ce82a3d8432
Showing with 73 additions and 0 deletions.
  1. +20 −0 pipe.js
  2. +53 −0 test/pipe.js
View
20 pipe.js
@@ -0,0 +1,20 @@
+"use strict";
+
+var send = require("./send")
+var reduce = require("reducible/reduce")
+
+function pipe(input, output) {
+ /**
+ Takes reducible `input` and pipes it to the `output` (which is anything
+ that implements `send`). Note that first `end` or `error` from the piped
+ `input`-s will end an `output` causing subsequent `send`s return `reducers`
+ stopping other `input`-s. If you need to `pipe` all values form multiple
+ inputs do `pipe(merge(inputs), output)`, that way `output` will close only
+ once all inputs end. If you can't merge all the inputs up front you can
+ always pipe merged event. That way sending new inputs to that event will
+ automatically pipe all it's items.
+ **/
+ reduce(input, function pipeReducible(value) { send(output, value) })
+}
+
+module.exports = pipe
View
@@ -0,0 +1,53 @@
+"use strict";
+
+var pipe = require("../pipe")
+var event = require("../event")
+var send = require("../send")
+
+var test = require("reducers/test/util/test")
+var lazy = require("reducers/test/util/lazy")
+
+var concat = require("reducers/concat")
+var into = require("reducers/into")
+var delay = require("reducers/delay")
+var merge = require("reducers/merge")
+
+
+var cache = require("cache-reduce")
+
+
+exports["test pipe multiple streams"] = test(function(assert) {
+ var s = event()
+ var actual = into(s)
+ pipe([1, 2, 3], s)
+ pipe([4, 5, 6], s)
+
+
+ assert(actual, [1, 2, 3],
+ "first end causes close on output all subsequent pipes are ignored")
+})
+
+exports["test pipe multiple streams indepenently"] = test(function(assert) {
+ var s = event()
+ var actual = into(s)
+ pipe(delay([1, 2, 3]), s)
+ pipe(delay([4, 5, 6, 7]), s)
+
+ // note that `6` goes through since `end` is also dispatched with a delay.
+ assert(actual, [1, 4, 2, 5, 3, 6],
+ "parallel pipe works until first end")
+})
+
+exports["test pipe multiple streams"] = test(function(assert) {
+ var s = event()
+ var actual = into(s)
+
+ pipe(merge([[1, 2, 3], [4, 5], [ 6, 7 ]]), s)
+
+ // note that `6` goes through since `end` is also dispatched with a delay.
+ assert(actual, [1, 2, 3, 4, 5, 6, 7],
+ "parallel pipe works until first end")
+})
+
+if (require.main === module)
+ require("test").run(exports)

0 comments on commit f38f6e6

Please sign in to comment.