diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index 0173f51d3..d38dddd99 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -123,8 +123,8 @@ func (BigQueryDialect) IsTableDoesNotExistErr(err error) bool { return false } -func (BigQueryDialect) BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string { - query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", fqTableName, strings.Join(colSQLParts, ",")) +func (BigQueryDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string { + query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ",")) if temporary { return fmt.Sprintf( @@ -137,8 +137,8 @@ func (BigQueryDialect) BuildCreateTableQuery(fqTableName string, temporary bool, } } -func (BigQueryDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string { - return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", fqTableName, columnOp, colSQLPart) +func (BigQueryDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { + return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) } func (BigQueryDialect) BuildProcessToastStructColExpression(colName string) string { diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 89c341427..7294a7531 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -172,21 +173,27 @@ func TestBQExpiresDate(t *testing.T) { } func TestBigQueryDialect_BuildCreateTableQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + // Temporary: assert.Contains(t, - BigQueryDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}), + BigQueryDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}), `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) OPTIONS (expiration_timestamp = TIMESTAMP(`, ) // Not temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2})`, - BigQueryDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}), + BigQueryDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}), ) } func TestBigQueryDialect_BuildAlterColumnQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + assert.Equal(t, "ALTER TABLE {TABLE} drop COLUMN {SQL_PART}", - BigQueryDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"), + BigQueryDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"), ) } diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index 7589ccd28..33d60c019 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -147,15 +147,15 @@ func (MSSQLDialect) IsTableDoesNotExistErr(err error) bool { return false } -func (MSSQLDialect) BuildCreateTableQuery(fqTableName string, _ bool, colSQLParts []string) string { +func (MSSQLDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string { // Microsoft SQL Server uses the same syntax for temporary and permanant tables. // Microsoft SQL Server doesn't support IF NOT EXISTS - return fmt.Sprintf("CREATE TABLE %s (%s);", fqTableName, strings.Join(colSQLParts, ",")) + return fmt.Sprintf("CREATE TABLE %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ",")) } -func (MSSQLDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string { +func (MSSQLDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { // Microsoft SQL Server doesn't support the COLUMN keyword - return fmt.Sprintf("ALTER TABLE %s %s %s", fqTableName, columnOp, colSQLPart) + return fmt.Sprintf("ALTER TABLE %s %s %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) } func (MSSQLDialect) BuildProcessToastStructColExpression(colName string) string { diff --git a/clients/mssql/dialect/dialect_test.go b/clients/mssql/dialect/dialect_test.go index b98dd10ec..305fc2772 100644 --- a/clients/mssql/dialect/dialect_test.go +++ b/clients/mssql/dialect/dialect_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -120,21 +121,27 @@ func TestMSSQLDialect_IsColumnAlreadyExistsErr(t *testing.T) { } func TestMSSQLDialect_BuildCreateTableQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + // Temporary: assert.Equal(t, `CREATE TABLE {TABLE} ({PART_1},{PART_2});`, - MSSQLDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}), + MSSQLDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}), ) // Not temporary: assert.Equal(t, `CREATE TABLE {TABLE} ({PART_1},{PART_2});`, - MSSQLDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}), + MSSQLDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}), ) } func TestMSSQLDialect_BuildAlterColumnQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + assert.Equal(t, "ALTER TABLE {TABLE} drop {SQL_PART}", - MSSQLDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"), + MSSQLDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"), ) } diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index e25766ce2..b3442df65 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -115,13 +115,13 @@ func (RedshiftDialect) IsTableDoesNotExistErr(err error) bool { return false } -func (RedshiftDialect) BuildCreateTableQuery(fqTableName string, _ bool, colSQLParts []string) string { +func (RedshiftDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string { // Redshift uses the same syntax for temporary and permanant tables. - return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", fqTableName, strings.Join(colSQLParts, ",")) + return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ",")) } -func (RedshiftDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string { - return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", fqTableName, columnOp, colSQLPart) +func (RedshiftDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { + return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) } func (RedshiftDialect) BuildProcessToastStructColExpression(colName string) string { diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index fdfe29b2e..fe0c64562 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -172,22 +173,28 @@ func TestRedshifgDialect_IsColumnAlreadyExistsErr(t *testing.T) { } func TestRedshiftDialect_BuildCreateTableQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + // Temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2});`, - RedshiftDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}), + RedshiftDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}), ) // Not temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2});`, - RedshiftDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}), + RedshiftDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}), ) } func TestRedshiftDialect_BuildAlterColumnQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + assert.Equal(t, "ALTER TABLE {TABLE} drop COLUMN {SQL_PART}", - RedshiftDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"), + RedshiftDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"), ) } diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index c1f072942..af76a22f7 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -126,8 +126,8 @@ func (SnowflakeDialect) IsTableDoesNotExistErr(err error) bool { return strings.Contains(err.Error(), "does not exist or not authorized") } -func (SnowflakeDialect) BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string { - query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", fqTableName, strings.Join(colSQLParts, ",")) +func (SnowflakeDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string { + query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ",")) if temporary { // TEMPORARY Table syntax - https://docs.snowflake.com/en/sql-reference/sql/create-table @@ -139,8 +139,8 @@ func (SnowflakeDialect) BuildCreateTableQuery(fqTableName string, temporary bool } } -func (SnowflakeDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string { - return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", fqTableName, columnOp, colSQLPart) +func (SnowflakeDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { + return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) } func (SnowflakeDialect) BuildProcessToastStructColExpression(colName string) string { diff --git a/clients/snowflake/dialect/snowflake_test.go b/clients/snowflake/dialect/snowflake_test.go index 80f2b0722..f5e1eef05 100644 --- a/clients/snowflake/dialect/snowflake_test.go +++ b/clients/snowflake/dialect/snowflake_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -217,21 +218,27 @@ func TestSnowflakeDialect_IsTableDoesNotExistErr(t *testing.T) { } func TestSnowflakeDialect_BuildCreateTableQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + // Temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`, - SnowflakeDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}), + SnowflakeDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}), ) // Not temporary: assert.Equal(t, `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2})`, - SnowflakeDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}), + SnowflakeDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}), ) } func TestSnowflakeDialect_BuildAlterColumnQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + assert.Equal(t, "ALTER TABLE {TABLE} drop COLUMN {SQL_PART}", - SnowflakeDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"), + SnowflakeDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"), ) } diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 4011eac3f..45fb3970c 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -116,14 +116,12 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col colSQLParts = append(colSQLParts, pkStatement) } - fqTableName := a.TableID.FullyQualifiedName() - var alterStatements []string if a.CreateTable { - alterStatements = []string{a.Dialect.BuildCreateTableQuery(fqTableName, a.TemporaryTable, colSQLParts)} + alterStatements = []string{a.Dialect.BuildCreateTableQuery(a.TableID, a.TemporaryTable, colSQLParts)} } else { for _, colSQLPart := range colSQLParts { - alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(fqTableName, a.ColumnOp, colSQLPart)) + alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(a.TableID, a.ColumnOp, colSQLPart)) } } diff --git a/lib/sql/dialect.go b/lib/sql/dialect.go index af0fb14f3..cc5e599bc 100644 --- a/lib/sql/dialect.go +++ b/lib/sql/dialect.go @@ -20,8 +20,8 @@ type Dialect interface { KindForDataType(_type string, stringPrecision string) (typing.KindDetails, error) IsColumnAlreadyExistsErr(err error) bool IsTableDoesNotExistErr(err error) bool - BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string - BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string + BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string + BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string BuildProcessToastStructColExpression(colName string) string BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string }