Skip to content

Commit

Permalink
feat: added basic lambda streaming support
Browse files Browse the repository at this point in the history
  • Loading branch information
conico974 committed Apr 12, 2023
1 parent d81ddcc commit 688150d
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 70 deletions.
23 changes: 23 additions & 0 deletions packages/open-next/src/adapters/lambda.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { APIGatewayProxyEventV2, Context } from "aws-lambda";
import * as http from "http";
import { Writable } from "stream";

type Handler = (
event: APIGatewayProxyEventV2,
responseStream: Writable,
context: Context
) => Promise<any>;

interface Metadata {
statusCode: number;
headers: Record<string, string>;
}

declare global {
namespace awslambda {
function streamifyResponse(handler: Handler): any;
module HttpResponseStream {
function from(res: Writable, metadata: Metadata): any;
}
}
}
77 changes: 34 additions & 43 deletions packages/open-next/src/adapters/response.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// @ts-nocheck
import http from "node:http";
import http, { IncomingMessage } from "node:http";
import { Socket } from "node:net";
import { Writable } from "node:stream";

const headerEnd = "\r\n\r\n";

Expand All @@ -17,11 +19,7 @@ function getString(data) {
}

function addData(stream, data) {
if (
Buffer.isBuffer(data) ||
typeof data === "string" ||
data instanceof Uint8Array
) {
if (Buffer.isBuffer(data) || typeof data === "string" || data instanceof Uint8Array) {
stream[BODY].push(Buffer.from(data));
} else {
throw new Error(`response.write() of unexpected type: ${typeof data}`);
Expand All @@ -40,9 +38,9 @@ export class ServerResponse extends http.ServerResponse {
return response;
}

static body(res) {
return Buffer.concat(res[BODY]);
}
// static body(res) {
// return Buffer.concat(res[BODY]);
// }

static headers(res) {
const headers =
Expand Down Expand Up @@ -75,13 +73,31 @@ export class ServerResponse extends http.ServerResponse {
break;
}
}
// console.log(
// "this.responseStream",
// this,
// statusCode,
// headers ?? this[HEADERS],
// this.getHeaders()
// );
if (!this.headersSent) {
try {
this.responseStream = awslambda.HttpResponseStream.from(this.responseStream, {
statusCode,
headers: this.getHeaders(),
});
} catch (e) {
console.log(e);
}
}

super.writeHead(statusCode, reason, obj);
// super.writeHead(statusCode, reason, obj);
}

constructor({ method }) {
constructor({ method }, responseStream: Writable) {
super({ method });

this.responseStream = responseStream;
this[BODY] = [];
this[HEADERS] = {};

Expand All @@ -92,41 +108,16 @@ export class ServerResponse extends http.ServerResponse {
this.assignSocket({
_writableState: {},
writable: true,
on: Function.prototype,
removeListener: Function.prototype,
destroy: Function.prototype,
cork: Function.prototype,
uncork: Function.prototype,
write: (data, encoding, cb) => {
if (typeof encoding === "function") {
cb = encoding;
encoding = null;
}

if (this._header === "" || this._wroteHeader) {
addData(this, data);
} else {
const string = getString(data);
const index = string.indexOf(headerEnd);

if (index !== -1) {
const remainder = string.slice(index + headerEnd.length);

if (remainder) {
addData(this, remainder);
}

this._wroteHeader = true;
}
}

if (typeof cb === "function") {
cb();
}
},
on: responseStream.on.bind(responseStream),
removeListener: responseStream.removeListener.bind(responseStream),
destroy: responseStream.destroy.bind(responseStream),
cork: responseStream.cork.bind(responseStream),
uncork: responseStream.uncork.bind(responseStream),
write: responseStream.write.bind(responseStream),
});

this.once("finish", () => {
responseStream.end();
this.emit("close");
});
}
Expand Down
54 changes: 27 additions & 27 deletions packages/open-next/src/adapters/server-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import NextServer from "next/dist/server/next-server.js";
import { loadConfig } from "./util.js";
import { isBinaryContentType } from "./binary.js";
import { debug } from "./logger.js";
import { Writable } from "node:stream";

setNextjsServerWorkingDirectory();
const nextDir = path.join(__dirname, ".next");
Expand Down Expand Up @@ -43,9 +44,7 @@ const eventParser = {
},
get url() {
const { rawPath, rawQueryString } = event;
return rawQueryString.length > 0
? `${rawPath}?${rawQueryString}`
: rawPath;
return rawQueryString.length > 0 ? `${rawPath}?${rawQueryString}` : rawPath;
},
get body() {
const { body, isBase64Encoded } = event;
Expand Down Expand Up @@ -122,8 +121,9 @@ const eventParser = {
// Handler //
/////////////

export async function handler(
event: APIGatewayProxyEventV2 | CloudFrontRequestEvent
export const handler = awslambda.streamifyResponse(async function (
event: APIGatewayProxyEventV2 | CloudFrontRequestEvent,
responseStream: Writable
) {
debug("handler event", event);

Expand All @@ -141,36 +141,36 @@ export async function handler(
};
debug("IncomingMessage constructor props", reqProps);
const req = new IncomingMessage(reqProps);
const res = new ServerResponse({ method: reqProps.method });
let res = new ServerResponse({ method: reqProps.method }, responseStream);

// Process Next.js request
await processRequest(req, res);

// Format Next.js response to Lambda response
const statusCode = res.statusCode || 200;
const headers = ServerResponse.headers(res);
const isBase64Encoded = isBinaryContentType(
Array.isArray(headers["content-type"])
? headers["content-type"][0]
: headers["content-type"]
);
const encoding = isBase64Encoded ? "base64" : "utf8";
const body = ServerResponse.body(res).toString(encoding);
debug("ServerResponse data", { statusCode, headers, isBase64Encoded, body });
// const statusCode = res.statusCode || 200;
// const headers = ServerResponse.headers(res);
// const isBase64Encoded = isBinaryContentType(
// Array.isArray(headers["content-type"])
// ? headers["content-type"][0]
// : headers["content-type"]
// );
// const encoding = isBase64Encoded ? "base64" : "utf8";
// const body = ServerResponse.body(res).toString(encoding);
// debug("ServerResponse data", { statusCode, headers, isBase64Encoded, body });

//TODO: Find a way to implement this with streaming
// WORKAROUND: `NextServer` does not set cache response headers for HTML pages — https://github.com/serverless-stack/open-next#workaround-nextserver-does-not-set-cache-response-headers-for-html-pages
if (htmlPages.includes(parser.rawPath) && headers["cache-control"]) {
headers["cache-control"] =
"public, max-age=0, s-maxage=31536000, must-revalidate";
}
// if (htmlPages.includes(parser.rawPath) && headers["cache-control"]) {
// headers["cache-control"] = "public, max-age=0, s-maxage=31536000, must-revalidate";
// }=

return isCloudFrontEvent
? // WORKAROUND: public/ static files served by the server function (AWS specific) — https://github.com/serverless-stack/open-next#workaround-public-static-files-served-by-the-server-function-aws-specific
statusCode === 404
? formatCloudFrontFailoverResponse(event as CloudFrontRequestEvent)
: formatCloudFrontResponse({ statusCode, headers, isBase64Encoded, body })
: formatApiv2Response({ statusCode, headers, isBase64Encoded, body });
}
// return isCloudFrontEvent
// ? // WORKAROUND: public/ static files served by the server function (AWS specific) — https://github.com/serverless-stack/open-next#workaround-public-static-files-served-by-the-server-function-aws-specific
// statusCode === 404
// ? formatCloudFrontFailoverResponse(event as CloudFrontRequestEvent)
// : formatCloudFrontResponse({ statusCode, headers, isBase64Encoded, body })
// : formatApiv2Response({ statusCode, headers, isBase64Encoded, body });
});

//////////////////////
// Helper functions //
Expand Down

0 comments on commit 688150d

Please sign in to comment.