Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 17, 2024
1 parent 355beb5 commit 8eeff08
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
11 changes: 10 additions & 1 deletion clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (BigQueryDialect) IsColumnAlreadyExistsErr(err error) bool {
return strings.Contains(err.Error(), "Column already exists")
}

func (BigQueryDialect) IsTableDoesNotExistErr(err error) bool {
func (BigQueryDialect) IsTableDoesNotExistErr(_ error) bool {
return false
}

Expand Down Expand Up @@ -154,6 +154,15 @@ func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.Ta
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (bd BigQueryDialect) GetDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd)
return fmt.Sprintf(`(SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 1)`,
tableID.FullyQualifiedName(),
strings.Join(primaryKeysEscaped, ", "),
strings.Join(primaryKeysEscaped, ", "),
)
}

func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd)

Expand Down
2 changes: 1 addition & 1 deletion clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt
temporaryTableName := temporaryTableID.FullyQualifiedName()
subQuery := temporaryTableName
if opts.SubQueryDedupe {
subQuery = fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, temporaryTableName)
subQuery = dwh.Dialect().GetDedupeTableQuery(temporaryTableID, tableData.PrimaryKeys())
}

if subQuery == "" {
Expand Down
1 change: 1 addition & 0 deletions lib/sql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Dialect interface {
BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string
BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string
BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string
GetDedupeTableQuery(tableID TableIdentifier, primaryKeys []string) string
BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string
BuildMergeQueries(
tableID TableIdentifier,
Expand Down

0 comments on commit 8eeff08

Please sign in to comment.