Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove more old code of projection analysis #55579

Merged
merged 16 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1583,22 +1583,17 @@ bool Context::hasScalar(const String & name) const
void Context::addQueryAccessInfo(
const String & quoted_database_name,
const String & full_quoted_table_name,
const Names & column_names,
const String & projection_name,
const String & view_name)
const Names & column_names)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");

std::lock_guard lock(query_access_info.mutex);
query_access_info.databases.emplace(quoted_database_name);
query_access_info.tables.emplace(full_quoted_table_name);

for (const auto & column_name : column_names)
query_access_info.columns.emplace(full_quoted_table_name + "." + backQuoteIfNeed(column_name));
if (!projection_name.empty())
query_access_info.projections.emplace(full_quoted_table_name + "." + backQuoteIfNeed(projection_name));
if (!view_name.empty())
query_access_info.views.emplace(view_name);
}

void Context::addQueryAccessInfo(const Names & partition_names)
Expand All @@ -1611,6 +1606,15 @@ void Context::addQueryAccessInfo(const Names & partition_names)
query_access_info.partitions.emplace(partition_name);
}

void Context::addViewAccessInfo(const String & view_name)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");

std::lock_guard<std::mutex> lock(query_access_info.mutex);
query_access_info.views.emplace(view_name);
}

void Context::addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name)
{
if (!qualified_projection_name)
Expand Down
9 changes: 5 additions & 4 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -693,22 +693,23 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
void addSpecialScalar(const String & name, const Block & block);

const QueryAccessInfo & getQueryAccessInfo() const { return query_access_info; }

void addQueryAccessInfo(
const String & quoted_database_name,
const String & full_quoted_table_name,
const Names & column_names,
const String & projection_name = {},
const String & view_name = {});
const Names & column_names);

void addQueryAccessInfo(const Names & partition_names);
void addViewAccessInfo(const String & view_name);

struct QualifiedProjectionName
{
StorageID storage_id = StorageID::createEmpty();
String projection_name;
explicit operator bool() const { return !projection_name.empty(); }
};
void addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name);

void addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name);

/// Supported factories for records in query_log
enum class QueryLogFactories
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
join_element.table_expression,
context,
original_right_column_names,
query_options.copy().setWithAllColumns().ignoreProjections(false).ignoreAlias(false));
query_options.copy().setWithAllColumns().ignoreAlias(false));
auto joined_plan = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*joined_plan);
{
Expand Down
162 changes: 27 additions & 135 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (!prepared_sets)
prepared_sets = std::make_shared<PreparedSets>();

query_info.ignore_projections = options.ignore_projections;
query_info.is_projection_query = options.is_projection_query;
query_info.is_internal = options.is_internal;

initSettings();
Expand All @@ -416,7 +414,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}

query_info.query = query_ptr->clone();
query_info.original_query = query_ptr->clone();

if (settings.count_distinct_optimization)
{
Expand Down Expand Up @@ -855,9 +852,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
analysis_result.required_columns = required_columns;
}

if (query_info.projection)
storage_snapshot->addProjection(query_info.projection->desc);

/// Blocks used in expression analysis contains size 1 const columns for constant folding and
/// null non-const columns to avoid useless memory allocations. However, a valid block sample
/// requires all columns to be of size 0, thus we need to sanitize the block here.
Expand Down Expand Up @@ -964,10 +958,7 @@ void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
executeImpl(query_plan, std::move(input_pipe));

/// We must guarantee that result structure is the same as in getSampleBlock()
///
/// But if it's a projection query, plan header does not match result_header.
/// TODO: add special stage for InterpreterSelectQuery?
if (!options.is_projection_query && !blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
Expand Down Expand Up @@ -1475,12 +1466,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (expressions.hasHaving() && query.group_by_with_totals && (query.group_by_with_rollup || query.group_by_with_cube))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING");

if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
query_info.projection->aggregate_overflow_row = aggregate_overflow_row;
query_info.projection->aggregate_final = aggregate_final;
}

if (options.only_analyze)
{
auto read_nothing = std::make_unique<ReadNothingStep>(source_header);
Expand Down Expand Up @@ -1549,11 +1534,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
LOG_TRACE(log, "{} -> {}", QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage));
}

