Skip to content

Commit

Permalink
disabled deep trampoline by default
Browse files Browse the repository at this point in the history
  • Loading branch information
bjouhier committed Mar 19, 2012
1 parent 9758545 commit fba1190
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
2 changes: 2 additions & 0 deletions API.md
Expand Up @@ -92,6 +92,8 @@ All stream wrappers derive from this wrapper.
creates a wrapper.
* `emitter = wrapper.emitter`
returns the underlying emitter. The emitter stream can be used to attach additional observers.
* `closed = wrapper.closed`
returns true if the `close` event has been received.
* `emitter = wrapper.unwrap()`
unwraps and returns the underlying emitter.
The wrapper should not be used after this call.
Expand Down
7 changes: 4 additions & 3 deletions lib/callbacks/runtime.js
Expand Up @@ -7,6 +7,7 @@
__g.context = __g.context || {};
__g.trampos = __g.trampos || [];
__g.depth = __g.depth || 0;
__g.deepTrampo = false;

function trampoline() {
__g.depth++;
Expand Down Expand Up @@ -36,7 +37,7 @@
} finally {
frame.active = false;
__g.frame = frame.prev;
if (--__g.depth === 0) trampoline();
if (--__g.depth === 0 || !__g.deepTrampo) trampoline();
}
}

Expand Down Expand Up @@ -74,7 +75,7 @@
__setEF(err, frame);
return _(err);
}
frame.active = true;
frame.active = __g.deepTrampo;
return fn(null, result);
}
} catch (ex) {
Expand All @@ -83,7 +84,7 @@
} finally {
frame.active = false;
__g.frame = oldFrame;
if (--__g.depth === 0) trampoline();
if (--__g.depth === 0 && __g.deepTrampo) trampoline();
}
}
}
Expand Down
37 changes: 22 additions & 15 deletions lib/streams/server/streams.js
Expand Up @@ -34,7 +34,7 @@ function wrapProperties(constr, writable, props) {
if (writable) desc.set = function(val) {
this.emitter[name] = val;
};
Object.defineProperty(constr.prototype, name, desc);
constr.prototype[name] === undefined && Object.defineProperty(constr.prototype, name, desc);
});
}

Expand Down Expand Up @@ -71,6 +71,7 @@ function wrapEvents(constr, events) {

function Emitter(emitter) {
var self = this;
var closed = false;
emitter.on('close', function() {
_onClose();
});
Expand All @@ -79,18 +80,18 @@ function Emitter(emitter) {
self.autoClosed = [];

function trackClose() {
emitter = null;
closed = true;
self.autoClosed.forEach(function(fn) { fn.call(self); });
}
var _onClose = trackClose;

self.close = function(_) {
return (function(callback) {
if (!emitter) return callback();
if (closed) return callback();
var close = emitter.close || emitter.destroySoon;
if (typeof close !== "function") return callback();
_onClose = function(err) {
emitter = null;
closed = true;
_onClose = trackClose;
callback(err);
}
Expand All @@ -102,21 +103,27 @@ function Emitter(emitter) {
}
/// * `emitter = wrapper.emitter`
/// returns the underlying emitter. The emitter stream can be used to attach additional observers.
Object.defineProperty(self, "emitter", {
self.emitter === undefined && Object.defineProperty(self, "emitter", {
get: function() {
return emitter;
}
});
/// * `closed = wrapper.closed`
/// returns true if the `close` event has been received.
self.closed === undefined && Object.defineProperty(self, "closed", {
get: function() {
return closed;
}
});
/// * `emitter = wrapper.unwrap()`
/// unwraps and returns the underlying emitter.
/// The wrapper should not be used after this call.
self.unwrap = function() {
var result = emitter;
emitter && emitter.events.forEach(function(event) {
emitter.events.forEach(function(event) {
emitter.removeAllListeners(event);
});
emitter = null;
return result;
closed = true;
return emitter;
}
}

Expand Down Expand Up @@ -161,7 +168,7 @@ function ReadableStream(emitter, options) {
else if (chunk) {
_chunks.push(chunk);
_current += chunk.length;
if (_current > _high && !_paused && !_done && !_error && self.emitter) {
if (_current > _high && !_paused && !_done && !_error && !self.closed) {
emitter.pause();
_paused = true;
}
Expand All @@ -174,15 +181,15 @@ function ReadableStream(emitter, options) {
if (_chunks.length > 0) {
var chunk = _chunks.splice(0, 1)[0];
_current -= chunk.length;
if (_current <= _low && _paused && !_done && !_error && self.emitter) {
if (_current <= _low && _paused && !_done && !_error && !self.closed) {
emitter.resume();
_paused = false;
}
return callback(null, chunk);
} else if (_done) {
if (_paused) { // resume it for keep-alive
try {
self.emitter && emitter.resume();
!self.closed && emitter.resume();
_paused = false;
} catch (e) { // socket may be closed
}
Expand Down Expand Up @@ -213,7 +220,7 @@ function ReadableStream(emitter, options) {
/// returns `this` for chaining.
self.setEncoding = function(enc) {
_encoding = enc;
if (enc && this.emitter) emitter.setEncoding(enc);
if (enc) emitter.setEncoding(enc);
return self;
}
/// * `data = stream.read(_, [len])`
Expand All @@ -223,7 +230,7 @@ function ReadableStream(emitter, options) {
/// Without `len`, the read calls returns the data chunks as they have been emitted by the underlying stream.
/// Once the end of stream has been reached, the `read` call returns `null`.
self.read = function(_, len) {
if (!self.emitter && !_chunks.length) return null;
if (self.closed && !_chunks.length) return null;
if (len == null) return readChunk(_);
if (len < 0) len = Infinity;
if (len == 0) return _encoding ? "" : new Buffer(0);
Expand Down Expand Up @@ -416,7 +423,7 @@ function Server(emitter) {
Emitter.call(self, emitter);

self.listen = function(_, args) {
if (!self.emitter) throw new Error("cannot listen: server is closed");
if (self.closed) throw new Error("cannot listen: server is closed");
args = Array.prototype.slice.call(arguments, 1);
return (function(callback) {
function reply(err, result) {
Expand Down

0 comments on commit fba1190

Please sign in to comment.