Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

initial

  • Loading branch information...
commit 7ec75dd0c7d76f46a96b8e311c19c2c50ce03e60 1 parent aa3feed
@dominictarr authored
Showing with 211 additions and 0 deletions.
  1. +62 −0 index.js
  2. +26 −0 package.json
  3. +123 −0 test/index.js
View
62 index.js
@@ -0,0 +1,62 @@
+
+var Stream = require('stream')
+
+// from
+//
+// a stream that reads from an source.
+// source may be an array, or a function.
+// from handles pause behaviour for you.
+
+module.exports =
+function from (source) {
+ if(Array.isArray(source))
+ return from (function (i) {
+ if(source.length)
+ this.emit('data', source.shift())
+ else
+ this.emit('end')
+ return true
+ })
+
+ var s = new Stream(), i = 0, ended = false, started = false
+ s.readable = true
+ s.writable = false
+ s.paused = false
+ s.pause = function () {
+ started = true
+ s.paused = true
+ }
+ function next () {
+ var n = 0, r = false
+ if(ended) return
+ while(!ended && !s.paused && source.call(s, i, function () {
+ if(!n++ && !s.ended && !s.paused)
+ next()
+ }))
+ ;
+ }
+ s.resume = function () {
+ started = true
+ s.paused = false
+ next()
+ }
+ s.on('end', function () {
+ ended = true
+ s.readable = false
+ process.nextTick(s.destroy)
+ })
+ s.destroy = function () {
+ ended = true
+ s.emit('close')
+ }
+ /*
+ by default, the stream will start emitting at nextTick
+ if you want, you can pause it, after pipeing.
+ you can also resume before next tick, and that will also
+ work.
+ */
+ process.nextTick(function () {
+ if(!started) s.resume()
+ })
+ return s
+}
View
26 package.json
@@ -0,0 +1,26 @@
+{
+ "name": "from",
+ "version": "0.0.0",
+ "description": "Easy way to make a Readable Stream",
+ "main": "index.js",
+ "scripts": {
+ "test": "asynct test/*.js"
+ },
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/dominictarr/from.git"
+ },
+ "keywords": [
+ "stream",
+ "streams",
+ "readable",
+ "easy"
+ ],
+ "devDependencies": {
+ "asynct": "1",
+ "stream-spec": "0",
+ "assertions": "~2.3.0"
+ },
+ "author": "Dominic Tarr <dominic.tarr@gmail.com> (dominictarr.com)",
+ "license": "MIT"
+}
View
123 test/index.js
@@ -0,0 +1,123 @@
+var from = require('..')
+var spec = require('stream-spec')
+var a = require('assertions')
+
+function read(stream, callback) {
+ var actual = []
+ stream.on('data', function (data) {
+ actual.push(data)
+ })
+ stream.once('end', function () {
+ callback(null, actual)
+ })
+ stream.once('error', function (err) {
+ callback(err)
+ })
+}
+
+function pause(stream) {
+ stream.on('data', function () {
+ if(Math.random() > 0.1) return
+ stream.pause()
+ process.nextTick(function () {
+ stream.resume()
+ })
+ })
+}
+
+exports['simple'] = function (test) {
+
+ var l = 1000
+ , expected = []
+
+ while(l--) expected.push(l * Math.random())
+
+ var t = from(expected.slice())
+
+ spec(t)
+ .readable()
+ .strictPausable()
+ .validateOnExit()
+
+ read(t, function (err, actual) {
+ if(err) test.error(err) //fail
+ a.deepEqual(actual, expected)
+ test.done()
+ })
+
+}
+
+exports['simple pausable'] = function (test) {
+
+ var l = 1000
+ , expected = []
+
+ while(l--) expected.push(l * Math.random())
+
+ var t = from(expected.slice())
+
+ spec(t)
+ .readable()
+ .strictPausable()
+ .validateOnExit()
+
+ pause(t)
+
+ read(t, function (err, actual) {
+ if(err) test.error(err) //fail
+ a.deepEqual(actual, expected)
+ test.done()
+ })
+
+}
+
+exports['simple (not strictly pausable) setTimeout'] = function (test) {
+
+ var l = 10
+ , expected = []
+ while(l--) expected.push(l * Math.random())
+
+
+ var _expected = expected.slice()
+ var t = from(function (i, n) {
+ var self = this
+ setTimeout(function () {
+ if(_expected.length)
+ self.emit('data', _expected.shift())
+ else
+ self.emit('end')
+ n()
+ }, 3)
+ })
+
+ /*
+ using from in this way will not be strictly pausable.
+ it could be extended to buffer outputs, but I think a better
+ way would be to use a PauseStream that implements strict pause.
+ */
+
+ spec(t)
+ .readable()
+ //.strictPausable()
+ .validateOnExit()
+
+ //pause(t)
+ var paused = false
+ var i = setInterval(function () {
+ if(!paused) t.pause()
+ else t.resume()
+ console.log('paused', paused)
+ paused = !paused
+ }, 2)
+
+ t.on('end', function () {
+ clearInterval(i)
+ })
+
+ read(t, function (err, actual) {
+ if(err) test.error(err) //fail
+ a.deepEqual(actual, expected)
+ test.done()
+ })
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.