Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

respect pause and resume #19

Merged
merged 1 commit into from

2 participants

@Raynos

Note I call

var md = duplex().resume()

Because a bunch of your tests assume that md is in a resumed state initially.

It's upto you whether you want to change md to be paused and resume on next tick.

@dominictarr dominictarr merged commit 4524b46 into from
@dominictarr
Owner

cool!

merged into mux-demux@3.4.0

Yeah, it's easier to test if you can make the module synchronous,
since this module doesn't actually do any IO directly.

@dominictarr
Owner

we should make the sum streams be duplex also.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 22, 2012
  1. @Raynos

    respect pause and resume

    Raynos authored
This page is out of date. Refresh to see the latest.
Showing with 38 additions and 16 deletions.
  1. +15 −15 index.js
  2. +2 −1  package.json
  3. +21 −0 test/pause.js
View
30 index.js
@@ -2,6 +2,7 @@
var through = require('through')
, extend = require('xtend')
+ , duplex = require('duplex')
, serializer = require('stream-serializer')
function MuxDemux (opts, onConnection) {
@@ -17,17 +18,19 @@ function MuxDemux (opts, onConnection) {
}
var streams = {}, streamCount = 0
- var md = through(function (data) {
+ var md = duplex().resume()
+
+ md.on('_data', function (data) {
var id = data.shift()
var event = data[0]
var s = streams[id]
if(!s) {
if(event == 'close')
return
- if(event != 'new')
+ if(event != 'new')
return outer.emit('unknown', id)
md.emit('connection', createStream(id, data[1].meta, data[1].opts))
- }
+ }
else if (event === 'pause')
s.paused = true
else if (event === 'resume') {
@@ -72,9 +75,6 @@ function MuxDemux (opts, onConnection) {
//end the stream once sub-streams have ended.
//(waits for them to close, like on a tcp server)
- md.pause = function () {}
- md.resume = function () {}
-
function createStream(id, meta, opts) {
streamCount ++
var s = through(function (data) {
@@ -83,27 +83,27 @@ function MuxDemux (opts, onConnection) {
err.stream = this
return outer.emit("error", err)
}
-
- md.emit('data', [s.id, 'data', data])
+
+ md._data([s.id, 'data', data])
}, function () {
- md.emit('data', [s.id, 'end'])
+ md._data([s.id, 'end'])
if (this.readable && !opts.allowHalfOpen && !this.ended) {
this.emit("end")
}
})
s.pause = function () {
- md.emit('data', [s.id, 'pause'])
+ md._data([s.id, 'pause'])
}
s.resume = function () {
- md.emit('data', [s.id, 'resume'])
+ md._data([s.id, 'resume'])
}
s.error = function (message) {
- md.emit('data', [s.id, 'error', message])
+ md._data([s.id, 'error', message])
}
s.once('close', function () {
delete streams[id]
streamCount --
- md.emit('data', [s.id, 'close'])
+ md._data([s.id, 'close'])
if(streamCount === 0)
md.emit('zero')
})
@@ -123,7 +123,7 @@ function MuxDemux (opts, onConnection) {
outer.close = function (cb) {
md.once('zero', function () {
- md.emit('end')
+ md._end()
if(cb) cb()
})
return this
@@ -155,7 +155,7 @@ function MuxDemux (opts, onConnection) {
opts.readable = opts.writable = true
var s = createStream(createID(), meta, opts)
var _opts = {writable: opts.readable, readable: opts.writable}
- md.emit('data', [s.id, 'new', {meta: meta, opts: _opts}])
+ md._data([s.id, 'new', {meta: meta, opts: _opts}])
return s
}
outer.createWriteStream = function (meta) {
View
3  package.json
@@ -10,7 +10,8 @@
"dependencies": {
"xtend": "~1.0.3",
"stream-serializer": "~0.0.3",
- "through": "~1.1.0"
+ "through": "~1.1.0",
+ "duplex": "~1.0.0"
},
"scripts": {
"test": "tap test/*.js"
View
21 test/pause.js
@@ -0,0 +1,21 @@
+var MuxDemux = require("..")
+var assert = require("assert")
+
+var mdm1 = MuxDemux()
+var mdm2 = MuxDemux()
+var called = false
+
+mdm2.on("connection", function (stream) {
+ assert.equal(stream.meta, "foo")
+ called = true
+})
+
+mdm1.pause()
+
+mdm1.createStream("foo")
+
+mdm1.pipe(mdm2).pipe(mdm1)
+
+mdm1.resume()
+
+assert.equal(called, true)
Something went wrong with that request. Please try again.