Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Dialect methods to accept a TableIdentifier #625

Merged
merged 3 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (BigQueryDialect) IsTableDoesNotExistErr(err error) bool {
return false
}

func (BigQueryDialect) BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", fqTableName, strings.Join(colSQLParts, ","))
func (BigQueryDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))

if temporary {
return fmt.Sprintf(
Expand All @@ -137,8 +137,8 @@ func (BigQueryDialect) BuildCreateTableQuery(fqTableName string, temporary bool,
}
}

func (BigQueryDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", fqTableName, columnOp, colSQLPart)
func (BigQueryDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (BigQueryDialect) BuildProcessToastStructColExpression(colName string) string {
Expand Down
13 changes: 10 additions & 3 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
Expand Down Expand Up @@ -172,21 +173,27 @@ func TestBQExpiresDate(t *testing.T) {
}

func TestBigQueryDialect_BuildCreateTableQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")

// Temporary:
assert.Contains(t,
BigQueryDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}),
BigQueryDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}),
`CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) OPTIONS (expiration_timestamp = TIMESTAMP(`,
)
// Not temporary:
assert.Equal(t,
`CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2})`,
BigQueryDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}),
BigQueryDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}),
)
}

func TestBigQueryDialect_BuildAlterColumnQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")

assert.Equal(t,
"ALTER TABLE {TABLE} drop COLUMN {SQL_PART}",
BigQueryDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"),
BigQueryDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"),
)
}
8 changes: 4 additions & 4 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ func (MSSQLDialect) IsTableDoesNotExistErr(err error) bool {
return false
}

func (MSSQLDialect) BuildCreateTableQuery(fqTableName string, _ bool, colSQLParts []string) string {
func (MSSQLDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string {
// Microsoft SQL Server uses the same syntax for temporary and permanant tables.
// Microsoft SQL Server doesn't support IF NOT EXISTS
return fmt.Sprintf("CREATE TABLE %s (%s);", fqTableName, strings.Join(colSQLParts, ","))
return fmt.Sprintf("CREATE TABLE %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))
}

func (MSSQLDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string {
func (MSSQLDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
// Microsoft SQL Server doesn't support the COLUMN keyword
return fmt.Sprintf("ALTER TABLE %s %s %s", fqTableName, columnOp, colSQLPart)
return fmt.Sprintf("ALTER TABLE %s %s %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (MSSQLDialect) BuildProcessToastStructColExpression(colName string) string {
Expand Down
13 changes: 10 additions & 3 deletions clients/mssql/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
Expand Down Expand Up @@ -120,21 +121,27 @@ func TestMSSQLDialect_IsColumnAlreadyExistsErr(t *testing.T) {
}

func TestMSSQLDialect_BuildCreateTableQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")

// Temporary:
assert.Equal(t,
`CREATE TABLE {TABLE} ({PART_1},{PART_2});`,
MSSQLDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}),
MSSQLDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}),
)
// Not temporary:
assert.Equal(t,
`CREATE TABLE {TABLE} ({PART_1},{PART_2});`,
MSSQLDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}),
MSSQLDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}),
)
}

func TestMSSQLDialect_BuildAlterColumnQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")

assert.Equal(t,
"ALTER TABLE {TABLE} drop {SQL_PART}",
MSSQLDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"),
MSSQLDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"),
)
}
8 changes: 4 additions & 4 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ func (RedshiftDialect) IsTableDoesNotExistErr(err error) bool {
return false
}

func (RedshiftDialect) BuildCreateTableQuery(fqTableName string, _ bool, colSQLParts []string) string {
func (RedshiftDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string {
// Redshift uses the same syntax for temporary and permanant tables.
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", fqTableName, strings.Join(colSQLParts, ","))
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))
}

