diff --git a/packages/event-handler/src/rest/index.ts b/packages/event-handler/src/rest/index.ts index 4bbdb73edb..7d9d8cabc1 100644 --- a/packages/event-handler/src/rest/index.ts +++ b/packages/event-handler/src/rest/index.ts @@ -25,4 +25,5 @@ export { isAPIGatewayProxyEventV2, isExtendedAPIGatewayProxyResult, isHttpMethod, + streamify, } from './utils.js'; diff --git a/packages/event-handler/src/rest/utils.ts b/packages/event-handler/src/rest/utils.ts index 409103d79d..466b1b5fb7 100644 --- a/packages/event-handler/src/rest/utils.ts +++ b/packages/event-handler/src/rest/utils.ts @@ -4,7 +4,13 @@ import { isRegExp, isString, } from '@aws-lambda-powertools/commons/typeutils'; -import type { APIGatewayProxyEvent, APIGatewayProxyEventV2 } from 'aws-lambda'; +import type { + APIGatewayProxyEvent, + APIGatewayProxyEventV2, + StreamifyHandler, +} from 'aws-lambda'; +import type { Router } from '../rest/Router.js'; +import type { ResolveOptions } from '../types/index.js'; import type { CompiledRoute, CompressionOptions, @@ -385,3 +391,76 @@ export const getStatusCode = ( } return fallback; }; + +const streamifyResponse = + globalThis.awslambda?.streamifyResponse ?? + (( + handler: StreamifyHandler + ): StreamifyHandler => { + return (async (event, responseStream, context) => { + await handler(event, responseStream, context); + + if ('chunks' in responseStream && Array.isArray(responseStream.chunks)) { + const output = Buffer.concat(responseStream.chunks as Buffer[]); + const nullBytes = Buffer.from([0, 0, 0, 0, 0, 0, 0, 0]); + const separatorIndex = output.indexOf(nullBytes); + + const preludeBuffer = output.subarray(0, separatorIndex); + const bodyBuffer = output.subarray(separatorIndex + 8); + const prelude = JSON.parse(preludeBuffer.toString()); + + return { + body: bodyBuffer.toString(), + headers: prelude.headers, + statusCode: prelude.statusCode, + } as TResult; + } + }) as StreamifyHandler; + }); + +/** + * Wraps a Router instance to create a Lambda handler that uses response streaming. + * + * In Lambda runtime, uses `awslambda.streamifyResponse` to enable streaming responses. + * In test/local environments, returns an unwrapped handler that works with mock streams. + * + * @param router - The Router instance to wrap + * @param options - Optional configuration including scope for decorator binding + * @returns A Lambda handler that streams responses + * + * @example + * ```typescript + * import { Router, streamify } from '@aws-lambda-powertools/event-handler/experimental-rest'; + * + * const app = new Router(); + * app.get('/test', () => ({ message: 'Hello' })); + * + * export const handler = streamify(app); + * ``` + * + * @example + * ```typescript + * // With scope for decorators + * class Lambda { + * public scope = 'my-scope'; + * + * @app.get('/test') + * public getTest() { + * return { message: `${this.scope}: success` }; + * } + * + * public handler = streamify(app, { scope: this }); + * } + * ``` + */ +export const streamify = ( + router: Router, + options?: ResolveOptions +): StreamifyHandler => { + return streamifyResponse(async (event, responseStream, context) => { + await router.resolveStream(event, context, { + responseStream, + scope: options?.scope, + }); + }); +}; diff --git a/packages/event-handler/tests/unit/rest/Router/decorators.test.ts b/packages/event-handler/tests/unit/rest/Router/decorators.test.ts index f48fd194f4..e2479b6ff5 100644 --- a/packages/event-handler/tests/unit/rest/Router/decorators.test.ts +++ b/packages/event-handler/tests/unit/rest/Router/decorators.test.ts @@ -6,19 +6,18 @@ import { MethodNotAllowedError, type NotFoundError, Router, + streamify, UnauthorizedError, } from '../../../../src/rest/index.js'; import type { RequestContext } from '../../../../src/types/rest.js'; import { createHandler, createHandlerWithScope, - createStreamHandler, createTestEvent, createTestEventV2, createTestLambdaClass, createTrackingMiddleware, MockResponseStream, - parseStreamOutput, } from '../helpers.js'; describe.each([ @@ -481,7 +480,7 @@ describe.each([ }); describe('streaming with decorators', () => { - it('preserves scope when using resolveStream with decorators', async () => { + it('preserves scope when using streamify with decorators', async () => { // Prepare const app = new Router(); @@ -495,7 +494,7 @@ describe.each([ }; } - public handler = createStreamHandler(app, this); + public handler = streamify(app, { scope: this }); } const lambda = new Lambda(); @@ -503,12 +502,15 @@ describe.each([ const handler = lambda.handler.bind(lambda); // Act - await handler(createTestEvent('/test', 'GET'), context, responseStream); + const result = await handler( + createTestEvent('/test', 'GET'), + responseStream, + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(200); - expect(JSON.parse(body)).toEqual({ + expect(result.statusCode).toBe(200); + expect(JSON.parse(result.body)).toEqual({ message: 'streaming-scope: streaming success', }); }); @@ -534,7 +536,7 @@ describe.each([ throw new UnauthorizedError('UnauthorizedError!'); } - public handler = createStreamHandler(app, this); + public handler = streamify(app, { scope: this }); } const lambda = new Lambda(); @@ -542,12 +544,15 @@ describe.each([ const handler = lambda.handler.bind(lambda); // Act - await handler(createTestEvent('/test', 'GET'), context, responseStream); + const result = await handler( + createTestEvent('/test', 'GET'), + responseStream, + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(401); - expect(JSON.parse(body)).toEqual({ + expect(result.statusCode).toBe(401); + expect(JSON.parse(result.body)).toEqual({ statusCode: HttpStatusCodes.UNAUTHORIZED, error: 'Unauthorized', message: 'error-scope: UnauthorizedError!', diff --git a/packages/event-handler/tests/unit/rest/Router/streaming.test.ts b/packages/event-handler/tests/unit/rest/Router/streaming.test.ts index e5c92a834e..69a7b63c44 100644 --- a/packages/event-handler/tests/unit/rest/Router/streaming.test.ts +++ b/packages/event-handler/tests/unit/rest/Router/streaming.test.ts @@ -1,13 +1,15 @@ import { Duplex, PassThrough, Readable } from 'node:stream'; import context from '@aws-lambda-powertools/testing-utils/context'; import { describe, expect, it, vi } from 'vitest'; -import { UnauthorizedError } from '../../../../src/rest/errors.js'; -import { Router } from '../../../../src/rest/index.js'; +import { + Router, + streamify, + UnauthorizedError, +} from '../../../../src/rest/index.js'; import { createTestEvent, createTestEventV2, MockResponseStream, - parseStreamOutput, } from '../helpers.js'; describe.each([ @@ -19,18 +21,19 @@ describe.each([ const app = new Router(); app.get('/test', async () => ({ message: 'Hello, World!' })); - // Create a mock ResponseStream + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(200); - expect(JSON.parse(body)).toEqual({ message: 'Hello, World!' }); + expect(result.statusCode).toBe(200); + expect(JSON.parse(result.body)).toEqual({ message: 'Hello, World!' }); }); it('streams a Response object', async () => { @@ -43,34 +46,37 @@ describe.each([ }); }); - // Create a mock ResponseStream + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(201); - expect(JSON.parse(body)).toEqual({ data: 'test' }); + expect(result.statusCode).toBe(201); + expect(JSON.parse(result.body)).toEqual({ data: 'test' }); }); it('handles route not found', async () => { // Prepare const app = new Router(); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/nonexistent', 'GET'), context, { + const result = await handler( + createEvent('/nonexistent', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(404); - const parsedBody = JSON.parse(body); + expect(result.statusCode).toBe(404); + const parsedBody = JSON.parse(result.body); expect(parsedBody.statusCode).toBe(404); }); @@ -86,18 +92,20 @@ describe.each([ app.get('/test', () => ({ message: 'middleware test' })); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(200); - expect(prelude.headers['x-custom-header']).toBe('test-value'); - expect(JSON.parse(body)).toEqual({ message: 'middleware test' }); + expect(result.statusCode).toBe(200); + expect(result.headers['x-custom-header']).toBe('test-value'); + expect(JSON.parse(result.body)).toEqual({ message: 'middleware test' }); }); it('handles thrown errors', async () => { @@ -107,17 +115,19 @@ describe.each([ throw new UnauthorizedError('Access denied'); }); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(401); - const parsedBody = JSON.parse(body); + expect(result.statusCode).toBe(401); + const parsedBody = JSON.parse(result.body); expect(parsedBody.message).toBe('Access denied'); }); @@ -134,17 +144,19 @@ describe.each([ throw new UnauthorizedError('handler error'); }); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(401); - expect(JSON.parse(body)).toEqual({ + expect(result.statusCode).toBe(401); + expect(JSON.parse(result.body)).toEqual({ statusCode: 401, message: 'Custom: handler error', }); @@ -180,51 +192,57 @@ describe.each([ // Prepare const app = new Router(); app.get('/test', handlerFn); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(200); - expect(JSON.parse(body).message).toMatch(/body$/); + expect(result.statusCode).toBe(200); + expect(JSON.parse(result.body).message).toMatch(/body$/); }); it('handles Response with no body', async () => { // Prepare const app = new Router(); app.get('/test', () => new Response(null, { status: 204 })); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(204); - expect(body).toBe(''); + expect(result.statusCode).toBe(204); + expect(result.body).toBe(''); }); it('handles Response with undefined body', async () => { // Prepare const app = new Router(); app.get('/test', () => new Response(undefined, { status: 200 })); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(200); - expect(body).toBe(''); + expect(result.statusCode).toBe(200); + expect(result.body).toBe(''); }); it('handles pipeline errors during streaming', async () => { @@ -237,13 +255,12 @@ describe.each([ }); app.get('/test', () => new Response(errorStream, { status: 200 })); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act & Assess await expect( - app.resolveStream(createEvent('/test', 'GET'), context, { - responseStream, - }) + handler(createEvent('/test', 'GET'), responseStream, context) ).rejects.toThrow('Stream error'); }); @@ -257,20 +274,20 @@ describe.each([ return { userId: params.userId, postId: params.postId }; }); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream( + const result = await handler( createEvent('/users/123/posts/456', 'GET'), - context, - { responseStream } + responseStream, + context ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(200); + expect(result.statusCode).toBe(200); expect(capturedParams).toEqual({ userId: '123', postId: '456' }); - expect(JSON.parse(body)).toEqual({ userId: '123', postId: '456' }); + expect(JSON.parse(result.body)).toEqual({ userId: '123', postId: '456' }); }); it('uses default error handler for unregistered errors', async () => { @@ -281,17 +298,19 @@ describe.each([ throw new Error('Unhandled error'); }); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(500); - const parsedBody = JSON.parse(body); + expect(result.statusCode).toBe(500); + const parsedBody = JSON.parse(result.body); expect(parsedBody.statusCode).toBe(500); expect(parsedBody.error).toBe('Internal Server Error'); expect(parsedBody.message).toBe('Unhandled error'); @@ -302,12 +321,13 @@ describe.each([ it('throws InternalServerError for invalid events', async () => { // Prepare const app = new Router(); + const handler = streamify(app); const invalidEvent = { invalid: 'event' }; const responseStream = new MockResponseStream(); // Act & Assess await expect( - app.resolveStream(invalidEvent, context, { responseStream }) + handler(invalidEvent, responseStream, context) ).rejects.toThrow(); }); @@ -323,16 +343,18 @@ describe.each([ body: Duplex.from(passThrough), })); + const handler = streamify(app); const responseStream = new MockResponseStream(); // Act - await app.resolveStream(createEvent('/test', 'GET'), context, { + const result = await handler( + createEvent('/test', 'GET'), responseStream, - }); + context + ); // Assess - const { prelude, body } = parseStreamOutput(responseStream.chunks); - expect(prelude.statusCode).toBe(200); - expect(JSON.parse(body)).toEqual({ message: 'duplex stream body' }); + expect(result.statusCode).toBe(200); + expect(JSON.parse(result.body)).toEqual({ message: 'duplex stream body' }); }); }); diff --git a/packages/event-handler/tests/unit/rest/helpers.ts b/packages/event-handler/tests/unit/rest/helpers.ts index 9f5e9e6fcc..2ee8a344cb 100644 --- a/packages/event-handler/tests/unit/rest/helpers.ts +++ b/packages/event-handler/tests/unit/rest/helpers.ts @@ -149,25 +149,6 @@ export class MockResponseStream extends HttpResponseStream { } } -// Helper to parse streaming response format -export function parseStreamOutput(chunks: Buffer[]) { - const output = Buffer.concat(chunks); - const nullBytes = Buffer.from([0, 0, 0, 0, 0, 0, 0, 0]); - const separatorIndex = output.indexOf(nullBytes); - - if (separatorIndex === -1) { - return { prelude: null, body: output.toString() }; - } - - const preludeBuffer = output.subarray(0, separatorIndex); - const bodyBuffer = output.subarray(separatorIndex + 8); - - return { - prelude: JSON.parse(preludeBuffer.toString()), - body: bodyBuffer.toString(), - }; -} - // Create a handler function from the Router instance export const createHandler = (app: Router) => { function handler( @@ -208,12 +189,6 @@ export const createHandlerWithScope = (app: Router, scope: unknown) => { return handler; }; -// Create a stream handler function from the Router instance with a custom scope -export const createStreamHandler = - (app: Router, scope: unknown) => - (event: unknown, _context: Context, responseStream: MockResponseStream) => - app.resolveStream(event, _context, { scope, responseStream }); - // Create a test Lambda class with all HTTP method decorators export const createTestLambdaClass = ( app: Router,