diff --git a/cli/api/commands/run.ts b/cli/api/commands/run.ts index 2793542e8..089efd191 100644 --- a/cli/api/commands/run.ts +++ b/cli/api/commands/run.ts @@ -26,7 +26,7 @@ export interface IExecutedAction { } export interface IExecutionOptions { - bigquery?: { jobPrefix?: string }; + bigquery?: { jobPrefix?: string; actionRetryLimit?: number }; } export function run( @@ -337,6 +337,7 @@ export class Runner { const taskStatus = await this.executeTask(client, task, actionResult, { bigquery: { labels: action.actionDescriptor?.bigqueryLabels, + actionRetryLimit: this.executionOptions?.bigquery?.actionRetryLimit, jobPrefix: this.executionOptions?.bigquery?.jobPrefix } }); @@ -417,7 +418,7 @@ export class Runner { rowLimit: 1, bigquery: options.bigquery }), - task.type === "operation" ? 1 : this.graph.projectConfig.idempotentActionRetries + 1 || 1 + task.type === "operation" ? 1 : options.bigquery.actionRetryLimit + 1 || 1 ); taskResult.metadata = metadata; if (task.type === "assertion") { diff --git a/cli/api/dbadapters/bigquery.ts b/cli/api/dbadapters/bigquery.ts index 97bea55b7..d2971ac38 100644 --- a/cli/api/dbadapters/bigquery.ts +++ b/cli/api/dbadapters/bigquery.ts @@ -29,13 +29,14 @@ const BIGQUERY_DATE_RELATED_FIELDS = [ const BIGQUERY_INTERNAL_ERROR_JOB_MAX_ATTEMPTS = 3; export interface IBigQueryExecutionOptions { + actionRetryLimit?: number; labels?: { [label: string]: string }; location?: string; jobPrefix?: string; } export class BigQueryDbAdapter implements IDbAdapter { - public static async create(credentials: Credentials, options?: { concurrencyLimit?: number }) { + public static async create(credentials: Credentials, options?: { concurrencyLimit: number }) { return new BigQueryDbAdapter(credentials, options); } @@ -44,12 +45,12 @@ export class BigQueryDbAdapter implements IDbAdapter { private readonly clients = new Map(); - private constructor(credentials: Credentials, options?: { concurrencyLimit?: number }) { + private constructor(credentials: Credentials, options?: { concurrencyLimit: number }) { this.bigQueryCredentials = credentials as dataform.IBigQuery; // Bigquery allows 50 concurrent queries, and a rate limit of 100/user/second by default. // These limits should be safely low enough for most projects. this.pool = new PromisePoolExecutor({ - concurrencyLimit: options?.concurrencyLimit || 16, + concurrencyLimit: options?.concurrencyLimit, frequencyWindow: 1000, frequencyLimit: 30 }); diff --git a/cli/api/dbadapters/index.ts b/cli/api/dbadapters/index.ts index 94de7e6a7..6b3b21a83 100644 --- a/cli/api/dbadapters/index.ts +++ b/cli/api/dbadapters/index.ts @@ -47,7 +47,7 @@ export interface IDbAdapter extends IDbClient { } interface ICredentialsOptions { - concurrencyLimit?: number; + concurrencyLimit: number; disableSslForTestsOnly?: boolean; } diff --git a/cli/index.ts b/cli/index.ts index 20b7ae10b..e201d5dcf 100644 --- a/cli/index.ts +++ b/cli/index.ts @@ -209,6 +209,15 @@ const jobPrefixOption: INamedOption = { } }; +const concurrencyQueryLimitOption: INamedOption = { + name: "concurrency-query-limit", + option: { + describe: "The limit for the number of concurrent queries that can be run against BigQuery.", + type: "number", + default: 16 + } +}; + const defaultDatabaseOptionName = "default-database"; const defaultLocationOptionName = "default-location"; const skipInstallOptionName = "skip-install"; @@ -223,6 +232,8 @@ const runTestsOptionName = "run-tests"; const schemaOptionName = "schema"; const tableOptionName = "table"; +const actionRetryLimitName = "action-retry-limit"; + const getCredentialsPath = (projectDir: string, credentialsPath: string) => actuallyResolve(credentialsPath || path.join(projectDir, CREDENTIALS_FILENAME)); @@ -493,7 +504,13 @@ export function runCli() { format: `test [${projectDirMustExistOption.name}]`, description: "Run the dataform project's unit tests on the configured data warehouse.", positionalOptions: [projectDirMustExistOption], - options: [credentialsOption, varsOption, timeoutOption, trackOption], + options: [ + concurrencyQueryLimitOption, + credentialsOption, + varsOption, + timeoutOption, + trackOption + ], processFn: async argv => { print("Compiling...\n"); const compiledGraph = await compile({ @@ -523,7 +540,7 @@ export function runCli() { const dbadapter = await dbadapters.create( readCredentials, compiledGraph.projectConfig.warehouse, - { concurrencyLimit: compiledGraph.projectConfig.concurrentQueryLimit } + { concurrencyLimit: argv[concurrencyQueryLimitOption.name] } ); try { const testResults = await test(dbadapter, compiledGraph.tests); @@ -555,18 +572,27 @@ export function runCli() { type: "boolean" } }, - fullRefreshOption, + { + name: actionRetryLimitName, + option: { + describe: "If set, idempotent actions will be retried up to the limit.", + type: "number", + default: 0 + } + }, actionsOption, - tagsOption, + concurrencyQueryLimitOption, + credentialsOption, + fullRefreshOption, includeDepsOption, includeDependentsOption, - schemaSuffixOverrideOption, - credentialsOption, + jobPrefixOption, jsonOutputOption, - varsOption, + schemaSuffixOverrideOption, + tagsOption, + trackOption, timeoutOption, - jobPrefixOption, - trackOption + varsOption ], processFn: async argv => { if (!argv[jsonOutputOption.name]) { @@ -595,7 +621,7 @@ export function runCli() { const dbadapter = await dbadapters.create( readCredentials, compiledGraph.projectConfig.warehouse, - { concurrencyLimit: compiledGraph.projectConfig.concurrentQueryLimit } + { concurrencyLimit: argv[concurrencyQueryLimitOption.name] } ); try { const executionGraph = await build( @@ -634,13 +660,11 @@ export function runCli() { if (!argv[jsonOutputOption.name]) { print("Running...\n"); } - const runner = run( - dbadapter, - executionGraph, - argv[jobPrefixOption.name] - ? { bigquery: { jobPrefix: argv[jobPrefixOption.name] } } - : {} - ); + let bigqueryOptions: {} = { actionRetryLimit: argv[actionRetryLimitName] }; + if (argv[jobPrefixOption.name]) { + bigqueryOptions = { ...bigqueryOptions, jobPrefix: argv[jobPrefixOption.name] }; + } + const runner = run(dbadapter, executionGraph, bigqueryOptions); process.on("SIGINT", () => { runner.cancel(); }); diff --git a/protos/core.proto b/protos/core.proto index b01e59036..a2733c938 100644 --- a/protos/core.proto +++ b/protos/core.proto @@ -23,11 +23,7 @@ message ProjectConfig { string schema_suffix = 7; string table_prefix = 11; - // TODO(ekrekr): Move these to be CLI flags instead. - int32 concurrent_query_limit = 13; - int32 idempotent_action_retries = 8; - - reserved 3, 4, 6, 10, 12; + reserved 3, 4, 6, 8, 10, 12, 13; } message CompileConfig { diff --git a/tests/api/api.spec.ts b/tests/api/api.spec.ts index c1c32e659..07c7149fd 100644 --- a/tests/api/api.spec.ts +++ b/tests/api/api.spec.ts @@ -1036,10 +1036,7 @@ suite("@dataform/api", () => { suite("execute with retry", () => { test("should fail when execution fails too many times for the retry setting", async () => { const mockedDbAdapter = mock(BigQueryDbAdapter); - const NEW_TEST_GRAPH = { - ...RUN_TEST_GRAPH, - projectConfig: { ...RUN_TEST_GRAPH.projectConfig, idempotentActionRetries: 1 } - }; + const NEW_TEST_GRAPH = RUN_TEST_GRAPH; when(mockedDbAdapter.createSchema(anyString(), anyString())).thenResolve(null); when( mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything()) @@ -1068,7 +1065,9 @@ suite("@dataform/api", () => { mockDbAdapterInstance.withClientLock = async callback => await callback(mockDbAdapterInstance); - const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH); + const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH, { + bigquery: { actionRetryLimit: 1 } + }); expect( dataform.RunResult.create(cleanTiming(await runner.execute().result())).toJSON() @@ -1077,10 +1076,7 @@ suite("@dataform/api", () => { test("should pass when execution fails initially, then passes with the number of allowed retries", async () => { const mockedDbAdapter = mock(BigQueryDbAdapter); - const NEW_TEST_GRAPH = { - ...RUN_TEST_GRAPH, - projectConfig: { ...RUN_TEST_GRAPH.projectConfig, idempotentActionRetries: 2 } - }; + const NEW_TEST_GRAPH = RUN_TEST_GRAPH; when(mockedDbAdapter.createSchema(anyString(), anyString())).thenResolve(null); when( mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything()) @@ -1109,7 +1105,9 @@ suite("@dataform/api", () => { mockDbAdapterInstance.withClientLock = async callback => await callback(mockDbAdapterInstance); - const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH); + const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH, { + bigquery: { actionRetryLimit: 2 } + }); expect( dataform.RunResult.create(cleanTiming(await runner.execute().result())).toJSON() @@ -1135,10 +1133,7 @@ suite("@dataform/api", () => { test("should not retry when the task is an operation", async () => { const mockedDbAdapter = mock(BigQueryDbAdapter); - const NEW_TEST_GRAPH_WITH_OPERATION = { - ...RUN_TEST_GRAPH, - projectConfig: { ...RUN_TEST_GRAPH.projectConfig, idempotentActionRetries: 3 } - }; + const NEW_TEST_GRAPH_WITH_OPERATION = RUN_TEST_GRAPH; NEW_TEST_GRAPH_WITH_OPERATION.actions[1].tasks[0].type = "operation"; when(mockedDbAdapter.createSchema(anyString(), anyString())).thenResolve(null); @@ -1174,7 +1169,9 @@ suite("@dataform/api", () => { mockDbAdapterInstance.withClientLock = async callback => await callback(mockDbAdapterInstance); - const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH_WITH_OPERATION); + const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH_WITH_OPERATION, { + bigquery: { actionRetryLimit: 3 } + }); expect( dataform.RunResult.create(cleanTiming(await runner.execute().result())).toJSON()