Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Functions work queue #1725

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/commands/emulators-start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import * as utils from "../utils";
module.exports = new Command("emulators:start")
.before(beforeEmulatorCommand)
.description("start the local Firebase emulators")
.option(
"--functionsMode <mode>",
"Control functions execution mode. Valid options are: default, debug"
)
.option(
"--only <list>",
"only run specific emulators. " +
Expand Down
310 changes: 176 additions & 134 deletions src/emulator/functionsEmulator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import * as clc from "cli-color";
import * as http from "http";

import * as getProjectId from "../getProjectId";
import * as functionsConfig from "../functionsConfig";
import * as utils from "../utils";
import * as logger from "../logger";
import * as track from "../track";
Expand All @@ -29,6 +28,7 @@ import { EmulatorRegistry } from "./registry";
import { EventEmitter } from "events";
import * as stream from "stream";
import { EmulatorLogger, Verbosity } from "./emulatorLogger";
import { WorkQueue, WorkMode } from "./workQueue";

const EVENT_INVOKE = "functions:invoke";

Expand Down Expand Up @@ -83,7 +83,154 @@ export class FunctionsEmulator implements EmulatorInstance {
return `http://${host}:${port}/${projectId}/${region}/${name}`;
}

static async handleBackgroundTrigger(
bundleTemplate: FunctionsRuntimeBundle,
nodeBinary: string,
req: express.Request,
res: express.Response
) {
const method = req.method;
const triggerId = req.params.trigger_name;

EmulatorLogger.log("DEBUG", `Accepted request ${method} ${req.url} --> ${triggerId}`);

const reqBody = (req as RequestWithRawBody).rawBody;
const proto = JSON.parse(reqBody.toString());

const runtime = FunctionsEmulator.startFunctionRuntime(
bundleTemplate,
triggerId,
EmulatedTriggerType.BACKGROUND,
nodeBinary,
proto
);

runtime.events.on("log", (el: EmulatorLog) => {
if (el.level === "FATAL") {
res.send(el.text);
}
});

// This "waiter" must be established before we block on "ready" since we expect
// this log entry to happen during the readying.
const triggerLogPromise = waitForLog(runtime.events, "SYSTEM", "triggers-parsed");

EmulatorLogger.log("DEBUG", `[functions] Waiting for runtime to be ready!`);
await runtime.ready;
EmulatorLogger.log("DEBUG", JSON.stringify(runtime.metadata));

const triggerLog = await triggerLogPromise;
const triggerMap: EmulatedTriggerMap = triggerLog.data.triggers;

const trigger = triggerMap[triggerId];
const service = getFunctionService(trigger.definition);
track(EVENT_INVOKE, service);

await runtime.exit;
return res.json({ status: "acknowledged" });
}

static async handleHttpsTrigger(
bundleTemplate: FunctionsRuntimeBundle,
nodeBinary: string,
req: express.Request,
res: express.Response
) {
const method = req.method;
const triggerId = req.params.trigger_name;

logger.debug(`Accepted request ${method} ${req.url} --> ${triggerId}`);

const reqBody = (req as RequestWithRawBody).rawBody;

const runtime = FunctionsEmulator.startFunctionRuntime(
bundleTemplate,
triggerId,
EmulatedTriggerType.HTTPS,
nodeBinary
);

runtime.events.on("log", (el: EmulatorLog) => {
if (el.level === "FATAL") {
res.status(500).send(el.text);
}
});

await runtime.ready;
logger.debug(JSON.stringify(runtime.metadata));
track(EVENT_INVOKE, "https");

EmulatorLogger.log(
"DEBUG",
`[functions] Runtime ready! Sending request! ${JSON.stringify(runtime.metadata)}`
);

// We do this instead of just 302'ing because many HTTP clients don't respect 302s so it may cause unexpected
// situations - not to mention CORS troubles and this enables us to use a socketPath (IPC socket) instead of
// consuming yet another port which is probably faster as well.
const runtimeReq = http.request(
{
method,
path: req.url || "/",
headers: req.headers,
socketPath: runtime.metadata.socketPath,
},
(runtimeRes: http.IncomingMessage) => {
function forwardStatusAndHeaders(): void {
res.status(runtimeRes.statusCode || 200);
if (!res.headersSent) {
Object.keys(runtimeRes.headers).forEach((key) => {
const val = runtimeRes.headers[key];
if (val) {
res.setHeader(key, val);
}
});
}
}

runtimeRes.on("data", (buf) => {
forwardStatusAndHeaders();
res.write(buf);
});

runtimeRes.on("close", () => {
forwardStatusAndHeaders();
res.end();
});

runtimeRes.on("end", () => {
forwardStatusAndHeaders();
res.end();
});
}
);

runtimeReq.on("error", () => {
res.end();
});

// If the original request had a body, forward that over the connection.
// TODO: Why is this not handled by the pipe?
if (reqBody) {
runtimeReq.write(reqBody);
runtimeReq.end();
}

// Pipe the incoming request over the socket.
req
.pipe(
runtimeReq,
{ end: true }
)
.on("error", () => {
res.end();
});

await runtime.exit;
}

static createHubServer(
queue: WorkQueue,
bundleTemplate: FunctionsRuntimeBundle,
nodeBinary: string
): express.Application {
Expand Down Expand Up @@ -114,144 +261,24 @@ export class FunctionsEmulator implements EmulatorInstance {
// A trigger named "foo" needs to respond at "foo" as well as "foo/*" but not "fooBar".
const httpsFunctionRoutes = [httpsFunctionRoute, `${httpsFunctionRoute}/*`];

const backgroundHandler = async (req: express.Request, res: express.Response) => {
const method = req.method;
const triggerId = req.params.trigger_name;

EmulatorLogger.log("DEBUG", `Accepted request ${method} ${req.url} --> ${triggerId}`);

const reqBody = (req as RequestWithRawBody).rawBody;
const proto = JSON.parse(reqBody.toString());

const runtime = FunctionsEmulator.startFunctionRuntime(
bundleTemplate,
triggerId,
EmulatedTriggerType.BACKGROUND,
nodeBinary,
proto
);

runtime.events.on("log", (el: EmulatorLog) => {
if (el.level === "FATAL") {
res.send(el.text);
}
// TODO: Should bundleTemplate and nodeBinary be held as global state?
const backgroundHandler: express.RequestHandler = async (
req: express.Request,
res: express.Response
) => {
queue.submit(() => {
return this.handleBackgroundTrigger(bundleTemplate, nodeBinary, req, res);
});

// This "waiter" must be established before we block on "ready" since we expect
// this log entry to happen during the readying.
const triggerLogPromise = waitForLog(runtime.events, "SYSTEM", "triggers-parsed");

EmulatorLogger.log("DEBUG", `[functions] Waiting for runtime to be ready!`);
await runtime.ready;
EmulatorLogger.log("DEBUG", JSON.stringify(runtime.metadata));

const triggerLog = await triggerLogPromise;
const triggerMap: EmulatedTriggerMap = triggerLog.data.triggers;

const trigger = triggerMap[triggerId];
const service = getFunctionService(trigger.definition);
track(EVENT_INVOKE, service);

await runtime.exit;
return res.json({ status: "acknowledged" });
};

// Define a common handler function to use for GET and POST requests.
const httpsHandler: express.RequestHandler = async (
req: express.Request,
res: express.Response
) => {
const method = req.method;
const triggerId = req.params.trigger_name;

logger.debug(`Accepted request ${method} ${req.url} --> ${triggerId}`);

const reqBody = (req as RequestWithRawBody).rawBody;

const runtime = FunctionsEmulator.startFunctionRuntime(
bundleTemplate,
triggerId,
EmulatedTriggerType.HTTPS,
nodeBinary
);

runtime.events.on("log", (el: EmulatorLog) => {
if (el.level === "FATAL") {
res.status(500).send(el.text);
}
queue.submit(() => {
return this.handleHttpsTrigger(bundleTemplate, nodeBinary, req, res);
});

await runtime.ready;
logger.debug(JSON.stringify(runtime.metadata));
track(EVENT_INVOKE, "https");

EmulatorLogger.log(
"DEBUG",
`[functions] Runtime ready! Sending request! ${JSON.stringify(runtime.metadata)}`
);

// We do this instead of just 302'ing because many HTTP clients don't respect 302s so it may cause unexpected
// situations - not to mention CORS troubles and this enables us to use a socketPath (IPC socket) instead of
// consuming yet another port which is probably faster as well.
const runtimeReq = http.request(
{
method,
path: req.url || "/",
headers: req.headers,
socketPath: runtime.metadata.socketPath,
},
(runtimeRes: http.IncomingMessage) => {
function forwardStatusAndHeaders(): void {
res.status(runtimeRes.statusCode || 200);
if (!res.headersSent) {
Object.keys(runtimeRes.headers).forEach((key) => {
const val = runtimeRes.headers[key];
if (val) {
res.setHeader(key, val);
}
});
}
}

runtimeRes.on("data", (buf) => {
forwardStatusAndHeaders();
res.write(buf);
});

runtimeRes.on("close", () => {
forwardStatusAndHeaders();
res.end();
});

runtimeRes.on("end", () => {
forwardStatusAndHeaders();
res.end();
});
}
);

runtimeReq.on("error", () => {
res.end();
});

// If the original request had a body, forward that over the connection.
// TODO: Why is this not handled by the pipe?
if (reqBody) {
runtimeReq.write(reqBody);
runtimeReq.end();
}

// Pipe the incoming request over the socket.
req
.pipe(
runtimeReq,
{ end: true }
)
.on("error", () => {
res.end();
});

await runtime.exit;
};

// The ordering here is important. The longer routes (background)
Expand Down Expand Up @@ -426,6 +453,7 @@ You can probably fix this by running "npm install ${
readonly projectId: string = "";
nodeBinary: string = "";

private queue: WorkQueue;
private server?: http.Server;
private functionsDir: string = "";
private triggers: EmulatedTriggerDefinition[] = [];
Expand All @@ -441,15 +469,28 @@ You can probably fix this by running "npm install ${

// TODO: Would prefer not to have static state but here we are!
EmulatorLogger.verbosity = this.args.quiet ? Verbosity.QUIET : Verbosity.DEBUG;

if (this.options.functionsMode === "debug") {
EmulatorLogger.logLabeled(
"WARN",
"functions",
"You have started the functions emulator in debug mode, functions will execute one at a time"
);
this.queue = new WorkQueue(WorkMode.SEQUENTIAL);
} else {
this.queue = new WorkQueue(WorkMode.CONCURRENT);
}
}

async start(): Promise<void> {
this.nodeBinary = await this.askInstallNodeVersion(this.functionsDir);
const { host, port } = this.getInfo();
this.server = FunctionsEmulator.createHubServer(this.getBaseBundle(), this.nodeBinary).listen(
port,
host
);
this.server = FunctionsEmulator.createHubServer(
this.queue,
this.getBaseBundle(),
this.nodeBinary
).listen(port, host);
this.queue.start();
}

async connect(): Promise<void> {
Expand Down Expand Up @@ -665,6 +706,7 @@ You can probably fix this by running "npm install ${
}

async stop(): Promise<void> {
this.queue.stop();
Promise.resolve(this.server && this.server.close());
}

Expand Down
Loading