diff --git a/HISTORY.md b/HISTORY.md index f013f331..b275de6c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,8 @@ +unreleased +========== + + * fix back-pressure behavior + 1.0.3 / 2014-05-29 ================== diff --git a/index.js b/index.js index ab277844..3498d3e0 100644 --- a/index.js +++ b/index.js @@ -57,7 +57,8 @@ module.exports = function compression(options) { return function compression(req, res, next){ var write = res.write - , end = res.end + var on = res.on + var end = res.end , compress = true , stream; @@ -97,6 +98,14 @@ module.exports = function compression(options) { : end.call(res); }; + res.on = function(type, listener){ + if (!stream || type !== 'drain') { + return on.call(this, type, listener) + } + + return stream.on(type, listener) + } + onHeaders(res, function(){ // default request filter if (!filter(req, res)) return; @@ -140,15 +149,17 @@ module.exports = function compression(options) { // compression stream.on('data', function(chunk){ - write.call(res, chunk); + if (write.call(res, chunk) === false) { + stream.pause() + } }); stream.on('end', function(){ end.call(res); }); - stream.on('drain', function() { - res.emit('drain'); + on.call(res, 'drain', function() { + stream.resume() }); }); diff --git a/test/test.js b/test/test.js index 2c8427cd..6095014f 100644 --- a/test/test.js +++ b/test/test.js @@ -1,3 +1,4 @@ +var crypto = require('crypto'); var http = require('http'); var request = require('supertest'); var should = require('should'); @@ -133,6 +134,55 @@ describe('compress()', function(){ }) }) + it('should back-pressure', function(done){ + var buf + var client + var resp + var server = createServer({ threshold: 0 }, function (req, res) { + resp = res + res.setHeader('Content-Type', 'text/plain') + res.write('start') + pressure() + }) + var wait = 2 + + crypto.pseudoRandomBytes(1024 * 128, function(err, chunk){ + buf = chunk + pressure() + }) + + function complete(){ + if (--wait === 0) done() + } + + function pressure(){ + if (!buf || !resp || !client) return + + while (resp.write(buf) !== false) { + resp.flush() + } + + resp.on('drain', function(){ + resp.write('end') + resp.end() + }) + resp.on('finish', complete) + client.resume() + } + + request(server) + .get('/') + .request() + .on('response', function (res) { + client = res + res.headers['content-encoding'].should.equal('gzip') + res.pause() + res.on('end', complete) + pressure() + }) + .end() + }) + describe('threshold', function(){ it('should not compress responses below the threshold size', function(done){ var server = createServer({ threshold: '1kb' }, function (req, res) { @@ -222,6 +272,7 @@ describe('compress()', function(){ res.statusCode = typeof res.flush === 'function' ? 200 : 500 + res.flush() res.end() })