Skip to content

Commit

Permalink
Refactor dedupe signature (#730)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 17, 2024
1 parent 50ca572 commit 61b279c
Show file tree
Hide file tree
Showing 13 changed files with 28 additions and 36 deletions.
4 changes: 2 additions & 2 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableI
return nil
}

func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error {
func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig)
dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt)

defer func() { _ = ddl.DropTemporaryTable(s, stagingTableID, false) }()

Expand Down
9 changes: 4 additions & 5 deletions clients/bigquery/bigquery_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/stringutil"
)

Expand All @@ -21,7 +20,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("project12", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{})
parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand All @@ -39,7 +38,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("project12", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand All @@ -57,7 +56,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("project123", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{})
parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand All @@ -75,7 +74,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("project123", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand Down
5 changes: 2 additions & 3 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
Expand Down Expand Up @@ -166,11 +165,11 @@ func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, pri
)
}

func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd)

orderColsToIterate := primaryKeysEscaped
if topicConfig.IncludeArtieUpdatedAt {
if includeArtieUpdatedAt {
orderColsToIterate = append(orderColsToIterate, bd.QuoteIdentifier(constants.UpdateColumnMarker))
}

Expand Down
3 changes: 1 addition & 2 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
Expand Down Expand Up @@ -173,7 +172,7 @@ func (MSSQLDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKe
panic("not implemented")
}

func (MSSQLDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
func (MSSQLDialect) BuildDedupeQueries(_, _ sql.TableIdentifier, _ []string, _ bool) []string {
panic("not implemented") // We don't currently support deduping for MS SQL.
}

Expand Down
2 changes: 1 addition & 1 deletion clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *Store) Sweep() error {
return shared.Sweep(s, tcs, queryFunc)
}

func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ kafkalib.TopicConfig) error {
func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ bool) error {
return nil // dedupe is not necessary for MS SQL
}

Expand Down
5 changes: 2 additions & 3 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
Expand Down Expand Up @@ -139,11 +138,11 @@ func (rd RedshiftDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, _ [
return fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, tableID.FullyQualifiedName())
}

func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, rd)

orderColsToIterate := primaryKeysEscaped
if topicConfig.IncludeArtieUpdatedAt {
if includeArtieUpdatedAt {
orderColsToIterate = append(orderColsToIterate, rd.QuoteIdentifier(constants.UpdateColumnMarker))
}

Expand Down
4 changes: 2 additions & 2 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ WHERE
return shared.Sweep(s, tcs, queryFunc)
}

func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error {
func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig)
dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt)
return destination.ExecStatements(s, dedupeQueries)
}

Expand Down
9 changes: 4 additions & 5 deletions clients/redshift/redshift_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/artie-labs/transfer/clients/redshift/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/stretchr/testify/assert"
)
Expand All @@ -17,7 +16,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() {
tableID := NewTableIdentifier("public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{})
parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false)
assert.Len(r.T(), parts, 3)
assert.Equal(
r.T(),
Expand All @@ -32,7 +31,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() {
tableID := NewTableIdentifier("public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true)
assert.Len(r.T(), parts, 3)
assert.Equal(
r.T(),
Expand All @@ -47,7 +46,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() {
tableID := NewTableIdentifier("public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{})
parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false)
assert.Len(r.T(), parts, 3)
assert.Equal(
r.T(),
Expand All @@ -62,7 +61,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() {
tableID := NewTableIdentifier("public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true)
assert.Len(r.T(), parts, 3)
assert.Equal(
r.T(),
Expand Down
5 changes: 2 additions & 3 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
Expand Down Expand Up @@ -157,11 +156,11 @@ func (SnowflakeDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, prima
panic("not implemented")
}

func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, sd)

orderColsToIterate := primaryKeysEscaped
if topicConfig.IncludeArtieUpdatedAt {
if includeArtieUpdatedAt {
orderColsToIterate = append(orderColsToIterate, sd.QuoteIdentifier(constants.UpdateColumnMarker))
}

Expand Down
4 changes: 2 additions & 2 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (s *Store) reestablishConnection() error {

// Dedupe takes a table and will remove duplicates based on the primary key(s).
// These queries are inspired and modified from: https://stackoverflow.com/a/71515946
func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error {
func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig)
dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt)
return destination.ExecStatements(s, dedupeQueries)
}

Expand Down
9 changes: 4 additions & 5 deletions clients/snowflake/snowflake_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/clients/snowflake/dialect"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/stretchr/testify/assert"
)
Expand All @@ -18,7 +17,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("db", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{})
parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand All @@ -33,7 +32,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("db", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand All @@ -48,7 +47,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("db", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{})
parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand All @@ -63,7 +62,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("db", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true)
assert.Len(t, parts, 3)
assert.Equal(
t,
Expand Down
2 changes: 1 addition & 1 deletion lib/destination/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type DataWarehouse interface {
Dialect() sqllib.Dialect
Merge(tableData *optimization.TableData) error
Append(tableData *optimization.TableData) error
Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error
Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error
Exec(query string, args ...any) (sql.Result, error)
Query(query string, args ...any) (*sql.Rows, error)
Begin() (*sql.Tx, error)
Expand Down
3 changes: 1 addition & 2 deletions lib/sql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sql

import (
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)
Expand All @@ -25,7 +24,7 @@ type Dialect interface {
BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string
BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string
BuildDedupeTableQuery(tableID TableIdentifier, primaryKeys []string) string
BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string
BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string
BuildMergeQueries(
tableID TableIdentifier,
subQuery string,
Expand Down

0 comments on commit 61b279c

Please sign in to comment.