diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 434f263e..0fe35f6f 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -9,6 +9,7 @@ import ( "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/ext" "github.com/stretchr/testify/assert" ) @@ -205,3 +206,76 @@ func TestBuildProcessToastColExpression(t *testing.T) { func TestBuildProcessToastStructColExpression(t *testing.T) { assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, BigQueryDialect{}.BuildProcessToastStructColExpression("foo")) } + +func TestQuoteColumns(t *testing.T) { + assert.Equal(t, []string{}, columns.QuoteColumns(nil, BigQueryDialect{})) + cols := []columns.Column{columns.NewColumn("a", typing.Invalid), columns.NewColumn("b", typing.Invalid)} + assert.Equal(t, []string{"`a`", "`b`"}, columns.QuoteColumns(cols, BigQueryDialect{})) +} + +func TestBuildColumnsUpdateFragment(t *testing.T) { + var lastCaseColTypes []columns.Column + lastCaseCols := []string{"a1", "b2", "c3"} + for _, lastCaseCol := range lastCaseCols { + kd := typing.String + var toast bool + // a1 - struct + toast, b2 - string + toast, c3 = regular string. + if lastCaseCol == "a1" { + kd = typing.Struct + toast = true + } else if lastCaseCol == "b2" { + toast = true + } + + column := columns.NewColumn(lastCaseCol, kd) + column.ToastColumn = toast + lastCaseColTypes = append(lastCaseColTypes, column) + } + + var lastCaseEscapeTypes []columns.Column + lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"} + for _, lastCaseColEsc := range lastCaseColsEsc { + kd := typing.String + var toast bool + // a1 - struct + toast, b2 - string + toast, c3 = regular string. + if lastCaseColEsc == "a1" { + kd = typing.Struct + toast = true + } else if lastCaseColEsc == "b2" { + toast = true + } else if lastCaseColEsc == "start" { + kd = typing.Struct + toast = true + } + + column := columns.NewColumn(lastCaseColEsc, kd) + column.ToastColumn = toast + lastCaseEscapeTypes = append(lastCaseEscapeTypes, column) + } + + lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + key := `{"key":"__debezium_unavailable_value"}` + testCases := []struct { + name string + columns []columns.Column + expectedString string + }{ + { + name: "struct, string and toast string (bigquery)", + columns: lastCaseColTypes, + expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`", + }, + { + name: "struct, string and toast string (bigquery) w/ reserved keywords", + columns: lastCaseEscapeTypes, + expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", + key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"), + }, + } + + for _, _testCase := range testCases { + actualQuery := columns.BuildColumnsUpdateFragment(_testCase.columns, BigQueryDialect{}) + assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) + } +} diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index 595a7330..ffd7a13a 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -11,6 +11,7 @@ import ( "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" ) func TestRedshiftDialect_QuoteIdentifier(t *testing.T) { @@ -210,3 +211,52 @@ func TestBuildProcessToastColExpression(t *testing.T) { func TestBuildProcessToastStructColExpression(t *testing.T) { assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, RedshiftDialect{}.BuildProcessToastStructColExpression("foo")) } + +func TestBuildColumnsUpdateFragment(t *testing.T) { + var happyPathCols []columns.Column + for _, col := range []string{"foo", "bar"} { + column := columns.NewColumn(col, typing.String) + column.ToastColumn = false + happyPathCols = append(happyPathCols, column) + } + + var lastCaseColTypes []columns.Column + lastCaseCols := []string{"a1", "b2", "c3"} + for _, lastCaseCol := range lastCaseCols { + kd := typing.String + var toast bool + // a1 - struct + toast, b2 - string + toast, c3 = regular string. + if lastCaseCol == "a1" { + kd = typing.Struct + toast = true + } else if lastCaseCol == "b2" { + toast = true + } + + column := columns.NewColumn(lastCaseCol, kd) + column.ToastColumn = toast + lastCaseColTypes = append(lastCaseColTypes, column) + } + + testCases := []struct { + name string + columns []columns.Column + expectedString string + }{ + { + name: "happy path", + columns: happyPathCols, + expectedString: `"foo"=cc."foo","bar"=cc."bar"`, + }, + { + name: "struct, string and toast string", + columns: lastCaseColTypes, + expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`, + }, + } + + for _, _testCase := range testCases { + actualQuery := columns.BuildColumnsUpdateFragment(_testCase.columns, RedshiftDialect{}) + assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) + } +} diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index 3f842ac1..31685fe5 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -8,6 +8,7 @@ import ( "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/ext" "github.com/stretchr/testify/assert" ) @@ -250,3 +251,26 @@ func TestBuildProcessToastColExpression(t *testing.T) { func TestBuildProcessToastStructColExpression(t *testing.T) { assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, SnowflakeDialect{}.BuildProcessToastStructColExpression("foo")) } + +func TestQuoteColumns(t *testing.T) { + assert.Equal(t, []string{}, columns.QuoteColumns(nil, SnowflakeDialect{})) + cols := []columns.Column{columns.NewColumn("a", typing.Invalid), columns.NewColumn("b", typing.Invalid)} + assert.Equal(t, []string{`"A"`, `"B"`}, columns.QuoteColumns(cols, SnowflakeDialect{})) +} + +func TestBuildColumnsUpdateFragment(t *testing.T) { + var stringAndToastCols []columns.Column + for _, col := range []string{"foo", "bar"} { + var toastCol bool + if col == "foo" { + toastCol = true + } + + column := columns.NewColumn(col, typing.String) + column.ToastColumn = toastCol + stringAndToastCols = append(stringAndToastCols, column) + } + + actualQuery := columns.BuildColumnsUpdateFragment(stringAndToastCols, SnowflakeDialect{}) + assert.Equal(t, `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, actualQuery) +} diff --git a/lib/destination/dml/columns.go b/lib/destination/dml/columns.go index 335e2a9f..5103e494 100644 --- a/lib/destination/dml/columns.go +++ b/lib/destination/dml/columns.go @@ -1,33 +1 @@ package dml - -import ( - "fmt" - "strings" - - "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" -) - -// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email -// NOTE: This should only be used with valid columns. -func buildColumnsUpdateFragment(columns []columns.Column, dialect sql.Dialect) string { - var cols []string - for _, column := range columns { - colName := dialect.QuoteIdentifier(column.Name()) - if column.ToastColumn { - var colValue string - if column.KindDetails == typing.Struct { - colValue = dialect.BuildProcessToastStructColExpression(colName) - } else { - colValue = dialect.BuildProcessToastColExpression(colName) - } - cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue)) - } else { - // This is to make it look like: objCol = cc.objCol - cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName)) - } - } - - return strings.Join(cols, ",") -} diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go deleted file mode 100644 index 5ca77e80..00000000 --- a/lib/destination/dml/columns_test.go +++ /dev/null @@ -1,136 +0,0 @@ -package dml - -import ( - "fmt" - "testing" - - bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" - redshiftDialect "github.com/artie-labs/transfer/clients/redshift/dialect" - snowflakeDialect "github.com/artie-labs/transfer/clients/snowflake/dialect" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/stretchr/testify/assert" -) - -func TestQuoteColumns(t *testing.T) { - assert.Equal(t, []string{}, columns.QuoteColumns(nil, bigQueryDialect.BigQueryDialect{})) - assert.Equal(t, []string{}, columns.QuoteColumns(nil, snowflakeDialect.SnowflakeDialect{})) - - cols := []columns.Column{columns.NewColumn("a", typing.Invalid), columns.NewColumn("b", typing.Invalid)} - assert.Equal(t, []string{"`a`", "`b`"}, columns.QuoteColumns(cols, bigQueryDialect.BigQueryDialect{})) - assert.Equal(t, []string{`"A"`, `"B"`}, columns.QuoteColumns(cols, snowflakeDialect.SnowflakeDialect{})) -} - -func TestBuildColumnsUpdateFragment(t *testing.T) { - type testCase struct { - name string - columns []columns.Column - expectedString string - dialect sql.Dialect - } - - fooBarCols := []string{"foo", "bar"} - - var ( - happyPathCols []columns.Column - stringAndToastCols []columns.Column - lastCaseColTypes []columns.Column - lastCaseEscapeTypes []columns.Column - ) - for _, col := range fooBarCols { - column := columns.NewColumn(col, typing.String) - column.ToastColumn = false - happyPathCols = append(happyPathCols, column) - } - for _, col := range fooBarCols { - var toastCol bool - if col == "foo" { - toastCol = true - } - - column := columns.NewColumn(col, typing.String) - column.ToastColumn = toastCol - stringAndToastCols = append(stringAndToastCols, column) - } - - lastCaseCols := []string{"a1", "b2", "c3"} - for _, lastCaseCol := range lastCaseCols { - kd := typing.String - var toast bool - // a1 - struct + toast, b2 - string + toast, c3 = regular string. - if lastCaseCol == "a1" { - kd = typing.Struct - toast = true - } else if lastCaseCol == "b2" { - toast = true - } - - column := columns.NewColumn(lastCaseCol, kd) - column.ToastColumn = toast - lastCaseColTypes = append(lastCaseColTypes, column) - } - - lastCaseColsEsc := []string{"a1", "b2", "c3", "start", "select"} - for _, lastCaseColEsc := range lastCaseColsEsc { - kd := typing.String - var toast bool - // a1 - struct + toast, b2 - string + toast, c3 = regular string. - if lastCaseColEsc == "a1" { - kd = typing.Struct - toast = true - } else if lastCaseColEsc == "b2" { - toast = true - } else if lastCaseColEsc == "start" { - kd = typing.Struct - toast = true - } - - column := columns.NewColumn(lastCaseColEsc, kd) - column.ToastColumn = toast - lastCaseEscapeTypes = append(lastCaseEscapeTypes, column) - } - - lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - - key := `{"key":"__debezium_unavailable_value"}` - testCases := []testCase{ - { - name: "happy path", - columns: happyPathCols, - dialect: redshiftDialect.RedshiftDialect{}, - expectedString: `"foo"=cc."foo","bar"=cc."bar"`, - }, - { - name: "string and toast", - columns: stringAndToastCols, - dialect: snowflakeDialect.SnowflakeDialect{}, - expectedString: `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, - }, - { - name: "struct, string and toast string", - columns: lastCaseColTypes, - dialect: redshiftDialect.RedshiftDialect{}, - expectedString: `"a1"= CASE WHEN COALESCE(cc."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."a1" ELSE c."a1" END,"b2"= CASE WHEN COALESCE(cc."b2" != '__debezium_unavailable_value', true) THEN cc."b2" ELSE c."b2" END,"c3"=cc."c3"`, - }, - { - name: "struct, string and toast string (bigquery)", - columns: lastCaseColTypes, - dialect: bigQueryDialect.BigQueryDialect{}, - expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`", - }, - { - name: "struct, string and toast string (bigquery) w/ reserved keywords", - columns: lastCaseEscapeTypes, - dialect: bigQueryDialect.BigQueryDialect{}, - expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", - key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"), - }, - } - - for _, _testCase := range testCases { - actualQuery := buildColumnsUpdateFragment(_testCase.columns, _testCase.dialect) - assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) - } -} diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 251b0d3f..e04292ce 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -108,7 +108,7 @@ func (m *MergeArgument) buildRedshiftUpdateQuery(cols []columns.Column) string { return fmt.Sprintf(`UPDATE %s AS c SET %s FROM %s AS cc WHERE %s;`, // UPDATE table set col1 = cc. col1 - m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(cols, m.Dialect), + m.TableID.FullyQualifiedName(), columns.BuildColumnsUpdateFragment(cols, m.Dialect), // FROM staging WHERE join on PK(s) m.SubQuery, strings.Join(clauses, " AND "), ) @@ -209,7 +209,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect), + idempotentClause, columns.BuildColumnsUpdateFragment(m.Columns, m.Dialect), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(columns.QuoteColumns(m.Columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -234,7 +234,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(cols, m.Dialect), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, columns.BuildColumnsUpdateFragment(cols, m.Dialect), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(columns.QuoteColumns(cols, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -266,7 +266,7 @@ WHEN MATCHED %sTHEN UPDATE SET %s WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, buildColumnsUpdateFragment(m.Columns, m.Dialect), + idempotentClause, columns.BuildColumnsUpdateFragment(m.Columns, m.Dialect), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(columns.QuoteColumns(m.Columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -292,7 +292,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Delete m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), // Update - m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, buildColumnsUpdateFragment(cols, m.Dialect), + m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, columns.BuildColumnsUpdateFragment(cols, m.Dialect), // Insert m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(columns.QuoteColumns(cols, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index ad4bc1ed..ea3a22c7 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -1,6 +1,7 @@ package columns import ( + "fmt" "slices" "strings" "sync" @@ -264,3 +265,26 @@ func RemoveDeleteColumnMarker(cols []Column) ([]Column, bool) { cols = slices.DeleteFunc(slices.Clone(cols), func(col Column) bool { return col.Name() == constants.DeleteColumnMarker }) return cols, len(cols) != origLength } + +// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email +// NOTE: This should only be used with valid columns. +func BuildColumnsUpdateFragment(columns []Column, dialect sql.Dialect) string { + var cols []string + for _, column := range columns { + colName := dialect.QuoteIdentifier(column.Name()) + if column.ToastColumn { + var colValue string + if column.KindDetails == typing.Struct { + colValue = dialect.BuildProcessToastStructColExpression(colName) + } else { + colValue = dialect.BuildProcessToastColExpression(colName) + } + cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue)) + } else { + // This is to make it look like: objCol = cc.objCol + cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName)) + } + } + + return strings.Join(cols, ",") +}