Skip to content

Commit

Permalink
Analyzer support recursive CTEs
Browse files Browse the repository at this point in the history
  • Loading branch information
kitaisreal committed Mar 28, 2024
1 parent b5b347e commit 6afea0f
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 28 deletions.
135 changes: 131 additions & 4 deletions src/Analyzer/Passes/QueryAnalysisPass.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <Analyzer/Passes/QueryAnalysisPass.h>

#include <boost/algorithm/string.hpp>

#include <Common/checkStackSize.h>
#include <Common/NamePrompter.h>
#include <Common/ProfileEvents.h>
Expand Down Expand Up @@ -53,6 +55,13 @@
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>

#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sources/BlocksListSource.h>

#include <QueryPipeline/QueryPipelineBuilder.h">

#include <Planner/Utils.h>

#include <Analyzer/createUniqueTableAliases.h>
#include <Analyzer/Utils.h>
Expand Down Expand Up @@ -82,7 +91,6 @@
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/Identifier.h>

#include <boost/algorithm/string.hpp>

namespace ProfileEvents
{
Expand Down Expand Up @@ -1234,6 +1242,8 @@ class QueryAnalyzer

void evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & query_tree_node, IdentifierResolveScope & scope);

[[maybe_unused]] BlocksList evaluateSubqueryIfNeeded(QueryTreeNodePtr & query_tree_node, IdentifierResolveScope & scope);

static void mergeWindowWithParentWindow(const QueryTreeNodePtr & window_node, const QueryTreeNodePtr & parent_window_node, IdentifierResolveScope & scope);

void replaceNodesWithPositionalArguments(QueryTreeNodePtr & node_list, const QueryTreeNodes & projection_nodes, IdentifierResolveScope & scope);
Expand Down Expand Up @@ -2197,6 +2207,48 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
node = std::move(get_scalar_function_node);
}

BlocksList QueryAnalyzer::evaluateSubqueryIfNeeded(QueryTreeNodePtr & query_tree_node, IdentifierResolveScope & scope)
{
auto * query_node = query_tree_node->as<QueryNode>();
auto * union_node = query_tree_node->as<UnionNode>();
if (!query_node && !union_node)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Node must have query or union type. Actual {} {}",
query_tree_node->getNodeTypeName(),
query_tree_node->formatASTForErrorMessage());

auto & context = scope.context;
auto subquery_context = Context::createCopy(context);

Settings subquery_settings = context->getSettings();
subquery_settings.extremes = false;
subquery_context->setSettings(subquery_settings);
/// When execute `INSERT INTO t WITH ... SELECT ...`, it may lead to `Unknown columns`
/// exception with this settings enabled(https://github.com/ClickHouse/ClickHouse/issues/52494).
subquery_context->setSetting("use_structure_from_insertion_table_in_table_functions", false);

auto options = SelectQueryOptions(QueryProcessingStage::Complete, scope.subquery_depth, true /*is_subquery*/);
auto interpreter = std::make_unique<InterpreterSelectQueryAnalyzer>(query_tree_node->toAST(), subquery_context, subquery_context->getViewSource(), options);

auto io = interpreter->execute();
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(context->getProgressCallback());
io.pipeline.setProcessListElement(context->getProcessListElement());

BlocksList result;
Block block;

while (executor.pull(block))
{
if (block.rows() == 0)
continue;

result.push_back(std::move(block));
}

return result;
}

void QueryAnalyzer::mergeWindowWithParentWindow(const QueryTreeNodePtr & window_node, const QueryTreeNodePtr & parent_window_node, IdentifierResolveScope & scope)
{
auto & window_node_typed = window_node->as<WindowNode &>();
Expand Down Expand Up @@ -3855,8 +3907,9 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromJoinTreeNode(const Ident
default:
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Scope FROM section expected table, table function, query, union, join or array join. Actual {}. In scope {}",
"Scope FROM section expected table, table function, query, union, join or array join. Actual {} type {}. In scope {}",
join_tree_node->formatASTForErrorMessage(),
toString(join_tree_node->getNodeType()),
scope.scope_node->formatASTForErrorMessage());
}
}
Expand Down Expand Up @@ -7887,7 +7940,7 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (query_node_typed.hasOffset())
visitor.visit(query_node_typed.getOffset());

/// Register CTE subqueries and remove them from WITH section
/// Register CTE subqueries, evaluate recursive CTE subqueries and remove them from WITH section

auto & with_nodes = query_node_typed.getWith().getNodes();

Expand All @@ -7903,12 +7956,86 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier

const auto & cte_name = subquery_node ? subquery_node->getCTEName() : union_node->getCTEName();

