Skip to content

Commit

Permalink
Merge pull request #119600 from rytaft/backport23.1-119104-119597
Browse files Browse the repository at this point in the history
release-23.1: sql, opt: support index hints for insert and upsert (#119600)
  • Loading branch information
rytaft committed Feb 27, 2024
2 parents 6546367 + 39311f7 commit e38b7be
Show file tree
Hide file tree
Showing 15 changed files with 1,066 additions and 36 deletions.
4 changes: 2 additions & 2 deletions docs/generated/sql/bnf/insert_stmt.bnf
@@ -1,3 +1,3 @@
insert_stmt ::=
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'INSERT' 'INTO' ( table_name | table_name 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) ( 'RETURNING' ( ( target_elem ) ( ( ',' target_elem ) )* ) | 'RETURNING' 'NOTHING' | )
| ( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'INSERT' 'INTO' ( table_name | table_name 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) on_conflict ( 'RETURNING' ( ( target_elem ) ( ( ',' target_elem ) )* ) | 'RETURNING' 'NOTHING' | )
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'INSERT' 'INTO' ( table_name_opt_idx | table_name_opt_idx 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) ( 'RETURNING' ( ( target_elem ) ( ( ',' target_elem ) )* ) | 'RETURNING' 'NOTHING' | )
| ( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'INSERT' 'INTO' ( table_name_opt_idx | table_name_opt_idx 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) on_conflict ( 'RETURNING' ( ( target_elem ) ( ( ',' target_elem ) )* ) | 'RETURNING' 'NOTHING' | )
4 changes: 2 additions & 2 deletions docs/generated/sql/bnf/stmt_block.bnf
Expand Up @@ -670,8 +670,8 @@ string_or_placeholder_list ::=
( string_or_placeholder ) ( ( ',' string_or_placeholder ) )*

insert_target ::=
table_name
| table_name 'AS' table_alias_name
table_name_opt_idx
| table_name_opt_idx 'AS' table_alias_name

insert_rest ::=
select_stmt
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/upsert_stmt.bnf
@@ -1,2 +1,2 @@
upsert_stmt ::=
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'UPSERT' 'INTO' ( table_name | table_name 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) ( 'RETURNING' target_list | 'RETURNING' 'NOTHING' | )
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'UPSERT' 'INTO' ( table_name_opt_idx | table_name_opt_idx 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) ( 'RETURNING' target_list | 'RETURNING' 'NOTHING' | )
14 changes: 11 additions & 3 deletions pkg/sql/importer/read_import_pgdump.go
Expand Up @@ -1084,9 +1084,17 @@ func (m *pgDumpReader) readFile(
}
switch i := stmt.(type) {
case *tree.Insert:
n, ok := i.Table.(*tree.TableName)
if !ok {
return errors.Errorf("unexpected: %T", i.Table)
n, isTableName := i.Table.(*tree.TableName)
if !isTableName {
// We might have the table name wrapped in an AliasedTableExpr.
a, isAliasedTableExpr := i.Table.(*tree.AliasedTableExpr)
if !isAliasedTableExpr {
return errors.Errorf("unexpected: %T", i.Table)
}
n, isTableName = a.Expr.(*tree.TableName)
if !isTableName {
return errors.Errorf("unexpected: %T", a.Expr)
}
}
name, err := getSchemaAndTableName(n)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/insert
Expand Up @@ -480,6 +480,18 @@ vectorized: true
table: kv@kv_pkey
spans: FULL SCAN

# Index hints should be a no-op for normal inserts without ON CONFLICT.
query T
EXPLAIN (PLAN) INSERT INTO kv@a VALUES (1,1), (2,2)
----
distribution: local
vectorized: true
·
• insert fast path
into: kv(k, v)
auto commit
size: 2 columns, 2 rows

# ------------------------------------------------------------------------------
# Insert rows into table during schema changes.
# ------------------------------------------------------------------------------
Expand Down
228 changes: 228 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/upsert
Expand Up @@ -319,6 +319,234 @@ vectorized: true
spans: /1/0
locking strength: for update

# Use index hints.
query T
EXPLAIN (VERBOSE) UPSERT INTO indexed@secondary VALUES (1)
----
distribution: local
vectorized: true
·
• upsert
│ columns: ()
│ estimated row count: 0 (missing stats)
│ into: indexed(a, b, c, d)
│ auto commit
│ arbiter indexes: indexed_pkey
└── • project
│ columns: (column1, b_default, c_default, d_comp, a, b, c, d, b_default, c_default, d_comp, a, check1)
└── • render
│ columns: (check1, column1, b_default, c_default, d_comp, a, b, c, d)
│ render check1: c_default > 0
│ render column1: column1
│ render b_default: b_default
│ render c_default: c_default
│ render d_comp: d_comp
│ render a: a
│ render b: b
│ render c: c
│ render d: d
└── • cross join (left outer)
│ columns: (column1, b_default, c_default, d_comp, a, b, c, d)
│ estimated row count: 1 (missing stats)
├── • values
│ columns: (column1, b_default, c_default, d_comp)
│ size: 4 columns, 1 row
│ row 0, expr 0: 1
│ row 0, expr 1: CAST(NULL AS INT8)
│ row 0, expr 2: 10
│ row 0, expr 3: 11
└── • filter
│ columns: (a, b, c, d)
│ estimated row count: 1 (missing stats)
│ filter: a = 1
└── • index join
│ columns: (a, b, c, d)
│ estimated row count: 1,000 (missing stats)
│ table: indexed@indexed_pkey
│ key columns: a
└── • scan
columns: (a, b, d)
estimated row count: 1,000 (missing stats)
table: indexed@secondary
spans: FULL SCAN

query T
EXPLAIN (VERBOSE) UPSERT INTO indexed@{NO_FULL_SCAN} VALUES (1)
----
distribution: local
vectorized: true
·
• upsert
│ columns: ()
│ estimated row count: 0 (missing stats)
│ into: indexed(a, b, c, d)
│ auto commit
│ arbiter indexes: indexed_pkey
└── • project
│ columns: (column1, b_default, c_default, d_comp, a, b, c, d, b_default, c_default, d_comp, a, check1)
└── • render
│ columns: (check1, column1, b_default, c_default, d_comp, a, b, c, d)
│ render check1: c_default > 0
│ render column1: column1
│ render b_default: b_default
│ render c_default: c_default
│ render d_comp: d_comp
│ render a: a
│ render b: b
│ render c: c
│ render d: d
└── • cross join (left outer)
│ columns: (column1, b_default, c_default, d_comp, a, b, c, d)
│ estimated row count: 1 (missing stats)
├── • values
│ columns: (column1, b_default, c_default, d_comp)
│ size: 4 columns, 1 row
│ row 0, expr 0: 1
│ row 0, expr 1: CAST(NULL AS INT8)
│ row 0, expr 2: 10
│ row 0, expr 3: 11
└── • scan
columns: (a, b, c, d)
estimated row count: 1 (missing stats)
table: indexed@indexed_pkey
spans: /1/0
locking strength: for update

query error could not produce a query plan conforming to the NO_FULL_SCAN hint
EXPLAIN (VERBOSE) UPSERT INTO indexed@{FORCE_INDEX=secondary,NO_FULL_SCAN} VALUES (1)

query T
EXPLAIN (VERBOSE)
INSERT INTO indexed@indexed_pkey AS indexed_pk
VALUES (1, 2, 3)
ON CONFLICT (a)
DO UPDATE SET b = 2, c = 3
----
distribution: local
vectorized: true
·
• upsert
│ columns: ()
│ estimated row count: 0 (missing stats)
│ into: indexed(a, b, c, d)
│ auto commit
│ arbiter indexes: indexed_pkey
└── • project
│ columns: (column1, column2, column3, d_comp, a, b, c, d, upsert_b, upsert_c, upsert_d, a, check1)
└── • render
│ columns: (check1, column1, column2, column3, d_comp, a, b, c, d, upsert_b, upsert_c, upsert_d)
│ render check1: upsert_c > 0
│ render column1: column1
│ render column2: column2
│ render column3: column3
│ render d_comp: d_comp
│ render a: a
│ render b: b
│ render c: c
│ render d: d
│ render upsert_b: upsert_b
│ render upsert_c: upsert_c
│ render upsert_d: upsert_d
└── • render
│ columns: (upsert_b, upsert_c, upsert_d, column1, column2, column3, d_comp, a, b, c, d)
│ render upsert_b: CASE WHEN a IS NULL THEN column2 ELSE 2 END
│ render upsert_c: CASE WHEN a IS NULL THEN column3 ELSE 3 END
│ render upsert_d: CASE WHEN a IS NULL THEN d_comp ELSE a + 3 END
│ render column1: column1
│ render column2: column2
│ render column3: column3
│ render d_comp: d_comp
│ render a: a
│ render b: b
│ render c: c
│ render d: d
└── • cross join (left outer)
│ columns: (column1, column2, column3, d_comp, a, b, c, d)
│ estimated row count: 1 (missing stats)
├── • values
│ columns: (column1, column2, column3, d_comp)
│ size: 4 columns, 1 row
│ row 0, expr 0: 1
│ row 0, expr 1: 2
│ row 0, expr 2: 3
│ row 0, expr 3: 4
└── • scan
columns: (a, b, c, d)
estimated row count: 1 (missing stats)
table: indexed@indexed_pkey
spans: /1/0
locking strength: for update

let $t_id
SELECT id FROM system.namespace WHERE name='indexed'

query T
EXPLAIN (VERBOSE)
INSERT INTO [$t_id AS t]@{NO_FULL_SCAN}
VALUES (1, 2, 3)
ON CONFLICT DO NOTHING
----
distribution: local
vectorized: true
·
• insert
│ columns: ()
│ estimated row count: 0 (missing stats)
│ into: indexed(a, b, c, d)
│ auto commit
│ arbiter indexes: indexed_pkey, secondary
└── • render
│ columns: (column1, column2, column3, d_comp, check1)
│ render check1: column3 > 0
│ render column1: column1
│ render column2: column2
│ render column3: column3
│ render d_comp: d_comp
└── • lookup join (anti)
│ columns: (column1, column2, column3, d_comp)
│ estimated row count: 0 (missing stats)
│ table: indexed@secondary
│ equality: (d_comp, column2) = (d,b)
│ equality cols are key
└── • cross join (anti)
│ columns: (column1, column2, column3, d_comp)
│ estimated row count: 0 (missing stats)
├── • values
│ columns: (column1, column2, column3, d_comp)
│ size: 4 columns, 1 row
│ row 0, expr 0: 1
│ row 0, expr 1: 2
│ row 0, expr 2: 3
│ row 0, expr 3: 4
└── • scan
columns: (a)
estimated row count: 1 (missing stats)
table: indexed@indexed_pkey
spans: /1/0

# Drop index and verify that existing values no longer need to be fetched.
statement ok
DROP INDEX indexed@secondary CASCADE
Expand Down
16 changes: 9 additions & 7 deletions pkg/sql/opt/optbuilder/insert.go
Expand Up @@ -303,7 +303,7 @@ func (b *Builder) buildInsert(ins *tree.Insert, inScope *scope) (outScope *scope
// Wrap the input in one ANTI JOIN per UNIQUE index, and filter out rows
// that have conflicts. See the buildInputForDoNothing comment for more
// details.
mb.buildInputForDoNothing(inScope, ins.OnConflict)
mb.buildInputForDoNothing(inScope, ins.Table, ins.OnConflict)

// Since buildInputForDoNothing filters out rows with conflicts, always
// insert rows that are not filtered.
Expand All @@ -320,7 +320,7 @@ func (b *Builder) buildInsert(ins *tree.Insert, inScope *scope) (outScope *scope
if mb.needExistingRows() {
// Left-join each input row to the target table, using conflict columns
// derived from the primary index as the join condition.
mb.buildInputForUpsert(inScope, nil /* onConflict */, nil /* whereClause */)
mb.buildInputForUpsert(inScope, ins.Table, nil /* onConflict */, nil /* whereClause */)

// Add additional columns for computed expressions that may depend on any
// updated columns, as well as mutation columns with default values.
Expand All @@ -334,7 +334,7 @@ func (b *Builder) buildInsert(ins *tree.Insert, inScope *scope) (outScope *scope
default:
// Left-join each input row to the target table, using the conflict columns
// as the join condition.
mb.buildInputForUpsert(inScope, ins.OnConflict, ins.OnConflict.Where)
mb.buildInputForUpsert(inScope, ins.Table, ins.OnConflict, ins.OnConflict.Where)

// Derive the columns that will be updated from the SET expressions.
mb.addTargetColsForUpdate(ins.OnConflict.Exprs)
Expand Down Expand Up @@ -701,7 +701,9 @@ func (mb *mutationBuilder) buildInsert(returning *tree.ReturningExprs) {
// buildInputForDoNothing wraps the input expression in ANTI JOIN expressions,
// one for each arbiter on the target table. See the comment header for
// Builder.buildInsert for an example.
func (mb *mutationBuilder) buildInputForDoNothing(inScope *scope, onConflict *tree.OnConflict) {
func (mb *mutationBuilder) buildInputForDoNothing(
inScope *scope, texpr tree.TableExpr, onConflict *tree.OnConflict,
) {
// Determine the set of arbiter indexes and constraints to use to check for
// conflicts.
mb.arbiters = mb.findArbiters(onConflict)
Expand All @@ -714,7 +716,7 @@ func (mb *mutationBuilder) buildInputForDoNothing(inScope *scope, onConflict *tr

// Create an anti-join for each arbiter.
mb.arbiters.ForEach(func(name string, conflictOrds intsets.Fast, pred tree.Expr, canaryOrd int) {
mb.buildAntiJoinForDoNothingArbiter(inScope, conflictOrds, pred)
mb.buildAntiJoinForDoNothingArbiter(inScope, texpr, conflictOrds, pred)
})

// Create an UpsertDistinctOn for each arbiter. This must happen after all
Expand Down Expand Up @@ -746,7 +748,7 @@ func (mb *mutationBuilder) buildInputForDoNothing(inScope *scope, onConflict *tr
// given insert row conflicts with an existing row in the table. If it is null,
// then there is no conflict.
func (mb *mutationBuilder) buildInputForUpsert(
inScope *scope, onConflict *tree.OnConflict, whereClause *tree.Where,
inScope *scope, texpr tree.TableExpr, onConflict *tree.OnConflict, whereClause *tree.Where,
) {
// Determine the set of arbiter indexes and constraints to use to check for
// conflicts.
Expand Down Expand Up @@ -796,7 +798,7 @@ func (mb *mutationBuilder) buildInputForUpsert(

// Create a left-join for the arbiter.
mb.buildLeftJoinForUpsertArbiter(
inScope, conflictOrds, pred,
inScope, texpr, conflictOrds, pred,
)

// Record a not-null "canary" column. After the left-join, this will be
Expand Down

0 comments on commit e38b7be

Please sign in to comment.