Skip to content

Commit

Permalink
🐛 FIX: HttpClientResponse.res should be IncomingMessage (#426)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Dec 16, 2022
1 parent 36fa03f commit 3c731cc
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 37 deletions.
20 changes: 6 additions & 14 deletions src/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import pump from 'pump';
import FormStream from 'formstream';
import { HttpAgent, CheckAddressFunction } from './HttpAgent';
import { RequestURL, RequestOptions, HttpMethod } from './Request';
import { HttpClientResponseMeta, HttpClientResponse, ReadableWithMeta, BaseResponseMeta, SocketInfo } from './Response';
import { RawResponseWithMeta, HttpClientResponse, SocketInfo } from './Response';
import { parseJSON, sleep, digestAuthHeader, globalId, performanceTime, isReadable } from './utils';
import symbols from './symbols';
import { initDiagnosticsChannel } from './diagnosticsChannel';
Expand Down Expand Up @@ -243,7 +243,7 @@ export class HttpClient extends EventEmitter {
};
// keep urllib createCallbackResponse style
const resHeaders: IncomingHttpHeaders = {};
const res: HttpClientResponseMeta = {
let res = {
status: -1,
statusCode: -1,
headers: resHeaders,
Expand All @@ -254,7 +254,7 @@ export class HttpClient extends EventEmitter {
requestUrls: [],
timing,
socket: socketInfo,
};
} as any as RawResponseWithMeta;

let headersTimeout = 5000;
let bodyTimeout = 5000;
Expand Down Expand Up @@ -471,24 +471,16 @@ export class HttpClient extends EventEmitter {
}

let data: any = null;
let responseBodyStream: ReadableWithMeta | undefined;
if (args.dataType === 'stream') {
// streaming mode will disable retry
args.retry = 0;
const meta: BaseResponseMeta = {
status: res.status,
statusCode: res.statusCode,
headers: res.headers,
timing,
socket: socketInfo,
};
// only auto decompress on request args.compressed = true
if (args.compressed === true && isCompressedContent) {
// gzip or br
const decoder = contentEncoding === 'gzip' ? createGunzip() : createBrotliDecompress();
responseBodyStream = Object.assign(pipeline(response.body, decoder, noop), meta);
res = Object.assign(pipeline(response.body, decoder, noop), res);
} else {
responseBodyStream = Object.assign(response.body, meta);
res = Object.assign(response.body, res);
}
} else if (args.writeStream) {
// streaming mode will disable retry
Expand Down Expand Up @@ -535,7 +527,7 @@ export class HttpClient extends EventEmitter {
url: lastUrl,
redirected: res.requestUrls.length > 1,
requestUrls: res.requestUrls,
res: responseBodyStream ?? res,
res,
};

if (args.retry > 0 && requestContext.retries < args.retry) {
Expand Down
9 changes: 2 additions & 7 deletions src/Response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,20 @@ export type Timing = {
contentDownload: number;
};

export type BaseResponseMeta = {
export type RawResponseWithMeta = Readable & {
status: number;
statusCode: number;
headers: IncomingHttpHeaders;
timing: Timing;
// SocketInfo
socket: SocketInfo;
};

export type HttpClientResponseMeta = BaseResponseMeta & {
size: number;
aborted: boolean;
rt: number;
keepAliveSocket: boolean;
requestUrls: string[];
};

export type ReadableWithMeta = Readable & BaseResponseMeta;

export type HttpClientResponse = {
opaque: unknown;
data: any;
Expand All @@ -63,5 +58,5 @@ export type HttpClientResponse = {
url: string;
redirected: boolean;
requestUrls: string[];
res: ReadableWithMeta | HttpClientResponseMeta;
res: RawResponseWithMeta;
};
4 changes: 2 additions & 2 deletions test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ describe('index.test.ts', () => {
dataType: 'stream',
});
assert.equal(response.status, 200);
let bytes = await readableToBytes(response.res as Readable);
let bytes = await readableToBytes(response.res);
assert.match(bytes.toString(), /mock response stream/);
assert.equal(bytes.length, readFileSync(__filename).length);

Expand All @@ -237,7 +237,7 @@ describe('index.test.ts', () => {
streaming: true,
});
assert.equal(response.status, 200);
bytes = await readableToBytes(response.res as Readable);
bytes = await readableToBytes(response.res);
assert.match(bytes.toString(), /streaming: true,/);
assert.equal(bytes.length, readFileSync(__filename).length);

Expand Down
4 changes: 2 additions & 2 deletions test/options.dataType.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ describe('options.dataType.test.ts', () => {
assert.equal(response.status, 200);
assert.equal(response.headers['content-type'], 'application/json');
assert(response.res);
const bytes = await readableToBytes(response.res as Readable);
const bytes = await readableToBytes(response.res);
const jsonString = bytes.toString();
assert.equal(JSON.parse(jsonString).method, 'GET');
});
Expand All @@ -164,7 +164,7 @@ describe('options.dataType.test.ts', () => {
assert.equal(response.headers['content-type'], 'application/json');
assert(response.res);
assert(response.res);
const bytes = await readableToBytes(response.res as Readable);
const bytes = await readableToBytes(response.res);
const jsonString = bytes.toString();
assert.equal(JSON.parse(jsonString).method, 'GET');
});
Expand Down
2 changes: 1 addition & 1 deletion test/options.stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe('options.stream.test.ts', () => {
assert.equal(response.status, 200);
assert.equal(response.headers['content-type'], 'application/json');
assert(response.res);
assert(isReadable(response.res as Readable));
assert(isReadable(response.res));
assert(response.res instanceof Readable);
const response2 = await urllib.request(`${_url}raw`, {
method: 'post',
Expand Down
33 changes: 22 additions & 11 deletions test/options.streaming.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it, beforeAll, afterAll } from 'vitest';
import { strict as assert } from 'assert';
import { Readable, pipeline } from 'stream';
import { pipeline } from 'stream';
import { createBrotliDecompress } from 'zlib';
import urllib from '../src';
import { isReadable } from '../src/utils';
Expand All @@ -21,7 +21,7 @@ describe('options.streaming.test.ts', () => {
});

it('should get streaming response', async () => {
const response = await urllib.request(`${_url}streaming_testing`, {
let response = await urllib.request(`${_url}streaming_testing`, {
streaming: true,
});
assert.equal(response.status, 200);
Expand All @@ -34,11 +34,22 @@ describe('options.streaming.test.ts', () => {
// console.log(response.res);
assert(isReadable(response.res as any));
assert.equal(response.res.status, 200);
const bytes = await readableToBytes(response.res as Readable);
const bytes = await readableToBytes(response.res);
const data = JSON.parse(bytes.toString());
assert.equal(data.method, 'GET');
assert.equal(data.url, '/streaming_testing');
assert.equal(data.requestBody, '');

response = await urllib.request(`${_url}streaming_testing`, {
streaming: true,
});
assert.equal(response.status, 200);
let size = 0;
// response.res can be read by await for of
for await (const chunk of response.res) {
size += chunk.length;
}
assert.equal(size, bytes.length);
});

it('should work on streaming=true and compressed=true/false', async () => {
Expand All @@ -54,8 +65,8 @@ describe('options.streaming.test.ts', () => {
assert.equal(response.data, null);
// console.log(response.res);
// response.res stream is decompressed
assert(isReadable(response.res as any));
let bytes = await readableToBytes(response.res as Readable);
assert(isReadable(response.res));
let bytes = await readableToBytes(response.res);
let data = bytes.toString();
assert.match(data, /export async function startServer/);

Expand All @@ -72,9 +83,9 @@ describe('options.streaming.test.ts', () => {
assert.equal(response.data, null);
// console.log(response.res);
// response.res stream is not decompressed
assert(isReadable(response.res as any));
assert(isReadable(response.res));
let decoder = createBrotliDecompress();
bytes = await readableToBytes(pipeline(response.res as Readable, decoder, () => {}));
bytes = await readableToBytes(pipeline(response.res, decoder, () => {}));
data = bytes.toString();
assert.match(data, /export async function startServer/);

Expand All @@ -93,9 +104,9 @@ describe('options.streaming.test.ts', () => {
assert.equal(response.data, null);
// console.log(response.res);
// response.res stream is not decompressed
assert(isReadable(response.res as any));
assert(isReadable(response.res));
decoder = createBrotliDecompress();
bytes = await readableToBytes(pipeline(response.res as Readable, decoder, () => {}));
bytes = await readableToBytes(pipeline(response.res, decoder, () => {}));
data = bytes.toString();
assert.match(data, /export async function startServer/);
});
Expand All @@ -109,7 +120,7 @@ describe('options.streaming.test.ts', () => {
assert.equal(response.data, null);
// console.log(response.headers);
assert(isReadable(response.res as any));
const bytes = await readableToBytes(response.res as Readable);
const bytes = await readableToBytes(response.res);
assert.equal(bytes.length, 1024102400);
});

Expand All @@ -122,7 +133,7 @@ describe('options.streaming.test.ts', () => {
assert.equal(response.data, null);
// console.log(response.headers);
assert(isReadable(response.res as any));
const bytes = await readableToBytes(response.res as Readable);
const bytes = await readableToBytes(response.res);
assert.equal(bytes.length, 1024102400);
});
});

0 comments on commit 3c731cc

Please sign in to comment.