func (RedshiftDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", fqTableName, columnOp, colSQLPart)
func (RedshiftDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (RedshiftDialect) BuildProcessToastStructColExpression(colName string) string {
Expand Down
13 changes: 10 additions & 3 deletions clients/redshift/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
Expand Down Expand Up @@ -172,22 +173,28 @@ func TestRedshifgDialect_IsColumnAlreadyExistsErr(t *testing.T) {
}

func TestRedshiftDialect_BuildCreateTableQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")

// Temporary:
assert.Equal(t,
`CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2});`,
RedshiftDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}),
RedshiftDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}),
)
// Not temporary:
assert.Equal(t,
`CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2});`,
RedshiftDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}),
RedshiftDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}),
)
}

func TestRedshiftDialect_BuildAlterColumnQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")

assert.Equal(t,
"ALTER TABLE {TABLE} drop COLUMN {SQL_PART}",
RedshiftDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"),
RedshiftDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"),
)
}

Expand Down
8 changes: 4 additions & 4 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func (SnowflakeDialect) IsTableDoesNotExistErr(err error) bool {
return strings.Contains(err.Error(), "does not exist or not authorized")
}

func (SnowflakeDialect) BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", fqTableName, strings.Join(colSQLParts, ","))
func (SnowflakeDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))

if temporary {
// TEMPORARY Table syntax - https://docs.snowflake.com/en/sql-reference/sql/create-table
Expand All @@ -139,8 +139,8 @@ func (SnowflakeDialect) BuildCreateTableQuery(fqTableName string, temporary bool
}
}

func (SnowflakeDialect) BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", fqTableName, columnOp, colSQLPart)
func (SnowflakeDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (SnowflakeDialect) BuildProcessToastStructColExpression(colName string) string {
Expand Down
13 changes: 10 additions & 3 deletions clients/snowflake/dialect/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
Expand Down Expand Up @@ -217,21 +218,27 @@ func TestSnowflakeDialect_IsTableDoesNotExistErr(t *testing.T) {
}

func TestSnowflakeDialect_BuildCreateTableQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")

// Temporary:
assert.Equal(t,
`CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`,
SnowflakeDialect{}.BuildCreateTableQuery("{TABLE}", true, []string{"{PART_1}", "{PART_2}"}),
SnowflakeDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}),
)
// Not temporary:
assert.Equal(t,
`CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2})`,
SnowflakeDialect{}.BuildCreateTableQuery("{TABLE}", false, []string{"{PART_1}", "{PART_2}"}),
SnowflakeDialect{}.BuildCreateTableQuery(fakeTableID, false, []string{"{PART_1}", "{PART_2}"}),
)
}

func TestSnowflakeDialect_BuildAlterColumnQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")

assert.Equal(t,
"ALTER TABLE {TABLE} drop COLUMN {SQL_PART}",
SnowflakeDialect{}.BuildAlterColumnQuery("{TABLE}", constants.Delete, "{SQL_PART}"),
SnowflakeDialect{}.BuildAlterColumnQuery(fakeTableID, constants.Delete, "{SQL_PART}"),
)
}
6 changes: 2 additions & 4 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,12 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col
colSQLParts = append(colSQLParts, pkStatement)
}

fqTableName := a.TableID.FullyQualifiedName()

var alterStatements []string
if a.CreateTable {
alterStatements = []string{a.Dialect.BuildCreateTableQuery(fqTableName, a.TemporaryTable, colSQLParts)}
alterStatements = []string{a.Dialect.BuildCreateTableQuery(a.TableID, a.TemporaryTable, colSQLParts)}
} else {
for _, colSQLPart := range colSQLParts {
alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(fqTableName, a.ColumnOp, colSQLPart))
alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(a.TableID, a.ColumnOp, colSQLPart))
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/sql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Dialect interface {
KindForDataType(_type string, stringPrecision string) (typing.KindDetails, error)
IsColumnAlreadyExistsErr(err error) bool
IsTableDoesNotExistErr(err error) bool
BuildCreateTableQuery(fqTableName string, temporary bool, colSQLParts []string) string
BuildAlterColumnQuery(fqTableName string, columnOp constants.ColumnOperation, colSQLPart string) string
BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string
BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string
BuildProcessToastStructColExpression(colName string) string
BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string
}