Skip to content

Commit

Permalink
Merge #58679
Browse files Browse the repository at this point in the history
58679: opt: use anti-join for INSERT ON CONFLICT DO NOTHING conflict detection r=rytaft a=mgartner

#### opt: use anti-join for INSERT ON CONFLICT DO NOTHING conflict detection

The addition of lookup anti-joins enables the usage of anti-joins,
instead of left joins and filters, for detecting conflicts in an
`INSERT ... ON CONFLICT ... DO NOTHING` statement. This simplifies the
query plans for these statements.

As a result of this change two normalization rules had to be updated:

  1. `EliminateGroupByProject` no longer matches `UpsertDistinctOn`
     expressions because they no longer are built with a `Project` as
     a child.
  2. The `AreValuesDistinct` helper function for
     `EliminateDistinctOnValues` now detects when the left child of an
     AntiJoin has distinct values. This is required for eliminating
     `UpsertDistinctOn` unnecessary operators now that their inputs are
     AntiJoins.

Release note (performance improvement): INSERT ... ON CONFLICT ... DO
NOTHING statements now use anti-joins for detecting conflicts. This
simplifies the query plan for these statements, which may result in more
efficient execution.

#### opt: replace &memo.JoinPrivate{} with memo.EmptyJoinPrivate

Release note: None

Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
  • Loading branch information
craig[bot] and mgartner committed Jan 12, 2021
2 parents d113ccb + 05a9be1 commit ee5c4e6
Show file tree
Hide file tree
Showing 12 changed files with 699 additions and 1,064 deletions.
30 changes: 12 additions & 18 deletions pkg/sql/opt/exec/execbuilder/testdata/explain
Expand Up @@ -1411,26 +1411,20 @@ vectorized: true
│ auto commit
│ arbiter indexes: primary, u_v_key
└── • filter
│ filter: k IS NULL
└── • lookup join (anti)
│ table: u@u_v_key
│ equality: (column2) = (v)
│ equality cols are key
└── • lookup join (left outer)
│ table: u@u_v_key
│ equality: (column2) = (v)
│ equality cols are key
└── • cross join (anti)
└── • filter
│ filter: k IS NULL
└── • cross join (left outer)
├── • values
│ size: 2 columns, 1 row
└── • scan
missing stats
table: u@primary
spans: [/1 - /1]
├── • values
│ size: 2 columns, 1 row
└── • scan
missing stats
table: u@primary
spans: [/1 - /1]

