Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove more old code of projection analysis #55579

Merged
merged 16 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Analyzer/Passes/ShardNumColumnToFunctionPass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ShardNumColumnToFunctionVisitor : public InDepthQueryTreeVisitorWithContex
return;

const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
if (!storage->isVirtualColumn(column.name, storage_snapshot->getMetadataForQuery()))
if (!storage->isVirtualColumn(column.name, storage_snapshot->metadata))
return;

auto function_node = std::make_shared<FunctionNode>("shardNum");
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
join_element.table_expression,
context,
original_right_column_names,
query_options.copy().setWithAllColumns().ignoreProjections(false).ignoreAlias(false));
query_options.copy().setWithAllColumns().ignoreAlias(false));
auto joined_plan = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*joined_plan);
{
Expand Down
147 changes: 22 additions & 125 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (!prepared_sets)
prepared_sets = std::make_shared<PreparedSets>();

query_info.ignore_projections = options.ignore_projections;
query_info.is_projection_query = options.is_projection_query;
query_info.is_internal = options.is_internal;

initSettings();
Expand All @@ -413,7 +411,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}

query_info.query = query_ptr->clone();
query_info.original_query = query_ptr->clone();

if (settings.count_distinct_optimization)
{
Expand Down Expand Up @@ -854,9 +851,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
analysis_result.required_columns = required_columns;
}

if (query_info.projection)
storage_snapshot->addProjection(query_info.projection->desc);

/// Blocks used in expression analysis contains size 1 const columns for constant folding and
/// null non-const columns to avoid useless memory allocations. However, a valid block sample
/// requires all columns to be of size 0, thus we need to sanitize the block here.
Expand All @@ -868,10 +862,7 @@ void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
executeImpl(query_plan, std::move(input_pipe));

/// We must guarantee that result structure is the same as in getSampleBlock()
///
/// But if it's a projection query, plan header does not match result_header.
/// TODO: add special stage for InterpreterSelectQuery?
if (!options.is_projection_query && !blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
Expand Down Expand Up @@ -1370,12 +1361,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (expressions.hasHaving() && query.group_by_with_totals && (query.group_by_with_rollup || query.group_by_with_cube))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING");

if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
query_info.projection->aggregate_overflow_row = aggregate_overflow_row;
query_info.projection->aggregate_final = aggregate_final;
}

if (options.only_analyze)
{
auto read_nothing = std::make_unique<ReadNothingStep>(source_header);
Expand Down Expand Up @@ -1444,11 +1429,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
LOG_TRACE(log, "{} -> {}", QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage));
}

if (query_info.projection && query_info.projection->input_order_info && query_info.input_order_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "InputOrderInfo is set for projection and for query");
InputOrderInfoPtr input_order_info_for_order;
if (!expressions.need_aggregate)
input_order_info_for_order = query_info.projection ? query_info.projection->input_order_info : query_info.input_order_info;
input_order_info_for_order = query_info.input_order_info;

if (options.to_stage > QueryProcessingStage::FetchColumns)
{
Expand Down Expand Up @@ -1505,7 +1488,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
{
// If there is a storage that supports prewhere, this will always be nullptr
// Thus, we don't actually need to check if projection is active.
if (!query_info.projection && expressions.filter_info)
if (expressions.filter_info)
{
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
Expand Down Expand Up @@ -1679,7 +1662,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
}
}

if (!query_info.projection && expressions.hasWhere())
if (expressions.hasWhere())
executeWhere(query_plan, expressions.before_where, expressions.remove_where_filter);

if (expressions.need_aggregate)
Expand Down Expand Up @@ -1947,15 +1930,13 @@ static void executeMergeAggregatedImpl(
query_plan.addStep(std::move(merging_aggregated));
}

void InterpreterSelectQuery::addEmptySourceToQueryPlan(
QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info, const ContextPtr & context_)
void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info)
{
Pipe pipe(std::make_shared<NullSource>(source_header));

PrewhereInfoPtr prewhere_info_ptr = query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info;
if (prewhere_info_ptr)
if (query_info.prewhere_info)
{
auto & prewhere_info = *prewhere_info_ptr;
auto & prewhere_info = *query_info.prewhere_info;

if (prewhere_info.row_level_filter)
{
Expand All @@ -1978,50 +1959,6 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
read_from_pipe->setStepDescription("Read from NullSource");
query_plan.addStep(std::move(read_from_pipe));

if (query_info.projection)
{
if (query_info.projection->before_where)
{
auto where_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
query_info.projection->before_where,
query_info.projection->where_column_name,
query_info.projection->remove_where_filter);

where_step->setStepDescription("WHERE");
query_plan.addStep(std::move(where_step));
}

if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
if (query_info.projection->before_aggregation)
{
auto expression_before_aggregation
= std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), query_info.projection->before_aggregation);
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));
}

