Skip to content

Commit

Permalink
Ensure http response streams are paused (#438)
Browse files Browse the repository at this point in the history
Responses returned by http#request are yielded in paused mode. By
attaching a 'data' listener to the request the http instrumentation was
switching the stream to flowing mode which could cause the request to
drain before the calling framework had a chance to attach their own
listeners.

This also removes a test for tracking span duration for http requests
with responses that were never checked. We are actually not able to
properly close spans in this case without changing the behavior of the
requests so this test should not pass.

Fixes #437 

PR-URL: #438
  • Loading branch information
matthewloring committed Mar 15, 2017
1 parent 72ab677 commit ca4b67b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 15 deletions.
20 changes: 18 additions & 2 deletions src/plugins/plugin-http.js
Expand Up @@ -76,8 +76,24 @@ function patchRequest (http, api) {
var req = request.call(this, options, function(res) {
api.wrapEmitter(res);
var numBytes = 0;
res.on('data', function (chunk) {
numBytes += chunk.length;
var listenerAttached = false;
// Responses returned by http#request are yielded in paused mode. Attaching
// a 'data' listener to the request will switch the stream to flowing mode
// which could cause the request to drain before the calling framework has
// a chance to attach their own listeners. To avoid this, we attach our listener
// lazily.
// This approach to tracking data size will not observe data read by
// explicitly calling `read` on the request. We expect this to be very
// uncommon as it is not mentioned in any of the official documentation.
shimmer.wrap(res, 'on', function onWrap(on) {
return function on_trace(eventName, cb) {
if (eventName === 'data' && !listenerAttached) {
on.call(this, 'data', function(chunk) {
numBytes += chunk.length;
});
}
on.apply(this, arguments);
};
});
res.on('end', function () {
requestLifecycleSpan
Expand Down
39 changes: 26 additions & 13 deletions test/plugins/test-trace-http.js
Expand Up @@ -17,6 +17,7 @@

var common = require('./common.js');
var constants = require('../../src/constants.js');
var stream = require('stream');
var TraceLabels = require('../../src/trace-labels.js');

var agent = require('../..').start({ samplingRate: 0 });
Expand Down Expand Up @@ -60,19 +61,6 @@ describe('test-trace-http', function() {
);
});

it('should accurately measure get time, no callback', function(done) {
server.listen(common.serverPort, common.runInTransaction.bind(null, agent,
function(endTransaction) {
http.get({port: common.serverPort, headers: {}});
setTimeout(function() {
endTransaction();
common.assertDurationCorrect(agent, common.serverWait);
done();
}, common.serverWait * 1.5);
})
);
});

it('should not trace api requests', function(done) {
server.listen(common.serverPort, common.runInTransaction.bind(null, agent,
function(endTransaction) {
Expand Down Expand Up @@ -102,6 +90,31 @@ describe('test-trace-http', function() {
);
});

it('should leave request streams in paused mode', function(done) {
server.listen(common.serverPort, common.runInTransaction.bind(null, agent,
function(endTransaction) {
var start = Date.now();
http.get({port: common.serverPort}, function(res) {
var result = '';
var writable = new stream.Writable();
writable._write = function(chunk, encoding, next) {
result += chunk;
next();
};
writable.on('finish', function() {
endTransaction();
assert.equal(common.serverRes, result);
common.assertDurationCorrect(agent, Date.now() - start);
done();
});
setImmediate(function() {
res.pipe(writable);
});
});
})
);
});

it('should accurately measure get time, string url', function(done) {
server.listen(common.serverPort, common.runInTransaction.bind(null, agent,
function(endTransaction) {
Expand Down

0 comments on commit ca4b67b

Please sign in to comment.