diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index 51afc8e6..5b61b709 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -245,11 +245,11 @@ MERGE INTO %s %s USING %s AS %s ON %s`, if softDelete { return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`, +WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, // Update + Soft Deletion idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd), // Insert - constants.StagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), strings.Join(sql.QuoteColumns(cols, bd), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ Vals: sql.QuoteColumns(cols, bd), Separator: ",", @@ -264,15 +264,15 @@ WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);` } return []string{baseQuery + fmt.Sprintf(` -WHEN MATCHED AND %s.%s THEN DELETE -WHEN MATCHED AND IFNULL(%s.%s, false) = false %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`, +WHEN MATCHED AND %s THEN DELETE +WHEN MATCHED AND IFNULL(%s, false) = false %sTHEN UPDATE SET %s +WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, // Delete - constants.StagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), // Update - constants.StagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd), // Insert - constants.StagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), strings.Join(sql.QuoteColumns(cols, bd), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ Vals: sql.QuoteColumns(cols, bd), Separator: ",", diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index f4a20378..0c2bc179 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -200,11 +200,11 @@ USING %s AS %s ON %s`, if softDelete { return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND COALESCE(%s.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, +WHEN NOT MATCHED AND COALESCE(%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, // Update + Soft Deletion idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md), // Insert - constants.StagingAlias, md.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, md), ","), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), strings.Join(sql.QuoteColumns(cols, md), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ Vals: sql.QuoteColumns(cols, md), Separator: ",", @@ -219,15 +219,15 @@ WHEN NOT MATCHED AND COALESCE(%s.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, } return []string{baseQuery + fmt.Sprintf(` -WHEN MATCHED AND %s.%s = 1 THEN DELETE -WHEN MATCHED AND COALESCE(%s.%s, 0) = 0 %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND COALESCE(%s.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, +WHEN MATCHED AND %s = 1 THEN DELETE +WHEN MATCHED AND COALESCE(%s, 0) = 0 %sTHEN UPDATE SET %s +WHEN NOT MATCHED AND COALESCE(%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete - constants.StagingAlias, md.QuoteIdentifier(constants.DeleteColumnMarker), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), // Update - constants.StagingAlias, md.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md), // Insert - constants.StagingAlias, md.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, md), ","), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), strings.Join(sql.QuoteColumns(cols, md), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ Vals: sql.QuoteColumns(cols, md), Separator: ",", diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index 29740384..9c0c7923 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -225,7 +225,7 @@ func (rd RedshiftDialect) buildMergeUpdateQuery( } if !softDelete { - clauses = append(clauses, fmt.Sprintf("COALESCE(%s.%s, false) = false", constants.StagingAlias, rd.QuoteIdentifier(constants.DeleteColumnMarker))) + clauses = append(clauses, fmt.Sprintf("COALESCE(%s, false) = false", sql.QuotedDeleteColumnMarker(constants.StagingAlias, rd))) } return fmt.Sprintf(`UPDATE %s AS %s SET %s FROM %s AS %s WHERE %s;`, @@ -237,7 +237,7 @@ func (rd RedshiftDialect) buildMergeUpdateQuery( } func (rd RedshiftDialect) buildMergeDeleteQuery(tableID sql.TableIdentifier, subQuery string, primaryKeys []columns.Column) string { - return fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s AS %s WHERE %s.%s = true);`, + return fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s AS %s WHERE %s = true);`, // DELETE from table where (pk_1, pk_2) tableID.FullyQualifiedName(), strings.Join(sql.QuoteColumns(primaryKeys, rd), ","), // IN (stg.pk_1, stg.pk_2) FROM staging @@ -245,7 +245,7 @@ func (rd RedshiftDialect) buildMergeDeleteQuery(tableID sql.TableIdentifier, sub Vals: sql.QuoteColumns(primaryKeys, rd), Separator: ",", Prefix: constants.StagingAlias + ".", - }), subQuery, constants.StagingAlias, constants.StagingAlias, rd.QuoteIdentifier(constants.DeleteColumnMarker), + }), subQuery, constants.StagingAlias, sql.QuotedDeleteColumnMarker(constants.StagingAlias, rd), ) } diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index fca0703a..65142488 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -226,11 +226,11 @@ MERGE INTO %s %s USING ( %s ) AS %s ON %s`, if softDelete { return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`, +WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, // Update + Soft Deletion idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, sd), // Insert - constants.StagingAlias, sd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, sd), ","), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), strings.Join(sql.QuoteColumns(cols, sd), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ Vals: sql.QuoteColumns(cols, sd), Separator: ",", @@ -245,15 +245,15 @@ WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);` } return []string{baseQuery + fmt.Sprintf(` -WHEN MATCHED AND %s.%s THEN DELETE -WHEN MATCHED AND IFNULL(%s.%s, false) = false %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`, +WHEN MATCHED AND %s THEN DELETE +WHEN MATCHED AND IFNULL(%s, false) = false %sTHEN UPDATE SET %s +WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, // Delete - constants.StagingAlias, sd.QuoteIdentifier(constants.DeleteColumnMarker), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), // Update - constants.StagingAlias, sd.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, sd), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, sd), // Insert - constants.StagingAlias, sd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, sd), ","), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, sd), strings.Join(sql.QuoteColumns(cols, sd), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ Vals: sql.QuoteColumns(cols, sd), Separator: ",", diff --git a/lib/sql/columns.go b/lib/sql/columns.go index e53570aa..f10d6a1a 100644 --- a/lib/sql/columns.go +++ b/lib/sql/columns.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) @@ -20,6 +21,10 @@ func QuoteTableAliasColumn(tableAlias constants.TableAlias, column columns.Colum return fmt.Sprintf("%s.%s", tableAlias, dialect.QuoteIdentifier(column.Name())) } +func QuotedDeleteColumnMarker(tableAlias constants.TableAlias, dialect Dialect) string { + return QuoteTableAliasColumn(tableAlias, columns.NewColumn(constants.DeleteColumnMarker, typing.Invalid), dialect) +} + // BuildColumnsUpdateFragment will parse the columns and return a string like: first_name=tgt."first_name",last_name=stg."last_name",email=tgt."email" // NOTE: This should only be used with valid columns. func BuildColumnsUpdateFragment(columns []columns.Column, stagingAlias, targetAlias constants.TableAlias, dialect Dialect) string { diff --git a/lib/sql/tests/columns_test.go b/lib/sql/tests/columns_test.go index e4bfe476..a2dc896e 100644 --- a/lib/sql/tests/columns_test.go +++ b/lib/sql/tests/columns_test.go @@ -35,6 +35,13 @@ func TestQuoteTableAliasColumn(t *testing.T) { assert.Equal(t, `tbl."COL"`, sql.QuoteTableAliasColumn("tbl", column, snowflakeDialect.SnowflakeDialect{})) } +func TestQuotedDeleteColumnMarker(t *testing.T) { + // BigQuery: + assert.Equal(t, "tbl.`__artie_delete`", sql.QuotedDeleteColumnMarker("tbl", bigqueryDialect.BigQueryDialect{})) + // Snowflake: + assert.Equal(t, `tbl."__ARTIE_DELETE"`, sql.QuotedDeleteColumnMarker("tbl", snowflakeDialect.SnowflakeDialect{})) +} + func TestBuildColumnsUpdateFragment_BigQuery(t *testing.T) { var lastCaseColTypes []columns.Column lastCaseCols := []string{"a1", "b2", "c3"}