Skip to content

Commit

Permalink
opt: support recursive CTEs
Browse files Browse the repository at this point in the history
A recursive CTE is of the form:
```
WITH RECURSIVE cte AS (
    <initial query>
  UNION ALL
    <recursive query>
)
```

Recursive CTE evaluation (paraphrased from postgres docs):
 1. Evaluate the initial query; emit the results and also save them in
    a "working" table.
 2. So long as the working table is not empty:
    - evaluate the recursive query, substituting the current contents of
      the working table for the recursive self-reference;
    - emit all resulting rows, and save them as the next iteration's
      working table.

This change adds optimizer and execution support for recursive CTEs.

Some notes for the various components:

 - optbuilder:  We build the recursive query using a `cteSource` that
   corresponds to the "working table". This code turned out to be
   tricky, in part because we want to allow non-recursive CTEs even if
   RECURSIVE is used (which is necessary when defining multiple CTEs,
   some of which are not recursive).

 - execution: the execution operator is somewhat similar to
   `applyJoinNode` (but simpler). The execbuilder provides a
   callback that can be used to regenerate the right-hand-side plan
   each time; this callback takes a reference to the working table (as
   a `bufferNode`).

 - execbuilder: to implement the callback mentioned above, we create
   an "inner" builder inside the callback which uses the same factory
   but is otherwise independent from the "outer" builder.

Fixes #21085.

Release note (sql change): WITH RECURSIVE is now supported (only the
UNION ALL variant).
  • Loading branch information