auto [_, inserted] = scope.cte_name_to_query_node.emplace(cte_name, node);
auto [it, inserted] = scope.cte_name_to_query_node.emplace(cte_name, node);
if (!inserted)
throw Exception(ErrorCodes::MULTIPLE_EXPRESSIONS_FOR_ALIAS,
"CTE with name {} already exists. In scope {}",
cte_name,
scope.scope_node->formatASTForErrorMessage());

if (union_node && union_node->isRecursiveCTE())
{
chassert(union_node->isCTE());
chassert(union_node->isCTE());

auto & non_recursive_query = union_node->getQueries().getNodes()[0];
auto & recursive_query = union_node->getQueries().getNodes()[1];

IdentifierResolveScope non_recursive_subquery_scope(non_recursive_query, &scope /*parent_scope*/);
non_recursive_subquery_scope.subquery_depth = scope.subquery_depth + 1;

std::cerr << "Try to resolve non recursive query\n" << non_recursive_query->dumpTree() << '\n';
resolveQuery(non_recursive_query, non_recursive_subquery_scope);
std::cerr << "Non recursive query\n" << non_recursive_query->dumpTree() << '\n';

auto & projection_columns = non_recursive_query->as<QueryNode &>().getProjectionColumns();

auto & context = non_recursive_subquery_scope.context;
auto external_storage_holder = TemporaryTableHolder(
context,
ColumnsDescription{NamesAndTypesList{projection_columns.begin(), projection_columns.end()}},
ConstraintsDescription{},
nullptr /*query*/,
true /*create_for_global_subquery*/);

StoragePtr external_storage = external_storage_holder.getTable();

auto subquery_context = Context::createCopy(context);
subquery_context->addExternalTable(union_node->getCTEName(), std::move(external_storage_holder));

IdentifierResolveScope recursive_subquery_scope(recursive_query, &scope /*parent_scope*/);
recursive_subquery_scope.context = subquery_context;
recursive_subquery_scope.subquery_depth = scope.subquery_depth + 1;

auto temporary_table_expression_node = std::make_shared<TableNode>(external_storage, subquery_context);
temporary_table_expression_node->setTemporaryTableName(union_node->getCTEName());

recursive_subquery_scope.expression_argument_name_to_node[union_node->getCTEName()] = temporary_table_expression_node;

std::cerr << "Try to resolve recursive query\n" << recursive_query->dumpTree() << '\n';
resolveQuery(recursive_query, recursive_subquery_scope);
std::cerr << "Recursive query\n" << recursive_query->dumpTree() << '\n';

size_t recursive_step = 0;

while (true)
{
if (recursive_step > 100)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Too deep recusion durings");
++recursive_step;

auto & query_to_execute = recursive_step > 0 ? recursive_query : non_recursive_query;
auto & query_scope = recursive_step > 0 ? recursive_subquery_scope : non_recursive_subquery_scope;

auto result_blocks = evaluateSubqueryIfNeeded(query_to_execute, query_scope);
if (result_blocks.empty())
break;

Pipe source(std::make_shared<BlocksListSource>(std::move(result_blocks)));

QueryPipelineBuilder builder;
builder.init(std::move(source));

auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
pipeline.complete(external_storage->write({}, external_storage->getInMemoryMetadataPtr(), context, /*async_insert=*/false));

CompletedPipelineExecutor executor(pipeline);
executor.execute();
}

it->second = buildSubqueryToReadColumnsFromTableExpression(projection_columns, temporary_table_expression_node, subquery_context);
std::cerr << "Result subquery " << it->second->dumpTree() << '\n';
}
}

/** WITH section can be safely removed, because WITH section only can provide aliases to query expressions
Expand Down
13 changes: 13 additions & 0 deletions src/Analyzer/QueryNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ class QueryNode final : public IQueryTreeNode
cte_name = std::move(cte_name_value);
}

/// Returns true if query node is RECURSIVE CTE, false otherwise
bool isRecursiveCTE() const
{
return is_cte;
}

/// Set query node is RECURSIVE CTE
void setIsRecursiveCTE(bool is_recursive_cte_value)
{
is_recursive_cte = is_recursive_cte_value;
}

/// Returns true if query node has DISTINCT, false otherwise
bool isDistinct() const
{
Expand Down Expand Up @@ -600,6 +612,7 @@ class QueryNode final : public IQueryTreeNode
private:
bool is_subquery = false;
bool is_cte = false;
bool is_recursive_cte = false;
bool is_distinct = false;
bool is_limit_with_ties = false;
bool is_group_by_with_totals = false;
Expand Down

0 comments on commit 6afea0f

Please sign in to comment.