diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index 727d5d63..d426a4a5 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -219,7 +219,7 @@ func (bd BigQueryDialect) BuildMergeQueries( // This is because Snowflake does not respect NS granularity. var idempotentClause string if idempotentKey != "" { - idempotentClause = fmt.Sprintf("AND cc.%s >= c.%s ", idempotentKey, idempotentKey) + idempotentClause = fmt.Sprintf("AND %s.%s >= %s.%s ", stagingAlias, idempotentKey, targetAlias, idempotentKey) } var equalitySQLParts []string @@ -231,7 +231,7 @@ func (bd BigQueryDialect) BuildMergeQueries( if primaryKey.KindDetails.Kind == typing.Struct.Kind { // BigQuery requires special casting to compare two JSON objects. - equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", quotedPrimaryKey, quotedPrimaryKey) + equalitySQL = fmt.Sprintf("TO_JSON_STRING(%s.%s) = TO_JSON_STRING(%s.%s)", targetAlias, quotedPrimaryKey, stagingAlias, quotedPrimaryKey) } equalitySQLParts = append(equalitySQLParts, equalitySQL) @@ -243,18 +243,18 @@ func (bd BigQueryDialect) BuildMergeQueries( if softDelete { return []string{fmt.Sprintf(` -MERGE INTO %s c USING %s AS cc ON %s +MERGE INTO %s %s USING %s AS %s ON %s WHEN MATCHED %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, - tableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " AND "), +WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`, + tableID.FullyQualifiedName(), targetAlias, subQuery, stagingAlias, strings.Join(equalitySQLParts, " AND "), // Update + Soft Deletion idempotentClause, sql.BuildColumnsUpdateFragment(cols, stagingAlias, targetAlias, bd), // Insert - bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","), + stagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ Vals: sql.QuoteColumns(cols, bd), Separator: ",", - Prefix: "cc.", + Prefix: stagingAlias + ".", }))}, nil } @@ -265,20 +265,20 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` } return []string{fmt.Sprintf(` -MERGE INTO %s c USING %s AS cc ON %s -WHEN MATCHED AND cc.%s THEN DELETE -WHEN MATCHED AND IFNULL(cc.%s, false) = false %sTHEN UPDATE SET %s -WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`, - tableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " AND "), +MERGE INTO %s %s USING %s AS %s ON %s +WHEN MATCHED AND %s.%s THEN DELETE +WHEN MATCHED AND IFNULL(%s.%s, false) = false %sTHEN UPDATE SET %s +WHEN NOT MATCHED AND IFNULL(%s.%s, false) = false THEN INSERT (%s) VALUES (%s);`, + tableID.FullyQualifiedName(), targetAlias, subQuery, stagingAlias, strings.Join(equalitySQLParts, " AND "), // Delete - bd.QuoteIdentifier(constants.DeleteColumnMarker), + stagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), // Update - bd.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, stagingAlias, targetAlias, bd), + stagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, sql.BuildColumnsUpdateFragment(cols, stagingAlias, targetAlias, bd), // Insert - bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","), + stagingAlias, bd.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteColumns(cols, bd), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ Vals: sql.QuoteColumns(cols, bd), Separator: ",", - Prefix: "cc.", + Prefix: stagingAlias + ".", }))}, nil } diff --git a/lib/sql/columns.go b/lib/sql/columns.go index 74fdfb5f..5a12fc10 100644 --- a/lib/sql/columns.go +++ b/lib/sql/columns.go @@ -15,7 +15,7 @@ func QuoteColumns(cols []columns.Column, dialect Dialect) []string { return result } -// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: cc.first_name=c.first_name,cc.last_name=c.last_name,cc.email=c.email +// buildColumnsUpdateFragment will parse the columns and then returns a list of strings like: first_name=tgt.first_name,last_name=stg.last_name,email=tgt.email // NOTE: This should only be used with valid columns. func BuildColumnsUpdateFragment(columns []columns.Column, stagingAlias, targetAlias string, dialect Dialect) string { var cols []string @@ -25,7 +25,7 @@ func BuildColumnsUpdateFragment(columns []columns.Column, stagingAlias, targetAl cols = append(cols, fmt.Sprintf("%s= CASE WHEN %s THEN %s.%s ELSE %s.%s END", colName, dialect.BuildIsNotToastValueExpression(stagingAlias, column), stagingAlias, colName, targetAlias, colName)) } else { - // This is to make it look like: objCol = cc.objCol + // This is to make it look like: objCol=stg.objCol cols = append(cols, fmt.Sprintf("%s=%s.%s", colName, stagingAlias, colName)) } }