diff --git a/packages/open-next/src/adapters/lambda.ts b/packages/open-next/src/adapters/lambda.ts new file mode 100644 index 00000000..97e461ad --- /dev/null +++ b/packages/open-next/src/adapters/lambda.ts @@ -0,0 +1,23 @@ +import { APIGatewayProxyEventV2, Context } from "aws-lambda"; +import * as http from "http"; +import { Writable } from "stream"; + +type Handler = ( + event: APIGatewayProxyEventV2, + responseStream: Writable, + context: Context +) => Promise; + +interface Metadata { + statusCode: number; + headers: Record; +} + +declare global { + namespace awslambda { + function streamifyResponse(handler: Handler): any; + module HttpResponseStream { + function from(res: Writable, metadata: Metadata): any; + } + } +} diff --git a/packages/open-next/src/adapters/response.ts b/packages/open-next/src/adapters/response.ts index 5cb19d78..d783bcd3 100644 --- a/packages/open-next/src/adapters/response.ts +++ b/packages/open-next/src/adapters/response.ts @@ -1,5 +1,7 @@ // @ts-nocheck -import http from "node:http"; +import http, { IncomingMessage } from "node:http"; +import { Socket } from "node:net"; +import { Writable } from "node:stream"; const headerEnd = "\r\n\r\n"; @@ -17,11 +19,7 @@ function getString(data) { } function addData(stream, data) { - if ( - Buffer.isBuffer(data) || - typeof data === "string" || - data instanceof Uint8Array - ) { + if (Buffer.isBuffer(data) || typeof data === "string" || data instanceof Uint8Array) { stream[BODY].push(Buffer.from(data)); } else { throw new Error(`response.write() of unexpected type: ${typeof data}`); @@ -40,9 +38,9 @@ export class ServerResponse extends http.ServerResponse { return response; } - static body(res) { - return Buffer.concat(res[BODY]); - } + // static body(res) { + // return Buffer.concat(res[BODY]); + // } static headers(res) { const headers = @@ -75,13 +73,31 @@ export class ServerResponse extends http.ServerResponse { break; } } + // console.log( + // "this.responseStream", + // this, + // statusCode, + // headers ?? this[HEADERS], + // this.getHeaders() + // ); + if (!this.headersSent) { + try { + this.responseStream = awslambda.HttpResponseStream.from(this.responseStream, { + statusCode, + headers: this.getHeaders(), + }); + } catch (e) { + console.log(e); + } + } - super.writeHead(statusCode, reason, obj); + // super.writeHead(statusCode, reason, obj); } - constructor({ method }) { + constructor({ method }, responseStream: Writable) { super({ method }); + this.responseStream = responseStream; this[BODY] = []; this[HEADERS] = {}; @@ -92,41 +108,16 @@ export class ServerResponse extends http.ServerResponse { this.assignSocket({ _writableState: {}, writable: true, - on: Function.prototype, - removeListener: Function.prototype, - destroy: Function.prototype, - cork: Function.prototype, - uncork: Function.prototype, - write: (data, encoding, cb) => { - if (typeof encoding === "function") { - cb = encoding; - encoding = null; - } - - if (this._header === "" || this._wroteHeader) { - addData(this, data); - } else { - const string = getString(data); - const index = string.indexOf(headerEnd); - - if (index !== -1) { - const remainder = string.slice(index + headerEnd.length); - - if (remainder) { - addData(this, remainder); - } - - this._wroteHeader = true; - } - } - - if (typeof cb === "function") { - cb(); - } - }, + on: responseStream.on.bind(responseStream), + removeListener: responseStream.removeListener.bind(responseStream), + destroy: responseStream.destroy.bind(responseStream), + cork: responseStream.cork.bind(responseStream), + uncork: responseStream.uncork.bind(responseStream), + write: responseStream.write.bind(responseStream), }); this.once("finish", () => { + responseStream.end(); this.emit("close"); }); } diff --git a/packages/open-next/src/adapters/server-adapter.ts b/packages/open-next/src/adapters/server-adapter.ts index 04eb925b..dce7b60d 100644 --- a/packages/open-next/src/adapters/server-adapter.ts +++ b/packages/open-next/src/adapters/server-adapter.ts @@ -14,6 +14,7 @@ import NextServer from "next/dist/server/next-server.js"; import { loadConfig } from "./util.js"; import { isBinaryContentType } from "./binary.js"; import { debug } from "./logger.js"; +import { Writable } from "node:stream"; setNextjsServerWorkingDirectory(); const nextDir = path.join(__dirname, ".next"); @@ -43,9 +44,7 @@ const eventParser = { }, get url() { const { rawPath, rawQueryString } = event; - return rawQueryString.length > 0 - ? `${rawPath}?${rawQueryString}` - : rawPath; + return rawQueryString.length > 0 ? `${rawPath}?${rawQueryString}` : rawPath; }, get body() { const { body, isBase64Encoded } = event; @@ -122,8 +121,9 @@ const eventParser = { // Handler // ///////////// -export async function handler( - event: APIGatewayProxyEventV2 | CloudFrontRequestEvent +export const handler = awslambda.streamifyResponse(async function ( + event: APIGatewayProxyEventV2 | CloudFrontRequestEvent, + responseStream: Writable ) { debug("handler event", event); @@ -141,36 +141,36 @@ export async function handler( }; debug("IncomingMessage constructor props", reqProps); const req = new IncomingMessage(reqProps); - const res = new ServerResponse({ method: reqProps.method }); + let res = new ServerResponse({ method: reqProps.method }, responseStream); // Process Next.js request await processRequest(req, res); // Format Next.js response to Lambda response - const statusCode = res.statusCode || 200; - const headers = ServerResponse.headers(res); - const isBase64Encoded = isBinaryContentType( - Array.isArray(headers["content-type"]) - ? headers["content-type"][0] - : headers["content-type"] - ); - const encoding = isBase64Encoded ? "base64" : "utf8"; - const body = ServerResponse.body(res).toString(encoding); - debug("ServerResponse data", { statusCode, headers, isBase64Encoded, body }); + // const statusCode = res.statusCode || 200; + // const headers = ServerResponse.headers(res); + // const isBase64Encoded = isBinaryContentType( + // Array.isArray(headers["content-type"]) + // ? headers["content-type"][0] + // : headers["content-type"] + // ); + // const encoding = isBase64Encoded ? "base64" : "utf8"; + // const body = ServerResponse.body(res).toString(encoding); + // debug("ServerResponse data", { statusCode, headers, isBase64Encoded, body }); + //TODO: Find a way to implement this with streaming // WORKAROUND: `NextServer` does not set cache response headers for HTML pages — https://github.com/serverless-stack/open-next#workaround-nextserver-does-not-set-cache-response-headers-for-html-pages - if (htmlPages.includes(parser.rawPath) && headers["cache-control"]) { - headers["cache-control"] = - "public, max-age=0, s-maxage=31536000, must-revalidate"; - } + // if (htmlPages.includes(parser.rawPath) && headers["cache-control"]) { + // headers["cache-control"] = "public, max-age=0, s-maxage=31536000, must-revalidate"; + // }= - return isCloudFrontEvent - ? // WORKAROUND: public/ static files served by the server function (AWS specific) — https://github.com/serverless-stack/open-next#workaround-public-static-files-served-by-the-server-function-aws-specific - statusCode === 404 - ? formatCloudFrontFailoverResponse(event as CloudFrontRequestEvent) - : formatCloudFrontResponse({ statusCode, headers, isBase64Encoded, body }) - : formatApiv2Response({ statusCode, headers, isBase64Encoded, body }); -} + // return isCloudFrontEvent + // ? // WORKAROUND: public/ static files served by the server function (AWS specific) — https://github.com/serverless-stack/open-next#workaround-public-static-files-served-by-the-server-function-aws-specific + // statusCode === 404 + // ? formatCloudFrontFailoverResponse(event as CloudFrontRequestEvent) + // : formatCloudFrontResponse({ statusCode, headers, isBase64Encoded, body }) + // : formatApiv2Response({ statusCode, headers, isBase64Encoded, body }); +}); ////////////////////// // Helper functions //