Skip to content

Commit

Permalink
Improve performance and reliability when deploying multiple 2nd gen f…
Browse files Browse the repository at this point in the history
…unctions using single builds (#6376)

* enable single builds v2
  • Loading branch information
blidd-google committed Sep 26, 2023
1 parent 67f7480 commit d04add6
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
- Improve performance and reliability when deploying multiple 2nd gen functions using single builds. (#6376)
- Fixed an issue where `emulators:export` did not check if the target folder is empty. (#6313)
- Fix "Could not find the next executable" on Next.js deployments (#6372)
50 changes: 37 additions & 13 deletions src/deploy/functions/release/fabricator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,22 @@ export class Fabricator {
};

const upserts: Array<Promise<void>> = [];
const scraper = new SourceTokenScraper();
const scraperV1 = new SourceTokenScraper();
const scraperV2 = new SourceTokenScraper();
for (const endpoint of changes.endpointsToCreate) {
this.logOpStart("creating", endpoint);
upserts.push(handle("create", endpoint, () => this.createEndpoint(endpoint, scraper)));
upserts.push(
handle("create", endpoint, () => this.createEndpoint(endpoint, scraperV1, scraperV2))
);
}
for (const endpoint of changes.endpointsToSkip) {
utils.logSuccess(this.getLogSuccessMessage("skip", endpoint));
}
for (const update of changes.endpointsToUpdate) {
this.logOpStart("updating", update.endpoint);
upserts.push(handle("update", update.endpoint, () => this.updateEndpoint(update, scraper)));
upserts.push(
handle("update", update.endpoint, () => this.updateEndpoint(update, scraperV1, scraperV2))
);
}
await utils.allSettled(upserts);

Expand All @@ -167,31 +172,39 @@ export class Fabricator {
return deployResults;
}

async createEndpoint(endpoint: backend.Endpoint, scraper: SourceTokenScraper): Promise<void> {
async createEndpoint(
endpoint: backend.Endpoint,
scraperV1: SourceTokenScraper,
scraperV2: SourceTokenScraper
): Promise<void> {
endpoint.labels = { ...endpoint.labels, ...deploymentTool.labels() };
if (endpoint.platform === "gcfv1") {
await this.createV1Function(endpoint, scraper);
await this.createV1Function(endpoint, scraperV1);
} else if (endpoint.platform === "gcfv2") {
await this.createV2Function(endpoint);
await this.createV2Function(endpoint, scraperV2);
} else {
assertExhaustive(endpoint.platform);
}

await this.setTrigger(endpoint);
}

async updateEndpoint(update: planner.EndpointUpdate, scraper: SourceTokenScraper): Promise<void> {
async updateEndpoint(
update: planner.EndpointUpdate,
scraperV1: SourceTokenScraper,
scraperV2: SourceTokenScraper
): Promise<void> {
update.endpoint.labels = { ...update.endpoint.labels, ...deploymentTool.labels() };
if (update.deleteAndRecreate) {
await this.deleteEndpoint(update.deleteAndRecreate);
await this.createEndpoint(update.endpoint, scraper);
await this.createEndpoint(update.endpoint, scraperV1, scraperV2);
return;
}

if (update.endpoint.platform === "gcfv1") {
await this.updateV1Function(update.endpoint, scraper);
await this.updateV1Function(update.endpoint, scraperV1);
} else if (update.endpoint.platform === "gcfv2") {
await this.updateV2Function(update.endpoint);
await this.updateV2Function(update.endpoint, scraperV2);
} else {
assertExhaustive(update.endpoint.platform);
}
Expand Down Expand Up @@ -276,7 +289,7 @@ export class Fabricator {
}
}

async createV2Function(endpoint: backend.Endpoint): Promise<void> {
async createV2Function(endpoint: backend.Endpoint, scraper: SourceTokenScraper): Promise<void> {
const storageSource = this.sources[endpoint.codebase!]?.storage;
if (!storageSource) {
logger.debug("Precondition failed. Cannot create a GCFv2 function without storage");
Expand Down Expand Up @@ -351,14 +364,19 @@ export class Fabricator {
while (!resultFunction) {
resultFunction = await this.functionExecutor
.run(async () => {
apiFunction.buildConfig.sourceToken = await scraper.getToken();
const op: { name: string } = await gcfV2.createFunction(apiFunction);
return await poller.pollOperation<gcfV2.OutputCloudFunction>({
...gcfV2PollerOptions,
pollerName: `create-${endpoint.codebase}-${endpoint.region}-${endpoint.id}`,
operationResourceName: op.name,
onPoll: scraper.poller,
});
})
.catch(async (err: any) => {
// Abort waiting on source token so other concurrent calls don't get stuck
scraper.abort();

// If the createFunction call returns RPC error code RESOURCE_EXHAUSTED (8),
// we have exhausted the underlying Cloud Run API quota. To retry, we need to
// first delete the GCF function resource, then call createFunction again.
Expand Down Expand Up @@ -463,7 +481,7 @@ export class Fabricator {
}
}

async updateV2Function(endpoint: backend.Endpoint): Promise<void> {
async updateV2Function(endpoint: backend.Endpoint, scraper: SourceTokenScraper): Promise<void> {
const storageSource = this.sources[endpoint.codebase!]?.storage;
if (!storageSource) {
logger.debug("Precondition failed. Cannot update a GCFv2 function without storage");
Expand All @@ -482,16 +500,22 @@ export class Fabricator {
const resultFunction = await this.functionExecutor
.run(
async () => {
apiFunction.buildConfig.sourceToken = await scraper.getToken();
const op: { name: string } = await gcfV2.updateFunction(apiFunction);
return await poller.pollOperation<gcfV2.OutputCloudFunction>({
...gcfV2PollerOptions,
pollerName: `update-${endpoint.codebase}-${endpoint.region}-${endpoint.id}`,
operationResourceName: op.name,
onPoll: scraper.poller,
});
},
{ retryCodes: [...DEFAULT_RETRY_CODES, CLOUD_RUN_RESOURCE_EXHAUSTED_CODE] }
)
.catch(rethrowAs<gcfV2.OutputCloudFunction>(endpoint, "update"));
.catch((err: any) => {
scraper.abort();
logger.error((err as Error).message);
throw new reporter.DeploymentError(endpoint, "update", err);
});

endpoint.uri = resultFunction.serviceConfig?.uri;
const serviceName = resultFunction.serviceConfig?.service;
Expand Down
27 changes: 22 additions & 5 deletions src/deploy/functions/release/sourceTokenScraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { assertExhaustive } from "../../../functional";
import { logger } from "../../../logger";

type TokenFetchState = "NONE" | "FETCHING" | "VALID";
interface TokenFetchResult {
token?: string;
aborted: boolean;
}

/**
* GCF v1 deploys support reusing a build between function deploys.
Expand All @@ -11,8 +15,8 @@ type TokenFetchState = "NONE" | "FETCHING" | "VALID";
*/
export class SourceTokenScraper {
private tokenValidDurationMs;
private resolve!: (token?: string) => void;
private promise: Promise<string | undefined>;
private resolve!: (token: TokenFetchResult) => void;
private promise: Promise<TokenFetchResult>;
private expiry: number | undefined;
private fetchState: TokenFetchState;

Expand All @@ -22,19 +26,29 @@ export class SourceTokenScraper {
this.fetchState = "NONE";
}

abort(): void {
this.resolve({ aborted: true });
}

async getToken(): Promise<string | undefined> {
if (this.fetchState === "NONE") {
this.fetchState = "FETCHING";
return undefined;
} else if (this.fetchState === "FETCHING") {
return this.promise; // wait until we get a source token
const tokenResult = await this.promise;
if (tokenResult.aborted) {
this.promise = new Promise((resolve) => (this.resolve = resolve));
return undefined;
}
return tokenResult.token;
} else if (this.fetchState === "VALID") {
const tokenResult = await this.promise;
if (this.isTokenExpired()) {
this.fetchState = "FETCHING";
this.promise = new Promise((resolve) => (this.resolve = resolve));
return undefined;
}
return this.promise;
return tokenResult.token;
} else {
assertExhaustive(this.fetchState);
}
Expand All @@ -58,7 +72,10 @@ export class SourceTokenScraper {
const [, , , /* projects*/ /* project*/ /* regions*/ region] =
op.metadata?.target?.split("/") || [];
logger.debug(`Got source token ${op.metadata?.sourceToken} for region ${region as string}`);
this.resolve(op.metadata?.sourceToken);
this.resolve({
token: op.metadata?.sourceToken,
aborted: false,
});
this.fetchState = "VALID";
this.expiry = Date.now() + this.tokenValidDurationMs;
}
Expand Down
14 changes: 14 additions & 0 deletions src/gcp/cloudfunctionsv2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export interface BuildConfig {
runtime: runtimes.Runtime;
entryPoint: string;
source: Source;
sourceToken?: string;
environmentVariables: Record<string, string>;

// Output only
Expand Down Expand Up @@ -320,6 +321,11 @@ export async function createFunction(cloudFunction: InputCloudFunction): Promise
GOOGLE_NODE_RUN_SCRIPTS: "",
};

cloudFunction.serviceConfig.environmentVariables = {
...cloudFunction.serviceConfig.environmentVariables,
FUNCTION_TARGET: functionId,
};

try {
const res = await client.post<typeof cloudFunction, Operation>(
components.join("/"),
Expand Down Expand Up @@ -404,6 +410,8 @@ async function listFunctionsInternal(
* Customers can force a field to be deleted by setting that field to `undefined`
*/
export async function updateFunction(cloudFunction: InputCloudFunction): Promise<Operation> {
const components = cloudFunction.name.split("/");
const functionId = components.splice(-1, 1)[0];
// Keys in labels and environmentVariables and secretEnvironmentVariables are user defined, so we don't recurse
// for field masks.
const fieldMasks = proto.fieldMasks(
Expand All @@ -420,6 +428,12 @@ export async function updateFunction(cloudFunction: InputCloudFunction): Promise
GOOGLE_NODE_RUN_SCRIPTS: "",
};
fieldMasks.push("buildConfig.buildEnvironmentVariables");

cloudFunction.serviceConfig.environmentVariables = {
...cloudFunction.serviceConfig.environmentVariables,
FUNCTION_TARGET: functionId,
};

try {
const queryParams = {
updateMask: fieldMasks.join(","),
Expand Down
Loading

0 comments on commit d04add6

Please sign in to comment.