Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return errors for partial payload transfers #199

Merged
merged 2 commits into from Sep 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions lib/index.js
Expand Up @@ -382,7 +382,8 @@ internals.Client.prototype._read = function (res, options, callback) {
reader.removeListener('error', onReaderError);
reader.removeListener('finish', onReaderFinish);
res.removeListener('error', onResError);
res.removeListener('close', onResClose);
res.removeListener('close', onResAborted);
res.removeListener('aborted', onResAborted);
res.on('error', Hoek.ignore);

if (err) {
Expand Down Expand Up @@ -438,13 +439,14 @@ internals.Client.prototype._read = function (res, options, callback) {
return finishOnce(err.isBoom ? err : Boom.internal('Payload stream error', err));
};

const onResClose = () => {
const onResAborted = () => {

return finishOnce(Boom.internal('Payload stream closed prematurely'));
};

res.once('error', onResError);
res.once('close', onResClose);
res.once('close', onResAborted);
res.once('aborted', onResAborted);

// Read payload

Expand Down
70 changes: 69 additions & 1 deletion test/index.js
Expand Up @@ -1038,7 +1038,7 @@ describe('read()', () => {
expect(err.output.statusCode).to.equal(400);
});

it('handles responses that close early', async () => {
it('handles "close" emit', async () => {

const res = new Events.EventEmitter();
res.pipe = function () { };
Expand All @@ -1050,6 +1050,74 @@ describe('read()', () => {
expect(err.isBoom).to.equal(true);
});

it('handles requests that close early', async () => {

let readPromise;
const handler = async (req, res) => {

readPromise = Wreck.read(req);
promise.req.abort();
};

const payload = new Stream.Readable();
let written = 0;
payload._read = function () {

if (written < 1) {
this.push(Buffer.alloc(1));
++written;
}
};

const headers = {
'content-length': '123'
};

const server = await internals.server(handler);
const promise = Wreck.request('post', 'http://localhost:' + server.address().port, { payload, headers });
await expect(promise).to.reject();
const err = await expect(readPromise).to.reject(Error, 'Payload stream closed prematurely');
expect(err.isBoom).to.equal(true);
});

it('errors on partial payload transfers', async () => {

const handler = (req, res) => {

res.setHeader('content-length', 2000);
res.writeHead(200);
res.write(internals.payload.slice(0, 1000));
res.end();
};

const server = await internals.server(handler);
const res = await Wreck.request('get', 'http://localhost:' + server.address().port);
expect(res.statusCode).to.equal(200);
expect(res.headers['transfer-encoding']).to.not.exist();
const err = await expect(Wreck.read(res)).to.reject(Error, 'Payload stream closed prematurely');
expect(err.isBoom).to.equal(true);
});

it('errors on partial payload transfers (chunked)', async () => {

const handler = (req, res) => {

res.writeHead(200);
res.write(internals.payload);
setTimeout(() => {

res.destroy(new Error('go away'));
}, 10);
};

const server = await internals.server(handler);
const res = await Wreck.request('get', 'http://localhost:' + server.address().port);
expect(res.statusCode).to.equal(200);
expect(res.headers['transfer-encoding']).to.equal('chunked');
const err = await expect(Wreck.read(res)).to.reject(Error, 'Payload stream closed prematurely');
expect(err.isBoom).to.equal(true);
});

it('times out when stream read takes too long', async () => {

const TestStream = function () {
Expand Down