Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

add pipable, and improve readme

  • Loading branch information...
commit b0333ed89207eae7dd1516bf0f36b6c840ecb8c7 1 parent 42b5c5c
@dominictarr authored
Showing with 173 additions and 53 deletions.
  1. +25 −0 examples/pretty.js
  2. +91 −7 index.js
  3. 0  out
  4. +57 −46 readme.markdown
View
25 examples/pretty.js
@@ -0,0 +1,25 @@
+
+var inspect = require('util').inspect
+
+if(!module.parent) {
+ var es = require('..') //load event-stream
+ es.pipe( //pipe joins streams together
+ process.openStdin(), //open stdin
+ es.split(), //split stream to break on newlines
+ es.map(function (data, callback) {//turn this async function into a stream
+ var j
+ try {
+ j = JSON.parse(data) //try to parse input into json
+ } catch (err) {
+ return callback(null, data) //if it fails just pass it anyway
+ }
+ callback(null, inspect(j)) //render it nicely
+ }),
+ process.stdout // pipe it to stdout !
+ )
+ }
+
+// run this
+//
+// curl -sS registry.npmjs.org/event-stream | node pretty.js
+//
View
98 index.js
@@ -5,12 +5,12 @@
var Stream = require('stream').Stream
-
+ , es = exports
// writable stream, collects all events into an array
// and calls back when 'end' occurs
// mainly I'm using this to test the other functions
-exports.writeArray = function (done) {
+es.writeArray = function (done) {
if ('function' !== typeof done)
throw new Error('function writeArray (done): done must be function')
@@ -30,7 +30,7 @@ exports.writeArray = function (done) {
//return a Stream that reads the properties of an object
//respecting pause() and resume()
-exports.readArray = function (array) {
+es.readArray = function (array) {
var stream = new Stream()
, i = 0
, paused = false
@@ -61,7 +61,7 @@ exports.readArray = function (array) {
//emitting each response as data
//unless it's an empty callback
-exports.map = function (mapper) {
+es.map = function (mapper) {
var stream = new Stream()
, inputs = 0
, outputs = 0
@@ -126,7 +126,7 @@ exports.map = function (mapper) {
// combine multiple streams together so that they act as a single stream
//
-exports.pipe = function () {
+es.pipe = function () {
var streams = [].slice.call(arguments)
, first = streams[0]
@@ -176,7 +176,7 @@ exports.pipe = function () {
return thepipe
}
-exports.split = function (matcher) {
+es.split = function (matcher) {
var stream = new Stream()
, soFar = ''
@@ -195,7 +195,6 @@ exports.split = function (matcher) {
var n = soFar;
soFar = ''
this.emit('data', n)
- console.log('data',n)
}
i++
}
@@ -208,3 +207,88 @@ exports.split = function (matcher) {
return stream
}
+
+//
+// helper to make your module into a unix pipe
+// simply add
+//
+// if(!module.parent)
+// require('event-stream').pipable(asyncFunctionOrStreams)
+//
+// asyncFunctionOrStreams may be one or more Streams or if it is a function,
+// it will be automatically wrapped in es.map
+//
+// then pipe stuff into from the command line!
+//
+// curl registry.npmjs.org/event-stream | node hello-pipeable.js | grep whatever
+//
+// etc!
+//
+// also, start pipeable running as a server!
+//
+// > node hello-pipeable.js --port 44444
+//
+
+var setup = function (args) {
+ return args.map(function (f) {
+ console.log(f)
+ var x = f()
+ if('function' === typeof x)
+ return es.map(x)
+ return x
+ })
+}
+
+es.pipeable = function () {
+ var opts = require('optimist').argv
+ var args = [].slice.call(arguments)
+
+ if(opts.h || opts.help) {
+ var name = process.argv[1]
+ console.error([
+ 'Usage:',
+ '',
+ 'node ' + name + ' [options]',
+ ' --port PORT turn this stream into a server',
+ ' --host HOST host of server (localhost is default)',
+ ' --protocol protocol http|net will require(protocol).createServer(...',
+ ' --help display this message',
+ '',
+ ' if --port is not set, will stream input from stdin',
+ '',
+ 'also, pipe from or to files:',
+ '',
+ ' node '+name+ ' < file #pipe from file into this stream',
+ ' node '+name+ ' < infile > outfile #pipe from file into this stream',
+ '',
+ ].join('\n'))
+
+ } else if (!opts.port) {
+ var streams = setup(args)
+ streams.unshift(es.split())
+ streams.unshift(process.openStdin())
+ streams.push(process.stdout)
+
+ return es.pipe.apply(null, streams)
+
+ } else {
+
+ opts.host = opts.host || 'localhost'
+ opts.protocol = opts.protocol || 'http'
+
+ var protocol = require(opts.protocol)
+
+ var server = protocol.createServer(function (instream, outstream) {
+ var streams = setup(args)
+ streams.unshift(es.split())
+ streams.unshift(instream)
+ streams.push(outstream || instream)
+ console.error(streams)
+ es.pipe.apply(null, streams)
+ })
+
+ server.listen(opts.port, opts.host)
+
+ console.error(process.argv[1] +' is listening for "' + opts.protocol + '" on ' + opts.host + ':' + opts.port)
+ }
+}
View
0  out
No changes.
View
103 readme.markdown
@@ -1,75 +1,86 @@
# EventStreams
-EventEmitters in node are a brilliant idea that unfortunatly are under utilized by the node community.
-Yes, that is right. _under utilized_. there are many more things that EventEmitters could be used for, especially,
-the `Stream`s, a subclass of EventEmitters.
+Streams are node's best and sadly misunderstood idea,
+this is a toolkit to make creating and working with streams <em>easy</em>.
-A stream of events is a bit like an array, but an array layed out in time, rather than in memory.
+`Stream` is a subclass of `EventEmitter`, it adds one very useful function:
-You can apply a map function to an array and create a new array, you could apply a similar
-map function to a stream of events to create a new stream. `map` functions, but also `fitler`, `reduce`
-and other functional programming idioms!
+``` js
+ readibleStream.pipe(writableStream)
-event streams are great because they have a naturally scalable API.
-if the events in a stream can be stringified ane parsed then it will be relatively simple to split heavy
-parts of a stream into seperate processes, and to incorperate middlewares into the stream that might
-buffer or rate-limit or parallelize critical aspects of your event stream.
+ //if a stream is both readable and writable it can be pipe on again
+
+ readibleStream.pipe(readableWritableStream)
+ readableWritableStream.pipe(writableStream)
-Supporting this sort of programming is the purpose of this library.
+ // just like on the command line!
+ // readibleStream | readableWritableStream | writableStream
+ //
+```
-[test are in event-stream_tests](https://github.com/dominictarr/event-stream_tests)
+the `event-stream` functions are just like the array functions,
+because Streams are like Arrays, but laid out in time, rather in memory.
-[node Stream documentation](http://nodejs.org/api/streams.html)
+###for example:
-##Examples
+``` js
-###map
+//pretty.js
-Turns an asyncronous function it into an readable/writable EventStream
-it can be used to perform a transformation upon a stream before writing it to something.
+if(!module.parent) {
+ var es = require('..') //load event-stream
+ es.pipe( //pipe joins streams together
+ process.openStdin(), //open stdin
+ es.split(), //split stream to break on newlines
+ es.map(function (data, callback) {//turn this async function into a stream
+ callback(null
+ , inspect(JSON.parse(data))) //render it nicely
+ }),
+ process.stdout // pipe it to stdout !
+ )
+ }
+
+//curl -sS registry.npmjs.org/event-stream | node pretty.js
-If error, `callback(error)` like normal. If you `callback()` (no args) the stream will not emit
-anything from that map.
+```
+
+[test are in event-stream_tests](https://github.com/dominictarr/event-stream_tests)
-`map` does not guarantee mapped output order will be the same an written input.
+[node Stream documentation](http://nodejs.org/api/streams.html)
-`map` will hold off on emitting `end` until all of it's map callbacks are complete.
+##map
-Each map MUST call the callback. it may callback with data, with an error or with no arguments,
+create a readable and writable stream from an asyncronous function.
``` js
+var es = require('event-stream')
- //callback mapped data
-
- callback(null, data) //may use multiple args, but first is always error
-
- //drop this peice of data
-
- callback()
-
- //if there was on error
-
- callback (error) //the event stream will emit 'error' instead of data for this step.
+es.map(function (data, callback) {
+ //transform data
+ // ...
+ callback(null, data)
+})
```
-if a callback is not called map will think that it is still being worked on.
-
-If the callback is called more than once, every call but the first will be ignored.
+Each map MUST call the callback. it may callback with data, with an error or with no arguments,
-''' js
+ * `callback()` drop this data.
+ this makes the map work like `filter`,
+ note:`callback(null,null)` is not the same, and will emit `null`
-var es = require('event-stream')
+ * `callback(null, newData)` turn data into newData
+
+ * `callback(error)` emit an error for this item.
- es.map(function (data, callback) {
- //do something to data
- callback(null, data)
- })
-'''
+>Note: if a callback is not called map will think that it is still being worked on,
+>every call must be answered or the stream will not know when to end.
+>
+>also, if the callback is called more than once, every call but the first will be ignored.
-###read
+###readArray
-Makes an readable `EventStream` from an `Array`.
+makes a readable stream from an array.
Just emit each item as a data event, respecting `pause` and `resume`.
Please sign in to comment.
Something went wrong with that request. Please try again.