From 740e3eea2b290b8aa411ae9f0b214736f1ac3f82 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Mon, 13 May 2024 23:14:58 -0700 Subject: [PATCH] [sql] Kill `Dialect.BuildProcessToastStructColExpression` (#651) --- clients/bigquery/dialect/dialect.go | 12 +++++------- clients/bigquery/dialect/dialect_test.go | 11 +++-------- clients/mssql/dialect/dialect.go | 11 ++++------- clients/mssql/dialect/dialect_test.go | 11 +++-------- clients/redshift/dialect/dialect.go | 10 ++++------ clients/redshift/dialect/dialect_test.go | 11 +++-------- clients/snowflake/dialect/dialect.go | 10 ++++------ clients/snowflake/dialect/dialect_test.go | 11 +++-------- lib/sql/columns.go | 9 +-------- lib/sql/dialect.go | 1 - 10 files changed, 30 insertions(+), 67 deletions(-) diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index 97b64aa2f..f511e9bcf 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -146,17 +146,15 @@ func (BigQueryDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, column func (bd BigQueryDialect) BuildProcessToastColExpression(column columns.Column) string { colName := bd.QuoteIdentifier(column.Name()) + if column.KindDetails == typing.Struct { + return fmt.Sprintf(`CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, + colName, constants.ToastUnavailableValuePlaceholder, + colName, colName) + } return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } -func (bd BigQueryDialect) BuildProcessToastStructColExpression(column columns.Column) string { - colName := bd.QuoteIdentifier(column.Name()) - return fmt.Sprintf(`CASE WHEN COALESCE(TO_JSON_STRING(cc.%s) != '{"key":"%s"}', true) THEN cc.%s ELSE c.%s END`, - colName, constants.ToastUnavailableValuePlaceholder, - colName, colName) -} - func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd) diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 9a8ee684e..dde38e205 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -201,18 +201,13 @@ func TestBigQueryDialect_BuildAlterColumnQuery(t *testing.T) { } func TestBigQueryDialect_BuildProcessToastColExpression(t *testing.T) { - assert.Equal( - t, + assert.Equal(t, "CASE WHEN COALESCE(cc.`bar` != '__debezium_unavailable_value', true) THEN cc.`bar` ELSE c.`bar` END", BigQueryDialect{}.BuildProcessToastColExpression(columns.NewColumn("bar", typing.Invalid)), ) -} - -func TestBigQueryDialect_ProcessToastStructColExpression(t *testing.T) { - assert.Equal( - t, + assert.Equal(t, "CASE WHEN COALESCE(TO_JSON_STRING(cc.`foo`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`foo` ELSE c.`foo` END", - BigQueryDialect{}.BuildProcessToastStructColExpression(columns.NewColumn("foo", typing.Invalid)), + BigQueryDialect{}.BuildProcessToastColExpression(columns.NewColumn("foo", typing.Struct)), ) } diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index 8ecfafb68..2a51dee1f 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -164,17 +164,14 @@ func (MSSQLDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp func (md MSSQLDialect) BuildProcessToastColExpression(column columns.Column) string { colName := md.QuoteIdentifier(column.Name()) // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. + if column.KindDetails == typing.Struct { + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + } return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, '') != '%s' THEN cc.%s ELSE c.%s END", colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } -func (md MSSQLDialect) BuildProcessToastStructColExpression(column columns.Column) string { - colName := md.QuoteIdentifier(column.Name()) - // Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement. - return fmt.Sprintf("CASE WHEN COALESCE(cc.%s, {}) != {'key': '%s'} THEN cc.%s ELSE c.%s END", - colName, constants.ToastUnavailableValuePlaceholder, colName, colName) -} - func (MSSQLDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { panic("not implemented") // We don't currently support deduping for MS SQL. } diff --git a/clients/mssql/dialect/dialect_test.go b/clients/mssql/dialect/dialect_test.go index 003049cd0..5865bbed2 100644 --- a/clients/mssql/dialect/dialect_test.go +++ b/clients/mssql/dialect/dialect_test.go @@ -150,18 +150,13 @@ func TestMSSQLDialect_BuildAlterColumnQuery(t *testing.T) { } func TestMSSQLDialect_BuildProcessToastColExpression(t *testing.T) { - assert.Equal( - t, + assert.Equal(t, `CASE WHEN COALESCE(cc."bar", '') != '__debezium_unavailable_value' THEN cc."bar" ELSE c."bar" END`, MSSQLDialect{}.BuildProcessToastColExpression(columns.NewColumn("bar", typing.Invalid)), ) -} - -func TestMSSQLDialect_BuildProcessToastStructColExpression(t *testing.T) { - assert.Equal( - t, + assert.Equal(t, `CASE WHEN COALESCE(cc."foo", {}) != {'key': '__debezium_unavailable_value'} THEN cc."foo" ELSE c."foo" END`, - MSSQLDialect{}.BuildProcessToastStructColExpression(columns.NewColumn("foo", typing.Invalid)), + MSSQLDialect{}.BuildProcessToastColExpression(columns.NewColumn("foo", typing.Struct)), ) } diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index ab98661f2..b9bee17ad 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -129,16 +129,14 @@ func (RedshiftDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, column func (rd RedshiftDialect) BuildProcessToastColExpression(column columns.Column) string { colName := rd.QuoteIdentifier(column.Name()) + if column.KindDetails == typing.Struct { + return fmt.Sprintf(`CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + } return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } -func (rd RedshiftDialect) BuildProcessToastStructColExpression(column columns.Column) string { - colName := rd.QuoteIdentifier(column.Name()) - return fmt.Sprintf(`CASE WHEN COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true) THEN cc.%s ELSE c.%s END`, - colName, constants.ToastUnavailableValuePlaceholder, colName, colName) -} - func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, rd) diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index d57d18efe..21e9ae6ca 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -205,18 +205,13 @@ func TestQuoteIdentifiers(t *testing.T) { } func TestRedshiftDialect_BuildProcessToastColExpression(t *testing.T) { - assert.Equal( - t, + assert.Equal(t, `CASE WHEN COALESCE(cc."bar" != '__debezium_unavailable_value', true) THEN cc."bar" ELSE c."bar" END`, RedshiftDialect{}.BuildProcessToastColExpression(columns.NewColumn("bar", typing.Invalid)), ) -} - -func TestRedshiftDialect_BuildProcessToastStructColExpression(t *testing.T) { - assert.Equal( - t, + assert.Equal(t, `CASE WHEN COALESCE(cc."foo" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc."foo" ELSE c."foo" END`, - RedshiftDialect{}.BuildProcessToastStructColExpression(columns.NewColumn("foo", typing.Invalid)), + RedshiftDialect{}.BuildProcessToastColExpression(columns.NewColumn("foo", typing.Struct)), ) } diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 2b8edf4ec..fd13aae1a 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -148,16 +148,14 @@ func (SnowflakeDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, colum func (sd SnowflakeDialect) BuildProcessToastColExpression(column columns.Column) string { colName := sd.QuoteIdentifier(column.Name()) + if column.KindDetails == typing.Struct { + return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", + colName, constants.ToastUnavailableValuePlaceholder, colName, colName) + } return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != '%s', true) THEN cc.%s ELSE c.%s END", colName, constants.ToastUnavailableValuePlaceholder, colName, colName) } -func (sd SnowflakeDialect) BuildProcessToastStructColExpression(column columns.Column) string { - colName := sd.QuoteIdentifier(column.Name()) - return fmt.Sprintf("CASE WHEN COALESCE(cc.%s != {'key': '%s'}, true) THEN cc.%s ELSE c.%s END", - colName, constants.ToastUnavailableValuePlaceholder, colName, colName) -} - func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, sd) diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index 156de6962..896ff3ead 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -248,18 +248,13 @@ func TestSnowflakeDialect_BuildAlterColumnQuery(t *testing.T) { } func TestSnowflakeDialect_BuildProcessToastColExpression(t *testing.T) { - assert.Equal( - t, + assert.Equal(t, `CASE WHEN COALESCE(cc."BAR" != '__debezium_unavailable_value', true) THEN cc."BAR" ELSE c."BAR" END`, SnowflakeDialect{}.BuildProcessToastColExpression(columns.NewColumn("bar", typing.Invalid)), ) -} - -func TestSnowflakeDialect_BuildProcessToastStructColExpression(t *testing.T) { - assert.Equal( - t, + assert.Equal(t, `CASE WHEN COALESCE(cc."FOO" != {'key': '__debezium_unavailable_value'}, true) THEN cc."FOO" ELSE c."FOO" END`, - SnowflakeDialect{}.BuildProcessToastStructColExpression(columns.NewColumn("foo", typing.Invalid)), + SnowflakeDialect{}.BuildProcessToastColExpression(columns.NewColumn("foo", typing.Struct)), ) } diff --git a/lib/sql/columns.go b/lib/sql/columns.go index dc4a373b9..fdb33c16f 100644 --- a/lib/sql/columns.go +++ b/lib/sql/columns.go @@ -4,7 +4,6 @@ import ( "fmt" "strings" - "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) @@ -23,13 +22,7 @@ func BuildColumnsUpdateFragment(columns []columns.Column, dialect Dialect) strin for _, column := range columns { colName := dialect.QuoteIdentifier(column.Name()) if column.ToastColumn { - var colValue string - if column.KindDetails == typing.Struct { - colValue = dialect.BuildProcessToastStructColExpression(column) - } else { - colValue = dialect.BuildProcessToastColExpression(column) - } - cols = append(cols, fmt.Sprintf("%s= %s", colName, colValue)) + cols = append(cols, fmt.Sprintf("%s= %s", colName, dialect.BuildProcessToastColExpression(column))) } else { // This is to make it look like: objCol = cc.objCol cols = append(cols, fmt.Sprintf("%s=cc.%s", colName, colName)) diff --git a/lib/sql/dialect.go b/lib/sql/dialect.go index a0be91e67..09287dd6a 100644 --- a/lib/sql/dialect.go +++ b/lib/sql/dialect.go @@ -24,7 +24,6 @@ type Dialect interface { BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string BuildProcessToastColExpression(column columns.Column) string - BuildProcessToastStructColExpression(column columns.Column) string BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string BuildMergeQueries( tableID TableIdentifier,