Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update incremental table execution to avoid create or replace #1712

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions cli/api/commands/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ export class Builder {
tableMetadata: dataform.ITableMetadata,
runConfig: dataform.IRunConfig
) {
if (table.protected && this.runConfig.fullRefresh) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this?

throw new Error("Protected datasets cannot be fully refreshed.");
}

return {
...this.toPartialExecutionAction(table),
type: "table",
Expand Down
45 changes: 31 additions & 14 deletions cli/api/dbadapters/execution_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ from (${query}) as insertions`;

if (table.enumType === dataform.TableType.INCREMENTAL) {
if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) {
tasks.add(Task.statement(this.createOrReplace(table)));
tasks.add(Task.statement(this.createTableStatement(table, runConfig)));
} else {
tasks.add(
Task.statement(
Expand All @@ -159,7 +159,7 @@ from (${query}) as insertions`;
);
}
} else {
tasks.add(Task.statement(this.createOrReplace(table)));
tasks.add(Task.statement(this.createTableStatement(table, runConfig)));
}

this.postOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));
Expand All @@ -182,7 +182,7 @@ from (${query}) as insertions`;
return `drop ${this.tableTypeAsSql(type)} if exists ${this.resolveTarget(target)}`;
}

private createOrReplace(table: dataform.ITable) {
private createTableStatement(table: dataform.ITable, runConfig: dataform.IRunConfig) {
const options = [];
if (table.bigquery && table.bigquery.partitionBy && table.bigquery.partitionExpirationDays) {
options.push(`partition_expiration_days=${table.bigquery.partitionExpirationDays}`);
Expand All @@ -196,17 +196,34 @@ from (${query}) as insertions`;
}
}

return `create or replace ${table.materialized ? "materialized " : ""}${this.tableTypeAsSql(
this.baseTableType(table.enumType)
)} ${this.resolveTarget(table.target)} ${
table.bigquery && table.bigquery.partitionBy
? `partition by ${table.bigquery.partitionBy} `
: ""
}${
table.bigquery && table.bigquery.clusterBy && table.bigquery.clusterBy.length > 0
? `cluster by ${table.bigquery.clusterBy.join(", ")} `
: ""
}${options.length > 0 ? `OPTIONS(${options.join(",")})` : ""}as ${table.query}`;
let protectTable = false;
if (table.enumType === dataform.TableType.INCREMENTAL) {
protectTable = table.protected || !runConfig.fullRefresh;
}