# Make sure EXPLAIN (VERBOSE) works when there is a constrained scan of a
# virtual table (#58193).
Expand Down
314 changes: 126 additions & 188 deletions pkg/sql/opt/memo/testdata/logprops/upsert

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions pkg/sql/opt/norm/groupby_funcs.go
Expand Up @@ -141,8 +141,8 @@ func (c *CustomFuncs) ConstructProjectionFromDistinctOn(

// AreValuesDistinct returns true if a constant Values operator input contains
// only rows that are already distinct with respect to the given grouping
// columns. The Values operator can be wrapped by Select, Project, and/or
// LeftJoin operators.
// columns. The Values operator can be wrapped by Select, Project, LeftJoin
// and/or AntiJoin operators.
//
// If nullsAreDistinct is true, then NULL values are treated as not equal to one
// another, and therefore rows containing a NULL value in any grouping column
Expand Down Expand Up @@ -190,6 +190,13 @@ func (c *CustomFuncs) AreValuesDistinct(

return c.AreValuesDistinct(t.Left, groupingCols, nullsAreDistinct)

case *memo.AntiJoinExpr:
// Pass through call to left input if grouping on its columns.
leftCols := t.Left.Relational().OutputCols
if groupingCols.SubsetOf(leftCols) {
return c.AreValuesDistinct(t.Left, groupingCols, nullsAreDistinct)
}

case *memo.UpsertDistinctOnExpr:
// Pass through call to input if grouping on passthrough columns.
if groupingCols.SubsetOf(t.Input.Relational().OutputCols) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/opt/norm/rules/groupby.opt
Expand Up @@ -11,13 +11,15 @@

# EliminateGroupByProject discards a nested Project operator that is only
# removing columns from its input (and not synthesizing new ones). That's
# something the GroupBy operators can do on their own.
# something the GroupBy operators can do on their own. This rule does not match
# UpsertDistinctOn expressions because they are not built with a Project as a
# child, so there is no Project to eliminate.
#
# Note: EliminateGroupByProject should be located above
# EliminateJoinUnderGroupByLeft so that it can remove any interfering Projects.
[EliminateGroupByProject, Normalize]
(GroupBy | ScalarGroupBy | DistinctOn | EnsureDistinctOn
| UpsertDistinctOn | EnsureUpsertDistinctOn
| EnsureUpsertDistinctOn
$input:(Project $innerInput:*) &
(ColsAreSubset
(OutputCols $input)
Expand Down
703 changes: 259 additions & 444 deletions pkg/sql/opt/norm/testdata/rules/groupby

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/sql/opt/optbuilder/fk_cascade.go
Expand Up @@ -547,7 +547,7 @@ func (b *Builder) buildDeleteCascadeMutationInput(
))
}
outScope.expr = b.factory.ConstructSemiJoin(
outScope.expr, mutationInput, on, &memo.JoinPrivate{},
outScope.expr, mutationInput, on, memo.EmptyJoinPrivate,
)
return outScope
}
Expand Down Expand Up @@ -855,7 +855,7 @@ func (b *Builder) buildUpdateCascadeMutationInput(
// inner join is equivalent.
// Note that this is very similar to the UPDATE ... FROM syntax.
outScope.expr = f.ConstructInnerJoin(
outScope.expr, mutationInput, on, &memo.JoinPrivate{},
outScope.expr, mutationInput, on, memo.EmptyJoinPrivate,
)
// Append the columns from the right-hand side to the scope.
for _, col := range outCols {
Expand Down
73 changes: 25 additions & 48 deletions pkg/sql/opt/optbuilder/insert.go
Expand Up @@ -160,8 +160,8 @@ func init() {
//
// If the ON CONFLICT clause contains a DO NOTHING clause, then each UNIQUE
// index on the target table requires its own DISTINCT ON to ensure that the
// input has no duplicates, and its own LEFT OUTER JOIN to check whether a
// conflict exists. For example:
// input has no duplicates, and an ANTI JOIN to check whether a conflict exists.
// For example:
//
// CREATE TABLE ab (a INT PRIMARY KEY, b INT)
// INSERT INTO ab (a, b) VALUES (1, 2), (1, 3) ON CONFLICT DO NOTHING
Expand All @@ -170,9 +170,9 @@ func init() {
//
// SELECT x, y
// FROM (SELECT DISTINCT ON (x) * FROM (VALUES (1, 2), (1, 3))) AS input(x, y)
// LEFT OUTER JOIN ab
// ON input.x = ab.a
// WHERE ab.a IS NULL
// WHERE NOT EXISTS(
// SELECT ab.a WHERE input.x = ab.a
// )
//
// Note that an ordered input to the INSERT does not provide any guarantee about
// the order in which mutations are applied, or the order of any returned rows
Expand Down Expand Up @@ -287,9 +287,9 @@ func (b *Builder) buildInsert(ins *tree.Insert, inScope *scope) (outScope *scope

// Case 2: INSERT..ON CONFLICT DO NOTHING.
case ins.OnConflict.DoNothing:
// Wrap the input in one LEFT OUTER JOIN per UNIQUE index, and filter out
// rows that have conflicts. See the buildInputForDoNothing comment for
// more details.
// Wrap the input in one ANTI JOIN per UNIQUE index, and filter out rows
// that have conflicts. See the buildInputForDoNothing comment for more
// details.
conflictOrds := mb.mapPublicColumnNamesToOrdinals(ins.OnConflict.Columns)
mb.buildInputForDoNothing(inScope, conflictOrds, ins.OnConflict.ArbiterPredicate)

Expand Down Expand Up @@ -658,28 +658,25 @@ func (mb *mutationBuilder) buildInsert(returning tree.ReturningExprs) {
mb.buildReturning(returning)
}

// buildInputForDoNothing wraps the input expression in LEFT OUTER JOIN
// expressions, one for each UNIQUE index on the target table. It then adds a
// filter that discards rows that have a conflict (by checking a not-null table
// column to see if it was null-extended by the left join). See the comment
// header for Builder.buildInsert for an example.
// buildInputForDoNothing wraps the input expression in ANTI JOIN expressions,
// one for each UNIQUE index on the target table. See the comment header for
// Builder.buildInsert for an example.
func (mb *mutationBuilder) buildInputForDoNothing(
inScope *scope, conflictOrds util.FastIntSet, arbiterPredicate tree.Expr,
) {
// Determine the set of arbiter indexes to use to check for conflicts.
arbiterIndexes := mb.arbiterIndexes(conflictOrds, arbiterPredicate)
mb.arbiters = arbiterIndexes.Ordered()

insertColSet := mb.outScope.expr.Relational().OutputCols
insertColScope := mb.outScope.replace()
insertColScope.appendColumnsFromScope(mb.outScope)

// Ignore any ordering requested by the input.
// TODO(andyk): do we need to do more here?
mb.outScope.ordering = nil

// Loop over each arbiter index, potentially creating a left join + filter
// for each one.
// Loop over each arbiter index, potentially creating an anti-join for each
// one.
for idx, idxCount := 0, mb.tab.IndexCount(); idx < idxCount; idx++ {
// Skip non-arbiter indexes.
if !arbiterIndexes.Contains(idx) {
Expand All @@ -693,7 +690,7 @@ func (mb *mutationBuilder) buildInputForDoNothing(
predExpr = mb.parsePartialIndexPredicateExpr(idx)
}

// Build the right side of the left outer join. Use a new metadata instance
// Build the right side of the anti-join. Use a new metadata instance
// of the mutation table so that a different set of column IDs are used for
// the two tables in the self-join.
fetchScope := mb.b.buildScan(
Expand All @@ -711,8 +708,8 @@ func (mb *mutationBuilder) buildInputForDoNothing(

// If the index is a unique partial index, then rows that are not in the
// partial index cannot conflict with insert rows. Therefore, a Select
// wraps the scan on the right side of the left outer join with the
// partial index predicate expression as the filter.
// wraps the scan on the right side of the anti-join with the partial
// index predicate expression as the filter.
if isPartial {
texpr := fetchScope.resolveAndRequireType(predExpr, types.Bool)
predScalar := mb.b.buildScalar(texpr, fetchScope, nil, nil, nil)
Expand All @@ -722,12 +719,6 @@ func (mb *mutationBuilder) buildInputForDoNothing(
)
}

// Remember the column ID of a scan column that is not null. This will be
// used to detect whether a conflict was detected for a row. Such a column
// must always exist, since the index always contains the primary key
// columns, either explicitly or implicitly.
notNullColID := fetchScope.cols[findNotNullIndexCol(index)].id

// Build the join condition by creating a conjunction of equality conditions
// that test each conflict column:
//
Expand Down Expand Up @@ -755,26 +746,12 @@ func (mb *mutationBuilder) buildInputForDoNothing(
on = append(on, mb.b.factory.ConstructFiltersItem(predScalar))
}

// Construct the left join + filter.
// TODO(andyk): Convert this to use anti-join once we have support for
// lookup anti-joins.
mb.outScope.expr = mb.b.factory.ConstructProject(
mb.b.factory.ConstructSelect(
mb.b.factory.ConstructLeftJoin(
mb.outScope.expr,
fetchScope.expr,
on,
memo.EmptyJoinPrivate,
),
memo.FiltersExpr{mb.b.factory.ConstructFiltersItem(
mb.b.factory.ConstructIs(
mb.b.factory.ConstructVariable(notNullColID),
memo.NullSingleton,
),
)},
),
memo.EmptyProjectionsExpr,
insertColSet,
// Construct the anti-join.
mb.outScope.expr = mb.b.factory.ConstructAntiJoin(
mb.outScope.expr,
fetchScope.expr,
on,
memo.EmptyJoinPrivate,
)

// If the index is a partial index, project a new column that allows the
Expand Down Expand Up @@ -1180,9 +1157,9 @@ func (mb *mutationBuilder) projectUpsertColumns() {
// arbiter indexes are found.
//
// Arbiter indexes ensure that the columns designated by conflictOrds reference
// at most one target row of a UNIQUE index. Using LEFT OUTER JOINs to detect
// conflicts relies upon this being true (otherwise result cardinality could
// increase). This is also a Postgres requirement.
// at most one target row of a UNIQUE index. Using ANTI JOINs and LEFT OUTER
// JOINs to detect conflicts relies upon this being true (otherwise result
// cardinality could increase). This is also a Postgres requirement.
//
// An arbiter index:
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/optbuilder/mutation_builder_unique.go
Expand Up @@ -235,7 +235,7 @@ func (h *uniqueCheckHelper) buildInsertionCheck() memo.UniqueChecksItem {
}
semiJoinFilters = append(semiJoinFilters, f.ConstructFiltersItem(pkFilter))

semiJoin := f.ConstructSemiJoin(checkInput, scanScope.expr, semiJoinFilters, &memo.JoinPrivate{})
semiJoin := f.ConstructSemiJoin(checkInput, scanScope.expr, semiJoinFilters, memo.EmptyJoinPrivate)

return f.ConstructUniqueChecksItem(semiJoin, &memo.UniqueChecksItemPrivate{
Table: h.mb.tabID,
Expand Down
24 changes: 9 additions & 15 deletions pkg/sql/opt/optbuilder/testdata/fk-checks-insert
Expand Up @@ -45,22 +45,16 @@ insert child
├── upsert-distinct-on
│ ├── columns: column1:4!null column2:5!null
│ ├── grouping columns: column1:4!null
│ ├── project
│ ├── anti-join (hash)
│ │ ├── columns: column1:4!null column2:5!null
│ │ └── select
│ │ ├── columns: column1:4!null column2:5!null c:6 child.p:7
│ │ ├── left-join (hash)
│ │ │ ├── columns: column1:4!null column2:5!null c:6 child.p:7
│ │ │ ├── values
│ │ │ │ ├── columns: column1:4!null column2:5!null
│ │ │ │ ├── (100, 1)
│ │ │ │ └── (200, 1)
│ │ │ ├── scan child
│ │ │ │ └── columns: c:6!null child.p:7!null
│ │ │ └── filters
│ │ │ └── column1:4 = c:6
│ │ └── filters
│ │ └── c:6 IS NULL
│ │ ├── values
│ │ │ ├── columns: column1:4!null column2:5!null
│ │ │ ├── (100, 1)
│ │ │ └── (200, 1)
│ │ ├── scan child
│ │ │ └── columns: c:6!null child.p:7!null
│ │ └── filters
│ │ └── column1:4 = c:6
│ └── aggregations
│ └── first-agg [as=column2:5]
│ └── column2:5
Expand Down
22 changes: 8 additions & 14 deletions pkg/sql/opt/optbuilder/testdata/inverted-indexes
Expand Up @@ -95,21 +95,15 @@ insert kj
└── upsert-distinct-on
├── columns: column1:5!null column2:6!null
├── grouping columns: column1:5!null
├── project
├── anti-join (hash)
│ ├── columns: column1:5!null column2:6!null
│ └── select
│ ├── columns: column1:5!null column2:6!null k:7 j:8
│ ├── left-join (hash)
│ │ ├── columns: column1:5!null column2:6!null k:7 j:8
│ │ ├── values
│ │ │ ├── columns: column1:5!null column2:6!null
│ │ │ └── (1, '{"a": 2}')
│ │ ├── scan kj
│ │ │ └── columns: k:7!null j:8
│ │ └── filters
│ │ └── column1:5 = k:7
│ └── filters
│ └── k:7 IS NULL
│ ├── values
│ │ ├── columns: column1:5!null column2:6!null
│ │ └── (1, '{"a": 2}')
│ ├── scan kj
│ │ └── columns: k:7!null j:8
│ └── filters
│ └── column1:5 = k:7
└── aggregations
└── first-agg [as=column2:6]
└── column2:6
Expand Down

0 comments on commit ee5c4e6

Please sign in to comment.