Skip to content

Commit

Permalink
remove backpressure support (ref #10)
Browse files Browse the repository at this point in the history
  • Loading branch information
gustav-olsen-groupone committed Aug 5, 2016
1 parent 4bb1eb8 commit 000717c
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 126 deletions.
20 changes: 4 additions & 16 deletions lib/hijackResponse.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
var Readable = require('stream').Readable

module.exports = function hijackResponse (res, cb, options) {
options = options || {}
var disableBackpressure = options.disableBackpressure
var readableOptions = options.readableOptions || {}
module.exports = function hijackResponse (res, cb) {
var writeHead = res.writeHead
var write = res.write
var end = res.end
var originalResponse = res
var hijacking = true
var hijackedResponse = new Readable(readableOptions)
var hijackedResponse = new Readable()
hijackedResponse.__proto__ = originalResponse // eslint-disable-line no-proto
hijackedResponse.emit = hijackedResponse.emit

Expand All @@ -18,13 +15,7 @@ module.exports = function hijackResponse (res, cb, options) {
hijackedResponse[method] = Readable.prototype[method].bind(hijackedResponse)
})

hijackedResponse._read = function () {
// emitting drain on read seems to be the most reliable thing we can do when
// bridging a writeable into a readable.
if (!disableBackpressure) {
res.emit('drain')
}
}
hijackedResponse._read = function () {}

res.write = function (rawChunk, encoding) {
if (!res.headersSent && res.writeHead !== writeHead) res._implicitHeader()
Expand All @@ -37,10 +28,7 @@ module.exports = function hijackResponse (res, cb, options) {
chunk = new Buffer(rawChunk, encoding)
}
}
var returnValue = hijackedResponse.push(chunk)
if (!disableBackpressure) {
return returnValue
}
hijackedResponse.push(chunk)
} else {
write.call(originalResponse, rawChunk, encoding)
}
Expand Down
2 changes: 1 addition & 1 deletion test/express.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ describe('Express Integration Tests', function () {
.use(function (req, res, next) {
hijackResponse(res, passError(next, function (res) {
res.pipe(res)
}), { disableBackpressure: true })
}))
next()
})
.use(express.static(__dirname)),
Expand Down
109 changes: 0 additions & 109 deletions test/hijackResponse.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,115 +106,6 @@ describe('hijackResponse', function () {
}, 'to yield response', 'foofoofoofoofoobarbazqux')
})

it('should support backpressure working with a good stream', function () {
// This test aims to force the bastardized readable stream that is the
// hijackedResponse to buffer up to it's highWaterMark, and not any further
// and do multiple drains etc during the test. That is achieved by setting
// the same highWaterMark on both the fs.ReadStream, the delayed Transform
// stream and the hijackedResponse it self.

var bufferMax = 0
var drains = 0
var highWaterMark = 100
var streamOptions = { highWaterMark: highWaterMark }

var filePath = require('path').resolve(__dirname, '..', 'package.json')
var readStream = require('fs').createReadStream(filePath, streamOptions)

var Transform = require('stream').Transform
var delayedIdentityStream = new Transform(streamOptions)

return expect(function (res, handleError) {
hijackResponse(res, passError(handleError, function (res) {
delayedIdentityStream._transform = function (chunk, encoding, cb) {
setTimeout(function () {
bufferMax = Math.max(bufferMax, res._readableState.length)
cb(null, chunk)
}, 1)
}

res.pipe(delayedIdentityStream).pipe(res)
}), { readableOptions: streamOptions })

res.on('drain', function () { drains += 1 })
readStream.pipe(res)
}, 'to yield response', 200).then(function (res) {
return expect(bufferMax, 'to equal', highWaterMark)
}).then(function () {
return expect(drains, 'to be greater than', 0)
})
})

it('should support backpressure working with an old stream', function () {
// This test aims to force the bastardized readable stream that is the
// hijackedResponse to buffer up everything that has been written to it
// and play nicely with new downstream streams

var highWaterMark = 5
var streamOptions = { highWaterMark: highWaterMark }

var stream = require('stream')

var onTransformBufferLengths = []

var delayedIdentityStream = new stream.Transform(streamOptions)

return expect(function (res, handleError) {
hijackResponse(res, passError(handleError, function (res) {
delayedIdentityStream._transform = function (chunk, encoding, cb) {
onTransformBufferLengths.push({
hijacked: res._readableState.length,
delayed: this._readableState.length
})
setTimeout(function () {
cb(null, chunk)
}, 3)
}
res.pipe(delayedIdentityStream).pipe(res)
}), { readableOptions: streamOptions })

res.setHeader('Content-Type', 'text/plain')

var mockedReadStream = new stream.Stream()
mockedReadStream.readable = true

mockedReadStream.pipe(res)

mockedReadStream.emit('data', 'foo 12345\n')
mockedReadStream.emit('data', 'bar 12345\n')
mockedReadStream.emit('data', 'baz 12345\n')
mockedReadStream.emit('data', 'qux 12345\n')
mockedReadStream.emit('end')
}, 'to yield response', {
body: [
'foo 12345',
'bar 12345',
'baz 12345',
'qux 12345',
''
].join('\n')
}).then(function () {
expect(onTransformBufferLengths.length, 'to be greater than', 1)
onTransformBufferLengths.forEach(function (obj, i, arr) {
if (i + 1 === arr.length) {
return expect(obj, 'to satisfy', {
hijacked: 0,
delayed: 0
})
} else if (i === 0) {
return expect(obj, 'to satisfy', {
hijacked: expect.it('to be greater than', 0),
delayed: 0
})
} else {
return expect(obj, 'to satisfy', {
hijacked: expect.it('to be less than', arr[i - 1].hijacked),
delayed: 0
})
}
})
})
})
it('should write the last chunk', function () {
return expect(function (res, handleError) {
hijackResponse(res, passError(handleError, function (res) {
Expand Down

0 comments on commit 000717c

Please sign in to comment.