Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash in EXPLAIN PIPELINE for Merge over Distributed #48320

Merged
merged 1 commit into from
Apr 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/Interpreters/IInterpreterUnionOrSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ namespace DB
QueryPipelineBuilder IInterpreterUnionOrSelectQuery::buildQueryPipeline()
{
QueryPlan query_plan;
return buildQueryPipeline(query_plan);
}

QueryPipelineBuilder IInterpreterUnionOrSelectQuery::buildQueryPipeline(QueryPlan & query_plan)
{
buildQueryPlan(query_plan);

return std::move(*query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)));
}
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/IInterpreterUnionOrSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class IInterpreterUnionOrSelectQuery : public IInterpreter

virtual void buildQueryPlan(QueryPlan & query_plan) = 0;
QueryPipelineBuilder buildQueryPipeline();
QueryPipelineBuilder buildQueryPipeline(QueryPlan & query_plan);

virtual void ignoreWithTotals() = 0;

Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/InterpreterSelectQueryAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class InterpreterSelectQueryAnalyzer : public IInterpreter
void setProperClientInfo(size_t replica_number, size_t count_participating_replicas);

const Planner & getPlanner() const { return planner; }
Planner & getPlanner() { return planner; }

private:
ASTPtr query;
Expand Down
8 changes: 5 additions & 3 deletions src/Storages/StorageMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,14 +649,13 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
QueryProcessingStage::Complete,
storage_snapshot,
modified_query_info);

if (processed_stage <= storage_stage || (allow_experimental_analyzer && processed_stage == QueryProcessingStage::FetchColumns))
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);

/// Steps for reading from child tables should have the same lifetime as the current step
/// because `builder` can have references to them (mainly for EXPLAIN PIPELINE).
QueryPlan & plan = child_plans.emplace_back();

StorageView * view = dynamic_cast<StorageView *>(storage.get());
Expand Down Expand Up @@ -709,12 +708,15 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
modified_context->setSetting("max_threads", streams_num);
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);

QueryPlan & plan = child_plans.emplace_back();

if (allow_experimental_analyzer)
{
InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query_tree,
modified_context,
SelectQueryOptions(processed_stage).ignoreProjections());
builder = std::make_unique<QueryPipelineBuilder>(interpreter.buildQueryPipeline());
plan = std::move(interpreter.getPlanner()).extractQueryPlan();
}
else
{
Expand All @@ -723,7 +725,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
InterpreterSelectQuery interpreter{modified_query_info.query,
modified_context,
SelectQueryOptions(processed_stage).ignoreProjections()};
builder = std::make_unique<QueryPipelineBuilder>(interpreter.buildQueryPipeline());
builder = std::make_unique<QueryPipelineBuilder>(interpreter.buildQueryPipeline(plan));
}

/** Materialization is needed, since from distributed storage the constants come materialized.
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class ReadFromMerge final : public SourceStepWithFilter
StorageSnapshotPtr merge_storage_snapshot;

/// Store read plan for each child table.
/// It's needed to guarantee lifetime for child steps to be the same as for this step.
/// It's needed to guarantee lifetime for child steps to be the same as for this step (mainly for EXPLAIN PIPELINE).
std::vector<QueryPlan> child_plans;

SelectQueryInfo query_info;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
DROP TABLE IF EXISTS foo;
DROP TABLE IF EXISTS foo2;
DROP TABLE IF EXISTS foo2_dist;
DROP TABLE IF EXISTS merge1;

CREATE TABLE foo (`Id` Int32, `Val` Int32) ENGINE = MergeTree ORDER BY Id;
INSERT INTO foo SELECT number, number FROM numbers(100);

CREATE TABLE merge1 AS foo ENGINE = Merge(currentDatabase(), '^foo');
CREATE TABLE foo2 (`Id` Int32, `Val` Int32) ENGINE = MergeTree ORDER BY Id;
INSERT INTO foo2 SELECT number, number FROM numbers(100);
CREATE TABLE foo2_dist (`Id` UInt32, `Val` String) ENGINE = Distributed(test_shard_localhost, currentDatabase(), foo2);

CREATE TABLE merge1 AS foo ENGINE = Merge(currentDatabase(), '^(foo|foo2_dist)$');

EXPLAIN PIPELINE graph = 1, compact = 1 SELECT * FROM merge1 FORMAT Null;
EXPLAIN PIPELINE graph = 1, compact = 1 SELECT * FROM merge1 FORMAT Null SETTINGS allow_experimental_analyzer=1;