Skip to content

Commit

Permalink
fix: premature close with chunked transfer encoding and for async ite…
Browse files Browse the repository at this point in the history
…rators in Node 12

This PR backports the fix from node-fetch#1064 to the `2.x.x` branch following
the [comment here](node-fetch#1064 (comment)).

I had to add some extra babel config to allow using the `for await..of`
syntax in the tests.  The config is only needed for the tests as
this syntax is not used in the implementation.
  • Loading branch information
achingbrain committed May 27, 2021
1 parent b5e2e41 commit e185041
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 3 deletions.
6 changes: 4 additions & 2 deletions .babelrc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
} ]
],
plugins: [
'./build/babel-plugin'
'./build/babel-plugin',
'transform-async-generator-functions'
]
},
coverage: {
Expand All @@ -31,7 +32,8 @@
],
plugins: [
[ 'istanbul', { exclude: [ 'src/blob.js', 'build', 'test' ] } ],
'./build/babel-plugin'
'./build/babel-plugin',
'transform-async-generator-functions'
]
},
rollup: {
Expand Down
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,49 @@ fetch('https://assets-cdn.github.com/images/modules/logos_page/Octocat.png')
});
```

In Node.js 14 you can also use async iterators to read `body`; however, be careful to catch
errors -- the longer a response runs, the more likely it is to encounter an error.

```js
const fetch = require('node-fetch');
const response = await fetch('https://httpbin.org/stream/3');
try {
for await (const chunk of response.body) {
console.dir(JSON.parse(chunk.toString()));
}
} catch (err) {
console.error(err.stack);
}
```

In Node.js 12 you can also use async iterators to read `body`; however, async iterators with streams
did not mature until Node.js 14, so you need to do some extra work to ensure you handle errors
directly from the stream and wait on it response to fully close.

```js
const fetch = require('node-fetch');
const read = async body => {
let error;
body.on('error', err => {
error = err;
});
for await (const chunk of body) {
console.dir(JSON.parse(chunk.toString()));
}
return new Promise((resolve, reject) => {
body.on('close', () => {
error ? reject(error) : resolve();
});
});
};
try {
const response = await fetch('https://httpbin.org/stream/3');
await read(response.body);
} catch (err) {
console.error(err.stack);
}
```

#### Buffer
If you prefer to cache binary data in full, use buffer(). (NOTE: `buffer()` is a `node-fetch`-only API)

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
"abortcontroller-polyfill": "^1.3.0",
"babel-core": "^6.26.3",
"babel-plugin-istanbul": "^4.1.6",
"babel-plugin-transform-async-generator-functions": "^6.24.1",
"babel-polyfill": "^6.26.0",
"babel-preset-env": "^1.6.1",
"babel-register": "^6.16.3",
"chai": "^3.5.0",
Expand Down
52 changes: 52 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,30 @@ export default function fetch(url, opts) {
finalize();
});

fixResponseChunkedTransferBadEnding(req, err => {
response.body.destroy(err);
});

/* c8 ignore next 18 */
if (process.version < 'v14') {
// Before Node.js 14, pipeline() does not fully support async iterators and does not always
// properly handle when the socket close/end events are out of order.
req.on('socket', s => {
let endedWithEventsCount = 0;
s.prependListener('end', () => {
endedWithEventsCount = s._eventsCount;
});
s.prependListener('close', hadError => {
// if end happened before close but the socket didn't emit an error, do it now
if (response && endedWithEventsCount < s._eventsCount && !hadError) {
const err = new Error('Premature close');
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
response.body.emit('error', err);
}
});
});
}

req.on('response', res => {
clearTimeout(reqTimeout);

Expand Down Expand Up @@ -265,6 +289,34 @@ export default function fetch(url, opts) {

};

function fixResponseChunkedTransferBadEnding(request, errorCallback) {
const LAST_CHUNK = Buffer.from('0\r\n');
let socket;

request.on('socket', s => {
socket = s;
});

request.on('response', response => {
const {headers} = response;
if (headers['transfer-encoding'] === 'chunked' && !headers['content-length']) {
let properLastChunkReceived = false;

socket.on('data', buf => {
properLastChunkReceived = Buffer.compare(buf.slice(-3), LAST_CHUNK) === 0;
});

socket.prependListener('close', () => {
if (!properLastChunkReceived) {
const err = new Error('Premature close');
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
errorCallback(err);
}
});
}
});
}

/**
* Redirect code matching
*
Expand Down
17 changes: 17 additions & 0 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,23 @@ export default class TestServer {
res.destroy();
}

if (p === '/error/premature/chunked') {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked'
});

res.write(`${JSON.stringify({data: 'hi'})}\n`);

setTimeout(() => {
res.write(`${JSON.stringify({data: 'bye'})}\n`);
}, 200);

setTimeout(() => {
res.destroy();
}, 400);
}

if (p === '/error/json') {
res.statusCode = 200;
res.setHeader('Content-Type', 'application/json');
Expand Down
73 changes: 72 additions & 1 deletion test/test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@

import 'babel-core/register'
import 'babel-polyfill'

// test tools
import chai from 'chai';
import chaiPromised from 'chai-as-promised';
Expand Down Expand Up @@ -552,6 +555,74 @@ describe('node-fetch', () => {
.and.have.property('code', 'ECONNRESET');
});

it('should handle network-error in chunked response', () => {
const url = `${base}error/premature/chunked`;
return fetch(url).then(res => {
expect(res.status).to.equal(200);
expect(res.ok).to.be.true;

return expect(new Promise((resolve, reject) => {
res.body.on('error', reject);
res.body.on('close', resolve);
})).to.eventually.be.rejectedWith(Error, 'Premature close')
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
});
});

it('should handle network-error in chunked response async iterator', () => {
const url = `${base}error/premature/chunked`;
return fetch(url).then(res => {
expect(res.status).to.equal(200);
expect(res.ok).to.be.true;

const read = async body => {
const chunks = [];

if (process.version < 'v14') {
// In Node.js 12, some errors don't come out in the async iterator; we have to pick
// them up from the event-emitter and then throw them after the async iterator
let error;
body.on('error', err => {
error = err;
});

for await (const chunk of body) {
chunks.push(chunk);
}

if (error) {
throw error;
}

return new Promise(resolve => {
body.on('close', () => resolve(chunks));
});
}

for await (const chunk of body) {
chunks.push(chunk);
}

return chunks;
};

return expect(read(res.body))
.to.eventually.be.rejectedWith(Error, 'Premature close')
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
});
});

it('should handle network-error in chunked response in consumeBody', () => {
const url = `${base}error/premature/chunked`;
return fetch(url).then(res => {
expect(res.status).to.equal(200);
expect(res.ok).to.be.true;

return expect(res.text())
.to.eventually.be.rejectedWith(Error, 'Premature close');
});
});

it('should handle DNS-error response', function() {
const url = 'http://domain.invalid';
return expect(fetch(url)).to.eventually.be.rejected
Expand Down Expand Up @@ -1015,7 +1086,7 @@ describe('node-fetch', () => {
))
.to.eventually.be.fulfilled
.then((res) => {
res.body.on('error', (err) => {
res.body.once('error', (err) => {
expect(err)
.to.be.an.instanceof(Error)
.and.have.property('name', 'AbortError');
Expand Down

0 comments on commit e185041

Please sign in to comment.