RaduBerinde committed Oct 10, 2019
1 parent eecd7db commit 7bc04d7
Show file tree
Hide file tree
Showing 41 changed files with 1,300 additions and 133 deletions.
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/delete_stmt.bnf
@@ -1,2 +1,2 @@
delete_stmt ::=
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'DELETE' 'FROM' ( ( table_name opt_index_flags ) | ( table_name opt_index_flags ) table_alias_name | ( table_name opt_index_flags ) 'AS' table_alias_name ) ( ( 'WHERE' a_expr ) | ) ( sort_clause | ) ( limit_clause | ) ( 'RETURNING' target_list | 'RETURNING' 'NOTHING' | )
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'DELETE' 'FROM' ( ( table_name opt_index_flags ) | ( table_name opt_index_flags ) table_alias_name | ( table_name opt_index_flags ) 'AS' table_alias_name ) ( ( 'WHERE' a_expr ) | ) ( sort_clause | ) ( limit_clause | ) ( 'RETURNING' target_list | 'RETURNING' 'NOTHING' | )
4 changes: 2 additions & 2 deletions docs/generated/sql/bnf/insert_stmt.bnf
@@ -1,3 +1,3 @@
insert_stmt ::=
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'INSERT' 'INTO' ( table_name | table_name 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) ( 'RETURNING' ( ( target_elem ) ( ( ',' target_elem ) )* ) | 'RETURNING' 'NOTHING' | )
| ( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'INSERT' 'INTO' ( table_name | table_name 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) on_conflict ( 'RETURNING' ( ( target_elem ) ( ( ',' target_elem ) )* ) | 'RETURNING' 'NOTHING' | )
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'INSERT' 'INTO' ( table_name | table_name 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) ( 'RETURNING' ( ( target_elem ) ( ( ',' target_elem ) )* ) | 'RETURNING' 'NOTHING' | )
| ( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'INSERT' 'INTO' ( table_name | table_name 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) on_conflict ( 'RETURNING' ( ( target_elem ) ( ( ',' target_elem ) )* ) | 'RETURNING' 'NOTHING' | )
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/select_stmt.bnf
@@ -1,3 +1,3 @@
select_stmt ::=
( simple_select locking_clause | select_clause sort_clause locking_clause | select_clause ( sort_clause | ) ( limit_clause offset_clause | offset_clause limit_clause | limit_clause | offset_clause ) locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause sort_clause locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause ( sort_clause | ) ( limit_clause offset_clause | offset_clause limit_clause | limit_clause | offset_clause ) locking_clause )
( simple_select locking_clause | select_clause sort_clause locking_clause | select_clause ( sort_clause | ) ( limit_clause offset_clause | offset_clause limit_clause | limit_clause | offset_clause ) locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause sort_clause locking_clause | ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) select_clause ( sort_clause | ) ( limit_clause offset_clause | offset_clause limit_clause | limit_clause | offset_clause ) locking_clause )

1 change: 1 addition & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Expand Up @@ -1036,6 +1036,7 @@ opt_create_stats_options ::=

with_clause ::=
'WITH' cte_list
| 'WITH' 'RECURSIVE' cte_list

table_name_expr_with_index ::=
table_name opt_index_flags
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/update_stmt.bnf
@@ -1,2 +1,2 @@
update_stmt ::=
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'UPDATE' ( ( table_name opt_index_flags ) | ( table_name opt_index_flags ) table_alias_name | ( table_name opt_index_flags ) 'AS' table_alias_name ) 'SET' ( ( ( ( column_name '=' a_expr ) | ( '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' '=' ( '(' select_stmt ')' | ( '(' ')' | '(' ( a_expr | a_expr ',' | a_expr ',' ( ( a_expr ) ( ( ',' a_expr ) )* ) ) ')' ) ) ) ) ) ( ( ',' ( ( column_name '=' a_expr ) | ( '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' '=' ( '(' select_stmt ')' | ( '(' ')' | '(' ( a_expr | a_expr ',' | a_expr ',' ( ( a_expr ) ( ( ',' a_expr ) )* ) ) ')' ) ) ) ) ) )* ) opt_from_list ( ( 'WHERE' a_expr ) | ) ( sort_clause | ) ( limit_clause | ) ( 'RETURNING' target_list | 'RETURNING' 'NOTHING' | )
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'UPDATE' ( ( table_name opt_index_flags ) | ( table_name opt_index_flags ) table_alias_name | ( table_name opt_index_flags ) 'AS' table_alias_name ) 'SET' ( ( ( ( column_name '=' a_expr ) | ( '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' '=' ( '(' select_stmt ')' | ( '(' ')' | '(' ( a_expr | a_expr ',' | a_expr ',' ( ( a_expr ) ( ( ',' a_expr ) )* ) ) ')' ) ) ) ) ) ( ( ',' ( ( column_name '=' a_expr ) | ( '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' '=' ( '(' select_stmt ')' | ( '(' ')' | '(' ( a_expr | a_expr ',' | a_expr ',' ( ( a_expr ) ( ( ',' a_expr ) )* ) ) ')' ) ) ) ) ) )* ) opt_from_list ( ( 'WHERE' a_expr ) | ) ( sort_clause | ) ( limit_clause | ) ( 'RETURNING' target_list | 'RETURNING' 'NOTHING' | )
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/upsert_stmt.bnf
@@ -1,2 +1,2 @@
upsert_stmt ::=
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'UPSERT' 'INTO' ( table_name | table_name 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) ( 'RETURNING' target_list | 'RETURNING' 'NOTHING' | )
( ( 'WITH' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) | 'WITH' 'RECURSIVE' ( ( common_table_expr ) ( ( ',' common_table_expr ) )* ) ) | ) 'UPSERT' 'INTO' ( table_name | table_name 'AS' table_alias_name ) ( select_stmt | '(' ( ( ( column_name ) ) ( ( ',' ( column_name ) ) )* ) ')' select_stmt | 'DEFAULT' 'VALUES' ) ( 'RETURNING' target_list | 'RETURNING' 'NOTHING' | )
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/with_clause.bnf
@@ -1,2 +1,3 @@
with_clause ::=
'WITH' ( ( ( table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) 'AS' '(' preparable_stmt ')' ) ) ( ( ',' ( table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) 'AS' '(' preparable_stmt ')' ) ) )* ) ( insert_stmt | update_stmt | delete_stmt | upsert_stmt | select_stmt )
| 'WITH' 'RECURSIVE' ( ( ( table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) 'AS' '(' preparable_stmt ')' ) ) ( ( ',' ( table_alias_name ( '(' ( ( name ) ( ( ',' name ) )* ) ')' | ) 'AS' '(' preparable_stmt ')' ) ) )* ) ( insert_stmt | update_stmt | delete_stmt | upsert_stmt | select_stmt )
11 changes: 9 additions & 2 deletions pkg/sql/apply_join.go
Expand Up @@ -294,7 +294,15 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) {
func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planTop) error {
a.run.curRightRow = 0
a.run.rightRows.Clear(params.ctx)
rowResultWriter := NewRowResultWriter(a.run.rightRows)
return runPlanInsidePlan(params, plan, a.run.rightRows)
}

// runPlanInsidePlan is used to run a plan and gather the results in a row
// container, as part of the execution of an "outer" plan.
func runPlanInsidePlan(
params runParams, plan *planTop, rowContainer *rowcontainer.RowContainer,
) error {
rowResultWriter := NewRowResultWriter(rowContainer)
recv := MakeDistSQLReceiver(
params.ctx, rowResultWriter, tree.Rows,
params.extendedEvalCtx.ExecCfg.RangeDescriptorCache,
Expand Down Expand Up @@ -339,7 +347,6 @@ func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planTop) error
return recv.commErr
}
return rowResultWriter.err

}

func (a *applyJoinNode) Values() tree.Datums {
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/buffer.go
Expand Up @@ -67,11 +67,7 @@ func (n *bufferNode) Values() tree.Datums {

func (n *bufferNode) Close(ctx context.Context) {
n.plan.Close(ctx)
// It's valid to be Closed without startExec having been called, in which
// case n.bufferedRows will be nil.
if n.bufferedRows != nil {
n.bufferedRows.Close(ctx)
}
n.bufferedRows.Close(ctx)
}

// scanBufferNode behaves like an iterator into the bufferNode it is
Expand Down

0 comments on commit 7bc04d7

Please sign in to comment.