let statement = "create ";
if (!protectTable) {
statement += "or replace ";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the bug - I think we still want to create or replace in the case that it is incremental AND it's not projected AND it's full refresh?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a test for this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is projected? Have added the full refresh.

This is tested, but not obviously, across

const expectedExecutionActions: dataform.IExecutionAction[] = [

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*protected!

Copy link
Contributor Author

@Ekrekr Ekrekr Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right! Done.

I had to update the protected logic, such as removing the error that we test for here

test("trying to fully refresh a protected dataset fails", () => {
. But now it should act as the incremental spec suggests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, sorry I think this has got really confusing, a few thoughts.

  1. leave this method as createOrReplace and don't pass in run options or do all that logic.
  2. Add a createIfNotExists method next to it (they can share some private impl if you want, but I wouldn't bother)
  3. Update the logic in publishTasks, only for the incremental tables, to do what you want to do and call createIfNotExists by default, and createOrReplace only when !protected && fullRefresh

The original method was clear what it did, but now you've moved this logic into it, I'd say it's much harder to follow what's going on in publishTasks.

}
if (table.materialized) {
statement += "materialized ";
}
statement += `${this.tableTypeAsSql(this.baseTableType(table.enumType))} `;
if (protectTable) {
statement += "if not exists ";
}
statement += `${this.resolveTarget(table.target)} `;
if (table.bigquery && table.bigquery.partitionBy) {
statement += `partition by ${table.bigquery.partitionBy} `;
}
if (table.bigquery && table.bigquery.clusterBy && table.bigquery.clusterBy.length > 0) {
statement += `cluster by ${table.bigquery.clusterBy.join(", ")} `;
}
if (options.length > 0) {
statement += `OPTIONS(${options.join(",")})`;
}
statement += `as ${table.query}`;
return statement;
}

private createOrReplaceView(target: dataform.ITarget, query: string) {
Expand Down
2 changes: 1 addition & 1 deletion cli/api/dbadapters/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { dataform } from "df/protos/ts";

export function concatenateQueries(statements: string[], modifier?: (mod: string) => string) {
return statements
.filter(statement => !!statement)
.map(statement => statement.trim())
.filter(statement => !!statement)
.map(statement =>
statement.length > 0 && statement.charAt(statement.length - 1) === ";"
? statement.substring(0, statement.length - 1)
Expand Down
5 changes: 4 additions & 1 deletion core/compilers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ export function extractJsBlocks(code: string): { sql: string; js: string } {

return {
sql: cleanSql.trim(),
js: jsBlocks.map(block => block.trim()).join("\n")
js: jsBlocks
.map(block => block.trim())
.filter(block => !!block)
.join("\n")
};
}

Expand Down
172 changes: 122 additions & 50 deletions tests/api/api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,6 @@ suite("@dataform/api", () => {
}).to.throw();
});

test("trying to fully refresh a protected dataset fails", () => {
const testGraph = dataform.CompiledGraph.create(TEST_GRAPH);
testGraph.tables[0].protected = true;
const builder = new Builder(TEST_GRAPH, { fullRefresh: true }, TEST_STATE);
expect(() => builder.build()).to.throw();
});

test("action_types", () => {
const graph: dataform.ICompiledGraph = dataform.CompiledGraph.create({
projectConfig: { warehouse: "bigquery" },
Expand Down Expand Up @@ -233,7 +226,7 @@ suite("@dataform/api", () => {
expect(action.tasks).eql([
dataform.ExecutionTask.create({
type: "statement",
statement: "preOp\n;\ncreate or replace table `schema.a` as foo\n;\npostOp"
statement: "preOp\n;\ncreate table if not exists `schema.a` as foo\n;\npostOp"
})
]);
});
Expand Down Expand Up @@ -388,53 +381,132 @@ suite("@dataform/api", () => {
});

suite("sql_generating", () => {
test("bigquery_incremental", () => {
const graph = dataform.CompiledGraph.create({
projectConfig: { warehouse: "bigquery", defaultDatabase: "deeb", defaultLocation: "US" },
tables: [
{
target: {
schema: "schema",
name: "incremental"
},
type: "incremental",
query: "select 1 as test",
where: "true"
}
]
});
const state = dataform.WarehouseState.create({
tables: [
{
target: {
schema: "schema",
name: "incremental"
suite("incremental", () => {
test("simple action", () => {
const graph = dataform.CompiledGraph.create({
projectConfig: { warehouse: "bigquery", defaultDatabase: "deeb", defaultLocation: "US" },
tables: [
{
target: {
schema: "schema",
name: "incremental"
},
type: "incremental",
query: "select 1 as test",
where: "true"
},
type: dataform.TableMetadata.Type.TABLE,
fields: [
{
name: "existing_field"
}
]
}
]
});
const executionGraph = new Builder(graph, {}, state).build();
expect(
cleanSql(
executionGraph.actions.filter(
n => targetAsReadableString(n.target) === "schema.incremental"
)[0].tasks[0].statement
)
).equals(
cleanSql(
`insert into \`deeb.schema.incremental\` (\`existing_field\`)
{
target: {
schema: "schema",
name: "non_incremental"
},
type: "incremental",
query: "select 1 as test",
where: "true"
}
]
});
const state = dataform.WarehouseState.create({
tables: [
{
target: {
schema: "schema",
name: "incremental"
},
type: dataform.TableMetadata.Type.TABLE,
fields: [
{
name: "existing_field"
}
]
}
]
});

const executionGraph = new Builder(graph, {}, state).build();

expect(
cleanSql(
executionGraph.actions.filter(
n => targetAsReadableString(n.target) === "schema.incremental"
)[0].tasks[0].statement
)
).equals(
cleanSql(
`insert into \`deeb.schema.incremental\` (\`existing_field\`)
select \`existing_field\` from (
select * from (select 1 as test) as subquery
where true
) as insertions`
)
);
)
);
expect(
cleanSql(
executionGraph.actions.filter(
n => targetAsReadableString(n.target) === "schema.non_incremental"
)[0].tasks[0].statement
)
).equals(
cleanSql(`create table if not exists \`deeb.schema.non_incremental\` as select 1 as test`)
);
});

test("full refresh and protected", () => {
const graph = dataform.CompiledGraph.create({
projectConfig: { warehouse: "bigquery", defaultDatabase: "deeb", defaultLocation: "US" },
tables: [
{
target: {
schema: "schema",
name: "protected"
},
type: "incremental",
query: "select 1 as test",
protected: true
},
{
target: {
schema: "schema",
name: "not_protected"
},
type: "incremental",
query: "select 1 as test"
}
]
});
const state = dataform.WarehouseState.create({
tables: [
{
target: {
schema: "schema",
name: "not_protected"
},
type: dataform.TableMetadata.Type.TABLE
}
]
});

const executionGraph = new Builder(graph, { fullRefresh: true }, state).build();

expect(
cleanSql(
executionGraph.actions.filter(
n => targetAsReadableString(n.target) === "schema.protected"
)[0].tasks[0].statement
)
).equals(
cleanSql(`create table if not exists \`deeb.schema.protected\` as select 1 as test`)
);
expect(
cleanSql(
executionGraph.actions.filter(
n => targetAsReadableString(n.target) === "schema.not_protected"
)[0].tasks[0].statement
)
).equals(
cleanSql(`create or replace table \`deeb.schema.not_protected\` as select 1 as test`)
);
});
});

test("bigquery_materialized", () => {
Expand Down