Skip to content

Commit

Permalink
Move concurrency and action retries from project config proto to the …
Browse files Browse the repository at this point in the history
…CLI (#1585)

* Move concurrency and action retries from project config proto to the CLI

* Fix typing

* Fix tests

* Fix optional
  • Loading branch information
Ekrekr committed Nov 24, 2023
1 parent d544851 commit 98d8195
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 43 deletions.
5 changes: 3 additions & 2 deletions cli/api/commands/run.ts
Expand Up @@ -26,7 +26,7 @@ export interface IExecutedAction {
}

export interface IExecutionOptions {
bigquery?: { jobPrefix?: string };
bigquery?: { jobPrefix?: string; actionRetryLimit?: number };
}

export function run(
Expand Down Expand Up @@ -337,6 +337,7 @@ export class Runner {
const taskStatus = await this.executeTask(client, task, actionResult, {
bigquery: {
labels: action.actionDescriptor?.bigqueryLabels,
actionRetryLimit: this.executionOptions?.bigquery?.actionRetryLimit,
jobPrefix: this.executionOptions?.bigquery?.jobPrefix
}
});
Expand Down Expand Up @@ -417,7 +418,7 @@ export class Runner {
rowLimit: 1,
bigquery: options.bigquery
}),
task.type === "operation" ? 1 : this.graph.projectConfig.idempotentActionRetries + 1 || 1
task.type === "operation" ? 1 : options.bigquery.actionRetryLimit + 1 || 1
);
taskResult.metadata = metadata;
if (task.type === "assertion") {
Expand Down
7 changes: 4 additions & 3 deletions cli/api/dbadapters/bigquery.ts
Expand Up @@ -29,13 +29,14 @@ const BIGQUERY_DATE_RELATED_FIELDS = [
const BIGQUERY_INTERNAL_ERROR_JOB_MAX_ATTEMPTS = 3;

export interface IBigQueryExecutionOptions {
actionRetryLimit?: number;
labels?: { [label: string]: string };
location?: string;
jobPrefix?: string;
}

export class BigQueryDbAdapter implements IDbAdapter {
public static async create(credentials: Credentials, options?: { concurrencyLimit?: number }) {
public static async create(credentials: Credentials, options?: { concurrencyLimit: number }) {
return new BigQueryDbAdapter(credentials, options);
}

Expand All @@ -44,12 +45,12 @@ export class BigQueryDbAdapter implements IDbAdapter {

private readonly clients = new Map<string, BigQuery>();

private constructor(credentials: Credentials, options?: { concurrencyLimit?: number }) {
private constructor(credentials: Credentials, options?: { concurrencyLimit: number }) {
this.bigQueryCredentials = credentials as dataform.IBigQuery;
// Bigquery allows 50 concurrent queries, and a rate limit of 100/user/second by default.
// These limits should be safely low enough for most projects.
this.pool = new PromisePoolExecutor({
concurrencyLimit: options?.concurrencyLimit || 16,
concurrencyLimit: options?.concurrencyLimit,
frequencyWindow: 1000,
frequencyLimit: 30
});
Expand Down
2 changes: 1 addition & 1 deletion cli/api/dbadapters/index.ts
Expand Up @@ -47,7 +47,7 @@ export interface IDbAdapter extends IDbClient {
}

interface ICredentialsOptions {
concurrencyLimit?: number;
concurrencyLimit: number;
disableSslForTestsOnly?: boolean;
}

Expand Down
58 changes: 41 additions & 17 deletions cli/index.ts
Expand Up @@ -209,6 +209,15 @@ const jobPrefixOption: INamedOption<yargs.Options> = {
}
};

const concurrencyQueryLimitOption: INamedOption<yargs.Options> = {
name: "concurrency-query-limit",
option: {
describe: "The limit for the number of concurrent queries that can be run against BigQuery.",
type: "number",
default: 16
}
};

const defaultDatabaseOptionName = "default-database";
const defaultLocationOptionName = "default-location";
const skipInstallOptionName = "skip-install";
Expand All @@ -223,6 +232,8 @@ const runTestsOptionName = "run-tests";
const schemaOptionName = "schema";
const tableOptionName = "table";

const actionRetryLimitName = "action-retry-limit";

const getCredentialsPath = (projectDir: string, credentialsPath: string) =>
actuallyResolve(credentialsPath || path.join(projectDir, CREDENTIALS_FILENAME));

Expand Down Expand Up @@ -493,7 +504,13 @@ export function runCli() {
format: `test [${projectDirMustExistOption.name}]`,
description: "Run the dataform project's unit tests on the configured data warehouse.",
positionalOptions: [projectDirMustExistOption],
options: [credentialsOption, varsOption, timeoutOption, trackOption],
options: [
concurrencyQueryLimitOption,
credentialsOption,
varsOption,
timeoutOption,
trackOption
],
processFn: async argv => {
print("Compiling...\n");
const compiledGraph = await compile({
Expand Down Expand Up @@ -523,7 +540,7 @@ export function runCli() {
const dbadapter = await dbadapters.create(
readCredentials,
compiledGraph.projectConfig.warehouse,
{ concurrencyLimit: compiledGraph.projectConfig.concurrentQueryLimit }
{ concurrencyLimit: argv[concurrencyQueryLimitOption.name] }
);
try {
const testResults = await test(dbadapter, compiledGraph.tests);
Expand Down Expand Up @@ -555,18 +572,27 @@ export function runCli() {
type: "boolean"
}
},
fullRefreshOption,
{
name: actionRetryLimitName,
option: {
describe: "If set, idempotent actions will be retried up to the limit.",
type: "number",
default: 0
}
},
actionsOption,
tagsOption,
concurrencyQueryLimitOption,
credentialsOption,
fullRefreshOption,
includeDepsOption,
includeDependentsOption,
schemaSuffixOverrideOption,
credentialsOption,
jobPrefixOption,
jsonOutputOption,
varsOption,
schemaSuffixOverrideOption,
tagsOption,
trackOption,
timeoutOption,
jobPrefixOption,
trackOption
varsOption
],
processFn: async argv => {
if (!argv[jsonOutputOption.name]) {
Expand Down Expand Up @@ -595,7 +621,7 @@ export function runCli() {
const dbadapter = await dbadapters.create(
readCredentials,
compiledGraph.projectConfig.warehouse,
{ concurrencyLimit: compiledGraph.projectConfig.concurrentQueryLimit }
{ concurrencyLimit: argv[concurrencyQueryLimitOption.name] }
);
try {
const executionGraph = await build(
Expand Down Expand Up @@ -634,13 +660,11 @@ export function runCli() {
if (!argv[jsonOutputOption.name]) {
print("Running...\n");
}
const runner = run(
dbadapter,
executionGraph,
argv[jobPrefixOption.name]
? { bigquery: { jobPrefix: argv[jobPrefixOption.name] } }
: {}
);
let bigqueryOptions: {} = { actionRetryLimit: argv[actionRetryLimitName] };
if (argv[jobPrefixOption.name]) {
bigqueryOptions = { ...bigqueryOptions, jobPrefix: argv[jobPrefixOption.name] };
}
const runner = run(dbadapter, executionGraph, bigqueryOptions);
process.on("SIGINT", () => {
runner.cancel();
});
Expand Down
6 changes: 1 addition & 5 deletions protos/core.proto
Expand Up @@ -23,11 +23,7 @@ message ProjectConfig {
string schema_suffix = 7;
string table_prefix = 11;

// TODO(ekrekr): Move these to be CLI flags instead.
int32 concurrent_query_limit = 13;
int32 idempotent_action_retries = 8;

reserved 3, 4, 6, 10, 12;
reserved 3, 4, 6, 8, 10, 12, 13;
}

message CompileConfig {
Expand Down
27 changes: 12 additions & 15 deletions tests/api/api.spec.ts
Expand Up @@ -1036,10 +1036,7 @@ suite("@dataform/api", () => {
suite("execute with retry", () => {
test("should fail when execution fails too many times for the retry setting", async () => {
const mockedDbAdapter = mock(BigQueryDbAdapter);
const NEW_TEST_GRAPH = {
...RUN_TEST_GRAPH,
projectConfig: { ...RUN_TEST_GRAPH.projectConfig, idempotentActionRetries: 1 }
};
const NEW_TEST_GRAPH = RUN_TEST_GRAPH;
when(mockedDbAdapter.createSchema(anyString(), anyString())).thenResolve(null);
when(
mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything())
Expand Down Expand Up @@ -1068,7 +1065,9 @@ suite("@dataform/api", () => {
mockDbAdapterInstance.withClientLock = async callback =>
await callback(mockDbAdapterInstance);

const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH);
const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH, {
bigquery: { actionRetryLimit: 1 }
});

expect(
dataform.RunResult.create(cleanTiming(await runner.execute().result())).toJSON()
Expand All @@ -1077,10 +1076,7 @@ suite("@dataform/api", () => {

test("should pass when execution fails initially, then passes with the number of allowed retries", async () => {
const mockedDbAdapter = mock(BigQueryDbAdapter);
const NEW_TEST_GRAPH = {
...RUN_TEST_GRAPH,
projectConfig: { ...RUN_TEST_GRAPH.projectConfig, idempotentActionRetries: 2 }
};
const NEW_TEST_GRAPH = RUN_TEST_GRAPH;
when(mockedDbAdapter.createSchema(anyString(), anyString())).thenResolve(null);
when(
mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything())
Expand Down Expand Up @@ -1109,7 +1105,9 @@ suite("@dataform/api", () => {
mockDbAdapterInstance.withClientLock = async callback =>
await callback(mockDbAdapterInstance);

const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH);
const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH, {
bigquery: { actionRetryLimit: 2 }
});

expect(
dataform.RunResult.create(cleanTiming(await runner.execute().result())).toJSON()
Expand All @@ -1135,10 +1133,7 @@ suite("@dataform/api", () => {

test("should not retry when the task is an operation", async () => {
const mockedDbAdapter = mock(BigQueryDbAdapter);
const NEW_TEST_GRAPH_WITH_OPERATION = {
...RUN_TEST_GRAPH,
projectConfig: { ...RUN_TEST_GRAPH.projectConfig, idempotentActionRetries: 3 }
};
const NEW_TEST_GRAPH_WITH_OPERATION = RUN_TEST_GRAPH;
NEW_TEST_GRAPH_WITH_OPERATION.actions[1].tasks[0].type = "operation";

when(mockedDbAdapter.createSchema(anyString(), anyString())).thenResolve(null);
Expand Down Expand Up @@ -1174,7 +1169,9 @@ suite("@dataform/api", () => {
mockDbAdapterInstance.withClientLock = async callback =>
await callback(mockDbAdapterInstance);

const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH_WITH_OPERATION);
const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH_WITH_OPERATION, {
bigquery: { actionRetryLimit: 3 }
});

expect(
dataform.RunResult.create(cleanTiming(await runner.execute().result())).toJSON()
Expand Down

0 comments on commit 98d8195

Please sign in to comment.