if (query_info.projection && query_info.projection->input_order_info && query_info.input_order_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "InputOrderInfo is set for projection and for query");
InputOrderInfoPtr input_order_info_for_order;
if (!expressions.need_aggregate)
input_order_info_for_order = query_info.projection ? query_info.projection->input_order_info : query_info.input_order_info;
input_order_info_for_order = query_info.input_order_info;

if (options.to_stage > QueryProcessingStage::FetchColumns)
{
Expand Down Expand Up @@ -1614,7 +1597,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
{
// If there is a storage that supports prewhere, this will always be nullptr
// Thus, we don't actually need to check if projection is active.
if (!query_info.projection && expressions.filter_info)
if (expressions.filter_info)
{
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
Expand Down Expand Up @@ -1788,7 +1771,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
}
}

if (!query_info.projection && expressions.hasWhere())
if (expressions.hasWhere())
executeWhere(query_plan, expressions.before_where, expressions.remove_where_filter);

if (expressions.need_aggregate)
Expand Down Expand Up @@ -2056,15 +2039,13 @@ static void executeMergeAggregatedImpl(
query_plan.addStep(std::move(merging_aggregated));
}

void InterpreterSelectQuery::addEmptySourceToQueryPlan(
QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info, const ContextPtr & context_)
void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info)
{
Pipe pipe(std::make_shared<NullSource>(source_header));

PrewhereInfoPtr prewhere_info_ptr = query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info;
if (prewhere_info_ptr)
if (query_info.prewhere_info)
{
auto & prewhere_info = *prewhere_info_ptr;
auto & prewhere_info = *query_info.prewhere_info;

if (prewhere_info.row_level_filter)
{
Expand All @@ -2087,50 +2068,6 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
read_from_pipe->setStepDescription("Read from NullSource");
query_plan.addStep(std::move(read_from_pipe));

if (query_info.projection)
{
if (query_info.projection->before_where)
{
auto where_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
query_info.projection->before_where,
query_info.projection->where_column_name,
query_info.projection->remove_where_filter);

where_step->setStepDescription("WHERE");
query_plan.addStep(std::move(where_step));
}

if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
if (query_info.projection->before_aggregation)
{
auto expression_before_aggregation
= std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), query_info.projection->before_aggregation);
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));
}

// Let's just choose the safe option since we don't know the value of `to_stage` here.
const bool should_produce_results_in_order_of_bucket_number = true;

// It is used to determine if we should use memory bound merging strategy. Maybe it makes sense for projections, but so far this case is just left untouched.
SortDescription group_by_sort_description;

executeMergeAggregatedImpl(
query_plan,
query_info.projection->aggregate_overflow_row,
query_info.projection->aggregate_final,
false,
false,
context_->getSettingsRef(),
query_info.projection->aggregation_keys,
query_info.projection->aggregate_descriptions,
should_produce_results_in_order_of_bucket_number,
std::move(group_by_sort_description));
}
}
}

RowPolicyFilterPtr InterpreterSelectQuery::getRowPolicyFilter() const
Expand Down Expand Up @@ -2574,80 +2511,47 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc

/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.
if ((optimize_read_in_order || optimize_aggregation_in_order)
&& (!query_info.projection || query_info.projection->complete))
if (optimize_read_in_order)
{
if (optimize_read_in_order)
{
if (query_info.projection)
{
query_info.projection->order_optimizer = std::make_shared<ReadInOrderOptimizer>(
// TODO Do we need a projection variant for this field?
query,
analysis_result.order_by_elements_actions,
getSortDescription(query, context),
query_info.syntax_analyzer_result);
}
else
{
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
analysis_result.order_by_elements_actions,
getSortDescription(query, context),
query_info.syntax_analyzer_result);
}
}
else if (optimize_aggregation_in_order)
{
if (query_info.projection)
{
query_info.projection->order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
query_info.projection->group_by_elements_actions,
query_info.projection->group_by_elements_order_descr,
query_info.syntax_analyzer_result);
}
else
{
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
analysis_result.group_by_elements_actions,
getSortDescriptionFromGroupBy(query),
query_info.syntax_analyzer_result);
}
}
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
analysis_result.order_by_elements_actions,
getSortDescription(query, context),
query_info.syntax_analyzer_result);

