Permalink
Browse files

changed read to readArray, add writeArray, and split

  • Loading branch information...
1 parent 9b9e2aa commit 42b5c5c4b8c99da198e06da05fa26e144a4ea65d @dominictarr committed Sep 16, 2011
Showing with 92 additions and 39 deletions.
  1. +92 −39 index.js
View
@@ -1,16 +1,66 @@
-
-//create an event stream and apply function to each .write
-//emitting each response as data
-//unless it's an empty callback
-//
-
-var Stream = require('stream').Stream
-
//filter will reemit the data if cb(err,pass) pass is truthy
// reduce is more tricky
// maybe we want to group the reductions or emit progress updates occasionally
// the most basic reduce just emits one 'data' event after it has recieved 'end'
+
+var Stream = require('stream').Stream
+
+// writable stream, collects all events into an array
+// and calls back when 'end' occurs
+// mainly I'm using this to test the other functions
+
+exports.writeArray = function (done) {
+ if ('function' !== typeof done)
+ throw new Error('function writeArray (done): done must be function')
+
+ var a = new Stream ()
+ , array = []
+ a.write = function (l) {
+ array.push(l)
+ }
+ a.end = function () {
+ done(null, array)
+ }
+ a.writable = true
+ a.readable = false
+ return a
+}
+
+//return a Stream that reads the properties of an object
+//respecting pause() and resume()
+
+exports.readArray = function (array) {
+ var stream = new Stream()
+ , i = 0
+ , paused = false
+
+ stream.readable = true
+ stream.writable = false
+
+ if(!Array.isArray(array))
+ throw new Error('event-stream.read expects an array')
+
+ stream.resume = function () {
+ paused = false
+ var l = array.length
+ while(i < l && !paused) {
+ stream.emit('data', array[i++])
+ }
+ if(i == l)
+ stream.emit('end'), stream.readible = false
+ }
+ process.nextTick(stream.resume)
+ stream.pause = function () {
+ paused = true
+ }
+ return stream
+}
+
+//create an event stream and apply function to each .write
+//emitting each response as data
+//unless it's an empty callback
+
exports.map = function (mapper) {
var stream = new Stream()
, inputs = 0
@@ -72,36 +122,6 @@ exports.map = function (mapper) {
return stream
}
- //return a Stream that reads the properties of an object
- //respecting pause() and resume()
-
-exports.read = function (array) {
- var stream = new Stream()
- , i = 0
- , paused = false
-
- stream.readable = true
- stream.writable = false
-
- if(!Array.isArray(array))
- throw new Error('event-stream.read expects an array')
-
- stream.open = function () {
- paused = false
- var l = array.length
- while(i < l && !paused) {
- stream.emit('data', array[i++])
- }
- if(i == l)
- stream.emit('end'), stream.readible = false
- }
- stream.pause = function () {
- paused = true
- }
- stream.resume = stream.open
- return stream
-}
-
//
// combine multiple streams together so that they act as a single stream
//
@@ -154,4 +174,37 @@ exports.pipe = function () {
})
return thepipe
-}
+}
+
+exports.split = function (matcher) {
+ var stream = new Stream()
+ , soFar = ''
+
+ if (!matcher)
+ matcher = '\n'
+
+ stream.writable = true
+ stream.write = function (buffer) {
+ buffer = buffer.toString()
+ var l = buffer.length
+ , i = 0
+ while (i < l) {
+ var c = buffer[i].toString()
+ soFar += c
+ if (c == matcher) {
+ var n = soFar;
+ soFar = ''
+ this.emit('data', n)
+ console.log('data',n)
+ }
+ i++
+ }
+ }
+
+ stream.end = function () {
+ stream.emit('data', soFar)
+ stream.emit('end')
+ }
+
+ return stream
+}

0 comments on commit 42b5c5c

Please sign in to comment.