// Let's just choose the safe option since we don't know the value of `to_stage` here.
const bool should_produce_results_in_order_of_bucket_number = true;

// It is used to determine if we should use memory bound merging strategy. Maybe it makes sense for projections, but so far this case is just left untouched.
SortDescription group_by_sort_description;

executeMergeAggregatedImpl(
query_plan,
query_info.projection->aggregate_overflow_row,
query_info.projection->aggregate_final,
false,
false,
context_->getSettingsRef(),
query_info.projection->aggregation_keys,
query_info.projection->aggregate_descriptions,
should_produce_results_in_order_of_bucket_number,
std::move(group_by_sort_description));
}
}
}

RowPolicyFilterPtr InterpreterSelectQuery::getRowPolicyFilter() const
Expand Down Expand Up @@ -2428,56 +2365,28 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc

/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.
if ((optimize_read_in_order || optimize_aggregation_in_order)
&& (!query_info.projection || query_info.projection->complete))
if (optimize_read_in_order || optimize_aggregation_in_order)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if now would probably be easier to read if it was:

    if (optimize_read_in_order)
         ...
    else if (optimize_aggregation_in_order)
         ...

As both options are separate there is no need of doing (if (a || b) { if (a) else ...)

{
if (optimize_read_in_order)
{
if (query_info.projection)
{
query_info.projection->order_optimizer = std::make_shared<ReadInOrderOptimizer>(
// TODO Do we need a projection variant for this field?
query,
analysis_result.order_by_elements_actions,
getSortDescription(query, context),
query_info.syntax_analyzer_result);
}
else
{
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
analysis_result.order_by_elements_actions,
getSortDescription(query, context),
query_info.syntax_analyzer_result);
}
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
analysis_result.order_by_elements_actions,
getSortDescription(query, context),
query_info.syntax_analyzer_result);
}
else if (optimize_aggregation_in_order)
else
{
if (query_info.projection)
{
query_info.projection->order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
query_info.projection->group_by_elements_actions,
query_info.projection->group_by_elements_order_descr,
query_info.syntax_analyzer_result);
}
else
{
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
analysis_result.group_by_elements_actions,
getSortDescriptionFromGroupBy(query),
query_info.syntax_analyzer_result);
}
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
analysis_result.group_by_elements_actions,
getSortDescriptionFromGroupBy(query),
query_info.syntax_analyzer_result);
}

/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (query.hasFiltration() || query.groupBy()) ? 0 : getLimitForSorting(query, context);
if (query_info.projection)
query_info.projection->input_order_info
= query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context, limit);
else
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit);
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit);
}

query_info.storage_limits = std::make_shared<StorageLimitsList>(storage_limits);
Expand All @@ -2493,15 +2402,15 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
backQuoteIfNeed(local_storage_id.getDatabaseName()),
local_storage_id.getFullTableName(),
required_columns,
query_info.projection ? query_info.projection->desc->name : "",
/*projection_name=*/ "",
CurtizJ marked this conversation as resolved.
Show resolved Hide resolved
view_name);
}

/// Create step which reads from empty source if storage has no data.
if (!query_plan.isInitialized())
{
auto header = storage_snapshot->getSampleBlockForColumns(required_columns);
addEmptySourceToQueryPlan(query_plan, header, query_info, context);
addEmptySourceToQueryPlan(query_plan, header, query_info);
}
}
else
Expand Down Expand Up @@ -2609,13 +2518,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));

if (options.is_projection_query)
return;

AggregateDescriptions aggregates = query_analyzer->aggregates();

const Settings & settings = context->getSettingsRef();

const auto & keys = query_analyzer->aggregationKeys().getNames();

auto aggregator_params = getAggregatorParams(
Expand Down Expand Up @@ -2679,13 +2583,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac

void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final, bool has_grouping_sets)
{
/// If aggregate projection was chosen for table, avoid adding MergeAggregated.
/// It is already added by storage (because of performance issues).
/// TODO: We should probably add another one processing stage for storage?
/// WithMergeableStateAfterAggregation is not ok because, e.g., it skips sorting after aggregation.
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
return;

const Settings & settings = context->getSettingsRef();

/// Used to determine if we should use memory bound merging strategy.
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery
bool hasAggregation() const { return query_analyzer->hasAggregation(); }

static void addEmptySourceToQueryPlan(
QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info, const ContextPtr & context_);
QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info);

