Skip to content

Commit

Permalink
[sql] Kill Dialect.BuildProcessToastStructColExpression (#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 14, 2024
1 parent c503317 commit 740e3ee
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 67 deletions.
12 changes: 5 additions & 7 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,15 @@ func (BigQueryDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, column

func (bd BigQueryDialect) BuildProcessToastColExpression(column columns.Column) string {
colName := bd.QuoteIdentifier(column.Name())
if column.KindDetails == typing.Struct {
return fmt.Sprintf(`CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`,
colName, constants.ToastUnavailableValuePlaceholder,
colName, colName)
}
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}

func (bd BigQueryDialect) BuildProcessToastStructColExpression(column columns.Column) string {
colName := bd.QuoteIdentifier(column.Name())
return fmt.Sprintf(`CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`,
colName, constants.ToastUnavailableValuePlaceholder,
colName, colName)
}

func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd)

Expand Down
11 changes: 3 additions & 8 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,13 @@ func TestBigQueryDialect_BuildAlterColumnQuery(t *testing.T) {
}

func TestBigQueryDialect_BuildProcessToastColExpression(t *testing.T) {
assert.Equal(
t,
assert.Equal(t,
"CASE WHEN COALESCE(cc.`bar` != '__debezium_unavailable_value', true) THEN cc.`bar` ELSE c.`bar` END",
BigQueryDialect{}.BuildProcessToastColExpression(columns.NewColumn("bar", typing.Invalid)),
)
}

func TestBigQueryDialect_ProcessToastStructColExpression(t *testing.T) {
assert.Equal(
t,
assert.Equal(t,
"CASE WHEN COALESCE(TO_JSON_STRING(cc.`foo`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`foo` ELSE c.`foo` END",
BigQueryDialect{}.BuildProcessToastStructColExpression(columns.NewColumn("foo", typing.Invalid)),
BigQueryDialect{}.BuildProcessToastColExpression(columns.NewColumn("foo", typing.Struct)),
)
}

Expand Down
11 changes: 4 additions & 7 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,14 @@ func (MSSQLDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp
func (md MSSQLDialect) BuildProcessToastColExpression(column columns.Column) string {
colName := md.QuoteIdentifier(column.Name())
// Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement.
if column.KindDetails == typing.Struct {
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName,
constants.ToastUnavailableValuePlaceholder, colName, colName)
}

func (md MSSQLDialect) BuildProcessToastStructColExpression(column columns.Column) string {
colName := md.QuoteIdentifier(column.Name())
// Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement.
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}

func (MSSQLDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
panic("not implemented") // We don't currently support deduping for MS SQL.
}
Expand Down
11 changes: 3 additions & 8 deletions clients/mssql/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,13 @@ func TestMSSQLDialect_BuildAlterColumnQuery(t *testing.T) {
}

func TestMSSQLDialect_BuildProcessToastColExpression(t *testing.T) {
assert.Equal(
t,
assert.Equal(t,
`CASE WHEN COALESCE(cc."bar", '') != '__debezium_unavailable_value' THEN cc."bar" ELSE c."bar" END`,
MSSQLDialect{}.BuildProcessToastColExpression(columns.NewColumn("bar", typing.Invalid)),
)
}

func TestMSSQLDialect_BuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(
t,
assert.Equal(t,
`CASE WHEN COALESCE(cc."foo", {}) != {'key': '__debezium_unavailable_value'} THEN cc."foo" ELSE c."foo" END`,
MSSQLDialect{}.BuildProcessToastStructColExpression(columns.NewColumn("foo", typing.Invalid)),
MSSQLDialect{}.BuildProcessToastColExpression(columns.NewColumn("foo", typing.Struct)),
)
}

Expand Down
10 changes: 4 additions & 6 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,14 @@ func (RedshiftDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, column

func (rd RedshiftDialect) BuildProcessToastColExpression(column columns.Column) string {
colName := rd.QuoteIdentifier(column.Name())
if column.KindDetails == typing.Struct {
return fmt.Sprintf(`CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`,
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}

func (rd RedshiftDialect) BuildProcessToastStructColExpression(column columns.Column) string {
colName := rd.QuoteIdentifier(column.Name())
return fmt.Sprintf(`CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`,
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}

func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, rd)

Expand Down
11 changes: 3 additions & 8 deletions clients/redshift/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,13 @@ func TestQuoteIdentifiers(t *testing.T) {
}

func TestRedshiftDialect_BuildProcessToastColExpression(t *testing.T) {
assert.Equal(
t,
assert.Equal(t,
`CASE WHEN COALESCE(cc."bar" != '__debezium_unavailable_value', true) THEN cc."bar" ELSE c."bar" END`,
RedshiftDialect{}.BuildProcessToastColExpression(columns.NewColumn("bar", typing.Invalid)),
)
}

func TestRedshiftDialect_BuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(
t,
assert.Equal(t,
`CASE WHEN COALESCE(cc."foo" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."foo" ELSE c."foo" END`,
RedshiftDialect{}.BuildProcessToastStructColExpression(columns.NewColumn("foo", typing.Invalid)),
RedshiftDialect{}.BuildProcessToastColExpression(columns.NewColumn("foo", typing.Struct)),
)
}

Expand Down
10 changes: 4 additions & 6 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,14 @@ func (SnowflakeDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, colum

func (sd SnowflakeDialect) BuildProcessToastColExpression(column columns.Column) string {
colName := sd.QuoteIdentifier(column.Name())
if column.KindDetails == typing.Struct {
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}

func (sd SnowflakeDialect) BuildProcessToastStructColExpression(column columns.Column) string {
colName := sd.QuoteIdentifier(column.Name())
return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END",
colName, constants.ToastUnavailableValuePlaceholder, colName, colName)
}

func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, sd)

Expand Down
11 changes: 3 additions & 8 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,13 @@ func TestSnowflakeDialect_BuildAlterColumnQuery(t *testing.T) {
}

func TestSnowflakeDialect_BuildProcessToastColExpression(t *testing.T) {
assert.Equal(
t,
assert.Equal(t,
`CASE WHEN COALESCE(cc."BAR" != '__debezium_unavailable_value', true) THEN cc."BAR" ELSE c."BAR" END`,
SnowflakeDialect{}.BuildProcessToastColExpression(columns.NewColumn("bar", typing.Invalid)),
)
}

func TestSnowflakeDialect_BuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(
t,
assert.Equal(t,
`CASE WHEN COALESCE(cc."FOO" != {'key': '__debezium_unavailable_value'}, true) THEN cc."FOO" ELSE c."FOO" END`,
SnowflakeDialect{}.BuildProcessToastStructColExpression(columns.NewColumn("foo", typing.Invalid)),
SnowflakeDialect{}.BuildProcessToastColExpression(columns.NewColumn("foo", typing.Struct)),
)
}

Expand Down
9 changes: 1 addition & 8 deletions lib/sql/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

Expand All @@ -23,13 +22,7 @@ func BuildColumnsUpdateFragment(columns []columns.Column, dialect Dialect) strin
for _, column := range columns {
colName := dialect.QuoteIdentifier(column.Name())
if column.ToastColumn {
var colValue string
if column.KindDetails == typing.Struct {
colValue = dialect.BuildProcessToastStructColExpression(column)
} else {
colValue = dialect.BuildProcessToastColExpression(column)
}
cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue))
cols = append(cols, fmt.Sprintf("%s= %s", colName, dialect.BuildProcessToastColExpression(column)))
} else {
// This is to make it look like: objCol = cc.objCol
cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName))
Expand Down
1 change: 0 additions & 1 deletion lib/sql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type Dialect interface {
BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string
BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string
BuildProcessToastColExpression(column columns.Column) string
BuildProcessToastStructColExpression(column columns.Column) string
BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string
BuildMergeQueries(
tableID TableIdentifier,
Expand Down

0 comments on commit 740e3ee

Please sign in to comment.