Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move concurrency and action retries from project config proto to the CLI #1585

Merged
merged 5 commits into from Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions cli/api/commands/run.ts
Expand Up @@ -26,7 +26,7 @@ export interface IExecutedAction {
}

export interface IExecutionOptions {
bigquery?: { jobPrefix?: string };
bigquery?: { jobPrefix?: string; actionRetryLimit?: number };
}

export function run(
Expand Down Expand Up @@ -337,6 +337,7 @@ export class Runner {
const taskStatus = await this.executeTask(client, task, actionResult, {
bigquery: {
labels: action.actionDescriptor?.bigqueryLabels,
actionRetryLimit: this.executionOptions?.bigquery?.actionRetryLimit,
jobPrefix: this.executionOptions?.bigquery?.jobPrefix
}
});
Expand Down Expand Up @@ -417,7 +418,7 @@ export class Runner {
rowLimit: 1,
bigquery: options.bigquery
}),
task.type === "operation" ? 1 : this.graph.projectConfig.idempotentActionRetries + 1 || 1
task.type === "operation" ? 1 : options.bigquery.actionRetryLimit + 1 || 1
);
taskResult.metadata = metadata;
if (task.type === "assertion") {
Expand Down
7 changes: 4 additions & 3 deletions cli/api/dbadapters/bigquery.ts
Expand Up @@ -29,13 +29,14 @@ const BIGQUERY_DATE_RELATED_FIELDS = [
const BIGQUERY_INTERNAL_ERROR_JOB_MAX_ATTEMPTS = 3;

export interface IBigQueryExecutionOptions {
actionRetryLimit?: number;
labels?: { [label: string]: string };
location?: string;
jobPrefix?: string;
}

export class BigQueryDbAdapter implements IDbAdapter {
public static async create(credentials: Credentials, options?: { concurrencyLimit?: number }) {
public static async create(credentials: Credentials, options?: { concurrencyLimit: number }) {
return new BigQueryDbAdapter(credentials, options);
}

Expand All @@ -44,12 +45,12 @@ export class BigQueryDbAdapter implements IDbAdapter {

private readonly clients = new Map<string, BigQuery>();

private constructor(credentials: Credentials, options?: { concurrencyLimit?: number }) {
private constructor(credentials: Credentials, options?: { concurrencyLimit: number }) {
this.bigQueryCredentials = credentials as dataform.IBigQuery;
// Bigquery allows 50 concurrent queries, and a rate limit of 100/user/second by default.
// These limits should be safely low enough for most projects.
this.pool = new PromisePoolExecutor({
concurrencyLimit: options?.concurrencyLimit || 16,
concurrencyLimit: options?.concurrencyLimit,
frequencyWindow: 1000,
frequencyLimit: 30
});
Expand Down
2 changes: 1 addition & 1 deletion cli/api/dbadapters/index.ts
Expand Up @@ -47,7 +47,7 @@ export interface IDbAdapter extends IDbClient {
}

interface ICredentialsOptions {
concurrencyLimit?: number;
concurrencyLimit: number;
disableSslForTestsOnly?: boolean;
}

Expand Down
58 changes: 41 additions & 17 deletions cli/index.ts
Expand Up @@ -209,6 +209,15 @@ const jobPrefixOption: INamedOption<yargs.Options> = {
}
};

const concurrencyQueryLimitOption: INamedOption<yargs.Options> = {
name: "concurrency-query-limit",
option: {
describe: "The limit for the number of concurrent queries that can be run against BigQuery.",
type: "number",
default: 16
}
};

const defaultDatabaseOptionName = "default-database";
const defaultLocationOptionName = "default-location";
const skipInstallOptionName = "skip-install";
Expand All @@ -223,6 +232,8 @@ const runTestsOptionName = "run-tests";
const schemaOptionName = "schema";
const tableOptionName = "table";

const actionRetryLimitName = "action-retry-limit";

const getCredentialsPath = (projectDir: string, credentialsPath: string) =>
actuallyResolve(credentialsPath || path.join(projectDir, CREDENTIALS_FILENAME));

Expand Down Expand Up @@ -493,7 +504,13 @@ export function runCli() {
format: `test [${projectDirMustExistOption.name}]`,
description: "Run the dataform project's unit tests on the configured data warehouse.",
positionalOptions: [projectDirMustExistOption],
options: [credentialsOption, varsOption, timeoutOption, trackOption],
options: [
concurrencyQueryLimitOption,
credentialsOption,
varsOption,
timeoutOption,
trackOption
],
processFn: async argv => {
print("Compiling...\n");
const compiledGraph = await compile({
Expand Down Expand Up @@ -523,7 +540,7 @@ export function runCli() {
const dbadapter = await dbadapters.create(
readCredentials,
compiledGraph.projectConfig.warehouse,
{ concurrencyLimit: compiledGraph.projectConfig.concurrentQueryLimit }
{ concurrencyLimit: argv[concurrencyQueryLimitOption.name] }
);
try {
const testResults = await test(dbadapter, compiledGraph.tests);
Expand Down Expand Up @@ -555,18 +572,27 @@ export function runCli() {
type: "boolean"
}
},
fullRefreshOption,
{
name: actionRetryLimitName,
option: {
describe: "If set, idempotent actions will be retried up to the limit.",
type: "number",
default: 0
}
},
actionsOption,
tagsOption,
concurrencyQueryLimitOption,
credentialsOption,
fullRefreshOption,
includeDepsOption,
includeDependentsOption,
schemaSuffixOverrideOption,
credentialsOption,
jobPrefixOption,
jsonOutputOption,
varsOption,
schemaSuffixOverrideOption,
tagsOption,
trackOption,
timeoutOption,
jobPrefixOption,
trackOption
varsOption
],
processFn: async argv => {
if (!argv[jsonOutputOption.name]) {
Expand Down Expand Up @@ -595,7 +621,7 @@ export function runCli() {
const dbadapter = await dbadapters.create(
readCredentials,
compiledGraph.projectConfig.warehouse,
{ concurrencyLimit: compiledGraph.projectConfig.concurrentQueryLimit }
{ concurrencyLimit: argv[concurrencyQueryLimitOption.name] }
);
try {
const executionGraph = await build(
Expand Down Expand Up @@ -634,13 +660,11 @@ export function runCli() {
if (!argv[jsonOutputOption.name]) {
print("Running...\n");
}
const runner = run(
dbadapter,
executionGraph,
argv[jobPrefixOption.name]
? { bigquery: { jobPrefix: argv[jobPrefixOption.name] } }
: {}
);
let bigqueryOptions: {} = { actionRetryLimit: argv[actionRetryLimitName] };
if (argv[jobPrefixOption.name]) {
bigqueryOptions = { ...bigqueryOptions, jobPrefix: argv[jobPrefixOption.name] };
}
const runner = run(dbadapter, executionGraph, bigqueryOptions);
process.on("SIGINT", () => {
runner.cancel();
});
Expand Down
6 changes: 1 addition & 5 deletions protos/core.proto
Expand Up @@ -23,11 +23,7 @@ message ProjectConfig {
string schema_suffix = 7;
string table_prefix = 11;

// TODO(ekrekr): Move these to be CLI flags instead.
int32 concurrent_query_limit = 13;
int32 idempotent_action_retries = 8;

reserved 3, 4, 6, 10, 12;
reserved 3, 4, 6, 8, 10, 12, 13;
}

message CompileConfig {
Expand Down
27 changes: 12 additions & 15 deletions tests/api/api.spec.ts
Expand Up @@ -1036,10 +1036,7 @@ suite("@dataform/api", () => {
suite("execute with retry", () => {
test("should fail when execution fails too many times for the retry setting", async () => {
const mockedDbAdapter = mock(BigQueryDbAdapter);
const NEW_TEST_GRAPH = {
...RUN_TEST_GRAPH,
projectConfig: { ...RUN_TEST_GRAPH.projectConfig, idempotentActionRetries: 1 }
};
const NEW_TEST_GRAPH = RUN_TEST_GRAPH;
when(mockedDbAdapter.createSchema(anyString(), anyString())).thenResolve(null);
when(
mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything())
Expand Down Expand Up @@ -1068,7 +1065,9 @@ suite("@dataform/api", () => {
mockDbAdapterInstance.withClientLock = async callback =>
await callback(mockDbAdapterInstance);

const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH);
const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH, {
bigquery: { actionRetryLimit: 1 }
});

expect(
dataform.RunResult.create(cleanTiming(await runner.execute().result())).toJSON()
Expand All @@ -1077,10 +1076,7 @@ suite("@dataform/api", () => {

test("should pass when execution fails initially, then passes with the number of allowed retries", async () => {
const mockedDbAdapter = mock(BigQueryDbAdapter);
const NEW_TEST_GRAPH = {
...RUN_TEST_GRAPH,
projectConfig: { ...RUN_TEST_GRAPH.projectConfig, idempotentActionRetries: 2 }
};
const NEW_TEST_GRAPH = RUN_TEST_GRAPH;
when(mockedDbAdapter.createSchema(anyString(), anyString())).thenResolve(null);
when(
mockedDbAdapter.execute(NEW_TEST_GRAPH.actions[0].tasks[0].statement, anything())
Expand Down Expand Up @@ -1109,7 +1105,9 @@ suite("@dataform/api", () => {
mockDbAdapterInstance.withClientLock = async callback =>
await callback(mockDbAdapterInstance);

const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH);
const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH, {
bigquery: { actionRetryLimit: 2 }
});

expect(
dataform.RunResult.create(cleanTiming(await runner.execute().result())).toJSON()
Expand All @@ -1135,10 +1133,7 @@ suite("@dataform/api", () => {

test("should not retry when the task is an operation", async () => {
const mockedDbAdapter = mock(BigQueryDbAdapter);
const NEW_TEST_GRAPH_WITH_OPERATION = {
...RUN_TEST_GRAPH,
projectConfig: { ...RUN_TEST_GRAPH.projectConfig, idempotentActionRetries: 3 }
};
const NEW_TEST_GRAPH_WITH_OPERATION = RUN_TEST_GRAPH;
NEW_TEST_GRAPH_WITH_OPERATION.actions[1].tasks[0].type = "operation";

when(mockedDbAdapter.createSchema(anyString(), anyString())).thenResolve(null);
Expand Down Expand Up @@ -1174,7 +1169,9 @@ suite("@dataform/api", () => {
mockDbAdapterInstance.withClientLock = async callback =>
await callback(mockDbAdapterInstance);

const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH_WITH_OPERATION);
const runner = new Runner(mockDbAdapterInstance, NEW_TEST_GRAPH_WITH_OPERATION, {
bigquery: { actionRetryLimit: 3 }
});

expect(
dataform.RunResult.create(cleanTiming(await runner.execute().result())).toJSON()
Expand Down