Names getRequiredColumns() { return required_columns; }

Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/MutationsInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ bool isStorageTouchedByMutations(
if (context->getSettingsRef().allow_experimental_analyzer)
{
auto select_query_tree = prepareQueryAffectedQueryTree(commands, storage.shared_from_this(), context);
InterpreterSelectQueryAnalyzer interpreter(select_query_tree, context, SelectQueryOptions().ignoreLimits().ignoreProjections());
InterpreterSelectQueryAnalyzer interpreter(select_query_tree, context, SelectQueryOptions().ignoreLimits());
io = interpreter.execute();
}
else
Expand All @@ -188,7 +188,7 @@ bool isStorageTouchedByMutations(
/// For some reason it may copy context and give it into ExpressionTransform
/// after that we will use context from destroyed stack frame in our stream.
interpreter_select_query.emplace(
select_query, context, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections());
select_query, context, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits());

io = interpreter_select_query->execute();
}
Expand Down Expand Up @@ -367,7 +367,7 @@ MutationsInterpreter::MutationsInterpreter(
, available_columns(std::move(available_columns_))
, context(Context::createCopy(context_))
, settings(std::move(settings_))
, select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits().ignoreProjections())
, select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits())
{
prepare(!settings.can_execute);
}
Expand Down
20 changes: 0 additions & 20 deletions src/Interpreters/SelectQueryOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ struct SelectQueryOptions
bool remove_duplicates = false;
bool ignore_quota = false;
bool ignore_limits = false;
/// This flag is needed to analyze query ignoring table projections.
/// It is needed because we build another one InterpreterSelectQuery while analyzing projections.
/// It helps to avoid infinite recursion.
bool ignore_projections = false;
/// This flag is also used for projection analysis.
/// It is needed because lazy normal projections require special planning in FetchColumns stage, such as adding WHERE transform.
/// It is also used to avoid adding aggregating step when aggregate projection is chosen.
bool is_projection_query = false;
/// This flag is needed for projection description.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the comments around ignore_ast_optimizations and ignore_setting_constraints related to projections?

Copy link
Member Author

@CurtizJ CurtizJ Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no, because we still use them in ProjectionDescription.

/// Otherwise, keys for GROUP BY may be removed as constants.
bool ignore_ast_optimizations = false;
Expand Down Expand Up @@ -119,18 +111,6 @@ struct SelectQueryOptions
return *this;
}

SelectQueryOptions & ignoreProjections(bool value = true)
{
ignore_projections = value;
return *this;
}

SelectQueryOptions & projectionQuery(bool value = true)
{
is_projection_query = value;
return *this;
}

SelectQueryOptions & ignoreAlias(bool value = true)
{
ignore_alias = value;
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/TreeRewriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ bool TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
{
for (const auto & name_type : storage_virtuals)
{
if (name_type.name == "_shard_num" && storage->isVirtualColumn("_shard_num", storage_snapshot->getMetadataForQuery()))
if (name_type.name == "_shard_num" && storage->isVirtualColumn("_shard_num", storage_snapshot->metadata))
{
has_virtual_shard_num = true;
break;
Expand Down
6 changes: 3 additions & 3 deletions src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,20 +391,20 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
/// We evaluate sampling for Merge lazily so we need to get all the columns
if (storage_snapshot->storage.getName() == "Merge")
{
const auto columns = storage_snapshot->getMetadataForQuery()->getColumns().getAll();
const auto columns = storage_snapshot->metadata->getColumns().getAll();
for (const auto & column : columns)
required_columns.insert(column.name);
}
else
{
auto columns_required_for_sampling = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForSampling();
auto columns_required_for_sampling = storage_snapshot->metadata->getColumnsRequiredForSampling();
required_columns.insert(columns_required_for_sampling.begin(), columns_required_for_sampling.end());
}
}

if (table_expression_modifiers->hasFinal())
{
auto columns_required_for_final = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForFinal();
auto columns_required_for_final = storage_snapshot->metadata->getColumnsRequiredForFinal();
required_columns.insert(columns_required_for_final.begin(), columns_required_for_final.end());
}
}
Expand Down