Skip to content

Commit

Permalink
[redshift] Use table alias constants for merge queries (#656)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 14, 2024
1 parent ea97ab3 commit 976e90f
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ func (RedshiftDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, column
func (rd RedshiftDialect) BuildIsNotToastValueExpression(column columns.Column) string {
colName := rd.QuoteIdentifier(column.Name())
if column.KindDetails == typing.Struct {
return fmt.Sprintf(`COALESCE(cc.%s != JSON_PARSE('{"key":"%s"}'), true)`,
colName, constants.ToastUnavailableValuePlaceholder)
return fmt.Sprintf(`COALESCE(%s.%s != JSON_PARSE('{"key":"%s"}'), true)`,
stagingAlias, colName, constants.ToastUnavailableValuePlaceholder)
}
return fmt.Sprintf("COALESCE(cc.%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
return fmt.Sprintf("COALESCE(%s.%s != '%s', true)", stagingAlias, colName, constants.ToastUnavailableValuePlaceholder)
}

func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
Expand Down Expand Up @@ -198,18 +198,19 @@ func (rd RedshiftDialect) buildMergeInsertQuery(
primaryKeys []columns.Column,
cols []columns.Column,
) string {
return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s AS cc LEFT JOIN %s AS c ON %s WHERE c.%s IS NULL;`,
return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s AS %s LEFT JOIN %s AS %s ON %s WHERE %s.%s IS NULL;`,
// insert into target (col1, col2, col3)
tableID.FullyQualifiedName(), strings.Join(sql.QuoteColumns(cols, rd), ","),
// SELECT cc.col1, cc.col2, ... FROM staging as CC
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(cols, rd),
Separator: ",",
Prefix: "cc.",
}), subQuery,
Prefix: stagingAlias + ".",
}), subQuery, stagingAlias,
// LEFT JOIN table on pk(s)
tableID.FullyQualifiedName(), strings.Join(sql.BuildColumnComparisons(primaryKeys, targetAlias, stagingAlias, sql.Equal, rd), " AND "),
tableID.FullyQualifiedName(), targetAlias, strings.Join(sql.BuildColumnComparisons(primaryKeys, targetAlias, stagingAlias, sql.Equal, rd), " AND "),
// Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts)
targetAlias,
rd.QuoteIdentifier(primaryKeys[0].Name()),
)
}
Expand All @@ -222,34 +223,34 @@ func (rd RedshiftDialect) buildMergeUpdateQuery(
idempotentKey string,
softDelete bool,
) string {
clauses := sql.BuildColumnComparisons(primaryKeys, "c", "cc", sql.Equal, rd)
clauses := sql.BuildColumnComparisons(primaryKeys, targetAlias, stagingAlias, sql.Equal, rd)

if idempotentKey != "" {
clauses = append(clauses, fmt.Sprintf("cc.%s >= c.%s", idempotentKey, idempotentKey))
clauses = append(clauses, fmt.Sprintf("%s.%s >= %s.%s", stagingAlias, idempotentKey, targetAlias, idempotentKey))
}

if !softDelete {
clauses = append(clauses, fmt.Sprintf("COALESCE(cc.%s, false) = false", rd.QuoteIdentifier(constants.DeleteColumnMarker)))
clauses = append(clauses, fmt.Sprintf("COALESCE(%s.%s, false) = false", stagingAlias, rd.QuoteIdentifier(constants.DeleteColumnMarker)))
}

return fmt.Sprintf(`UPDATE %s AS c SET %s FROM %s AS cc WHERE %s;`,
return fmt.Sprintf(`UPDATE %s AS %s SET %s FROM %s AS %s WHERE %s;`,
// UPDATE table set col1 = cc. col1
tableID.FullyQualifiedName(), sql.BuildColumnsUpdateFragment(cols, rd),
tableID.FullyQualifiedName(), targetAlias, sql.BuildColumnsUpdateFragment(cols, rd),
// FROM staging WHERE join on PK(s)
subQuery, strings.Join(clauses, " AND "),
subQuery, stagingAlias, strings.Join(clauses, " AND "),
)
}

func (rd RedshiftDialect) buildMergeDeleteQuery(tableID sql.TableIdentifier, subQuery string, primaryKeys []columns.Column) string {
return fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s AS cc WHERE cc.%s = true);`,
return fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s AS %s WHERE %s.%s = true);`,
// DELETE from table where (pk_1, pk_2)
tableID.FullyQualifiedName(), strings.Join(sql.QuoteColumns(primaryKeys, rd), ","),
// IN (cc.pk_1, cc.pk_2) FROM staging
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteColumns(primaryKeys, rd),
Separator: ",",
Prefix: "cc.",
}), subQuery, rd.QuoteIdentifier(constants.DeleteColumnMarker),
Prefix: stagingAlias + ".",
}), subQuery, stagingAlias, stagingAlias, rd.QuoteIdentifier(constants.DeleteColumnMarker),
)
}

Expand Down

0 comments on commit 976e90f

Please sign in to comment.