Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
* @private
*/

var { finished } = require('node:stream')
var Negotiator = require('negotiator')
var bytes = require('bytes')
var compressible = require('compressible')
var debug = require('debug')('compression')
const isFinished = require('on-finished').isFinished
var onHeaders = require('on-headers')
var vary = require('vary')
var zlib = require('zlib')
Expand Down Expand Up @@ -215,7 +217,12 @@ function compression (options) {

// compression
stream.on('data', function onStreamData (chunk) {
if (isFinished(res)) {
debug('response finished')
return
}
if (_write.call(res, chunk) === false) {
debug('pausing compression stream')
stream.pause()
}
})
Expand All @@ -227,6 +234,15 @@ function compression (options) {
_on.call(res, 'drain', function onResponseDrain () {
stream.resume()
})

// In case the stream is paused when the response finishes (e.g. because
// the client cuts the connection), its `drain` event may not get emitted.
// The following handler is here to ensure that the stream gets resumed so
// it ends up emitting its `end` event and calling the original
// `res.end()`.
finished(res, function onResponseFinished () {
stream.resume()
})
Comment on lines +243 to +245
Copy link
Member

@bjohansebas bjohansebas Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't quite understand at what point this becomes necessary, given that finished runs after ('error', 'end', 'finish', and 'close') have already been emitted. Why should we call stream.resume()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, after doing more research, I think resume() is necessary — but it's already being called in the data event, so doing it again would be redundant.

I'm going to leave this here in case anyone has comments about it

Copy link
Author

@martinslota martinslota Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness: There is no test that fails if this piece of code gets removed.

I kept it in place in case the sequence of events goes something like this:

  1. At first, response socket is all well and good but the connection is slow, causing its buffer to fill up. This leads to calling stream.pause(), which is all completely fine.
  2. Before the response buffer gets drained, the socket connection dies. As a result, the drain event on the response does not get emitted.
  3. The stream remains paused and never emits the end event, and the original res.end() never gets called.

I can attempt to write a test that captures this scenario but it might end up being somewhat artificial (i.e. relying on mocking res.write() so it returns false without the response appearing as finished). What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to add a comment explaining why it's necessary

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a tentative comment in 11f8f24.

})

next()
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"compressible": "^2.0.18",
"debug": "^2.6.9",
"negotiator": "^0.6.4",
"on-finished": "^2.4.1",
"on-headers": "^1.0.2",
"vary": "^1.1.2"
},
Expand Down
206 changes: 206 additions & 0 deletions test/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ var assert = require('assert')
var bytes = require('bytes')
var crypto = require('crypto')
var http = require('http')
var net = require('net')
var request = require('supertest')
var zlib = require('zlib')
var http2 = require('http2')
Expand Down Expand Up @@ -953,6 +954,198 @@ describe('compression()', function () {
.expect(200, done)
})
})

