Skip to content

Commit

Permalink
[bigquery] Use table alias constants for merge queries (#662)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 14, 2024
1 parent 0bbe68e commit 9da4771
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
32 changes: 16 additions & 16 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (bd BigQueryDialect) BuildMergeQueries(
// This is because Snowflake does not respect NS granularity.
var idempotentClause string
if idempotentKey != "" {
idempotentClause = fmt.Sprintf("AND cc.%s >= c.%s ", idempotentKey, idempotentKey)
idempotentClause = fmt.Sprintf("AND %s.%s >= %s.%s ", stagingAlias, idempotentKey, targetAlias, idempotentKey)
}

var equalitySQLParts []string
Expand All @@ -231,7 +231,7 @@ func (bd BigQueryDialect) BuildMergeQueries(

if primaryKey.KindDetails.Kind == typing.Struct.Kind {
// BigQuery requires special casting to compare two JSON objects.
equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", quotedPrimaryKey, quotedPrimaryKey)
equalitySQL = fmt.Sprintf("TO_JSON_STRING(%s.%s) = TO_JSON_STRING(%s.%s)", targetAlias, quotedPrimaryKey, stagingAlias, quotedPrimaryKey)
}

equalitySQLParts = append(equalitySQLParts, equalitySQL)
Expand All @@ -243,18 +243,18 @@ func (bd BigQueryDialect) BuildMergeQueries(

if softDelete {
return []string{fmt.Sprintf(`
MERGE INTO %s c USING %s AS cc ON %s
MERGE INTO %s %s USING %s AS %s ON %s
WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
tableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " AND "),
WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
tableID.FullyQualifiedName(), targetAlias, subQuery, stagingAlias, strings.Join(equalitySQLParts, " AND "),
// Update + Soft Deletion
idempotentClause, sql.BuildColumnsUpdateFragment(cols, stagingAlias, targetAlias, bd),
// Insert
bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","),
stagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(cols, bd),
Separator: ",",
Prefix: "cc.",
Prefix: stagingAlias + ".",
}))}, nil
}

Expand All @@ -265,20 +265,20 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
}

return []string{fmt.Sprintf(`
MERGE INTO %s c USING %s AS cc ON %s
WHEN MATCHED AND cc.%s THEN DELETE
WHEN MATCHED AND IFNULL(cc.%s, false) = false %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
tableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " AND "),
MERGE INTO %s %s USING %s AS %s ON %s
WHEN MATCHED AND %s.%s THEN DELETE
WHEN MATCHED AND IFNULL(%s.%s, false) = false %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
tableID.FullyQualifiedName(), targetAlias, subQuery, stagingAlias, strings.Join(equalitySQLParts, " AND "),
// Delete
bd.QuoteIdentifier(constants.DeleteColumnMarker),
stagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker),
// Update
bd.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, stagingAlias, targetAlias, bd),
stagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, stagingAlias, targetAlias, bd),
// Insert
bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","),
stagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(cols, bd),
Separator: ",",
Prefix: "cc.",
Prefix: stagingAlias + ".",
}))}, nil
}
4 changes: 2 additions & 2 deletions lib/sql/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func QuoteColumns(cols []columns.Column, dialect Dialect) []string {
return result
}

// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email
// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: first_name=tgt.first_name,last_name=stg.last_name,email=tgt.email
// NOTE: This should only be used with valid columns.
func BuildColumnsUpdateFragment(columns []columns.Column, stagingAlias, targetAlias string, dialect Dialect) string {
var cols []string
Expand All @@ -25,7 +25,7 @@ func BuildColumnsUpdateFragment(columns []columns.Column, stagingAlias, targetAl
cols = append(cols, fmt.Sprintf("%s= CASE WHEN %s THEN %s.%s ELSE %s.%s END",
colName, dialect.BuildIsNotToastValueExpression(stagingAlias, column), stagingAlias, colName, targetAlias, colName))
} else {
// This is to make it look like: objCol = cc.objCol
// This is to make it look like: objCol=stg.objCol
cols = append(cols, fmt.Sprintf("%s=%s.%s", colName, stagingAlias, colName))
}
}
Expand Down

0 comments on commit 9da4771

Please sign in to comment.