Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance and reliability when deploying multiple 2nd gen functions using single builds #6275

Merged
merged 5 commits into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Improve performance and reliability when deploying multiple 2nd gen functions using single builds. (#6275)
12 changes: 8 additions & 4 deletions src/deploy/functions/release/fabricator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
try {
await fn();
this.logOpSuccess(op, endpoint);
} catch (err: any) {

Check warning on line 125 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Unexpected any. Specify a different type
result.error = err as Error;
}
result.durationMs = timer.stop();
Expand Down Expand Up @@ -172,7 +172,7 @@
if (endpoint.platform === "gcfv1") {
await this.createV1Function(endpoint, scraper);
} else if (endpoint.platform === "gcfv2") {
await this.createV2Function(endpoint);
await this.createV2Function(endpoint, scraper);
} else {
assertExhaustive(endpoint.platform);
}
Expand All @@ -191,7 +191,7 @@
if (update.endpoint.platform === "gcfv1") {
await this.updateV1Function(update.endpoint, scraper);
} else if (update.endpoint.platform === "gcfv2") {
await this.updateV2Function(update.endpoint);
await this.updateV2Function(update.endpoint, scraper);
} else {
assertExhaustive(update.endpoint.platform);
}
Expand All @@ -209,7 +209,7 @@
}

async createV1Function(endpoint: backend.Endpoint, scraper: SourceTokenScraper): Promise<void> {
const sourceUrl = this.sources[endpoint.codebase!]?.sourceUrl;

Check warning on line 212 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Forbidden non-null assertion
if (!sourceUrl) {
logger.debug("Precondition failed. Cannot create a GCF function without sourceUrl");
throw new Error("Precondition failed");
Expand All @@ -228,7 +228,7 @@
const op: { name: string } = await gcf.createFunction(apiFunction);
return poller.pollOperation<gcf.CloudFunction>({
...gcfV1PollerOptions,
pollerName: `create-${endpoint.codebase}-${endpoint.region}-${endpoint.id}`,

Check warning on line 231 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Invalid type "string | undefined" of template literal expression
operationResourceName: op.name,
onPoll: scraper.poller,
});
Expand Down Expand Up @@ -265,7 +265,7 @@
}
} else if (
backend.isBlockingTriggered(endpoint) &&
AUTH_BLOCKING_EVENTS.includes(endpoint.blockingTrigger.eventType as any)

Check warning on line 268 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Unsafe argument of type `any` assigned to a parameter of type `"providers/cloud.auth/eventTypes/user.beforeCreate" | "providers/cloud.auth/eventTypes/user.beforeSignIn"`

Check warning on line 268 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Unexpected any. Specify a different type
) {
// Auth Blocking functions should always be public
await this.executor
Expand All @@ -276,8 +276,8 @@
}
}