describe('when the client closes the connection before consuming the response', function () {
it('should call the original res.end() if connection is cut early on', function (done) {
var server = http.createServer(function (req, res) {
var originalResEnd = res.end
var originalResEndCalledTimes = 0
res.end = function () {
originalResEndCalledTimes++
return originalResEnd.apply(this, arguments)
}

compression({ threshold: 0 })(req, res, function () {
socket.end()

res.setHeader('Content-Type', 'text/plain')
res.write('hello, ')
setTimeout(function () {
res.end('world!')

setTimeout(function () {
server.close(function () {
if (originalResEndCalledTimes === 1) {
done()
} else {
done(new Error('The original res.end() was called ' + originalResEndCalledTimes + ' times'))
}
})
}, 5)
Comment on lines +976 to +984
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting for the condition to assert could be extracted into a generic helper, and it could also poll a couple of times if that is desired. I left this code in its naked form for now, and it is also duplicated in the other added test cases. I'm happy to improve this further as needed.

}, 5)
})
})

server.listen()

var port = server.address().port
var socket = openSocketWithRequest(port)
})

it('should call the original res.end() if connection is cut right after setting headers', function (done) {
var server = http.createServer(function (req, res) {
var originalResEnd = res.end
var originalResEndCalledTimes = 0
res.end = function () {
originalResEndCalledTimes++
return originalResEnd.apply(this, arguments)
}

compression({ threshold: 0 })(req, res, function () {
res.setHeader('Content-Type', 'text/plain')
socket.end()

res.write('hello, ')
setTimeout(function () {
res.end('world!')

setTimeout(function () {
server.close(function () {
if (originalResEndCalledTimes === 1) {
done()
} else {
done(new Error('The original res.end() was called ' + originalResEndCalledTimes + ' times'))
}
})
}, 5)
}, 5)
})
})

server.listen()

var port = server.address().port
var socket = openSocketWithRequest(port)
})

it('should call the original res.end() if connection is cut after an initial write', function (done) {
var server = http.createServer(function (req, res) {
var originalResEnd = res.end
var originalResEndCalledTimes = 0
res.end = function () {
originalResEndCalledTimes++
return originalResEnd.apply(this, arguments)
}

compression({ threshold: 0 })(req, res, function () {
res.setHeader('Content-Type', 'text/plain')
res.write('hello, ')
socket.end()

setTimeout(function () {
res.end('world!')

setTimeout(function () {
server.close(function () {
if (originalResEndCalledTimes === 1) {
done()
} else {
done(new Error('The original res.end() was called ' + originalResEndCalledTimes + ' times'))
}
})
}, 5)
}, 5)
})
})

server.listen()

var port = server.address().port
var socket = openSocketWithRequest(port)
})

it('should call the original res.end() if connection is cut just after response body was generated', function (done) {
var server = http.createServer(function (req, res) {
var originalResEnd = res.end
var originalResEndCalledTimes = 0
res.end = function () {
originalResEndCalledTimes++
return originalResEnd.apply(this, arguments)
}

compression({ threshold: 0 })(req, res, function () {
res.setHeader('Content-Type', 'text/plain')
res.write('hello, ')
res.end('world!')
socket.end()

setTimeout(function () {
server.close(function () {
if (originalResEndCalledTimes === 1) {
done()
} else {
done(new Error('The original res.end() was called ' + originalResEndCalledTimes + ' times'))
}
})
}, 5)
})
})

server.listen()

var port = server.address().port
var socket = openSocketWithRequest(port)
})

it('should not trigger write errors if connection is cut just after response body was generated', function (done) {
var requestCount = 0

var server = http.createServer(function (req, res) {
requestCount += 1

var originalWrite = res.write
var writeError = null
res.write = function (chunk, callback) {
return originalWrite.call(this, chunk, function (error) {
if (error) {
writeError = error
}
return callback?.(error)
})
}

var originalResEnd = res.end
res.end = function () {
setTimeout(function () {
if (writeError !== null) {
server.close(function () {
done(new Error(`Write error occurred: ${writeError}`))
})
} else {
if (requestCount < 50) {
socket = openSocketWithRequest(port)
} else {
server.close(done)
}
}
}, 0)
return originalResEnd.apply(this, arguments)
}

compression({ threshold: 0 })(req, res, function () {
res.setHeader('Content-Type', 'text/plain')
res.write('hello, ')
res.end('world!')
socket.end()
})
})

server.listen()

var port = server.address().port
var socket = openSocketWithRequest(port)
})
})
})

function createServer (opts, fn) {
Expand Down Expand Up @@ -1056,3 +1249,16 @@ function unchunk (encoding, onchunk, onend) {
stream.on('end', onend)
}
}

function openSocketWithRequest (port) {
var socket = net.connect(port, function onConnect () {
socket.write('GET / HTTP/1.1\r\n')
socket.write('Accept-Encoding: gzip\r\n')
socket.write('Host: localhost:' + port + '\r\n')
socket.write('Content-Type: text/plain\r\n')
socket.write('Content-Length: 0\r\n')
socket.write('Connection: keep-alive\r\n')
socket.write('\r\n')
})
return socket
}