Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

big tidy up so that sinks can also be throughs

  • Loading branch information...
commit 5a77767b9ccadb33e5a187f777482c9f99eb37f6 1 parent 7f1fab9
Dominic Tarr authored
Showing with 186 additions and 128 deletions.
  1. +12 −83 index.js
  2. +57 −0 maybe.js
  3. +3 −44 sinks.js
  4. +0 −1  sources.js
  5. +22 −0 throughs.js
  6. +92 −0 util.js
95 index.js
View
@@ -2,96 +2,25 @@
var sources = require('./sources')
var sinks = require('./sinks')
var throughs = require('./throughs')
-
+var u = require('./util')
+
for(var k in sources)
- exports[k] = Source(sources[k])
+ exports[k] = u.Source(sources[k])
for(var k in throughs)
- exports[k] = Through(throughs[k])
+ exports[k] = u.Through(throughs[k])
for(var k in sinks)
- exports[k] = Sink(sinks[k])
-
-exports.Duplex =
-exports.Through = exports.pipeable = Through
-exports.Source = exports.pipeableSource = Source
-exports.Sink = exports.pipeableSink = Sink
-
-exports.addPipe = addPipe
-exports.addReaderPipe
- = addReaderPipe
+ exports[k] = u.Sink(sinks[k])
-function addPipe(read) {
- if('function' !== typeof read)
- return read
+var maybe = require('./maybe')(exports)
- read.pipe = read.pipe || function (reader) {
- if('function' != typeof reader)
- throw new Error('must pipe to reader')
- return addPipe(reader(read))
- }
- read.type = 'Source'
- return read
-}
+for(var k in maybe)
+ exports[k] = maybe[k]
-function Source (createRead) {
- function s() {
- var args = [].slice.call(arguments)
- return addPipe(createRead.apply(null, args))
- }
- s.type = 'Source'
- return s
-}
-
-function addReaderPipe(reader) {
- var piped = []
- function _reader (read) {
- read = reader(read)
- while(piped.length)
- read = piped.shift()(read)
- return read
- //pipeing to from this reader should compose...
- }
- _reader.pipe = function (read) {
- piped.push(read)
- return reader
- }
- return _reader
-}
-
-function Through (createRead) {
- return function () {
- var args = [].slice.call(arguments)
- var piped = []
- function reader (read) {
- args.unshift(read)
- read = createRead.apply(null, args)
- while(piped.length)
- read = piped.shift()(read)
- return read
- //pipeing to from this reader should compose...
- }
- reader.pipe = function (read) {
- piped.push(read)
- if(read.type === 'Source')
- throw new Error('cannot pipe ' + reader.type + ' to Source')
- reader.type = read.type === 'Sink' ? 'Sink' : 'Through'
- return reader
- }
- reader.type = 'Through'
- return reader
- }
-}
+exports.Duplex =
+exports.Through = exports.pipeable = u.Through
+exports.Source = exports.pipeableSource = u.Source
+exports.Sink = exports.pipeableSink = u.Sink
-function Sink(createReader) {
- return function () {
- var args = [].slice.call(arguments)
- function s (read) {
- args.unshift(read)
- return createReader.apply(null, args)
- }
- s.type = 'Sink'
- return s
- }
-}
57 maybe.js
View
@@ -0,0 +1,57 @@
+var u = require('./util')
+var prop = u.prop
+var id = u.id
+var maybeSink = u.maybeSink
+
+module.exports = function (pull) {
+
+ var exports = {}
+ var drain = pull.drain
+
+ var find =
+ exports.find = function (test, cb) {
+ return maybeSink(function (cb) {
+ var ended = false
+ if(!cb)
+ cb = test, test = id
+ else
+ test = prop(test) || id
+
+ return drain(function (data) {
+ if(test(data)) {
+ ended = true
+ cb(null, data)
+ return false
+ }
+ }, function (err) {
+ if(ended) return //already called back
+ cb(err === true ? null : err, null)
+ })
+
+ }, cb)
+ }
+
+ var reduce = exports.reduce =
+ function (reduce, acc, cb) {
+ console.log('maybeSink---', reduce, acc, cb)
+
+ return maybeSink(function (cb) {
+ return drain(function (data) {
+ acc = reduce(acc, data)
+ }, function (err) {
+ cb(err, acc)
+ })
+
+ }, cb)
+ }
+
+ var collect = exports.collect = exports.writeArray =
+ function (cb) {
+ return reduce(function (arr, item) {
+ arr.push(item)
+ return arr
+ }, [], cb)
+ }
+
+ return exports
+}
47 sinks.js
View
@@ -1,7 +1,3 @@
-var u = require('./util')
-var prop = u.prop
-var id = u.id
-
var drain = exports.drain = function (read, op, done) {
;(function next() {
var sync = true, returned = false, loop = true
@@ -30,50 +26,13 @@ var drain = exports.drain = function (read, op, done) {
})()
}
-var find =
-exports.find = function (read, test, cb) {
- var ended = false
- if(!cb)
- cb = test, test = id
- else
- test = prop(test) || id
- drain(read, function (data) {
- if(test(data)) {
- ended = true
- cb(null, data)
- return false
- }
- }, function (err) {
- if(ended) return //already called back
- cb(err === true ? null : err, null)
- })
-}
-
-var reduce = exports.reduce =
-function (read, reduce, acc, cb) {
- drain(read, function (data) {
- acc = reduce(acc, data)
- }, function (err) {
- cb(err, acc)
- })
-}
-
-var collect = exports.collect = exports.writeArray =
-function (read, cb) {
- return reduce(read, function (arr, item) {
- arr.push(item)
- return arr
- }, [], cb)
-}
-
-//if the source callsback sync, then loop
-//rather than recurse
-
var onEnd = exports.onEnd = function (read, done) {
return drain(read, null, done)
}
var log = exports.log = function (read, done) {
- return drain(read, console.log.bind(console), done)
+ return drain(read, function (data) {
+ console.log(data)
+ }, done)
}
1  sources.js
View
@@ -87,7 +87,6 @@ function (start, createStream) {
var reads = []
reads.unshift(once(start))
-// reads.unshift(createStream(start))
return function next (end, cb) {
if(!reads.length)
22 throughs.js
View
@@ -1,5 +1,7 @@
var u = require('./util')
var sources = require('./sources')
+var sinks = require('./sinks')
+
var prop = u.prop
var id = u.id
var tester = u.tester
@@ -232,6 +234,26 @@ function (read, head) {
}
+//var drainIf = exports.drainIf = function (op, done) {
+// sinks.drain(
+//}
+
+var _reduce = exports._reduce = function (read, reduce, initial) {
+ return function (close, cb) {
+ if(close) return read(close, cb)
+ if(ended) return cb(ended)
+
+ sinks.drain(function (item) {
+ initial = reduce(initial, item)
+ }, function (err, data) {
+ ended = err || true
+ if(!err) cb(null, initial)
+ else cb(ended)
+ })
+ (read)
+ }
+}
+
var nextTick = process.nextTick
var highWaterMark = exports.highWaterMark =
92 util.js
View
@@ -20,4 +20,96 @@ exports.tester = function (test) {
return exports.prop(test) || exports.id
}
+exports.addPipe = addPipe
+
+function addPipe(read) {
+ if('function' !== typeof read)
+ return read
+
+ read.pipe = read.pipe || function (reader) {
+ if('function' != typeof reader)
+ throw new Error('must pipe to reader')
+ return addPipe(reader(read))
+ }
+ read.type = 'Source'
+ return read
+}
+
+var Source =
+exports.Source =
+function Source (createRead) {
+ function s() {
+ var args = [].slice.call(arguments)
+ return addPipe(createRead.apply(null, args))
+ }
+ s.type = 'Source'
+ return s
+}
+
+
+var Through =
+exports.Through =
+function (createRead) {
+ return function () {
+ var args = [].slice.call(arguments)
+ var piped = []
+ function reader (read) {
+ args.unshift(read)
+ read = createRead.apply(null, args)
+ while(piped.length)
+ read = piped.shift()(read)
+ return read
+ //pipeing to from this reader should compose...
+ }
+ reader.pipe = function (read) {
+ piped.push(read)
+ if(read.type === 'Source')
+ throw new Error('cannot pipe ' + reader.type + ' to Source')
+ reader.type = read.type === 'Sink' ? 'Sink' : 'Through'
+ return reader
+ }
+ reader.type = 'Through'
+ return reader
+ }
+}
+
+var Sink =
+exports.Sink =
+function Sink(createReader) {
+ return function () {
+ var args = [].slice.call(arguments)
+ if(!createReader)
+ throw new Error('must be createReader function')
+ function s (read) {
+ args.unshift(read)
+ return createReader.apply(null, args)
+ }
+ s.type = 'Sink'
+ return s
+ }
+}
+
+
+exports.maybeSink =
+exports.maybeDrain =
+function (createSink, cb) {
+ if(!cb)
+ return Through(function (read) {
+ var ended
+ return function (close, cb) {
+ if(close) return read(close, cb)
+ if(ended) return cb(ended)
+
+ createSink(function (err, data) {
+ ended = err || true
+ if(!err) cb(null, data)
+ else cb(ended)
+ }) (read)
+ }
+ })()
+
+ return Sink(function (read) {
+ return createSink(cb) (read)
+ })()
+}
Please sign in to comment.
Something went wrong with that request. Please try again.