Skip to content

Commit

Permalink
📦 NEW: Support options.streaming = true
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Jul 5, 2022
1 parent f2ca7eb commit 57ccc7e
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 91 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"typescript": "4"
},
"engines": {
"node": ">= 16.0.0"
"node": ">= 16.5.0"
},
"ci": {
"version": "16, 18"
Expand Down
25 changes: 16 additions & 9 deletions src/HttpClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EventEmitter } from 'events';
import { debuglog } from 'util';
import { Readable, isReadable } from 'stream';
import { pipeline } from 'stream/promises';
import { Blob } from 'buffer';
import { createReadStream } from 'fs';
import { basename } from 'path';
Expand All @@ -10,6 +11,7 @@ import {
import createUserAgent from 'default-user-agent';
import mime from 'mime-types';
import { RequestURL, RequestOptions } from './Request';
import { HttpClientResponseMeta, HttpClientResponse, ReadableStreamWithMeta } from './Response';
import { parseJSON } from './utils';

const debug = debuglog('urllib');
Expand Down Expand Up @@ -77,7 +79,7 @@ export class HttpClient extends EventEmitter {
const requestStartTime = Date.now();
// keep urllib createCallbackResponse style
const resHeaders: Record<string, string> = {};
const res = {
const res: HttpClientResponseMeta = {
status: -1,
statusCode: -1,
statusMessage: '',
Expand All @@ -90,10 +92,6 @@ export class HttpClient extends EventEmitter {
timing: {
contentDownload: 0,
},
// remoteAddress: remoteAddress,
// remotePort: remotePort,
// socketHandledRequests: socketHandledRequests,
// socketHandledResponses: socketHandledResponses,
};

let requestTimeout = 5000;
Expand Down Expand Up @@ -236,9 +234,17 @@ export class HttpClient extends EventEmitter {
res.size = parseInt(res.headers['content-length']);
}

let data: any;
let data: any = null;
let responseBodyStream: ReadableStreamWithMeta | undefined;
if (args.streaming || args.dataType === 'stream') {
data = response.body;
responseBodyStream = Object.assign(response.body!, {
status: res.status,
statusCode: res.statusCode,
statusMessage: res.statusMessage,
headers: res.headers,
});
} else if (args.writeStream) {
await pipeline(response.body!, args.writeStream);
} else if (args.dataType === 'text') {
data = await response.text();
} else if (args.dataType === 'json') {
Expand All @@ -254,14 +260,15 @@ export class HttpClient extends EventEmitter {
}
res.rt = res.timing.contentDownload = Date.now() - requestStartTime;

return {
const clientResponse: HttpClientResponse = {
status: res.status,
data,
headers: res.headers,
url: response.url,
redirected: response.redirected,
res,
res: responseBodyStream ?? res,
};
return clientResponse;
} catch (e: any) {
let err = e;
if (requestTimeoutController.signal.aborted) {
Expand Down
53 changes: 36 additions & 17 deletions src/Response.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
import { OutgoingHttpHeaders, IncomingMessage } from 'http';
import { ReadableStream } from 'stream/web';

export interface HttpClientResponse<T> {
data: T;
export type HttpClientResponseMeta = {
status: number;
headers: OutgoingHttpHeaders;
res: IncomingMessage & {
/**
* https://eggjs.org/en/core/httpclient.html#timing-boolean
*/
timing?: {
queuing: number;
dnslookup: number;
connected: number;
requestSent: number;
waiting: number;
contentDownload: number;
}
statusCode: number;
statusMessage: string;
headers: Record<string, string>;
size: number;
aborted: boolean;
rt: number;
keepAliveSocket: boolean;
requestUrls: string[],
/**
* https://eggjs.org/en/core/httpclient.html#timing-boolean
*/
timing: {
contentDownload: number;
};
}
// remoteAddress: remoteAddress,
// remotePort: remotePort,
// socketHandledRequests: socketHandledRequests,
// socketHandledResponses: socketHandledResponses,
};

export type ReadableStreamWithMeta = ReadableStream & {
status: number;
statusCode: number;
statusMessage: string;
headers: Record<string, string>;
};

export type HttpClientResponse = {
data: any;
status: number;
headers: Record<string, string>;
url: string;
redirected: boolean;
res: ReadableStreamWithMeta | HttpClientResponseMeta;
};
18 changes: 0 additions & 18 deletions test-old/urllib.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -747,24 +747,6 @@ describe('test/urllib.test.js', function () {
});

describe('args.streaming = true', function () {
it('should got streaming the response', function (done) {
urllib.request(config.npmWeb, {
timeout: 25000,
streaming: true
}, function (err, data, res) {
assert(!err);
assert(!data);
var size = 0;
res.on('data', function (chunk) {
size += chunk.length;
});
res.on('end', function () {
assert(size > 0);
done();
});
});
});

it('should work with alias name customResponse', function (done) {
urllib.request(config.npmWeb, {
timeout: 25000,
Expand Down
73 changes: 41 additions & 32 deletions test/fixtures/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createReadStream } from 'fs';
import { setTimeout } from 'timers/promises';
import busboy from 'busboy';
import iconv from 'iconv-lite';
import { readableToBytes } from '../utils';

export async function startServer(options?: {
keepAliveTimeout?: number;
Expand All @@ -14,6 +15,7 @@ export async function startServer(options?: {
res.setHeader('Keep-Alive', 'timeout=' + server.keepAliveTimeout / 1000);
}
const urlObject = new URL(req.url!, `http://${req.headers.host}`);
const pathname = urlObject.pathname;
res.setHeader('X-Foo', 'bar');
res.setHeader('x-href', urlObject.href);
res.setHeader('x-method', req.method ?? '');
Expand All @@ -24,38 +26,47 @@ export async function startServer(options?: {
await setTimeout(parseInt(timeout));
}

if (req.url === '/wrongjson') {
if (pathname === '/mock-bytes') {
const size = urlObject.searchParams.get('size') ?? '1024';
const bytes = Buffer.alloc(parseInt(size));
res.end(bytes);
return;
}

if (pathname === '/wrongjson') {
res.setHeader('content-type', 'application/json');
return res.end(Buffer.from('{"foo":""'));
}

if (req.url === '/html') {
if (pathname === '/html') {
res.setHeader('content-type', 'text/html');
return res.end('<h1>hello</h1>');
}

if (req.url === '/redirect') {
if (pathname === '/redirect') {
res.setHeader('Location', '/redirect-to-url');
res.statusCode = 302;
return res.end();
return res.end('Redirect to /redirect-to-url');
}
if (req.url === '/redirect-301') {
if (pathname === '/redirect-301') {
res.setHeader('Location', '/redirect-301-to-url');
res.statusCode = 301;
return res.end();
return res.end('Redirect to /redirect-301-to-url');
}
if (req.url === '/redirect-full') {
res.setHeader('Location', `http://${req.headers.host}/redirect-full-to-url`);
if (pathname === '/redirect-full') {
const url = `http://${req.headers.host}/redirect-full-to-url`;
res.setHeader('Location', url);
res.statusCode = 302;
return res.end();
return res.end(`Redirect to ${url}`);
}
if (req.url === '/redirect-full-301') {
res.setHeader('Location', `http://${req.headers.host}/redirect-full-301-to-url`);
if (pathname === '/redirect-full-301') {
const url = `http://${req.headers.host}/redirect-full-301-to-url`;
res.setHeader('Location', url);
res.statusCode = 301;
return res.end();
return res.end(`Redirect to ${url}`);
}

if (req.url === '/socket.end.error') {
if (pathname === '/socket.end.error') {
res.write('foo haha\n');
await setTimeout(200);
res.write('foo haha 2');
Expand All @@ -64,49 +75,49 @@ export async function startServer(options?: {
return;
}

if (req.url === '/wrongjson-gbk') {
if (pathname === '/wrongjson-gbk') {
res.setHeader('content-type', 'application/json');
createReadStream(__filename).pipe(res);
return
}
if (req.url === '/json_with_controls_unicode') {
if (pathname === '/json_with_controls_unicode') {
return res.end(Buffer.from('{"foo":"\b\f\n\r\tbar\u000e!1!\u0086!2!\u0000!3!\u001F!4!\u005C!5!end\u005C\\"}'));
}
if (req.url === '/json_with_t') {
if (pathname === '/json_with_t') {
return res.end(Buffer.from('{"foo":"ba\tr\t\t"}'));
}
if (req.url === '/gbk/json') {
if (pathname === '/gbk/json') {
res.setHeader('Content-Type', 'application/json;charset=gbk');
const content = iconv.encode(JSON.stringify({ hello: '你好' }), 'gbk');
return res.end(content);
}
if (req.url === '/gbk/text') {
if (pathname === '/gbk/text') {
res.setHeader('Content-Type', 'text/plain;charset=gbk');
const content = iconv.encode('你好', 'gbk');
return res.end(content);
}
if (req.url === '/errorcharset') {
if (pathname === '/errorcharset') {
res.setHeader('Content-Type', 'text/plain;charset=notfound');
return res.end('你好');
}

if (req.url === '/deflate') {
if (pathname === '/deflate') {
res.setHeader('Content-Encoding', 'deflate');
createReadStream(__filename).pipe(createDeflate()).pipe(res);
return;
}
if (req.url === '/gzip') {
if (pathname === '/gzip') {
res.setHeader('Content-Encoding', 'gzip');
createReadStream(__filename).pipe(createGzip()).pipe(res);
return;
}
if (req.url === '/error-gzip') {
if (pathname === '/error-gzip') {
res.setHeader('Content-Encoding', 'gzip');
createReadStream(__filename).pipe(res);
return;
}

if (req.url === '/multipart' && (req.method === 'POST' || req.method === 'PUT')) {
if (pathname === '/multipart' && (req.method === 'POST' || req.method === 'PUT')) {
const bb = busboy({ headers: req.headers });
const result = {
method: req.method,
Expand Down Expand Up @@ -146,29 +157,27 @@ export async function startServer(options?: {
return;
}

if (req.url === '/raw') {
if (pathname === '/raw') {
req.pipe(res);
return;
}

let requestBody;
const chunks = [];
let requestBody: any;
let requestBytes: Buffer = Buffer.from('');
if (req.method !== 'GET' && req.method !== 'HEAD') {
for await (const chunk of req) {
chunks.push(chunk);
}
requestBytes = await readableToBytes(req);
}

if (req.headers['content-type']?.startsWith('application/x-www-form-urlencoded')) {
const searchParams = new URLSearchParams(Buffer.concat(chunks).toString());
const searchParams = new URLSearchParams(requestBytes.toString());
requestBody = {};
for (const [ field, value ] of searchParams.entries()) {
requestBody[field] = value;
}
} else if (req.headers['content-type']?.startsWith('application/json')) {
requestBody = JSON.parse(Buffer.concat(chunks).toString());
requestBody = JSON.parse(requestBytes.toString());
} else {
requestBody = Buffer.concat(chunks).toString();
requestBody = requestBytes.toString();
}
res.writeHead(200, {
'Content-Type': 'application/json',
Expand Down
24 changes: 10 additions & 14 deletions test/options.dataType.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import assert from 'assert/strict';
import { isReadable } from 'stream';
import { ReadableStream } from 'stream/web';
import urllib from '../src';
import { startServer } from './fixtures/server';
import { readableToBytes } from './utils';

describe('options.dataType.test.ts', () => {
let close: any;
Expand Down Expand Up @@ -122,27 +123,22 @@ describe('options.dataType.test.ts', () => {
});
assert.equal(response.status, 200);
assert.equal(response.headers['content-type'], 'application/json');
assert(isReadable(response.data));
const chunks = [];
for await (const chunk of response.data) {
chunks.push(chunk);
}
const jsonString = Buffer.concat(chunks).toString();
assert(response.res);
const bytes = await readableToBytes(response.res as ReadableStream);
const jsonString = bytes.toString();
assert.equal(JSON.parse(jsonString).method, 'GET');
});

it('should work with streaming = true', async () => {
it('should work with streaming = true, alias to data = stream', async () => {
const response = await urllib.request(_url, {
streaming: true,
});
assert.equal(response.status, 200);
assert.equal(response.headers['content-type'], 'application/json');
assert(isReadable(response.data));
const chunks = [];
for await (const chunk of response.data) {
chunks.push(chunk);
}
const jsonString = Buffer.concat(chunks).toString();
assert(response.res);
assert(response.res);
const bytes = await readableToBytes(response.res as ReadableStream);
const jsonString = bytes.toString();
assert.equal(JSON.parse(jsonString).method, 'GET');
});
});

0 comments on commit 57ccc7e

Please sign in to comment.