Skip to content

Commit

Permalink
[sql] Add QuotedDeleteColumnMarker function (#685)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 16, 2024
1 parent 8a39e45 commit 7118f6b
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 27 deletions.
16 changes: 8 additions & 8 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ MERGE INTO %s %s USING %s AS %s ON %s`,
if softDelete {
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
// Update + Soft Deletion
idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd),
// Insert
constants.StagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), strings.Join(sql.QuoteColumns(cols, bd), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(cols, bd),
Separator: ",",
Expand All @@ -264,15 +264,15 @@ WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`
}

return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED AND %s.%s THEN DELETE
WHEN MATCHED AND IFNULL(%s.%s, false) = false %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
WHEN MATCHED AND %s THEN DELETE
WHEN MATCHED AND IFNULL(%s, false) = false %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
// Delete
constants.StagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd),
// Update
constants.StagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd),
// Insert
constants.StagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), strings.Join(sql.QuoteColumns(cols, bd), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(cols, bd),
Separator: ",",
Expand Down
16 changes: 8 additions & 8 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,11 @@ USING %s AS %s ON %s`,
if softDelete {
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(%s.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
WHEN NOT MATCHED AND COALESCE(%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
// Update + Soft Deletion
idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md),
// Insert
constants.StagingAlias, md.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, md), ","),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), strings.Join(sql.QuoteColumns(cols, md), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(cols, md),
Separator: ",",
Expand All @@ -219,15 +219,15 @@ WHEN NOT MATCHED AND COALESCE(%s.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
}

return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED AND %s.%s = 1 THEN DELETE
WHEN MATCHED AND COALESCE(%s.%s, 0) = 0 %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(%s.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`,
WHEN MATCHED AND %s = 1 THEN DELETE
WHEN MATCHED AND COALESCE(%s, 0) = 0 %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`,
// Delete
constants.StagingAlias, md.QuoteIdentifier(constants.DeleteColumnMarker),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, md),
// Update
constants.StagingAlias, md.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md),
// Insert
constants.StagingAlias, md.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, md), ","),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), strings.Join(sql.QuoteColumns(cols, md), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(cols, md),
Separator: ",",
Expand Down
6 changes: 3 additions & 3 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (rd RedshiftDialect) buildMergeUpdateQuery(
}

if !softDelete {
clauses = append(clauses, fmt.Sprintf("COALESCE(%s.%s, false) = false", constants.StagingAlias, rd.QuoteIdentifier(constants.DeleteColumnMarker)))
clauses = append(clauses, fmt.Sprintf("COALESCE(%s, false) = false", sql.QuotedDeleteColumnMarker(constants.StagingAlias, rd)))
}

return fmt.Sprintf(`UPDATE %s AS %s SET %s FROM %s AS %s WHERE %s;`,
Expand All @@ -237,15 +237,15 @@ func (rd RedshiftDialect) buildMergeUpdateQuery(
}

func (rd RedshiftDialect) buildMergeDeleteQuery(tableID sql.TableIdentifier, subQuery string, primaryKeys []columns.Column) string {
return fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s AS %s WHERE %s.%s = true);`,
return fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s AS %s WHERE %s = true);`,
// DELETE from table where (pk_1, pk_2)
tableID.FullyQualifiedName(), strings.Join(sql.QuoteColumns(primaryKeys, rd), ","),
// IN (stg.pk_1, stg.pk_2) FROM staging
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(primaryKeys, rd),
Separator: ",",
Prefix: constants.StagingAlias + ".",
}), subQuery, constants.StagingAlias, constants.StagingAlias, rd.QuoteIdentifier(constants.DeleteColumnMarker),
}), subQuery, constants.StagingAlias, sql.QuotedDeleteColumnMarker(constants.StagingAlias, rd),
)
}

Expand Down
16 changes: 8 additions & 8 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ MERGE INTO %s %s USING ( %s ) AS %s ON %s`,
if softDelete {
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
// Update + Soft Deletion
idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, sd),
// Insert
constants.StagingAlias, sd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, sd), ","),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), strings.Join(sql.QuoteColumns(cols, sd), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(cols, sd),
Separator: ",",
Expand All @@ -245,15 +245,15 @@ WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`
}

return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED AND %s.%s THEN DELETE
WHEN MATCHED AND IFNULL(%s.%s, false) = false %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
WHEN MATCHED AND %s THEN DELETE
WHEN MATCHED AND IFNULL(%s, false) = false %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
// Delete
constants.StagingAlias, sd.QuoteIdentifier(constants.DeleteColumnMarker),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd),
// Update
constants.StagingAlias, sd.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, sd),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, sd),
// Insert
constants.StagingAlias, sd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, sd), ","),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), strings.Join(sql.QuoteColumns(cols, sd), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(cols, sd),
Separator: ",",
Expand Down
5 changes: 5 additions & 0 deletions lib/sql/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

Expand All @@ -20,6 +21,10 @@ func QuoteTableAliasColumn(tableAlias constants.TableAlias, column columns.Colum
return fmt.Sprintf("%s.%s", tableAlias, dialect.QuoteIdentifier(column.Name()))
}

func QuotedDeleteColumnMarker(tableAlias constants.TableAlias, dialect Dialect) string {
return QuoteTableAliasColumn(tableAlias, columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid), dialect)
}

// BuildColumnsUpdateFragment will parse the columns and return a string like: first_name=tgt."first_name",last_name=stg."last_name",email=tgt."email"
// NOTE: This should only be used with valid columns.
func BuildColumnsUpdateFragment(columns []columns.Column, stagingAlias, targetAlias constants.TableAlias, dialect Dialect) string {
Expand Down
7 changes: 7 additions & 0 deletions lib/sql/tests/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ func TestQuoteTableAliasColumn(t *testing.T) {
assert.Equal(t, `tbl."COL"`, sql.QuoteTableAliasColumn("tbl", column, snowflakeDialect.SnowflakeDialect{}))
}

func TestQuotedDeleteColumnMarker(t *testing.T) {
// BigQuery:
assert.Equal(t, "tbl.`__artie_delete`", sql.QuotedDeleteColumnMarker("tbl", bigqueryDialect.BigQueryDialect{}))
// Snowflake:
assert.Equal(t, `tbl."__ARTIE_DELETE"`, sql.QuotedDeleteColumnMarker("tbl", snowflakeDialect.SnowflakeDialect{}))
}

func TestBuildColumnsUpdateFragment_BigQuery(t *testing.T) {
var lastCaseColTypes []columns.Column
lastCaseCols := []string{"a1", "b2", "c3"}
Expand Down

0 comments on commit 7118f6b

Please sign in to comment.