/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (query.hasFiltration() || query.groupBy()) ? 0 : getLimitForSorting(query, context);
if (query_info.projection)
query_info.projection->input_order_info
= query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context, limit);
else
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit);
UInt64 limit = query.hasFiltration() ? 0 : getLimitForSorting(query, context);
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit);
}
else if (optimize_aggregation_in_order)
{
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
analysis_result.group_by_elements_actions,
getSortDescriptionFromGroupBy(query),
query_info.syntax_analyzer_result);

query_info.storage_limits = std::make_shared<StorageLimitsList>(storage_limits);
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, /*limit=*/ 0);
}

query_info.storage_limits = std::make_shared<StorageLimitsList>(storage_limits);
query_info.settings_limit_offset_done = options.settings_limit_offset_done;
storage->read(query_plan, required_columns, storage_snapshot, query_info, context, processing_stage, max_block_size, max_streams);

if (context->hasQueryContext() && !options.is_internal)
{
const String view_name{};
auto local_storage_id = storage->getStorageID();
context->getQueryContext()->addQueryAccessInfo(
backQuoteIfNeed(local_storage_id.getDatabaseName()),
local_storage_id.getFullTableName(),
required_columns,
query_info.projection ? query_info.projection->desc->name : "",
view_name);
required_columns);
}

/// Create step which reads from empty source if storage has no data.
if (!query_plan.isInitialized())
{
auto header = storage_snapshot->getSampleBlockForColumns(required_columns);
addEmptySourceToQueryPlan(query_plan, header, query_info, context);
addEmptySourceToQueryPlan(query_plan, header, query_info);
}
}
else
Expand Down Expand Up @@ -2756,13 +2660,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));

if (options.is_projection_query)
return;

AggregateDescriptions aggregates = query_analyzer->aggregates();

const Settings & settings = context->getSettingsRef();

const auto & keys = query_analyzer->aggregationKeys().getNames();

auto aggregator_params = getAggregatorParams(
Expand Down Expand Up @@ -2826,13 +2725,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac

void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final, bool has_grouping_sets)
{
/// If aggregate projection was chosen for table, avoid adding MergeAggregated.
/// It is already added by storage (because of performance issues).
/// TODO: We should probably add another one processing stage for storage?
/// WithMergeableStateAfterAggregation is not ok because, e.g., it skips sorting after aggregation.
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
return;

const Settings & settings = context->getSettingsRef();

/// Used to determine if we should use memory bound merging strategy.
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery
bool hasAggregation() const { return query_analyzer->hasAggregation(); }

static void addEmptySourceToQueryPlan(
QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info, const ContextPtr & context_);
QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info);

Names getRequiredColumns() { return required_columns; }

Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/MutationsInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ bool isStorageTouchedByMutations(
if (context->getSettingsRef().allow_experimental_analyzer)
{
auto select_query_tree = prepareQueryAffectedQueryTree(commands, storage.shared_from_this(), context);
InterpreterSelectQueryAnalyzer interpreter(select_query_tree, context, SelectQueryOptions().ignoreLimits().ignoreProjections());
InterpreterSelectQueryAnalyzer interpreter(select_query_tree, context, SelectQueryOptions().ignoreLimits());
io = interpreter.execute();
}
else
Expand All @@ -200,7 +200,7 @@ bool isStorageTouchedByMutations(
/// For some reason it may copy context and give it into ExpressionTransform
/// after that we will use context from destroyed stack frame in our stream.
interpreter_select_query.emplace(
select_query, context, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections());
select_query, context, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits());

io = interpreter_select_query->execute();
}
Expand Down Expand Up @@ -404,7 +404,7 @@ MutationsInterpreter::MutationsInterpreter(
, available_columns(std::move(available_columns_))
, context(Context::createCopy(context_))
, settings(std::move(settings_))
, select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits().ignoreProjections())
, select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits())
{
prepare(!settings.can_execute);
}
Expand Down
Loading
Loading