Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 17, 2024
1 parent 8eeff08 commit 8a7d7e9
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 2 deletions.
4 changes: 4 additions & 0 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func (md MSSQLDialect) BuildIsNotToastValueExpression(tableAlias constants.Table
return fmt.Sprintf("COALESCE(%s, '') != '%s'", colName, constants.ToastUnavailableValuePlaceholder)
}

func (MSSQLDialect) GetDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
panic("not implemented")
}

func (MSSQLDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
panic("not implemented") // We don't currently support deduping for MS SQL.
}
Expand Down
4 changes: 4 additions & 0 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.Ta
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (rd RedshiftDialect) GetDedupeTableQuery(tableID sql.TableIdentifier, _ []string) string {
return fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, tableID.FullyQualifiedName())
}

func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, rd)

Expand Down
3 changes: 1 addition & 2 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt
}
}

temporaryTableName := temporaryTableID.FullyQualifiedName()
subQuery := temporaryTableName
subQuery := temporaryTableID.FullyQualifiedName()
if opts.SubQueryDedupe {
subQuery = dwh.Dialect().GetDedupeTableQuery(temporaryTableID, tableData.PrimaryKeys())
}
Expand Down
4 changes: 4 additions & 0 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func (sd SnowflakeDialect) BuildIsNotToastValueExpression(tableAlias constants.T
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (SnowflakeDialect) GetDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
panic("not implemented")
}

func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, sd)

Expand Down

0 comments on commit 8a7d7e9

Please sign in to comment.