Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial

  • Loading branch information...
commit 993a5c4eeedbbdffbbd742bcddab6e7a03d82699 0 parents
@dominictarr authored
15 LICENSE.APACHE2
@@ -0,0 +1,15 @@
+Apache License, Version 2.0
+
+Copyright (c) 2012 Dominic Tarr
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
24 LICENSE.MIT
@@ -0,0 +1,24 @@
+The MIT License
+
+Copyright (c) 2012 Dominic Tarr
+
+Permission is hereby granted, free of charge,
+to any person obtaining a copy of this software and
+associated documentation files (the "Software"), to
+deal in the Software without restriction, including
+without limitation the rights to use, copy, modify,
+merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom
+the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
+ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
63 README.markdown
@@ -0,0 +1,63 @@
+# duplex
+
+Simple base class for [duplex](https://github.com/dominictarr/stream-spec#duplex) streams, that automaticall handles pausing and buffering.
+
+``` js
+
+var duplex = require('duplex')
+
+var d = duplex()
+ .on('write', function (data) {
+ d.sendData(data)
+ })
+ .on('ended', function () {
+ d.sendEnd()
+ })
+```
+
+## API
+
+### on('write', function (data))
+
+Emitted when `write(data)` is called.
+
+### on('ended', function ())
+
+Emitted when `end()` is called
+
+### sendData(data)
+
+Add `data` to the output buffer.
+`'data'` will be emitted if there the stream is not paused.
+
+### sendEnd()
+
+Emit `'end'` after the output buffer drains.
+Will be emitted immediately, it the buffer is empty.
+
+### pause()
+
+Set the readable side of the stream into paused state.
+This will prevent it from emitting 'data' or or 'end'
+until resume is called.
+
+### resume()
+Set the reabable side of the stream into the unpaused state.
+This will allow it to emit `'data'` and `'end'` events.
+If there there is any data in the output buffer,
+It will start draining immediately.
+
+## Automatic Behaviours
+
+`destroy()` is called automitically after both sides of the stream has ended.
+`write()==false` after the stream emits `'pause'`,
+and `write()==true` after the stream emits `'drain'`.
+The user is responsible for emitting `'pause'` and `'drain'`.
+
+The stream will call `resume()` in the next tick, unless `pause()` is called manually.
+if `resume()` is manually called before the next tick, the stream will start emitting data
+immediately.
+
+## License
+
+MIT / APACHE 2
148 index.js
@@ -0,0 +1,148 @@
+
+var Stream = require('stream')
+
+/*
+ lemmy think...
+
+ pause() and resume() must prevent data from being emitted.
+
+ write controls 'drain', and the return value.
+
+ idea:
+
+ write -> emit('write')
+
+ on('pause') --> writePause = true
+ if(writePause) --> wirte() == false
+ on('drain') --> writePause = false
+
+ pause() -> paused = true
+ resume() -> paused = false, drain()
+ sendData(data) -> push onto buffer or emit.
+ sendEnd() -> queue end after buffer clears.
+*/
+
+module.exports = function (write, end) {
+ var stream = new Stream()
+ var buffer = [], ended = false, destroyed = false, emitEnd
+ stream.writable = stream.readable = true
+ stream.paused = false
+ stream.buffer = buffer
+
+ stream.writePause = false
+ stream
+ .on('pause', function () {
+ stream.writePause = true
+ })
+ .on('drain', function () {
+ stream.writePause = false
+ })
+
+ function destroySoon () {
@hij1nx
hij1nx added a note

this is never used/exposed?

@dominictarr Owner

oh, thanks for pointing this out, initially, destroy() called that but then I refactored, but forgot to delete that.

destroy is a must in the stream api, (because dest.destroy() is called on source.emit('close'))
destroySoon is only optional. destroySoon() means the same thing as end() so I just implement end and leave it at that.

@hij1nx
hij1nx added a note

btw. i know this is the wrong venue for this. but are you going to be in krakow to work on the thing we've been discussing? :D

@dominictarr Owner

I want to talk to you about that. the right venue is irc! I'm there now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ process.nextTick(stream.destroy.bind(stream))
+ }
+
+ if(write)
+ stream.on('write', write)
+ if(end)
+ stream.on('ended', end)
+
+ //destroy the stream once both ends are over
+
+ stream.once('end', function () {
+ stream.readable = false
+ if(!stream.writable)
+ stream.destroy()
+ })
+
+ stream.once('ended', function () {
+ stream.writable = false
+ if(!stream.readable)
+ stream.destroy()
+ })
+
+ // this is the default write method,
+ // if you overide it, you are resposible
+ // for pause state.
+
+ stream.sendData = function (data) {
+ if(!stream.paused && !buffer.length)
+ stream.emit('data', data)
+ else
+ buffer.push(data)
+ return !(this.paused || buffer.length)
+ }
+
+ stream.sendEnd = function (data) {
+ if(data) stream.write(data)
+ if(emitEnd) return
+ emitEnd = true
+ //destroy is handled above.
+ stream.drain()
+ }
+
+ stream.write = function (data) {
+ stream.emit('write', data)
+ return !stream.writePaused
+ }
+ stream.end = function () {
+ stream.writable = false
+ if(stream.ended) return
+ stream.ended = true
+ stream.emit('ended')
+ }
+ stream.drain = function () {
+ if(!buffer.length && !emitEnd) return
+ //if the stream is paused after just before emitEnd()
+ //end should be buffered.
+ while(!stream.paused) {
+ if(buffer.length) {
+ stream.emit('data', buffer.shift())
+ if(buffer.length == 0)
+ stream.emit('read-drain')
+ }
+ else if(emitEnd && stream.readable) {
+ stream.readable = false
+ stream.emit('end')
+ return
+ } return true
+ }
+ //if the buffer has emptied. emit drain.
+ }
+ var started = false
+ stream.resume = function () {
+ //this is where I need pauseRead, and pauseWrite.
+ //here the reading side is unpaused,
+ //but the writing side may still be paused.
+ //the whole buffer might not empity at once.
+ //it might pause again.
+ //the stream should never emit data inbetween pause()...resume()
+ //and write should return !buffer.length
+ started = true
+ stream.paused = false
+ stream.drain() //will emit drain if buffer empties.
+ return stream
+ }
+
+ stream.destroy = function () {
+ if(destroyed) return
+ destroyed = ended = true
+ buffer.length = 0
+ this.emit('close')
+ }
+ var pauseCalled = false
+ stream.pause = function () {
+ started = true
+ stream.paused = true
+ return this
+ }
+ stream.paused = true
+ process.nextTick(function () {
+ //unless the user manually
+ if(started) return
+ stream.resume()
+ })
+
+ return stream
+}
+
11 package.json
@@ -0,0 +1,11 @@
+{
+ "author": "Dominic Tarr <dominic.tarr@gmail.com> (http://dominictarr.com)",
+ "name": "duplex",
+ "description": "base class for a duplex stream",
+ "version": "0.0.0",
+ "homepage": "https://github.com/dominictarr/duplex",
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/dominictarr/duplex.git"
+ }
+}
108 test/index.js
@@ -0,0 +1,108 @@
+var assert = require('assert')
+var duplex = require('..')
+
+function test(message, func) {
+ console.log('#', message)
+ func(duplex())
+}
+
+
+test("write(data) -> emit('write', data)", function (d) {
+ var a = null
+ d.on('write', function (d) {
+ a = d
+ })
+ var e = Math.random()
+ d.write(e)
+ assert.equal(a, e)
+})
+
+test("emit('pause') -> write(data) === false", function (d) {
+ var a = null
+ assert(d.write(), true)
+ d.emit('pause')
+ assert(d.write(), false)
+ d.emit('drain')
+ assert(d.write(), true)
+})
+
+test("sendData(data) -> emit('data', data)", function (d) {
+
+ var a = null, e
+ d.on('data', function (d) {
+ a = d
+ })
+ d.pause()
+ assert.equal(a, null)
+ d.sendData(e = Math.random())
+ assert.equal(a, null)
+ d.resume()
+ assert.equal(a, e)
+})
+
+test("sendEnd(data) -> emit('end', data)", function (d) {
+
+ var a = false
+
+ d.on('end', function (d) {
+ a = true
+ })
+
+ d.pause()
+ d.sendEnd()
+ assert.equal(a, false, 'pause should buffer end')
+ d.resume()
+ assert.equal(a, true, 'resume should drain end')
+ assert.equal(d.readable, false, 'emitting end should set readable = false')
+})
+
+test('end() -> emit(\'ended\')', function (d) {
+
+ var a = false
+ d.on('ended', function (d) {
+ a = true
+ })
+
+ assert.equal(d.writable, true,'writable should start true')
+ d.end()
+ assert.equal(d.writable, false,'emitEnd should set writable=false')
+ d.resume()
+ assert.equal(a, true)
+
+})
+
+test('end() & emit(\'end\') -> emit(\'close\')', function(d){
+ var a = false
+
+ d.on('close', function () {
+
+ a = true
+ })
+
+ assert.equal(a, false)
+ d.sendEnd()
+ d.end()
+ d.resume()
+ assert(a, true, 'destroy is called after both ends finish')
+
+})
+
+test('start in nextTick', function () {
+ var a = null, e = null
+ process.nextTick(function () {
+ assert.equal(a, null)
+ })
+ //use this d, so I have a chance to set an earlier nextTick.
+ d = duplex()
+
+ d.on('data', function (data) {
+ a = data
+ })
+
+ d.sendData(e = Math.random())
+
+ process.nextTick(function () {
+ assert.equal(a, e)
+ })
+
+})
@hij1nx

this is never used/exposed?

@dominictarr

oh, thanks for pointing this out, initially, destroy() called that but then I refactored, but forgot to delete that.

destroy is a must in the stream api, (because dest.destroy() is called on source.emit('close'))
destroySoon is only optional. destroySoon() means the same thing as end() so I just implement end and leave it at that.

@hij1nx

btw. i know this is the wrong venue for this. but are you going to be in krakow to work on the thing we've been discussing? :D

@dominictarr

I want to talk to you about that. the right venue is irc! I'm there now.

Please sign in to comment.
Something went wrong with that request. Please try again.