Skip to content

Commit

Permalink
[snowflake] Add tests for building additional equality strings (#666)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 15, 2024
1 parent d20de18 commit e18ceb0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
20 changes: 20 additions & 0 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/kafkalib/partition"
"github.com/artie-labs/transfer/lib/sql"

"github.com/artie-labs/transfer/lib/typing/columns"
Expand Down Expand Up @@ -296,6 +297,25 @@ func (s *SnowflakeTestSuite) TestExecuteMergeExitEarly() {
assert.Nil(s.T(), err)
}

func (s *SnowflakeTestSuite) TestStore_AdditionalEqualityStrings() {
{
// No additional equality strings:
tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{}, "foo")
assert.Empty(s.T(), s.stageStore.additionalEqualityStrings(tableData))
}
{
// Additional equality strings:
topicConfig := kafkalib.TopicConfig{}
topicConfig.AdditionalMergePredicates = []partition.MergePredicates{
{PartitionField: "foo"},
{PartitionField: "bar"},
}
tableData := optimization.NewTableData(nil, config.Replication, nil, topicConfig, "foo")
actual := s.stageStore.additionalEqualityStrings(tableData)
assert.Equal(s.T(), []string{`c."FOO" = cc."FOO"`, `c."BAR" = cc."BAR"`}, actual)
}
}

func TestTempTableName(t *testing.T) {
trimTTL := func(tableName string) string {
lastUnderscore := strings.LastIndex(tableName, "_")
Expand Down
21 changes: 12 additions & 9 deletions clients/snowflake/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ func (s *Store) Append(tableData *optimization.TableData) error {
return err
}

func (s *Store) additionalEqualityStrings(tableData *optimization.TableData) []string {
var additionalEqualityStrings []string
if len(tableData.TopicConfig().AdditionalMergePredicates) > 0 {
for _, additionalMergePredicate := range tableData.TopicConfig().AdditionalMergePredicates {
mergePredicateCol := s.Dialect().QuoteIdentifier(additionalMergePredicate.PartitionField)
additionalEqualityStrings = append(additionalEqualityStrings, fmt.Sprintf("c.%s = cc.%s", mergePredicateCol, mergePredicateCol))
}
}
return additionalEqualityStrings
}

func (s *Store) Merge(tableData *optimization.TableData) error {
var err error
for i := 0; i < maxRetries; i++ {
Expand All @@ -49,16 +60,8 @@ func (s *Store) Merge(tableData *optimization.TableData) error {
}
}

var additionalEqualityStrings []string
if len(tableData.TopicConfig().AdditionalMergePredicates) > 0 {
for _, additionalMergePredicate := range tableData.TopicConfig().AdditionalMergePredicates {
mergePredicateCol := s.Dialect().QuoteIdentifier(additionalMergePredicate.PartitionField)
additionalEqualityStrings = append(additionalEqualityStrings, fmt.Sprintf("c.%s = cc.%s", mergePredicateCol, mergePredicateCol))
}
}

err = shared.Merge(s, tableData, types.MergeOpts{
AdditionalEqualityStrings: additionalEqualityStrings,
AdditionalEqualityStrings: s.additionalEqualityStrings(tableData),
})
}
return err
Expand Down

0 comments on commit e18ceb0

Please sign in to comment.