This repository has been archived by the owner on Nov 28, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 149
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
inital. first implementation of a map function (takes async callback …
…function and turns it into readible & writable stream
- Loading branch information
0 parents
commit 970993c
Showing
4 changed files
with
103 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| node_modules | ||
| node_modules/* | ||
| npm_debug.log |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
|
|
||
| //create an event stream and apply function to each .write | ||
| //emitting each response as data | ||
| //unless it's an empty callback | ||
| // | ||
|
|
||
| var Stream = require('stream').Stream | ||
|
|
||
| exports.map = function (mapper) { | ||
| var stream = new Stream() | ||
| , inputs = 0 | ||
| , outputs = 0 | ||
| , ended = false | ||
|
|
||
| stream.writable = true | ||
| stream.readible = true | ||
|
|
||
| stream.write = function () { | ||
| inputs ++ | ||
| var args = [].slice.call(arguments) | ||
| , r | ||
| , inNext = false | ||
| function next (err) { | ||
| inNext = true | ||
| outputs ++ | ||
| var args = [].slice.call(arguments) | ||
| if(err) | ||
| return inNext = false, stream.emit('error') | ||
|
|
||
| args.shift() //drop err | ||
|
|
||
| if (args.length){ | ||
| args.unshift('data') | ||
| r = stream.emit.apply(stream, args) | ||
| } | ||
| if(inputs == outputs) { | ||
| stream.emit('drain') //written all the incoming events | ||
| if(ended) | ||
| stream.end() | ||
| } | ||
| inNext = false | ||
| } | ||
| args.push(next) | ||
|
|
||
| try { | ||
| //catch sync errors and handle them like async errors | ||
| return mapper.apply(null,args) | ||
| } catch (err) { | ||
| //if the callback has been called syncronously, and the error | ||
| //has occured in an listener, throw it again. | ||
| if(inNext) | ||
| throw err | ||
| next(err) | ||
| } | ||
| } | ||
|
|
||
| stream.end = function () { | ||
| var args = [].slice.call(arguments) | ||
| //if end was called with args, write it, | ||
| ended = true //write will emit 'end' if ended is true | ||
| if(args.length) | ||
| return stream.write.apply(emitter, args) | ||
| else | ||
| stream.emit('end') | ||
| } | ||
|
|
||
| return stream | ||
| } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| { "name": "event-stream" | ||
| , "version": "0.0.0" | ||
| , "description": "node streams applied to arbitary events" | ||
| , "homepage": "http://github.com/dominictarr/event-stream" | ||
| , "repository": | ||
| { "type": "git" | ||
| , "url": "https://github.com/dominictarr/event-stream.git" } | ||
| , "dependencies": {} | ||
| , "devDependencies": {} | ||
| , "author": "Dominic Tarr <dominic.tarr@gmail.com> (http://bit.ly/dominictarr)" | ||
| , "scripts": { "test": "meta-test test/*.js" } } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| # EventStreams | ||
|
|
||
| EventEmitters in node are a brilliant idea that are under utilized in the node community. | ||
|
|
||
| they are at the heart of all the IO in node core, reading from files, or TCP, or the body | ||
| of an http request or response is handled as a stream of events. | ||
|
|
||
| A stream of events is a bit like an array, but an array layed out in time, rather than in memory. | ||
|
|
||
| You can apply a map function to an array and create a new array, you could apply a similar | ||
| map function to a stream of events to create a new stream. `map` functions, but also `fitler`, `reduce` | ||
| and other functional programming idioms! | ||
|
|
||
| event streams are great because they have a naturally scalable API. | ||
| if the events in a stream can be stringified ane parsed then it will be relatively simple to split heavy | ||
| parts of a stream into seperate processes, and to incorperate middlewares into the stream that might | ||
| buffer or rate-limit or parallelize critical aspects of your event stream. | ||
|
|
||
| Supporting this sort of programming is the purpose of this library. | ||
|
|
||
| an EventStream must: |