Skip to content

Commit

Permalink
Merge branch 'master' into drop-temp-table-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 17, 2024
2 parents 61fc310 + deb0ce0 commit 4d5a11c
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 38 deletions.
4 changes: 1 addition & 3 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log/slog"
"os"
"strings"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/stringutil"
)

const (
Expand Down Expand Up @@ -181,7 +179,7 @@ func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableI
}

func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt)

Expand Down
10 changes: 4 additions & 6 deletions clients/bigquery/bigquery_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bigquery

import (
"fmt"
"strings"
"testing"
"time"

Expand All @@ -11,14 +10,13 @@ import (
"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/stringutil"
)

func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with one primary key + no `__artie_updated_at` flag.
tableID := NewTableIdentifier("project12", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false)
assert.Len(t, parts, 3)
Expand All @@ -36,7 +34,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with one primary key + `__artie_updated_at` flag.
tableID := NewTableIdentifier("project12", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true)
assert.Len(t, parts, 3)
Expand All @@ -54,7 +52,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with composite keys + no `__artie_updated_at` flag.
tableID := NewTableIdentifier("project123", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false)
assert.Len(t, parts, 3)
Expand All @@ -72,7 +70,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with composite keys + `__artie_updated_at` flag.
tableID := NewTableIdentifier("project123", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true)
assert.Len(t, parts, 3)
Expand Down
4 changes: 2 additions & 2 deletions clients/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestTempTableName(t *testing.T) {
func TestTempTableIDWithSuffix(t *testing.T) {
trimTTL := func(tableName string) string {
lastUnderscore := strings.LastIndex(tableName, "_")
assert.GreaterOrEqual(t, lastUnderscore, 0)
Expand All @@ -26,6 +26,6 @@ func TestTempTableName(t *testing.T) {
store := &Store{config: config.Config{BigQuery: &config.BigQuery{ProjectID: "123454321"}}}
tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table")
tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName()
tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName()
assert.Equal(t, "`123454321`.`db`.`table___artie_sUfFiX`", trimTTL(tempTableName))
}
6 changes: 3 additions & 3 deletions clients/mssql/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestTempTableName(t *testing.T) {
func TestTempTableIDWithSuffix(t *testing.T) {
trimTTL := func(tableName string) string {
lastUnderscore := strings.LastIndex(tableName, "_")
assert.GreaterOrEqual(t, lastUnderscore, 0)
Expand All @@ -28,14 +28,14 @@ func TestTempTableName(t *testing.T) {
// Schema is "schema":
tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table")
tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName()
tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName()
assert.Equal(t, `"schema"."table___artie_sUfFiX"`, trimTTL(tempTableName))
}
{
// Schema is "public" -> "dbo":
tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "public"}, "table")
tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName()
tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName()
assert.Equal(t, `"dbo"."table___artie_sUfFiX"`, trimTTL(tempTableName))
}
}
4 changes: 1 addition & 3 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package redshift

import (
"fmt"
"strings"

_ "github.com/lib/pq"

Expand All @@ -17,7 +16,6 @@ import (
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/stringutil"
)

type Store struct {
Expand Down Expand Up @@ -111,7 +109,7 @@ WHERE
}

func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)
dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt)
return destination.ExecStatements(s, dedupeQueries)
}
Expand Down
10 changes: 4 additions & 6 deletions clients/redshift/redshift_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@ package redshift

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/clients/redshift/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/stretchr/testify/assert"
)

func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() {
{
// Dedupe with one primary key + no `__artie_updated_at` flag.
tableID := NewTableIdentifier("public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false)
assert.Len(r.T(), parts, 3)
Expand All @@ -29,7 +27,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() {
{
// Dedupe with one primary key + `__artie_updated_at` flag.
tableID := NewTableIdentifier("public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true)
assert.Len(r.T(), parts, 3)
Expand All @@ -44,7 +42,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() {
{
// Dedupe with composite keys + no `__artie_updated_at` flag.
tableID := NewTableIdentifier("public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false)
assert.Len(r.T(), parts, 3)
Expand All @@ -59,7 +57,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() {
{
// Dedupe with composite keys + `__artie_updated_at` flag.
tableID := NewTableIdentifier("public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true)
assert.Len(r.T(), parts, 3)
Expand Down
4 changes: 2 additions & 2 deletions clients/redshift/redshift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestTempTableName(t *testing.T) {
func TestTempTableIDWithSuffix(t *testing.T) {
trimTTL := func(tableName string) string {
lastUnderscore := strings.LastIndex(tableName, "_")
assert.GreaterOrEqual(t, lastUnderscore, 0)
Expand All @@ -25,6 +25,6 @@ func TestTempTableName(t *testing.T) {

tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table")
tableID := (&Store{}).IdentifierFor(tableData.TopicConfig(), tableData.Name())
tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName()
tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName()
assert.Equal(t, `schema."table___artie_suffix"`, trimTTL(tempTableName))
}
2 changes: 1 addition & 1 deletion clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

temporaryTableID := TempTableID(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix())
temporaryTableID := TempTableIDWithSuffix(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix())
if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, types.AdditionalSettings{}, true); err != nil {
return fmt.Errorf("failed to prepare temporary table: %w", err)
}
Expand Down
8 changes: 7 additions & 1 deletion clients/shared/temp_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package shared

import (
"fmt"
"strings"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/stringutil"
)

func TempTableID(tableID sql.TableIdentifier, suffix string) sql.TableIdentifier {
func TempTableID(tableID sql.TableIdentifier) sql.TableIdentifier {
return TempTableIDWithSuffix(tableID, strings.ToLower(stringutil.Random(5)))
}

func TempTableIDWithSuffix(tableID sql.TableIdentifier, suffix string) sql.TableIdentifier {
tempTable := fmt.Sprintf(
"%s_%s_%s_%d",
tableID.Table(),
Expand Down
4 changes: 1 addition & 3 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package snowflake

import (
"fmt"
"strings"

"github.com/snowflakedb/gosnowflake"

Expand All @@ -17,7 +16,6 @@ import (
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/stringutil"
)

const maxRetries = 10
Expand Down Expand Up @@ -131,7 +129,7 @@ func (s *Store) reestablishConnection() error {
// Dedupe takes a table and will remove duplicates based on the primary key(s).
// These queries are inspired and modified from: https://stackoverflow.com/a/71515946
func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)
dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt)
return destination.ExecStatements(s, dedupeQueries)
}
Expand Down
10 changes: 4 additions & 6 deletions clients/snowflake/snowflake_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@ package snowflake

import (
"fmt"
"strings"
"testing"

"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/clients/snowflake/dialect"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/stretchr/testify/assert"
)

func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with one primary key + no `__artie_updated_at` flag.
tableID := NewTableIdentifier("db", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false)
assert.Len(t, parts, 3)
Expand All @@ -30,7 +28,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with one primary key + `__artie_updated_at` flag.
tableID := NewTableIdentifier("db", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true)
assert.Len(t, parts, 3)
Expand All @@ -45,7 +43,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with composite keys + no `__artie_updated_at` flag.
tableID := NewTableIdentifier("db", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false)
assert.Len(t, parts, 3)
Expand All @@ -60,7 +58,7 @@ func TestGenerateDedupeQueries(t *testing.T) {
{
// Dedupe with composite keys + `__artie_updated_at` flag.
tableID := NewTableIdentifier("db", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))
stagingTableID := shared.TempTableID(tableID)

parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true)
assert.Len(t, parts, 3)
Expand Down
4 changes: 2 additions & 2 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (s *SnowflakeTestSuite) TestStore_AdditionalEqualityStrings() {
}
}

func TestTempTableName(t *testing.T) {
func TestTempTableIDWithSuffix(t *testing.T) {
trimTTL := func(tableName string) string {
lastUnderscore := strings.LastIndex(tableName, "_")
assert.GreaterOrEqual(t, lastUnderscore, 0)
Expand All @@ -328,6 +328,6 @@ func TestTempTableName(t *testing.T) {

tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table")
tableID := (&Store{}).IdentifierFor(tableData.TopicConfig(), tableData.Name())
tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName()
tempTableName := shared.TempTableIDWithSuffix(tableID, "sUfFiX").FullyQualifiedName()
assert.Equal(t, `db.schema."TABLE___ARTIE_SUFFIX"`, trimTTL(tempTableName))
}

0 comments on commit 4d5a11c

Please sign in to comment.