Skip to content

Commit

Permalink
Soft Delete bug (#740)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 18, 2024
1 parent 9eef6e5 commit 6b979d7
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 13 deletions.
6 changes: 3 additions & 3 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,11 @@ MERGE INTO %s %s USING %s AS %s ON %s`,
if softDelete {
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`,
// WHEN MATCHED %sTHEN UPDATE SET %s
idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd),
// WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s)
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), strings.Join(sql.QuoteColumns(cols, bd), ","),
// WHEN NOT MATCHED THEN INSERT (%s)
strings.Join(sql.QuoteColumns(cols, bd), ","),
// VALUES (%s);
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, bd), ","),
)}, nil
Expand Down
4 changes: 2 additions & 2 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestBigQueryDialect_BuildMergeQueries_SoftDelete(t *testing.T) {
assert.Equal(t, []string{
"MERGE INTO customers.orders tgt USING {SUB_QUERY} AS stg ON tgt.`order_id` = stg.`order_id`",
"WHEN MATCHED THEN UPDATE SET `order_id`=stg.`order_id`,`name`=stg.`name`,`__artie_delete`=stg.`__artie_delete`",
"WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"},
"WHEN NOT MATCHED THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"},
strings.Split(strings.TrimSpace(statements[0]), "\n"))
}

Expand Down Expand Up @@ -295,7 +295,7 @@ func TestBigQueryDialect_BuildMergeQueries_IdempotentKey(t *testing.T) {
assert.Equal(t, []string{
"MERGE INTO customers.orders tgt USING {SUB_QUERY} AS stg ON tgt.`order_id` = stg.`order_id`",
"WHEN MATCHED AND stg.idempotent_key >= tgt.idempotent_key THEN UPDATE SET `order_id`=stg.`order_id`,`name`=stg.`name`,`__artie_delete`=stg.`__artie_delete`",
"WHEN NOT MATCHED AND IFNULL(stg.`__artie_delete`, false) = false THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"},
"WHEN NOT MATCHED THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"},
strings.Split(strings.TrimSpace(statements[0]), "\n"))
}

Expand Down
6 changes: 3 additions & 3 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ USING %s AS %s ON %s`,
if softDelete {
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`,
// WHEN MATCHED %sTHEN UPDATE SET %s
idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md),
// WHEN NOT MATCHED AND COALESCE(%s, 0) = 0 THEN INSERT (%s)
sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), strings.Join(sql.QuoteColumns(cols, md), ","),
// WHEN NOT MATCHED THEN INSERT (%s)
strings.Join(sql.QuoteColumns(cols, md), ","),
// VALUES (%s);
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, md), ","),
)}, nil
Expand Down
2 changes: 1 addition & 1 deletion clients/mssql/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,6 @@ WHEN NOT MATCHED AND COALESCE(stg."__artie_delete", 1) = 0 THEN INSERT ("id","ba
MERGE INTO database.schema.table tgt
USING {SUB_QUERY} AS stg ON tgt."id" = stg."id"
WHEN MATCHED THEN UPDATE SET "id"=stg."id","bar"=stg."bar","updated_at"=stg."updated_at","start"=stg."start","__artie_delete"=stg."__artie_delete"
WHEN NOT MATCHED AND COALESCE(stg."__artie_delete", 0) = 0 THEN INSERT ("id","bar","updated_at","start","__artie_delete") VALUES (stg."id",stg."bar",stg."updated_at",stg."start",stg."__artie_delete");`, queries[0])
WHEN NOT MATCHED THEN INSERT ("id","bar","updated_at","start","__artie_delete") VALUES (stg."id",stg."bar",stg."updated_at",stg."start",stg."__artie_delete");`, queries[0])
}
}
1 change: 1 addition & 0 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt
}
primaryKeys = append(primaryKeys, column)
}

if len(primaryKeys) == 0 {
return fmt.Errorf("primary keys cannot be empty")
}
Expand Down
4 changes: 2 additions & 2 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,11 @@ MERGE INTO %s %s USING ( %s ) AS %s ON %s`,
if softDelete {
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`,
// Update + Soft Deletion
idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, sd),
// Insert
sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), strings.Join(sql.QuoteColumns(cols, sd), ","),
strings.Join(sql.QuoteColumns(cols, sd), ","),
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, sd), ","),
)}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestSnowflakeDialect_BuildMergeQueries_SoftDelete(t *testing.T) {
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,__artie_delete from (values ('1', '456', '2001-02-03T04:05:06Z', false),('2', 'bb', '2001-02-03T04:05:06Z', true),('3', 'dd', '2001-02-03T04:05:06Z', false)) as _tbl(id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID"
WHEN MATCHED THEN UPDATE SET "ID"=stg."ID","__ARTIE_DELETE"=stg."__ARTIE_DELETE"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0])
WHEN NOT MATCHED THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0])
}
{
statements, err := SnowflakeDialect{}.BuildMergeQueries(
Expand All @@ -319,7 +319,7 @@ WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("I
assert.Equal(t, `
MERGE INTO database.schema.table tgt USING ( SELECT id,bar,updated_at,__artie_delete from (values ('1', '456', '2001-02-03T04:05:06Z', false),('2', 'bb', '2001-02-03T04:05:06Z', true),('3', 'dd', '2001-02-03T04:05:06Z', false)) as _tbl(id,bar,updated_at,__artie_delete) ) AS stg ON tgt."ID" = stg."ID"
WHEN MATCHED AND stg.updated_at >= tgt.updated_at THEN UPDATE SET "ID"=stg."ID","__ARTIE_DELETE"=stg."__ARTIE_DELETE"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0])
WHEN NOT MATCHED THEN INSERT ("ID","__ARTIE_DELETE") VALUES (stg."ID",stg."__ARTIE_DELETE");`, statements[0])
}
}

Expand Down

0 comments on commit 6b979d7

Please sign in to comment.