async createV2Function(endpoint: backend.Endpoint): Promise<void> {
async createV2Function(endpoint: backend.Endpoint, scraper: SourceTokenScraper): Promise<void> {
const storageSource = this.sources[endpoint.codebase!]?.storage;

Check warning on line 280 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Forbidden non-null assertion
if (!storageSource) {
logger.debug("Precondition failed. Cannot create a GCFv2 function without storage");
throw new Error("Precondition failed");
Expand All @@ -293,15 +293,15 @@
.run(async () => {
try {
await pubsub.createTopic({ name: topic });
} catch (err: any) {

Check warning on line 296 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Unexpected any. Specify a different type
// Pub/Sub uses HTTP 409 (CONFLICT) with a status message of
// ALREADY_EXISTS if the topic already exists.
if (err.status === 409) {

Check warning on line 299 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Unsafe member access .status on an `any` value
return;
}
throw new FirebaseError("Unexpected error creating Pub/Sub topic", {
original: err as Error,
status: err.status,

Check warning on line 304 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Unsafe assignment of an `any` value

Check warning on line 304 in src/deploy/functions/release/fabricator.ts

View workflow job for this annotation

GitHub Actions / lint (18)

Unsafe member access .status on an `any` value
});
}
})
Expand Down Expand Up @@ -351,11 +351,13 @@
while (!resultFunction) {
resultFunction = await this.functionExecutor
.run(async () => {
apiFunction.buildConfig.sourceToken = await scraper.getToken();
const op: { name: string } = await gcfV2.createFunction(apiFunction);
return await poller.pollOperation<gcfV2.OutputCloudFunction>({
...gcfV2PollerOptions,
pollerName: `create-${endpoint.codebase}-${endpoint.region}-${endpoint.id}`,
operationResourceName: op.name,
onPoll: scraper.poller,
});
})
.catch(async (err: any) => {
Expand Down Expand Up @@ -463,7 +465,7 @@
}
}

async updateV2Function(endpoint: backend.Endpoint): Promise<void> {
async updateV2Function(endpoint: backend.Endpoint, scraper: SourceTokenScraper): Promise<void> {
const storageSource = this.sources[endpoint.codebase!]?.storage;
if (!storageSource) {
logger.debug("Precondition failed. Cannot update a GCFv2 function without storage");
Expand All @@ -482,11 +484,13 @@
const resultFunction = await this.functionExecutor
.run(
async () => {
apiFunction.buildConfig.sourceToken = await scraper.getToken();
const op: { name: string } = await gcfV2.updateFunction(apiFunction);
return await poller.pollOperation<gcfV2.OutputCloudFunction>({
...gcfV2PollerOptions,
pollerName: `update-${endpoint.codebase}-${endpoint.region}-${endpoint.id}`,
operationResourceName: op.name,
onPoll: scraper.poller,
});
},
{ retryCodes: [...DEFAULT_RETRY_CODES, CLOUD_RUN_RESOURCE_EXHAUSTED_CODE] }
Expand Down
14 changes: 12 additions & 2 deletions src/deploy/functions/release/sourceTokenScraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ type TokenFetchState = "NONE" | "FETCHING" | "VALID";
*/
export class SourceTokenScraper {
private tokenValidDurationMs;
private fetchTimeoutMs;
private resolve!: (token?: string) => void;
private promise: Promise<string | undefined>;
private expiry: number | undefined;
private fetchState: TokenFetchState;

constructor(validDurationMs = 1500000) {
constructor(validDurationMs = 1500000, fetchTimeoutMs = 180_000) {
this.tokenValidDurationMs = validDurationMs;
this.fetchTimeoutMs = fetchTimeoutMs;
this.promise = new Promise((resolve) => (this.resolve = resolve));
this.fetchState = "NONE";
}
Expand All @@ -27,7 +29,15 @@ export class SourceTokenScraper {
this.fetchState = "FETCHING";
return undefined;
} else if (this.fetchState === "FETCHING") {
return this.promise; // wait until we get a source token
const timeout = new Promise<undefined>((resolve) => {
setTimeout(() => {
this.fetchState = "NONE";
resolve(undefined);
}, this.fetchTimeoutMs);
});
// wait until we get a source token, or the timeout occurs
// and we reset the fetch state and return an undefined token.
return Promise.race([this.promise, timeout]);
} else if (this.fetchState === "VALID") {
if (this.isTokenExpired()) {
this.fetchState = "FETCHING";
Expand Down
1 change: 1 addition & 0 deletions src/gcp/cloudfunctionsv2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export interface BuildConfig {
runtime: runtimes.Runtime;
entryPoint: string;
source: Source;
sourceToken?: string;
environmentVariables: Record<string, string>;

// Output only
Expand Down
69 changes: 40 additions & 29 deletions src/test/deploy/functions/release/fabricator.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ describe("Fabricator", () => {
}
);

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(pubsub.createTopic).to.have.been.called;
expect(gcfv2.createFunction).to.have.been.called;
});
Expand All @@ -476,7 +476,7 @@ describe("Fabricator", () => {
}
);

await expect(fab.createV2Function(ep)).to.be.rejectedWith(
await expect(fab.createV2Function(ep, new scraper.SourceTokenScraper())).to.be.rejectedWith(
reporter.DeploymentError,
"create topic"
);
Expand All @@ -500,7 +500,7 @@ describe("Fabricator", () => {
}
);

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(eventarc.getChannel).to.have.been.called;
expect(eventarc.createChannel).to.not.have.been.called;
expect(gcfv2.createFunction).to.have.been.called;
Expand Down Expand Up @@ -530,7 +530,7 @@ describe("Fabricator", () => {
}
);

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(eventarc.createChannel).to.have.been.called;
expect(gcfv2.createFunction).to.have.been.called;
});
Expand Down Expand Up @@ -568,7 +568,7 @@ describe("Fabricator", () => {
}
);

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(eventarc.createChannel).to.have.been.calledOnceWith({ name: channelName });
expect(poller.pollOperation).to.have.been.called;
});
Expand All @@ -594,22 +594,27 @@ describe("Fabricator", () => {
}
);

await expect(fab.createV2Function(ep)).to.eventually.be.rejectedWith(
reporter.DeploymentError,
"upsert eventarc channel"
);
await expect(
fab.createV2Function(ep, new scraper.SourceTokenScraper())
).to.eventually.be.rejectedWith(reporter.DeploymentError, "upsert eventarc channel");
});

it("throws on create function failure", async () => {
gcfv2.createFunction.rejects(new Error("Server failure"));

const ep = endpoint({ httpsTrigger: {} }, { platform: "gcfv2" });
await expect(fab.createV2Function(ep)).to.be.rejectedWith(reporter.DeploymentError, "create");
await expect(fab.createV2Function(ep, new scraper.SourceTokenScraper())).to.be.rejectedWith(
reporter.DeploymentError,
"create"
);

gcfv2.createFunction.resolves({ name: "op", done: false });
poller.pollOperation.rejects(new Error("Fail whale"));

await expect(fab.createV2Function(ep)).to.be.rejectedWith(reporter.DeploymentError, "create");
await expect(fab.createV2Function(ep, new scraper.SourceTokenScraper())).to.be.rejectedWith(
reporter.DeploymentError,
"create"
);
});

it("deletes broken function and retries on cloud run quota exhaustion", async () => {
Expand All @@ -620,7 +625,7 @@ describe("Fabricator", () => {
poller.pollOperation.resolves({ name: "op" });

const ep = endpoint({ httpsTrigger: {} }, { platform: "gcfv2" });
await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper(1500000, 0));

expect(gcfv2.createFunction).to.have.been.calledTwice;
expect(gcfv2.deleteFunction).to.have.been.called;
Expand All @@ -632,7 +637,7 @@ describe("Fabricator", () => {
run.setInvokerCreate.rejects(new Error("Boom"));

const ep = endpoint({ httpsTrigger: {} }, { platform: "gcfv2" });
await expect(fab.createV2Function(ep)).to.be.rejectedWith(
await expect(fab.createV2Function(ep, new scraper.SourceTokenScraper())).to.be.rejectedWith(
reporter.DeploymentError,
"set invoker"
);
Expand All @@ -645,7 +650,7 @@ describe("Fabricator", () => {
run.setInvokerCreate.resolves();
const ep = endpoint({ httpsTrigger: {} }, { platform: "gcfv2" });

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerCreate).to.have.been.calledWith(ep.project, "service", ["public"]);
});

Expand All @@ -662,7 +667,7 @@ describe("Fabricator", () => {
{ platform: "gcfv2" }
);

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerCreate).to.have.been.calledWith(ep.project, "service", ["custom@"]);
});

Expand All @@ -672,7 +677,7 @@ describe("Fabricator", () => {
run.setInvokerCreate.resolves();
const ep = endpoint({ httpsTrigger: { invoker: ["private"] } }, { platform: "gcfv2" });

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerCreate).to.not.have.been.called;
});
});
Expand All @@ -684,7 +689,7 @@ describe("Fabricator", () => {
run.setInvokerCreate.resolves();
const ep = endpoint({ callableTrigger: {} }, { platform: "gcfv2" });

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerCreate).to.have.been.calledWith(ep.project, "service", ["public"]);
});
});
Expand All @@ -696,7 +701,7 @@ describe("Fabricator", () => {
run.setInvokerCreate.resolves();
const ep = endpoint({ taskQueueTrigger: {} }, { platform: "gcfv2" });

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerCreate).to.not.have.been.called;
});

Expand All @@ -712,7 +717,7 @@ describe("Fabricator", () => {
},
{ platform: "gcfv2" }
);
await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerCreate).to.have.been.calledWith(ep.project, "service", ["custom@"]);
});
});
Expand All @@ -727,7 +732,7 @@ describe("Fabricator", () => {
{ platform: "gcfv2" }
);

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerCreate).to.have.been.calledWith(ep.project, "service", ["public"]);
});
});
Expand All @@ -741,7 +746,7 @@ describe("Fabricator", () => {
{ platform: "gcfv2" }
);

await fab.createV2Function(ep);
await fab.createV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerCreate).to.not.have.been.called;
});
});
Expand All @@ -751,11 +756,17 @@ describe("Fabricator", () => {
gcfv2.updateFunction.rejects(new Error("Server failure"));

const ep = endpoint({ httpsTrigger: {} }, { platform: "gcfv2" });
await expect(fab.updateV2Function(ep)).to.be.rejectedWith(reporter.DeploymentError, "update");
await expect(fab.updateV2Function(ep, new scraper.SourceTokenScraper())).to.be.rejectedWith(
reporter.DeploymentError,
"update"
);

gcfv2.updateFunction.resolves({ name: "op", done: false });
poller.pollOperation.rejects(new Error("Fail whale"));
await expect(fab.updateV2Function(ep)).to.be.rejectedWith(reporter.DeploymentError, "update");
await expect(fab.updateV2Function(ep, new scraper.SourceTokenScraper())).to.be.rejectedWith(
reporter.DeploymentError,
"update"
);
});

