Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/event-handler/src/rest/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ export {
isAPIGatewayProxyEventV2,
isExtendedAPIGatewayProxyResult,
isHttpMethod,
streamify,
} from './utils.js';
81 changes: 80 additions & 1 deletion packages/event-handler/src/rest/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import {
isRegExp,
isString,
} from '@aws-lambda-powertools/commons/typeutils';
import type { APIGatewayProxyEvent, APIGatewayProxyEventV2 } from 'aws-lambda';
import type {
APIGatewayProxyEvent,
APIGatewayProxyEventV2,
StreamifyHandler,
} from 'aws-lambda';
import type { Router } from '../rest/Router.js';
import type { ResolveOptions } from '../types/index.js';
import type {
CompiledRoute,
CompressionOptions,
Expand Down Expand Up @@ -385,3 +391,76 @@ export const getStatusCode = (
}
return fallback;
};

const streamifyResponse =
globalThis.awslambda?.streamifyResponse ??
(<TEvent = unknown, TResult = void>(
handler: StreamifyHandler<TEvent, TResult>
): StreamifyHandler<TEvent, TResult> => {
return (async (event, responseStream, context) => {
await handler(event, responseStream, context);

if ('chunks' in responseStream && Array.isArray(responseStream.chunks)) {
const output = Buffer.concat(responseStream.chunks as Buffer[]);
const nullBytes = Buffer.from([0, 0, 0, 0, 0, 0, 0, 0]);
const separatorIndex = output.indexOf(nullBytes);

const preludeBuffer = output.subarray(0, separatorIndex);
const bodyBuffer = output.subarray(separatorIndex + 8);
const prelude = JSON.parse(preludeBuffer.toString());

return {
body: bodyBuffer.toString(),
headers: prelude.headers,
statusCode: prelude.statusCode,
} as TResult;
}
}) as StreamifyHandler<TEvent, TResult>;
});

/**
* Wraps a Router instance to create a Lambda handler that uses response streaming.
*
* In Lambda runtime, uses `awslambda.streamifyResponse` to enable streaming responses.
* In test/local environments, returns an unwrapped handler that works with mock streams.
*
* @param router - The Router instance to wrap
* @param options - Optional configuration including scope for decorator binding
* @returns A Lambda handler that streams responses
*
* @example
* ```typescript
* import { Router, streamify } from '@aws-lambda-powertools/event-handler/experimental-rest';
*
* const app = new Router();
* app.get('/test', () => ({ message: 'Hello' }));
*
* export const handler = streamify(app);
* ```
*
* @example
* ```typescript
* // With scope for decorators
* class Lambda {
* public scope = 'my-scope';
*
* @app.get('/test')
* public getTest() {
* return { message: `${this.scope}: success` };
* }
*
* public handler = streamify(app, { scope: this });
* }
* ```
*/
export const streamify = (
router: Router,
options?: ResolveOptions
): StreamifyHandler => {
return streamifyResponse(async (event, responseStream, context) => {
await router.resolveStream(event, context, {
responseStream,
scope: options?.scope,
});
});
};
31 changes: 18 additions & 13 deletions packages/event-handler/tests/unit/rest/Router/decorators.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ import {
MethodNotAllowedError,
type NotFoundError,
Router,
streamify,
UnauthorizedError,
} from '../../../../src/rest/index.js';
import type { RequestContext } from '../../../../src/types/rest.js';
import {
createHandler,
createHandlerWithScope,
createStreamHandler,
createTestEvent,
createTestEventV2,
createTestLambdaClass,
createTrackingMiddleware,
MockResponseStream,
parseStreamOutput,
} from '../helpers.js';

describe.each([
Expand Down Expand Up @@ -481,7 +480,7 @@ describe.each([
});

describe('streaming with decorators', () => {
it('preserves scope when using resolveStream with decorators', async () => {
it('preserves scope when using streamify with decorators', async () => {
// Prepare
const app = new Router();

Expand All @@ -495,20 +494,23 @@ describe.each([
};
}

public handler = createStreamHandler(app, this);
public handler = streamify(app, { scope: this });
}

const lambda = new Lambda();
const responseStream = new MockResponseStream();
const handler = lambda.handler.bind(lambda);

// Act
await handler(createTestEvent('/test', 'GET'), context, responseStream);
const result = await handler(
createTestEvent('/test', 'GET'),
responseStream,
context
);

// Assess
const { prelude, body } = parseStreamOutput(responseStream.chunks);
expect(prelude.statusCode).toBe(200);
expect(JSON.parse(body)).toEqual({
expect(result.statusCode).toBe(200);
expect(JSON.parse(result.body)).toEqual({
message: 'streaming-scope: streaming success',
});
});
Expand All @@ -534,20 +536,23 @@ describe.each([
throw new UnauthorizedError('UnauthorizedError!');
}

public handler = createStreamHandler(app, this);
public handler = streamify(app, { scope: this });
}

const lambda = new Lambda();
const responseStream = new MockResponseStream();
const handler = lambda.handler.bind(lambda);

// Act
await handler(createTestEvent('/test', 'GET'), context, responseStream);
const result = await handler(
createTestEvent('/test', 'GET'),
responseStream,
context
);

// Assess
const { prelude, body } = parseStreamOutput(responseStream.chunks);
expect(prelude.statusCode).toBe(401);
expect(JSON.parse(body)).toEqual({
expect(result.statusCode).toBe(401);
expect(JSON.parse(result.body)).toEqual({
statusCode: HttpStatusCodes.UNAUTHORIZED,
error: 'Unauthorized',
message: 'error-scope: UnauthorizedError!',
Expand Down
Loading
Loading