From 642cb0a5b9911cae7275e280896f924462743d71 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Fri, 10 May 2024 22:30:58 -0700 Subject: [PATCH 1/3] Change `BuildAlterColumnQuery` to accept a `TableIdentifier` --- clients/bigquery/dialect/dialect.go | 4 ++-- clients/bigquery/dialect/dialect_test.go | 6 +++++- clients/mssql/dialect/dialect.go | 4 ++-- clients/mssql/dialect/dialect_test.go | 6 +++++- clients/redshift/dialect/dialect.go | 4 ++-- clients/redshift/dialect/dialect_test.go | 6 +++++- clients/snowflake/dialect/dialect.go | 4 ++-- clients/snowflake/dialect/snowflake_test.go | 6 +++++- lib/destination/ddl/ddl.go | 2 +- lib/sql/dialect.go | 2 +- 10 files changed, 30 insertions(+), 14 deletions(-) diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index 0173f51d3..e87f0417d 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -137,8 +137,8 @@ func (BigQueryDialect) BuildCreateTableQuery(fqTableName string, temporary bool, } } -func (BigQueryDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string { - return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", fqTableName, columnOp, colSQLPart) +func (BigQueryDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { + return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) } func (BigQueryDialect) BuildProcessToastStructColExpression(colName string) string { diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 89c341427..df7d67a0a 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -185,8 +186,11 @@ func TestBigQueryDialect_BuildCreateTableQuery(t *testing.T) { } func TestBigQueryDialect_BuildAlterColumnQuery(t *testing.T) { + fakeTableID := mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + assert.Equal(t, "ALTER TABLE {TABLE} drop COLUMN {SQL_PART}", - BigQueryDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"), + BigQueryDialect{}.BuildAlterColumnQuery(&fakeTableID, constants.Delete, "{SQL_PART}"), ) } diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index 7589ccd28..6b7b3181b 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -153,9 +153,9 @@ func (MSSQLDialect) BuildCreateTableQuery(fqTableName string, _ bool, colSQLPart return fmt.Sprintf("CREATE TABLE %s (%s);", fqTableName, strings.Join(colSQLParts, ",")) } -func (MSSQLDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string { +func (MSSQLDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { // Microsoft SQL Server doesn't support the COLUMN keyword - return fmt.Sprintf("ALTER TABLE %s %s %s", fqTableName, columnOp, colSQLPart) + return fmt.Sprintf("ALTER TABLE %s %s %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) } func (MSSQLDialect) BuildProcessToastStructColExpression(colName string) string { diff --git a/clients/mssql/dialect/dialect_test.go b/clients/mssql/dialect/dialect_test.go index b98dd10ec..404696fa6 100644 --- a/clients/mssql/dialect/dialect_test.go +++ b/clients/mssql/dialect/dialect_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -133,8 +134,11 @@ func TestMSSQLDialect_BuildCreateTableQuery(t *testing.T) { } func TestMSSQLDialect_BuildAlterColumnQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + assert.Equal(t, "ALTER TABLE {TABLE} drop {SQL_PART}", - MSSQLDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"), + MSSQLDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"), ) } diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index e25766ce2..c2eb90172 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -120,8 +120,8 @@ func (RedshiftDialect) BuildCreateTableQuery(fqTableName string, _ bool, colSQLP return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", fqTableName, strings.Join(colSQLParts, ",")) } -func (RedshiftDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string { - return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", fqTableName, columnOp, colSQLPart) +func (RedshiftDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { + return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) } func (RedshiftDialect) BuildProcessToastStructColExpression(colName string) string { diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index fdfe29b2e..c95baa93a 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -185,9 +186,12 @@ func TestRedshiftDialect_BuildCreateTableQuery(t *testing.T) { } func TestRedshiftDialect_BuildAlterColumnQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + assert.Equal(t, "ALTER TABLE {TABLE} drop COLUMN {SQL_PART}", - RedshiftDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"), + RedshiftDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"), ) } diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index c1f072942..41950a257 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -139,8 +139,8 @@ func (SnowflakeDialect) BuildCreateTableQuery(fqTableName string, temporary bool } } -func (SnowflakeDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string { - return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", fqTableName, columnOp, colSQLPart) +func (SnowflakeDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { + return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) } func (SnowflakeDialect) BuildProcessToastStructColExpression(colName string) string { diff --git a/clients/snowflake/dialect/snowflake_test.go b/clients/snowflake/dialect/snowflake_test.go index 80f2b0722..8956e0ad5 100644 --- a/clients/snowflake/dialect/snowflake_test.go +++ b/clients/snowflake/dialect/snowflake_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -230,8 +231,11 @@ func TestSnowflakeDialect_BuildCreateTableQuery(t *testing.T) { } func TestSnowflakeDialect_BuildAlterColumnQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + assert.Equal(t, "ALTER TABLE {TABLE} drop COLUMN {SQL_PART}", - SnowflakeDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"), + SnowflakeDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"), ) } diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 4011eac3f..883349d4d 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -123,7 +123,7 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col alterStatements = []string{a.Dialect.BuildCreateTableQuery(fqTableName, a.TemporaryTable, colSQLParts)} } else { for _, colSQLPart := range colSQLParts { - alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(fqTableName, a.ColumnOp, colSQLPart)) + alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(a.TableID, a.ColumnOp, colSQLPart)) } } diff --git a/lib/sql/dialect.go b/lib/sql/dialect.go index af0fb14f3..078632a10 100644 --- a/lib/sql/dialect.go +++ b/lib/sql/dialect.go @@ -21,7 +21,7 @@ type Dialect interface { IsColumnAlreadyExistsErr(err error) bool IsTableDoesNotExistErr(err error) bool BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string - BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string + BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string BuildProcessToastStructColExpression(colName string) string BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string } From e758f79e65c902b1def9419ca7b41cacdc83ba4f Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Fri, 10 May 2024 22:32:21 -0700 Subject: [PATCH 2/3] Move --- clients/bigquery/dialect/dialect_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index df7d67a0a..3f1fd2fd9 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -186,11 +186,11 @@ func TestBigQueryDialect_BuildCreateTableQuery(t *testing.T) { } func TestBigQueryDialect_BuildAlterColumnQuery(t *testing.T) { - fakeTableID := mocks.FakeTableIdentifier{} + fakeTableID := &mocks.FakeTableIdentifier{} fakeTableID.FullyQualifiedNameReturns("{TABLE}") assert.Equal(t, "ALTER TABLE {TABLE} drop COLUMN {SQL_PART}", - BigQueryDialect{}.BuildAlterColumnQuery(&fakeTableID, constants.Delete, "{SQL_PART}"), + BigQueryDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"), ) } From 2ea50d9312aaaa4e22ebe55f39620434338ee001 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Fri, 10 May 2024 22:36:44 -0700 Subject: [PATCH 3/3] More --- clients/bigquery/dialect/dialect.go | 4 ++-- clients/bigquery/dialect/dialect_test.go | 7 +++++-- clients/mssql/dialect/dialect.go | 4 ++-- clients/mssql/dialect/dialect_test.go | 7 +++++-- clients/redshift/dialect/dialect.go | 4 ++-- clients/redshift/dialect/dialect_test.go | 7 +++++-- clients/snowflake/dialect/dialect.go | 4 ++-- clients/snowflake/dialect/snowflake_test.go | 7 +++++-- lib/destination/ddl/ddl.go | 4 +--- lib/sql/dialect.go | 2 +- 10 files changed, 30 insertions(+), 20 deletions(-) diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index e87f0417d..d38dddd99 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -123,8 +123,8 @@ func (BigQueryDialect) IsTableDoesNotExistErr(err error) bool { return false } -func (BigQueryDialect) BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string { - query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", fqTableName, strings.Join(colSQLParts, ",")) +func (BigQueryDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string { + query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ",")) if temporary { return fmt.Sprintf( diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 3f1fd2fd9..7294a7531 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -173,15 +173,18 @@ func TestBQExpiresDate(t *testing.T) { } func TestBigQueryDialect_BuildCreateTableQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + // Temporary: assert.Contains(t, - BigQueryDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}), + BigQueryDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}), `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) OPTIONS (expiration_timestamp = TIMESTAMP(`, ) // Not temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2})`, - BigQueryDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}), + BigQueryDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}), ) } diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index 6b7b3181b..33d60c019 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -147,10 +147,10 @@ func (MSSQLDialect) IsTableDoesNotExistErr(err error) bool { return false } -func (MSSQLDialect) BuildCreateTableQuery(fqTableName string, _ bool, colSQLParts []string) string { +func (MSSQLDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string { // Microsoft SQL Server uses the same syntax for temporary and permanant tables. // Microsoft SQL Server doesn't support IF NOT EXISTS - return fmt.Sprintf("CREATE TABLE %s (%s);", fqTableName, strings.Join(colSQLParts, ",")) + return fmt.Sprintf("CREATE TABLE %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ",")) } func (MSSQLDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { diff --git a/clients/mssql/dialect/dialect_test.go b/clients/mssql/dialect/dialect_test.go index 404696fa6..305fc2772 100644 --- a/clients/mssql/dialect/dialect_test.go +++ b/clients/mssql/dialect/dialect_test.go @@ -121,15 +121,18 @@ func TestMSSQLDialect_IsColumnAlreadyExistsErr(t *testing.T) { } func TestMSSQLDialect_BuildCreateTableQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + // Temporary: assert.Equal(t, `CREATE TABLE {TABLE} ({PART_1},{PART_2});`, - MSSQLDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}), + MSSQLDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}), ) // Not temporary: assert.Equal(t, `CREATE TABLE {TABLE} ({PART_1},{PART_2});`, - MSSQLDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}), + MSSQLDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}), ) } diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index c2eb90172..b3442df65 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -115,9 +115,9 @@ func (RedshiftDialect) IsTableDoesNotExistErr(err error) bool { return false } -func (RedshiftDialect) BuildCreateTableQuery(fqTableName string, _ bool, colSQLParts []string) string { +func (RedshiftDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string { // Redshift uses the same syntax for temporary and permanant tables. - return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", fqTableName, strings.Join(colSQLParts, ",")) + return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ",")) } func (RedshiftDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index c95baa93a..fe0c64562 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -173,15 +173,18 @@ func TestRedshifgDialect_IsColumnAlreadyExistsErr(t *testing.T) { } func TestRedshiftDialect_BuildCreateTableQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + // Temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2});`, - RedshiftDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}), + RedshiftDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}), ) // Not temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2});`, - RedshiftDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}), + RedshiftDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}), ) } diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 41950a257..af76a22f7 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -126,8 +126,8 @@ func (SnowflakeDialect) IsTableDoesNotExistErr(err error) bool { return strings.Contains(err.Error(), "does not exist or not authorized") } -func (SnowflakeDialect) BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string { - query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", fqTableName, strings.Join(colSQLParts, ",")) +func (SnowflakeDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string { + query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ",")) if temporary { // TEMPORARY Table syntax - https://docs.snowflake.com/en/sql-reference/sql/create-table diff --git a/clients/snowflake/dialect/snowflake_test.go b/clients/snowflake/dialect/snowflake_test.go index 8956e0ad5..f5e1eef05 100644 --- a/clients/snowflake/dialect/snowflake_test.go +++ b/clients/snowflake/dialect/snowflake_test.go @@ -218,15 +218,18 @@ func TestSnowflakeDialect_IsTableDoesNotExistErr(t *testing.T) { } func TestSnowflakeDialect_BuildCreateTableQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + // Temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`, - SnowflakeDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}), + SnowflakeDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}), ) // Not temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2})`, - SnowflakeDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}), + SnowflakeDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}), ) } diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 883349d4d..45fb3970c 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -116,11 +116,9 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col colSQLParts = append(colSQLParts, pkStatement) } - fqTableName := a.TableID.FullyQualifiedName() - var alterStatements []string if a.CreateTable { - alterStatements = []string{a.Dialect.BuildCreateTableQuery(fqTableName, a.TemporaryTable, colSQLParts)} + alterStatements = []string{a.Dialect.BuildCreateTableQuery(a.TableID, a.TemporaryTable, colSQLParts)} } else { for _, colSQLPart := range colSQLParts { alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(a.TableID, a.ColumnOp, colSQLPart)) diff --git a/lib/sql/dialect.go b/lib/sql/dialect.go index 078632a10..cc5e599bc 100644 --- a/lib/sql/dialect.go +++ b/lib/sql/dialect.go @@ -20,7 +20,7 @@ type Dialect interface { KindForDataType(_type string, stringPrecision string) (typing.KindDetails, error) IsColumnAlreadyExistsErr(err error) bool IsTableDoesNotExistErr(err error) bool - BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string + BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string BuildProcessToastStructColExpression(colName string) string BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string