it("throws on set invoker failure", async () => {
Expand All @@ -764,7 +775,7 @@ describe("Fabricator", () => {
run.setInvokerUpdate.rejects(new Error("Boom"));

const ep = endpoint({ httpsTrigger: { invoker: ["private"] } }, { platform: "gcfv2" });
await expect(fab.updateV2Function(ep)).to.be.rejectedWith(
await expect(fab.updateV2Function(ep, new scraper.SourceTokenScraper())).to.be.rejectedWith(
reporter.DeploymentError,
"set invoker"
);
Expand All @@ -783,7 +794,7 @@ describe("Fabricator", () => {
{ platform: "gcfv2" }
);

await fab.updateV2Function(ep);
await fab.updateV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerUpdate).to.have.been.calledWith(ep.project, "service", ["custom@"]);
});

Expand All @@ -800,7 +811,7 @@ describe("Fabricator", () => {
{ platform: "gcfv2" }
);

await fab.updateV2Function(ep);
await fab.updateV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerUpdate).to.have.been.calledWith(ep.project, "service", ["custom@"]);
});

Expand All @@ -817,7 +828,7 @@ describe("Fabricator", () => {
{ platform: "gcfv2" }
);

await fab.updateV2Function(ep);
await fab.updateV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerUpdate).to.have.been.calledWith(ep.project, "service", ["public"]);
});

Expand All @@ -827,7 +838,7 @@ describe("Fabricator", () => {
run.setInvokerUpdate.resolves();
const ep = endpoint({ httpsTrigger: {} }, { platform: "gcfv2" });

await fab.updateV2Function(ep);
await fab.updateV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerUpdate).to.not.have.been.called;
});

Expand All @@ -840,7 +851,7 @@ describe("Fabricator", () => {
{ platform: "gcfv2" }
);

await fab.updateV2Function(ep);
await fab.updateV2Function(ep, new scraper.SourceTokenScraper());
expect(run.setInvokerUpdate).to.not.have.been.called;
});
});
Expand Down
6 changes: 6 additions & 0 deletions src/test/deploy/functions/release/sourceTokenScraper.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ describe("SourceTokenScraper", () => {
await expect(scraper.getToken()).to.eventually.equal("magic token #2");
});

it("resets fetch state after timeout and returns undefined token", async () => {
const scraper = new SourceTokenScraper(100000, 10);
await expect(scraper.getToken()).to.eventually.be.undefined;
await expect(scraper.getToken()).to.eventually.be.undefined;
});

it("concurrent requests for source token", async () => {
const scraper = new SourceTokenScraper();

Expand Down
Loading