Skip to content

Commit

Permalink
feat(http): implement stream support
Browse files Browse the repository at this point in the history
  • Loading branch information
marcj committed Jul 9, 2022
1 parent 7e38414 commit 8639d89
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
22 changes: 20 additions & 2 deletions packages/http/src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { createWorkflow, WorkflowEvent } from '@deepkit/workflow';
import type { ElementStruct, render } from '@deepkit/template';
import { FrameCategory, Stopwatch } from '@deepkit/stopwatch';
import { getSerializeFunction, hasTypeInformation, ReflectionKind, resolveReceiveType, SerializationError, serialize, serializer, ValidationError } from '@deepkit/type';
import stream from 'stream';

export function isElementStruct(v: any): v is ElementStruct {
return 'object' === typeof v && v.hasOwnProperty('render') && v.hasOwnProperty('attributes') && !v.slice;
Expand Down Expand Up @@ -380,7 +381,7 @@ export class JSONResponse extends BaseResponse {
}
}

export type SupportedHttpResult = undefined | null | number | string | Response | JSONResponse | HtmlResponse | HttpResponse | ServerResponse | Redirect | Uint8Array | Error;
export type SupportedHttpResult = undefined | null | number | string | Response | JSONResponse | HtmlResponse | HttpResponse | ServerResponse | stream.Readable | Redirect | Uint8Array | Error;

export interface HttpResultFormatterContext {
request: HttpRequest;
Expand Down Expand Up @@ -454,6 +455,10 @@ export class HttpResultFormatter {
context.response.end(JSON.stringify(serialize(instance, route ? route.serializationOptions : undefined, serializerToUse, undefined, resolveReceiveType(classType))));
}

handleStream(stream: stream.Readable, context: HttpResultFormatterContext): void {
stream.pipe(context.response);
}

handleBinary(result: Uint8Array, context: HttpResultFormatterContext): void {
context.response.end(result);
}
Expand All @@ -472,6 +477,8 @@ export class HttpResultFormatter {
this.handleResponse(context);
} else if (result instanceof HtmlResponse) {
this.handleHtmlResponse(result, context);
} else if (result instanceof stream.Readable) {
this.handleStream(result, context);
} else if (result instanceof Uint8Array) {
this.handleBinary(result, context);
} else if (result instanceof JSONResponse) {
Expand Down Expand Up @@ -651,6 +658,13 @@ export class HttpListener {
const html = await getTemplateRender()(event.injectorContext.getRootInjector(), result, this.stopwatch ? this.stopwatch : undefined);
result = new HtmlResponse(html, 200).header('Content-Type', 'text/html; charset=utf-8');
}
if (result instanceof stream.Readable) {
const stream = result as stream.Readable;
await new Promise((resolve, reject) => {
stream.once('readable', resolve)
stream.once('error', reject)
})
}
const responseEvent = new HttpResponseEvent(event.injectorContext, event.request, event.response, result, event.route);
responseEvent.controllerActionTime = Date.now() - start;
event.next('response', responseEvent);
Expand Down Expand Up @@ -726,7 +740,7 @@ export class HttpListener {
if (event.response.headersSent) return;
if (event.result === undefined || event.result === null) return;

if (event.result instanceof HtmlResponse || event.result instanceof ServerResponse || event.result instanceof Redirect) {
if (event.result instanceof HtmlResponse || event.result instanceof ServerResponse || event.result instanceof Redirect || event.result instanceof stream.Readable) {
// don't do anything
} else if (event.result instanceof JSONResponse) {
const schema = (event.result._statusCode && event.route.getSchemaForResponse(event.result._statusCode)) || event.route.returnType;
Expand Down Expand Up @@ -755,5 +769,9 @@ export class HttpListener {
if (event.response.headersSent) return;

this.resultFormatter.handle(event.result, event);
await new Promise((resolve, reject) => {
event.response.once('finish', resolve);
event.response.once('error', reject);
});
}
}
27 changes: 27 additions & 0 deletions packages/http/tests/router.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { HttpBody, HttpBodyValidation, HttpQueries, HttpQuery, HttpRegExp, HttpR
import { getClassName, isObject, sleep } from '@deepkit/core';
import { createHttpKernel } from './utils';
import { Group, MinLength, PrimaryKey, Reference } from '@deepkit/type';
import { Readable } from 'stream';

test('router', async () => {
class Controller {
Expand Down Expand Up @@ -897,3 +898,29 @@ test('BodyValidation', async () => {
expect((await httpKernel.request(HttpRequest.POST('/action3').json({ username: 'Peter' }))).json).toEqual({ username: 'Peter' });
expect((await httpKernel.request(HttpRequest.POST('/action3').json({ username: 'Pe' }))).bodyString).toEqual(`{"message":"Invalid: Min length is 3"}`);
});

test('stream', async () => {
class Controller {
@http.GET()
handle() {
return Readable.from(['test']);
}
}
const httpKernel = createHttpKernel([Controller]);
const response = (await httpKernel.request(HttpRequest.GET('/')));
expect(response.statusCode).toBe(200);
expect(response.bodyString).toBe('test');
});

test('stream error', async () => {
class Controller {
@http.GET()
handle() {
return new Readable().emit('error', new Error());
}
}
const httpKernel = createHttpKernel([Controller]);
const response = (await httpKernel.request(HttpRequest.GET('/')));
expect(response.statusCode).toBe(500);
expect(response.bodyString).toBe('Internal error');
});

0 comments on commit 8639d89

Please sign in to comment.