From 8639d896730d816917f05e13b0ef897604227745 Mon Sep 17 00:00:00 2001 From: "Marc J. Schmidt" Date: Sat, 9 Jul 2022 15:25:36 +0200 Subject: [PATCH] feat(http): implement stream support --- packages/http/src/http.ts | 22 ++++++++++++++++++++-- packages/http/tests/router.spec.ts | 27 +++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/packages/http/src/http.ts b/packages/http/src/http.ts index f34596282..f7cc9815c 100644 --- a/packages/http/src/http.ts +++ b/packages/http/src/http.ts @@ -19,6 +19,7 @@ import { createWorkflow, WorkflowEvent } from '@deepkit/workflow'; import type { ElementStruct, render } from '@deepkit/template'; import { FrameCategory, Stopwatch } from '@deepkit/stopwatch'; import { getSerializeFunction, hasTypeInformation, ReflectionKind, resolveReceiveType, SerializationError, serialize, serializer, ValidationError } from '@deepkit/type'; +import stream from 'stream'; export function isElementStruct(v: any): v is ElementStruct { return 'object' === typeof v && v.hasOwnProperty('render') && v.hasOwnProperty('attributes') && !v.slice; @@ -380,7 +381,7 @@ export class JSONResponse extends BaseResponse { } } -export type SupportedHttpResult = undefined | null | number | string | Response | JSONResponse | HtmlResponse | HttpResponse | ServerResponse | Redirect | Uint8Array | Error; +export type SupportedHttpResult = undefined | null | number | string | Response | JSONResponse | HtmlResponse | HttpResponse | ServerResponse | stream.Readable | Redirect | Uint8Array | Error; export interface HttpResultFormatterContext { request: HttpRequest; @@ -454,6 +455,10 @@ export class HttpResultFormatter { context.response.end(JSON.stringify(serialize(instance, route ? route.serializationOptions : undefined, serializerToUse, undefined, resolveReceiveType(classType)))); } + handleStream(stream: stream.Readable, context: HttpResultFormatterContext): void { + stream.pipe(context.response); + } + handleBinary(result: Uint8Array, context: HttpResultFormatterContext): void { context.response.end(result); } @@ -472,6 +477,8 @@ export class HttpResultFormatter { this.handleResponse(context); } else if (result instanceof HtmlResponse) { this.handleHtmlResponse(result, context); + } else if (result instanceof stream.Readable) { + this.handleStream(result, context); } else if (result instanceof Uint8Array) { this.handleBinary(result, context); } else if (result instanceof JSONResponse) { @@ -651,6 +658,13 @@ export class HttpListener { const html = await getTemplateRender()(event.injectorContext.getRootInjector(), result, this.stopwatch ? this.stopwatch : undefined); result = new HtmlResponse(html, 200).header('Content-Type', 'text/html; charset=utf-8'); } + if (result instanceof stream.Readable) { + const stream = result as stream.Readable; + await new Promise((resolve, reject) => { + stream.once('readable', resolve) + stream.once('error', reject) + }) + } const responseEvent = new HttpResponseEvent(event.injectorContext, event.request, event.response, result, event.route); responseEvent.controllerActionTime = Date.now() - start; event.next('response', responseEvent); @@ -726,7 +740,7 @@ export class HttpListener { if (event.response.headersSent) return; if (event.result === undefined || event.result === null) return; - if (event.result instanceof HtmlResponse || event.result instanceof ServerResponse || event.result instanceof Redirect) { + if (event.result instanceof HtmlResponse || event.result instanceof ServerResponse || event.result instanceof Redirect || event.result instanceof stream.Readable) { // don't do anything } else if (event.result instanceof JSONResponse) { const schema = (event.result._statusCode && event.route.getSchemaForResponse(event.result._statusCode)) || event.route.returnType; @@ -755,5 +769,9 @@ export class HttpListener { if (event.response.headersSent) return; this.resultFormatter.handle(event.result, event); + await new Promise((resolve, reject) => { + event.response.once('finish', resolve); + event.response.once('error', reject); + }); } } diff --git a/packages/http/tests/router.spec.ts b/packages/http/tests/router.spec.ts index ea06b9b8b..ab8085b4d 100644 --- a/packages/http/tests/router.spec.ts +++ b/packages/http/tests/router.spec.ts @@ -7,6 +7,7 @@ import { HttpBody, HttpBodyValidation, HttpQueries, HttpQuery, HttpRegExp, HttpR import { getClassName, isObject, sleep } from '@deepkit/core'; import { createHttpKernel } from './utils'; import { Group, MinLength, PrimaryKey, Reference } from '@deepkit/type'; +import { Readable } from 'stream'; test('router', async () => { class Controller { @@ -897,3 +898,29 @@ test('BodyValidation', async () => { expect((await httpKernel.request(HttpRequest.POST('/action3').json({ username: 'Peter' }))).json).toEqual({ username: 'Peter' }); expect((await httpKernel.request(HttpRequest.POST('/action3').json({ username: 'Pe' }))).bodyString).toEqual(`{"message":"Invalid: Min length is 3"}`); }); + +test('stream', async () => { + class Controller { + @http.GET() + handle() { + return Readable.from(['test']); + } + } + const httpKernel = createHttpKernel([Controller]); + const response = (await httpKernel.request(HttpRequest.GET('/'))); + expect(response.statusCode).toBe(200); + expect(response.bodyString).toBe('test'); +}); + +test('stream error', async () => { + class Controller { + @http.GET() + handle() { + return new Readable().emit('error', new Error()); + } + } + const httpKernel = createHttpKernel([Controller]); + const response = (await httpKernel.request(HttpRequest.GET('/'))); + expect(response.statusCode).toBe(500); + expect(response.bodyString).toBe('Internal error'); +});