From fb666e137c86339b532f9becc956eb4ef60971d3 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 12 Oct 2023 23:25:20 +0000 Subject: [PATCH 1/8] remove old code of projection analysis --- src/Interpreters/ExpressionAnalyzer.cpp | 2 +- src/Interpreters/InterpreterSelectQuery.cpp | 147 ++------ src/Interpreters/InterpreterSelectQuery.h | 2 +- src/Interpreters/MutationsInterpreter.cpp | 6 +- src/Interpreters/SelectQueryOptions.h | 20 - src/Planner/Utils.cpp | 3 +- .../Optimizations/optimizeReadInOrder.cpp | 5 +- .../QueryPlan/ReadFromMergeTree.cpp | 37 +- src/Processors/QueryPlan/ReadFromMergeTree.h | 5 +- src/Storages/IStorage.cpp | 5 +- src/Storages/IStorage.h | 1 - src/Storages/MergeTree/MergeTreeData.cpp | 5 - .../MergeTree/MergeTreeDataSelectExecutor.cpp | 348 +----------------- .../MergeTree/MergeTreeDataSelectExecutor.h | 1 - src/Storages/NATS/StorageNATS.cpp | 2 +- src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 4 +- .../ReadFinalForExternalReplicaStorage.cpp | 2 +- src/Storages/SelectQueryInfo.h | 42 --- src/Storages/StorageBuffer.cpp | 8 +- src/Storages/StorageExecutable.cpp | 2 +- src/Storages/StorageExternalDistributed.cpp | 2 +- src/Storages/StorageMaterializedView.cpp | 4 - src/Storages/StorageMerge.cpp | 6 +- src/Storages/StorageMergeTree.cpp | 2 +- src/Storages/StorageProxy.h | 2 - src/Storages/StorageReplicatedMergeTree.cpp | 16 +- src/Storages/StorageReplicatedMergeTree.h | 2 - 27 files changed, 79 insertions(+), 602 deletions(-) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 5b26084e4402..b581d1523695 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1040,7 +1040,7 @@ static std::unique_ptr buildJoinedPlan( join_element.table_expression, context, original_right_column_names, - query_options.copy().setWithAllColumns().ignoreProjections(false).ignoreAlias(false)); + query_options.copy().setWithAllColumns().ignoreAlias(false)); auto joined_plan = std::make_unique(); interpreter->buildQueryPlan(*joined_plan); { diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 5e7ece5912f3..cb8653907d33 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -386,8 +386,6 @@ InterpreterSelectQuery::InterpreterSelectQuery( if (!prepared_sets) prepared_sets = std::make_shared(); - query_info.ignore_projections = options.ignore_projections; - query_info.is_projection_query = options.is_projection_query; query_info.is_internal = options.is_internal; initSettings(); @@ -413,7 +411,6 @@ InterpreterSelectQuery::InterpreterSelectQuery( } query_info.query = query_ptr->clone(); - query_info.original_query = query_ptr->clone(); if (settings.count_distinct_optimization) { @@ -854,9 +851,6 @@ InterpreterSelectQuery::InterpreterSelectQuery( analysis_result.required_columns = required_columns; } - if (query_info.projection) - storage_snapshot->addProjection(query_info.projection->desc); - /// Blocks used in expression analysis contains size 1 const columns for constant folding and /// null non-const columns to avoid useless memory allocations. However, a valid block sample /// requires all columns to be of size 0, thus we need to sanitize the block here. @@ -868,10 +862,7 @@ void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan) executeImpl(query_plan, std::move(input_pipe)); /// We must guarantee that result structure is the same as in getSampleBlock() - /// - /// But if it's a projection query, plan header does not match result_header. - /// TODO: add special stage for InterpreterSelectQuery? - if (!options.is_projection_query && !blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header)) + if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header)) { auto convert_actions_dag = ActionsDAG::makeConvertingActions( query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(), @@ -1370,12 +1361,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

desc->type == ProjectionDescription::Type::Aggregate) - { - query_info.projection->aggregate_overflow_row = aggregate_overflow_row; - query_info.projection->aggregate_final = aggregate_final; - } - if (options.only_analyze) { auto read_nothing = std::make_unique(source_header); @@ -1444,11 +1429,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

{}", QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage)); } - if (query_info.projection && query_info.projection->input_order_info && query_info.input_order_info) - throw Exception(ErrorCodes::LOGICAL_ERROR, "InputOrderInfo is set for projection and for query"); InputOrderInfoPtr input_order_info_for_order; if (!expressions.need_aggregate) - input_order_info_for_order = query_info.projection ? query_info.projection->input_order_info : query_info.input_order_info; + input_order_info_for_order = query_info.input_order_info; if (options.to_stage > QueryProcessingStage::FetchColumns) { @@ -1505,7 +1488,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

( query_plan.getCurrentDataStream(), @@ -1679,7 +1662,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

(source_header)); - PrewhereInfoPtr prewhere_info_ptr = query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info; - if (prewhere_info_ptr) + if (query_info.prewhere_info) { - auto & prewhere_info = *prewhere_info_ptr; + auto & prewhere_info = *query_info.prewhere_info; if (prewhere_info.row_level_filter) { @@ -1978,50 +1959,6 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan( auto read_from_pipe = std::make_unique(std::move(pipe)); read_from_pipe->setStepDescription("Read from NullSource"); query_plan.addStep(std::move(read_from_pipe)); - - if (query_info.projection) - { - if (query_info.projection->before_where) - { - auto where_step = std::make_unique( - query_plan.getCurrentDataStream(), - query_info.projection->before_where, - query_info.projection->where_column_name, - query_info.projection->remove_where_filter); - - where_step->setStepDescription("WHERE"); - query_plan.addStep(std::move(where_step)); - } - - if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate) - { - if (query_info.projection->before_aggregation) - { - auto expression_before_aggregation - = std::make_unique(query_plan.getCurrentDataStream(), query_info.projection->before_aggregation); - expression_before_aggregation->setStepDescription("Before GROUP BY"); - query_plan.addStep(std::move(expression_before_aggregation)); - } - - // Let's just choose the safe option since we don't know the value of `to_stage` here. - const bool should_produce_results_in_order_of_bucket_number = true; - - // It is used to determine if we should use memory bound merging strategy. Maybe it makes sense for projections, but so far this case is just left untouched. - SortDescription group_by_sort_description; - - executeMergeAggregatedImpl( - query_plan, - query_info.projection->aggregate_overflow_row, - query_info.projection->aggregate_final, - false, - false, - context_->getSettingsRef(), - query_info.projection->aggregation_keys, - query_info.projection->aggregate_descriptions, - should_produce_results_in_order_of_bucket_number, - std::move(group_by_sort_description)); - } - } } RowPolicyFilterPtr InterpreterSelectQuery::getRowPolicyFilter() const @@ -2428,56 +2365,28 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc /// Create optimizer with prepared actions. /// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge. - if ((optimize_read_in_order || optimize_aggregation_in_order) - && (!query_info.projection || query_info.projection->complete)) + if (optimize_read_in_order || optimize_aggregation_in_order) { if (optimize_read_in_order) { - if (query_info.projection) - { - query_info.projection->order_optimizer = std::make_shared( - // TODO Do we need a projection variant for this field? - query, - analysis_result.order_by_elements_actions, - getSortDescription(query, context), - query_info.syntax_analyzer_result); - } - else - { - query_info.order_optimizer = std::make_shared( - query, - analysis_result.order_by_elements_actions, - getSortDescription(query, context), - query_info.syntax_analyzer_result); - } + query_info.order_optimizer = std::make_shared( + query, + analysis_result.order_by_elements_actions, + getSortDescription(query, context), + query_info.syntax_analyzer_result); } - else if (optimize_aggregation_in_order) + else { - if (query_info.projection) - { - query_info.projection->order_optimizer = std::make_shared( - query, - query_info.projection->group_by_elements_actions, - query_info.projection->group_by_elements_order_descr, - query_info.syntax_analyzer_result); - } - else - { - query_info.order_optimizer = std::make_shared( - query, - analysis_result.group_by_elements_actions, - getSortDescriptionFromGroupBy(query), - query_info.syntax_analyzer_result); - } + query_info.order_optimizer = std::make_shared( + query, + analysis_result.group_by_elements_actions, + getSortDescriptionFromGroupBy(query), + query_info.syntax_analyzer_result); } /// If we don't have filtration, we can pushdown limit to reading stage for optimizations. UInt64 limit = (query.hasFiltration() || query.groupBy()) ? 0 : getLimitForSorting(query, context); - if (query_info.projection) - query_info.projection->input_order_info - = query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context, limit); - else - query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit); + query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit); } query_info.storage_limits = std::make_shared(storage_limits); @@ -2493,7 +2402,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc backQuoteIfNeed(local_storage_id.getDatabaseName()), local_storage_id.getFullTableName(), required_columns, - query_info.projection ? query_info.projection->desc->name : "", + /*projection_name=*/ "", view_name); } @@ -2501,7 +2410,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc if (!query_plan.isInitialized()) { auto header = storage_snapshot->getSampleBlockForColumns(required_columns); - addEmptySourceToQueryPlan(query_plan, header, query_info, context); + addEmptySourceToQueryPlan(query_plan, header, query_info); } } else @@ -2609,13 +2518,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac expression_before_aggregation->setStepDescription("Before GROUP BY"); query_plan.addStep(std::move(expression_before_aggregation)); - if (options.is_projection_query) - return; - AggregateDescriptions aggregates = query_analyzer->aggregates(); - const Settings & settings = context->getSettingsRef(); - const auto & keys = query_analyzer->aggregationKeys().getNames(); auto aggregator_params = getAggregatorParams( @@ -2679,13 +2583,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final, bool has_grouping_sets) { - /// If aggregate projection was chosen for table, avoid adding MergeAggregated. - /// It is already added by storage (because of performance issues). - /// TODO: We should probably add another one processing stage for storage? - /// WithMergeableStateAfterAggregation is not ok because, e.g., it skips sorting after aggregation. - if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate) - return; - const Settings & settings = context->getSettingsRef(); /// Used to determine if we should use memory bound merging strategy. diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index 41f43f4c4b42..9ea3972977fa 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -117,7 +117,7 @@ class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery bool hasAggregation() const { return query_analyzer->hasAggregation(); } static void addEmptySourceToQueryPlan( - QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info, const ContextPtr & context_); + QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info); Names getRequiredColumns() { return required_columns; } diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 2db4fce81f0d..3bcd1e247970 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -178,7 +178,7 @@ bool isStorageTouchedByMutations( if (context->getSettingsRef().allow_experimental_analyzer) { auto select_query_tree = prepareQueryAffectedQueryTree(commands, storage.shared_from_this(), context); - InterpreterSelectQueryAnalyzer interpreter(select_query_tree, context, SelectQueryOptions().ignoreLimits().ignoreProjections()); + InterpreterSelectQueryAnalyzer interpreter(select_query_tree, context, SelectQueryOptions().ignoreLimits()); io = interpreter.execute(); } else @@ -188,7 +188,7 @@ bool isStorageTouchedByMutations( /// For some reason it may copy context and give it into ExpressionTransform /// after that we will use context from destroyed stack frame in our stream. interpreter_select_query.emplace( - select_query, context, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections()); + select_query, context, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits()); io = interpreter_select_query->execute(); } @@ -367,7 +367,7 @@ MutationsInterpreter::MutationsInterpreter( , available_columns(std::move(available_columns_)) , context(Context::createCopy(context_)) , settings(std::move(settings_)) - , select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits().ignoreProjections()) + , select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits()) { prepare(!settings.can_execute); } diff --git a/src/Interpreters/SelectQueryOptions.h b/src/Interpreters/SelectQueryOptions.h index c91329c869c1..1e08aec38135 100644 --- a/src/Interpreters/SelectQueryOptions.h +++ b/src/Interpreters/SelectQueryOptions.h @@ -33,14 +33,6 @@ struct SelectQueryOptions bool remove_duplicates = false; bool ignore_quota = false; bool ignore_limits = false; - /// This flag is needed to analyze query ignoring table projections. - /// It is needed because we build another one InterpreterSelectQuery while analyzing projections. - /// It helps to avoid infinite recursion. - bool ignore_projections = false; - /// This flag is also used for projection analysis. - /// It is needed because lazy normal projections require special planning in FetchColumns stage, such as adding WHERE transform. - /// It is also used to avoid adding aggregating step when aggregate projection is chosen. - bool is_projection_query = false; /// This flag is needed for projection description. /// Otherwise, keys for GROUP BY may be removed as constants. bool ignore_ast_optimizations = false; @@ -119,18 +111,6 @@ struct SelectQueryOptions return *this; } - SelectQueryOptions & ignoreProjections(bool value = true) - { - ignore_projections = value; - return *this; - } - - SelectQueryOptions & projectionQuery(bool value = true) - { - is_projection_query = value; - return *this; - } - SelectQueryOptions & ignoreAlias(bool value = true) { ignore_alias = value; diff --git a/src/Planner/Utils.cpp b/src/Planner/Utils.cpp index 733db0f00bc2..ae94092de4bf 100644 --- a/src/Planner/Utils.cpp +++ b/src/Planner/Utils.cpp @@ -419,8 +419,7 @@ QueryTreeNodePtr buildSubqueryToReadColumnsFromTableExpression(const NamesAndTyp SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context) { SelectQueryInfo select_query_info; - select_query_info.original_query = queryNodeToSelectQuery(query_tree); - select_query_info.query = select_query_info.original_query; + select_query_info.query = queryNodeToSelectQuery(query_tree); select_query_info.query_tree = query_tree; select_query_info.planner_context = planner_context; return select_query_info; diff --git a/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp b/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp index 655cb1fdb808..c970589ea33d 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp @@ -1073,10 +1073,7 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, /// If we don't have filtration, we can pushdown limit to reading stage for optimizations. UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context); - auto order_info = order_optimizer->getInputOrder( - query_info.projection ? query_info.projection->desc->metadata : read_from_merge_tree->getStorageMetadata(), - context, - limit); + auto order_info = order_optimizer->getInputOrder(read_from_merge_tree->getStorageMetadata(), context, limit); if (order_info) { diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 10f51563e9b8..f743f9171f85 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -121,12 +121,6 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings( }; } -static const PrewhereInfoPtr & getPrewhereInfoFromQueryInfo(const SelectQueryInfo & query_info) -{ - return query_info.projection ? query_info.projection->prewhere_info - : query_info.prewhere_info; -} - static bool checkAllPartsOnRemoteFS(const RangesInDataParts & parts) { for (const auto & part : parts) @@ -253,7 +247,7 @@ ReadFromMergeTree::ReadFromMergeTree( bool enable_parallel_reading) : SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader( storage_snapshot_->getSampleBlockForColumns(real_column_names_), - getPrewhereInfoFromQueryInfo(query_info_), + query_info_.prewhere_info, data_.getPartitionValueType(), virt_column_names_)}) , reader_settings(getMergeTreeReaderSettings(context_, query_info_)) @@ -263,7 +257,7 @@ ReadFromMergeTree::ReadFromMergeTree( , virt_column_names(std::move(virt_column_names_)) , data(data_) , query_info(query_info_) - , prewhere_info(getPrewhereInfoFromQueryInfo(query_info)) + , prewhere_info(query_info_.prewhere_info) , actions_settings(ExpressionActionsSettings::fromContext(context_)) , storage_snapshot(std::move(storage_snapshot_)) , metadata_for_reading(storage_snapshot->getMetadataForQuery()) @@ -318,7 +312,7 @@ ReadFromMergeTree::ReadFromMergeTree( *output_stream, storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(), getSortDirection(), - query_info.getInputOrderInfo(), + query_info.input_order_info, prewhere_info); } @@ -1618,10 +1612,10 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl( result.total_marks_pk = total_marks_pk; result.selected_rows = sum_rows; - const auto & input_order_info = query_info.getInputOrderInfo(); - if (input_order_info) - result.read_type = (input_order_info->direction > 0) ? ReadType::InOrder - : ReadType::InReverseOrder; + if (query_info.input_order_info) + result.read_type = (query_info.input_order_info->direction > 0) + ? ReadType::InOrder + : ReadType::InReverseOrder; return std::make_shared(MergeTreeDataSelectAnalysisResult{.result = std::move(result)}); } @@ -1637,12 +1631,7 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, if (direction != 1 && query_info.isFinal()) return false; - auto order_info = std::make_shared(SortDescription{}, prefix_size, direction, limit); - if (query_info.projection) - query_info.projection->input_order_info = order_info; - else - query_info.input_order_info = order_info; - + query_info.input_order_info = std::make_shared(SortDescription{}, prefix_size, direction, limit); reader_settings.read_in_order = true; /// In case or read-in-order, don't create too many reading streams. @@ -1664,7 +1653,7 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, } if (!sort_description.empty()) { - const size_t used_prefix_of_sorting_key_size = order_info->used_prefix_of_sorting_key_size; + const size_t used_prefix_of_sorting_key_size = query_info.input_order_info->used_prefix_of_sorting_key_size; if (sort_description.size() > used_prefix_of_sorting_key_size) sort_description.resize(used_prefix_of_sorting_key_size); output_stream->sort_description = std::move(sort_description); @@ -1694,7 +1683,7 @@ void ReadFromMergeTree::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info *output_stream, storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(), getSortDirection(), - query_info.getInputOrderInfo(), + query_info.input_order_info, prewhere_info); } @@ -1789,8 +1778,6 @@ Pipe ReadFromMergeTree::spreadMarkRanges( RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, ActionsDAGPtr & result_projection) { const bool final = isQueryWithFinal(); - const auto & input_order_info = query_info.getInputOrderInfo(); - Names column_names_to_read = result.column_names_to_read; NameSet names(column_names_to_read.begin(), column_names_to_read.end()); @@ -1831,10 +1818,10 @@ Pipe ReadFromMergeTree::spreadMarkRanges( return spreadMarkRangesAmongStreamsFinal(std::move(parts_with_ranges), num_streams, result.column_names_to_read, column_names_to_read, result_projection); } - else if (input_order_info) + else if (query_info.input_order_info) { return spreadMarkRangesAmongStreamsWithOrder( - std::move(parts_with_ranges), num_streams, column_names_to_read, result_projection, input_order_info); + std::move(parts_with_ranges), num_streams, column_names_to_read, result_projection, query_info.input_order_info); } else { diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index ab8a37e03230..ee8f8895bed3 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -229,9 +229,8 @@ class ReadFromMergeTree final : public SourceStepWithFilter int getSortDirection() const { - const InputOrderInfoPtr & order_info = query_info.getInputOrderInfo(); - if (order_info) - return order_info->direction; + if (query_info.input_order_info) + return query_info.input_order_info->direction; return 1; } diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index ae7659e074f5..a435eb498d98 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -149,7 +149,7 @@ void IStorage::read( if (parallelize_output && parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < num_streams) pipe.resize(num_streams); - readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, context, getName()); + readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, getName()); } void IStorage::readFromPipe( @@ -158,13 +158,12 @@ void IStorage::readFromPipe( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, - ContextPtr context, std::string storage_name) { if (pipe.empty()) { auto header = storage_snapshot->getSampleBlockForColumns(column_names); - InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context); + InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info); } else { diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index fcf7675d15d6..eee7bf7aa1d5 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -681,7 +681,6 @@ class IStorage : public std::enable_shared_from_this, public TypePromo const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, - ContextPtr context, std::string storage_name); private: diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 889dcfa537f2..a4e88ebe936a 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6596,11 +6596,6 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage( return QueryProcessingStage::Enum::WithMergeableState; } - if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) - { - query_info.projection = std::nullopt; - } - return QueryProcessingStage::Enum::FetchColumns; } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 7e028f8c65cb..5c9226ec3706 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -124,22 +124,6 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTSampleRatio::Ra return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows)); } -static SortDescription getSortDescriptionFromGroupBy(const ASTSelectQuery & query) -{ - SortDescription order_descr; - order_descr.reserve(query.groupBy()->children.size()); - - for (const auto & elem : query.groupBy()->children) - { - /// Note, here aliases should not be used, since there will be no such column in a block. - String name = elem->getColumnNameWithoutAlias(); - order_descr.emplace_back(name, 1, 1); - } - - return order_descr; -} - - QueryPlanPtr MergeTreeDataSelectExecutor::read( const Names & column_names_to_return, const StorageSnapshotPtr & storage_snapshot, @@ -147,336 +131,32 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( ContextPtr context, const UInt64 max_block_size, const size_t num_streams, - QueryProcessingStage::Enum processed_stage, std::shared_ptr max_block_numbers_to_read, bool enable_parallel_reading) const { if (query_info.merge_tree_empty_result) return std::make_unique(); - const auto & settings = context->getSettingsRef(); - - const auto & metadata_for_reading = storage_snapshot->getMetadataForQuery(); - const auto & snapshot_data = assert_cast(*storage_snapshot->data); - const auto & parts = snapshot_data.parts; const auto & alter_conversions = snapshot_data.alter_conversions; - if (!query_info.projection) - { - auto step = readFromParts( - query_info.merge_tree_select_result_ptr ? MergeTreeData::DataPartsVector{} : parts, - query_info.merge_tree_select_result_ptr ? std::vector{} : alter_conversions, - column_names_to_return, - storage_snapshot, - query_info, - context, - max_block_size, - num_streams, - max_block_numbers_to_read, - query_info.merge_tree_select_result_ptr, - enable_parallel_reading); - - auto plan = std::make_unique(); - if (step) - plan->addStep(std::move(step)); - return plan; - } - - LOG_DEBUG( - log, - "Choose {} {} projection {}", - query_info.projection->complete ? "complete" : "incomplete", - query_info.projection->desc->type, - query_info.projection->desc->name); - - const ASTSelectQuery & select_query = query_info.query->as(); - QueryPlanResourceHolder resources; - - auto projection_plan = std::make_unique(); - if (query_info.projection->desc->is_minmax_count_projection) - { - Pipe pipe(std::make_shared(query_info.minmax_count_projection_block)); - auto read_from_pipe = std::make_unique(std::move(pipe)); - projection_plan->addStep(std::move(read_from_pipe)); - } - else if (query_info.projection->merge_tree_projection_select_result_ptr) - { - LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", ")); - projection_plan->addStep(readFromParts( - /*parts=*/ {}, - /*alter_conversions=*/ {}, - query_info.projection->required_columns, - storage_snapshot, - query_info, - context, - max_block_size, - num_streams, - max_block_numbers_to_read, - query_info.projection->merge_tree_projection_select_result_ptr, - enable_parallel_reading)); - } - - if (projection_plan->isInitialized()) - { - if (query_info.projection->before_where) - { - auto where_step = std::make_unique( - projection_plan->getCurrentDataStream(), - query_info.projection->before_where, - query_info.projection->where_column_name, - query_info.projection->remove_where_filter); - - where_step->setStepDescription("WHERE"); - projection_plan->addStep(std::move(where_step)); - } - - if (query_info.projection->before_aggregation) - { - auto expression_before_aggregation - = std::make_unique(projection_plan->getCurrentDataStream(), query_info.projection->before_aggregation); - expression_before_aggregation->setStepDescription("Before GROUP BY"); - projection_plan->addStep(std::move(expression_before_aggregation)); - } - - /// NOTE: input_order_info (for projection and not) is set only if projection is complete - if (query_info.has_order_by && !query_info.need_aggregate && query_info.projection->input_order_info) - { - chassert(query_info.projection->complete); - - SortDescription output_order_descr = InterpreterSelectQuery::getSortDescription(select_query, context); - UInt64 limit = InterpreterSelectQuery::getLimitForSorting(select_query, context); - - auto sorting_step = std::make_unique( - projection_plan->getCurrentDataStream(), - query_info.projection->input_order_info->sort_description_for_merging, - output_order_descr, - settings.max_block_size, - limit); - - sorting_step->setStepDescription("ORDER BY for projections"); - projection_plan->addStep(std::move(sorting_step)); - } - } - - auto ordinary_query_plan = std::make_unique(); - if (query_info.projection->merge_tree_normal_select_result_ptr) - { - auto storage_from_base_parts_of_projection - = std::make_shared(data, query_info.projection->merge_tree_normal_select_result_ptr); - auto interpreter = InterpreterSelectQuery( - query_info.query, - context, - storage_from_base_parts_of_projection, - nullptr, - SelectQueryOptions{processed_stage}.projectionQuery()); - - interpreter.buildQueryPlan(*ordinary_query_plan); - - const auto & expressions = interpreter.getAnalysisResult(); - if (processed_stage == QueryProcessingStage::Enum::FetchColumns && expressions.before_where) - { - auto where_step = std::make_unique( - ordinary_query_plan->getCurrentDataStream(), - expressions.before_where, - expressions.where_column_name, - expressions.remove_where_filter); - where_step->setStepDescription("WHERE"); - ordinary_query_plan->addStep(std::move(where_step)); - } - } - - Pipe projection_pipe; - Pipe ordinary_pipe; - if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate) - { - auto make_aggregator_params = [&](bool projection) - { - const auto & keys = query_info.projection->aggregation_keys.getNames(); - - AggregateDescriptions aggregates = query_info.projection->aggregate_descriptions; - - /// This part is hacky. - /// We want AggregatingTransform to work with aggregate states instead of normal columns. - /// It is almost the same, just instead of adding new data to aggregation state we merge it with existing. - /// - /// It is needed because data in projection: - /// * is not merged completely (we may have states with the same key in different parts) - /// * is not split into buckets (so if we just use MergingAggregated, it will use single thread) - const bool only_merge = projection; - - Aggregator::Params params( - keys, - aggregates, - query_info.projection->aggregate_overflow_row, - settings.max_rows_to_group_by, - settings.group_by_overflow_mode, - settings.group_by_two_level_threshold, - settings.group_by_two_level_threshold_bytes, - settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, - context->getTempDataOnDisk(), - settings.max_threads, - settings.min_free_disk_space_for_temporary_data, - settings.compile_aggregate_expressions, - settings.min_count_to_compile_aggregate_expression, - settings.max_block_size, - settings.enable_software_prefetch_in_aggregation, - only_merge, - settings.optimize_group_by_constant_keys); - - return std::make_pair(params, only_merge); - }; - - if (ordinary_query_plan->isInitialized() && projection_plan->isInitialized()) - { - auto projection_builder = projection_plan->buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); - projection_pipe = QueryPipelineBuilder::getPipe(std::move(*projection_builder), resources); - - auto ordinary_builder = ordinary_query_plan->buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); - ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*ordinary_builder), resources); - - /// Here we create shared ManyAggregatedData for both projection and ordinary data. - /// For ordinary data, AggregatedData is filled in a usual way. - /// For projection data, AggregatedData is filled by merging aggregation states. - /// When all AggregatedData is filled, we merge aggregation states together in a usual way. - /// Pipeline will look like: - /// ReadFromProjection -> Aggregating (only merge states) -> - /// ReadFromProjection -> Aggregating (only merge states) -> - /// ... -> Resize -> ConvertingAggregatedToChunks - /// ReadFromOrdinaryPart -> Aggregating (usual) -> (added by last Aggregating) - /// ReadFromOrdinaryPart -> Aggregating (usual) -> - /// ... - auto many_data = std::make_shared(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts()); - size_t counter = 0; - - AggregatorListPtr aggregator_list_ptr = std::make_shared(); - - /// TODO apply optimize_aggregation_in_order here too (like below) - auto build_aggregate_pipe = [&](Pipe & pipe, bool projection) - { - auto [params, only_merge] = make_aggregator_params(projection); - - AggregatingTransformParamsPtr transform_params = std::make_shared( - pipe.getHeader(), std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final); - - pipe.resize(pipe.numOutputPorts(), true, true); - - auto merge_threads = num_streams; - auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads - ? static_cast(settings.aggregation_memory_efficient_merge_threads) - : static_cast(settings.max_threads); - - pipe.addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads); - }); - }; - - if (!projection_pipe.empty()) - build_aggregate_pipe(projection_pipe, true); - if (!ordinary_pipe.empty()) - build_aggregate_pipe(ordinary_pipe, false); - } - else - { - auto add_aggregating_step = [&](QueryPlanPtr & query_plan, bool projection) - { - auto [params, only_merge] = make_aggregator_params(projection); - - auto merge_threads = num_streams; - auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads - ? static_cast(settings.aggregation_memory_efficient_merge_threads) - : static_cast(settings.max_threads); - - InputOrderInfoPtr group_by_info = query_info.projection->input_order_info; - SortDescription sort_description_for_merging; - SortDescription group_by_sort_description; - if (group_by_info && settings.optimize_aggregation_in_order) - { - group_by_sort_description = getSortDescriptionFromGroupBy(select_query); - sort_description_for_merging = group_by_info->sort_description_for_merging; - } - else - group_by_info = nullptr; - - // We don't have information regarding the `to_stage` of the query processing, only about `from_stage` (which is passed through `processed_stage` argument). - // Thus we cannot assign false here since it may be a query over distributed table. - const bool should_produce_results_in_order_of_bucket_number = true; - - auto aggregating_step = std::make_unique( - query_plan->getCurrentDataStream(), - std::move(params), - /* grouping_sets_params_= */ GroupingSetsParamsList{}, - query_info.projection->aggregate_final, - settings.max_block_size, - settings.aggregation_in_order_max_block_bytes, - merge_threads, - temporary_data_merge_threads, - /* storage_has_evenly_distributed_read_= */ false, - /* group_by_use_nulls */ false, - std::move(sort_description_for_merging), - std::move(group_by_sort_description), - should_produce_results_in_order_of_bucket_number, - settings.enable_memory_bound_merging_of_aggregation_results, - !group_by_info && settings.force_aggregation_in_order); - query_plan->addStep(std::move(aggregating_step)); - }; - - if (projection_plan->isInitialized()) - { - add_aggregating_step(projection_plan, true); - - auto projection_builder = projection_plan->buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); - projection_pipe = QueryPipelineBuilder::getPipe(std::move(*projection_builder), resources); - } - if (ordinary_query_plan->isInitialized()) - { - add_aggregating_step(ordinary_query_plan, false); - - auto ordinary_builder = ordinary_query_plan->buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); - ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*ordinary_builder), resources); - } - } - } - else - { - if (projection_plan->isInitialized()) - { - auto projection_builder = projection_plan->buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); - projection_pipe = QueryPipelineBuilder::getPipe(std::move(*projection_builder), resources); - } - - if (ordinary_query_plan->isInitialized()) - { - auto ordinary_builder = ordinary_query_plan->buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); - ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*ordinary_builder), resources); - } - } + auto step = readFromParts( + parts, + alter_conversions, + column_names_to_return, + storage_snapshot, + query_info, + context, + max_block_size, + num_streams, + max_block_numbers_to_read, + /*merge_tree_select_result_ptr=*/ nullptr, + enable_parallel_reading); - Pipes pipes; - pipes.emplace_back(std::move(projection_pipe)); - pipes.emplace_back(std::move(ordinary_pipe)); - auto pipe = Pipe::unitePipes(std::move(pipes)); auto plan = std::make_unique(); - if (pipe.empty()) - return plan; - - pipe.resize(1); - auto step = std::make_unique( - std::move(pipe), - fmt::format("MergeTree(with {} projection {})", query_info.projection->desc->type, query_info.projection->desc->name), - query_info.storage_limits); - plan->addStep(std::move(step)); - plan->addInterpreterContext(query_info.projection->context); + if (step) + plan->addStep(std::move(step)); return plan; } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index d5d8107db485..1e57fd71bd1e 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -34,7 +34,6 @@ class MergeTreeDataSelectExecutor ContextPtr context, UInt64 max_block_size, size_t num_streams, - QueryProcessingStage::Enum processed_stage, std::shared_ptr max_block_numbers_to_read = nullptr, bool enable_parallel_reading = false) const; diff --git a/src/Storages/NATS/StorageNATS.cpp b/src/Storages/NATS/StorageNATS.cpp index a3478069356c..92f4abdf390e 100644 --- a/src/Storages/NATS/StorageNATS.cpp +++ b/src/Storages/NATS/StorageNATS.cpp @@ -342,7 +342,7 @@ void StorageNATS::read( if (pipe.empty()) { auto header = storage_snapshot->getSampleBlockForColumns(column_names); - InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, local_context); + InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info); } else { diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index ec552dd10326..5a11bdf5fc0e 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -695,7 +695,7 @@ void StorageRabbitMQ::read( if (num_created_consumers == 0) { auto header = storage_snapshot->getSampleBlockForColumns(column_names); - InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, local_context); + InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info); return; } @@ -753,7 +753,7 @@ void StorageRabbitMQ::read( if (pipe.empty()) { auto header = storage_snapshot->getSampleBlockForColumns(column_names); - InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, local_context); + InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info); } else { diff --git a/src/Storages/ReadFinalForExternalReplicaStorage.cpp b/src/Storages/ReadFinalForExternalReplicaStorage.cpp index 28053c84e204..e1d52eefc205 100644 --- a/src/Storages/ReadFinalForExternalReplicaStorage.cpp +++ b/src/Storages/ReadFinalForExternalReplicaStorage.cpp @@ -64,7 +64,7 @@ void readFinalFromNestedStorage( if (!query_plan.isInitialized()) { - InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, nested_header, query_info, context); + InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, nested_header, query_info); return; } diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 6d52d45c6a9c..69dbb64db38d 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -9,7 +9,6 @@ #include #include #include -#include #include @@ -142,32 +141,6 @@ class IMergeTreeDataPart; using ManyExpressionActions = std::vector; -// The projection selected to execute current query -struct ProjectionCandidate -{ - ProjectionDescriptionRawPtr desc{}; - PrewhereInfoPtr prewhere_info; - ActionsDAGPtr before_where; - String where_column_name; - bool remove_where_filter = false; - ActionsDAGPtr before_aggregation; - Names required_columns; - NamesAndTypesList aggregation_keys; - AggregateDescriptions aggregate_descriptions; - bool aggregate_overflow_row = false; - bool aggregate_final = false; - bool complete = false; - ReadInOrderOptimizerPtr order_optimizer; - InputOrderInfoPtr input_order_info; - ManyExpressionActions group_by_elements_actions; - SortDescription group_by_elements_order_descr; - MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr; - MergeTreeDataSelectAnalysisResultPtr merge_tree_normal_select_result_ptr; - - /// Because projection analysis uses a separate interpreter. - ContextPtr context; -}; - /** Query along with some additional data, * that can be used during query processing * inside storage engines. @@ -180,7 +153,6 @@ struct SelectQueryInfo ASTPtr query; ASTPtr view_query; /// Optimized VIEW query - ASTPtr original_query; /// Unmodified query for projection analysis /// Query tree QueryTreeNodePtr query_tree; @@ -242,20 +214,11 @@ struct SelectQueryInfo ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; } - /// If not null, it means we choose a projection to execute current query. - std::optional projection; - bool ignore_projections = false; - bool is_projection_query = false; bool merge_tree_empty_result = false; bool settings_limit_offset_done = false; bool is_internal = false; - Block minmax_count_projection_block; - MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr; - bool parallel_replicas_disabled = false; - bool is_parameterized_view = false; - bool optimize_trivial_count = false; // If limit is not 0, that means it's a trivial limit query. @@ -264,11 +227,6 @@ struct SelectQueryInfo /// For IStorageSystemOneBlock std::vector columns_mask; - InputOrderInfoPtr getInputOrderInfo() const - { - return input_order_info ? input_order_info : (projection ? projection->input_order_info : nullptr); - } - bool isFinal() const; }; } diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index e011565edc17..b27b28e5a535 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -210,8 +210,6 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage( { if (auto destination = getDestinationTable()) { - /// TODO: Find a way to support projections for StorageBuffer - query_info.ignore_projections = true; const auto & destination_metadata = destination->getInMemoryMetadataPtr(); return destination->getQueryProcessingStage(local_context, to_stage, destination->getStorageSnapshot(destination_metadata, local_context), query_info); } @@ -335,12 +333,12 @@ void StorageBuffer::read( pipes_from_buffers.emplace_back(std::make_shared(column_names, buf, storage_snapshot)); pipe_from_buffers = Pipe::unitePipes(std::move(pipes_from_buffers)); - if (query_info.getInputOrderInfo()) + if (query_info.input_order_info) { /// Each buffer has one block, and it not guaranteed that rows in each block are sorted by order keys pipe_from_buffers.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, query_info.getInputOrderInfo()->sort_description_for_merging, 0); + return std::make_shared(header, query_info.input_order_info->sort_description_for_merging, 0); }); } } @@ -358,7 +356,7 @@ void StorageBuffer::read( /// TODO: Find a way to support projections for StorageBuffer auto interpreter = InterpreterSelectQuery( query_info.query, local_context, std::move(pipe_from_buffers), - SelectQueryOptions(processed_stage).ignoreProjections()); + SelectQueryOptions(processed_stage)); interpreter.addStorageLimits(*query_info.storage_limits); interpreter.buildQueryPlan(buffers_plan); } diff --git a/src/Storages/StorageExecutable.cpp b/src/Storages/StorageExecutable.cpp index df03301b5e89..ee3b63f85860 100644 --- a/src/Storages/StorageExecutable.cpp +++ b/src/Storages/StorageExecutable.cpp @@ -166,7 +166,7 @@ void StorageExecutable::read( } auto pipe = coordinator->createPipe(script_path, settings.script_arguments, std::move(inputs), std::move(sample_block), context, configuration); - IStorage::readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, context, getName()); + IStorage::readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, getName()); query_plan.addResources(std::move(resources)); } diff --git a/src/Storages/StorageExternalDistributed.cpp b/src/Storages/StorageExternalDistributed.cpp index d493fead9930..beb93afc9729 100644 --- a/src/Storages/StorageExternalDistributed.cpp +++ b/src/Storages/StorageExternalDistributed.cpp @@ -73,7 +73,7 @@ void StorageExternalDistributed::read( if (plans.empty()) { auto header = storage_snapshot->getSampleBlockForColumns(column_names); - InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context); + InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info); } if (plans.size() == 1) diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 7354dd565522..3c68219d6d4b 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -134,10 +134,6 @@ QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage( const StorageSnapshotPtr &, SelectQueryInfo & query_info) const { - /// TODO: Find a way to support projections for StorageMaterializedView. Why do we use different - /// metadata for materialized view and target table? If they are the same, we can get rid of all - /// converting and use it just like a normal view. - query_info.ignore_projections = true; const auto & target_metadata = getTargetTable()->getInMemoryMetadataPtr(); return getTargetTable()->getQueryProcessingStage(local_context, to_stage, getTargetTable()->getStorageSnapshot(target_metadata, local_context), query_info); } diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index b6253fa6daf7..ad1808d90a08 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -272,8 +272,6 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage( size_t selected_table_size = 0; - /// TODO: Find a way to support projections for StorageMerge - query_info.ignore_projections = true; for (const auto & iterator : database_table_iterators) { while (iterator->isValid()) @@ -711,7 +709,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( { InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query_tree, modified_context, - SelectQueryOptions(processed_stage).ignoreProjections()); + SelectQueryOptions(processed_stage)); builder = std::make_unique(interpreter.buildQueryPipeline()); plan = std::move(interpreter.getPlanner()).extractQueryPlan(); } @@ -721,7 +719,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( /// TODO: Find a way to support projections for StorageMerge InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, - SelectQueryOptions(processed_stage).ignoreProjections()}; + SelectQueryOptions(processed_stage)}; builder = std::make_unique(interpreter.buildQueryPipeline(plan)); } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 470e30b7947b..0f0093b6da9d 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -254,7 +254,7 @@ void StorageMergeTree::read( if (auto plan = reader.read( column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, - processed_stage, nullptr, enable_parallel_reading)) + nullptr, enable_parallel_reading)) query_plan = std::move(*plan); } diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index a4304faeaecf..2fb5bbde79bf 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -38,8 +38,6 @@ class StorageProxy : public IStorage const StorageSnapshotPtr &, SelectQueryInfo & info) const override { - /// TODO: Find a way to support projections for StorageProxy - info.ignore_projections = true; const auto & nested_metadata = getNested()->getInMemoryMetadataPtr(); return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getStorageSnapshot(nested_metadata, context), info); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 63f785a93e54..35d5925e465b 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5150,12 +5150,12 @@ void StorageReplicatedMergeTree::read( /// 2. Do not read parts that have not yet been written to the quorum of the replicas. /// For this you have to synchronously go to ZooKeeper. if (settings.select_sequential_consistency) - return readLocalSequentialConsistencyImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams); + return readLocalSequentialConsistencyImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams); if (local_context->canUseParallelReplicasOnInitiator()) return readParallelReplicasImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams); - readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams); + readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams); } void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl( @@ -5164,14 +5164,15 @@ void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl( const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, - QueryProcessingStage::Enum processed_stage, size_t max_block_size, size_t num_streams) { auto max_added_blocks = std::make_shared(getMaxAddedBlocks()); - auto plan = reader.read(column_names, storage_snapshot, query_info, local_context, - max_block_size, num_streams, processed_stage, std::move(max_added_blocks), - /* enable_parallel_reading= */false); + auto plan = reader.read( + column_names, storage_snapshot, query_info, local_context, + max_block_size, num_streams, std::move(max_added_blocks), + /* enable_parallel_reading=*/ false); + if (plan) query_plan = std::move(*plan); } @@ -5232,16 +5233,15 @@ void StorageReplicatedMergeTree::readLocalImpl( const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, - QueryProcessingStage::Enum processed_stage, const size_t max_block_size, const size_t num_streams) { auto plan = reader.read( column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, - processed_stage, /* max_block_numbers_to_read= */ nullptr, /* enable_parallel_reading= */ local_context->canUseParallelReplicasOnFollower()); + if (plan) query_plan = std::move(*plan); } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 794991d8e062..9e342c044bd4 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -564,7 +564,6 @@ class StorageReplicatedMergeTree final : public MergeTreeData const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, - QueryProcessingStage::Enum processed_stage, size_t max_block_size, size_t num_streams); @@ -574,7 +573,6 @@ class StorageReplicatedMergeTree final : public MergeTreeData const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, - QueryProcessingStage::Enum processed_stage, size_t max_block_size, size_t num_streams); From a01acf5d2a650fa7e7d4a2a7426fbdc29c5142c5 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 13 Oct 2023 03:18:43 +0000 Subject: [PATCH 2/8] remove projection from StorageSnapshot --- .../Passes/ShardNumColumnToFunctionPass.cpp | 2 +- src/Interpreters/TreeRewriter.cpp | 2 +- src/Planner/PlannerJoinTree.cpp | 6 +-- .../Optimizations/optimizePrewhere.cpp | 2 +- .../optimizeUseAggregateProjection.cpp | 42 +++++++++---------- .../optimizeUseNormalProjection.cpp | 24 +++++------ .../QueryPlan/ReadFromMergeTree.cpp | 11 ++--- .../QueryPlan/ReadFromPreparedSource.cpp | 7 +--- .../QueryPlan/ReadFromPreparedSource.h | 3 +- .../MergeTree/MergeTreeBlockReadUtils.cpp | 2 +- src/Storages/MergeTree/MergeTreeData.cpp | 2 +- src/Storages/StorageMerge.cpp | 2 +- src/Storages/StorageSnapshot.cpp | 10 ++--- src/Storages/StorageSnapshot.h | 8 ---- 14 files changed, 50 insertions(+), 73 deletions(-) diff --git a/src/Analyzer/Passes/ShardNumColumnToFunctionPass.cpp b/src/Analyzer/Passes/ShardNumColumnToFunctionPass.cpp index 52c30b7b35d1..f3ff7fe06b13 100644 --- a/src/Analyzer/Passes/ShardNumColumnToFunctionPass.cpp +++ b/src/Analyzer/Passes/ShardNumColumnToFunctionPass.cpp @@ -46,7 +46,7 @@ class ShardNumColumnToFunctionVisitor : public InDepthQueryTreeVisitorWithContex return; const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot(); - if (!storage->isVirtualColumn(column.name, storage_snapshot->getMetadataForQuery())) + if (!storage->isVirtualColumn(column.name, storage_snapshot->metadata)) return; auto function_node = std::make_shared("shardNum"); diff --git a/src/Interpreters/TreeRewriter.cpp b/src/Interpreters/TreeRewriter.cpp index 67187be962ca..6fe5ec2d55af 100644 --- a/src/Interpreters/TreeRewriter.cpp +++ b/src/Interpreters/TreeRewriter.cpp @@ -1114,7 +1114,7 @@ bool TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select { for (const auto & name_type : storage_virtuals) { - if (name_type.name == "_shard_num" && storage->isVirtualColumn("_shard_num", storage_snapshot->getMetadataForQuery())) + if (name_type.name == "_shard_num" && storage->isVirtualColumn("_shard_num", storage_snapshot->metadata)) { has_virtual_shard_num = true; break; diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index c95671da6be8..d0b850d987d3 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -391,20 +391,20 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info /// We evaluate sampling for Merge lazily so we need to get all the columns if (storage_snapshot->storage.getName() == "Merge") { - const auto columns = storage_snapshot->getMetadataForQuery()->getColumns().getAll(); + const auto columns = storage_snapshot->metadata->getColumns().getAll(); for (const auto & column : columns) required_columns.insert(column.name); } else { - auto columns_required_for_sampling = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForSampling(); + auto columns_required_for_sampling = storage_snapshot->metadata->getColumnsRequiredForSampling(); required_columns.insert(columns_required_for_sampling.begin(), columns_required_for_sampling.end()); } } if (table_expression_modifiers->hasFinal()) { - auto columns_required_for_final = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForFinal(); + auto columns_required_for_final = storage_snapshot->metadata->getColumnsRequiredForFinal(); required_columns.insert(columns_required_for_final.begin(), columns_required_for_final.end()); } } diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp index 3352567943ab..d79cf65c366b 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp @@ -137,7 +137,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes) if (table_expression_modifiers && table_expression_modifiers->hasSampleSizeRatio()) { - const auto & sampling_key = storage_snapshot->getMetadataForQuery()->getSamplingKey(); + const auto & sampling_key = storage_snapshot->metadata->getSamplingKey(); const auto & sampling_source_columns = sampling_key.expression->getRequiredColumnsWithTypes(); for (const auto & column : sampling_source_columns) required_columns_after_filter.push_back(ColumnWithTypeAndName(column.type, column.name)); diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp index 0599a0fa369d..3e9d3e06493c 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp @@ -612,6 +612,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & return false; } + Context::QualifiedProjectionName projection_name; QueryPlanStepPtr projection_reading; bool has_ordinary_parts; @@ -622,26 +623,21 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & // candidates.minmax_projection->block.dumpStructure()); Pipe pipe(std::make_shared(std::move(candidates.minmax_projection->block))); - projection_reading = std::make_unique( - std::move(pipe), - context, - query_info.is_internal - ? Context::QualifiedProjectionName{} - : Context::QualifiedProjectionName - { - .storage_id = reading->getMergeTreeData().getStorageID(), - .projection_name = candidates.minmax_projection->candidate.projection->name, - }); + projection_reading = std::make_unique(std::move(pipe)); has_ordinary_parts = !candidates.minmax_projection->normal_parts.empty(); if (has_ordinary_parts) reading->resetParts(std::move(candidates.minmax_projection->normal_parts)); + + projection_name = Context::QualifiedProjectionName + { + .storage_id = reading->getMergeTreeData().getStorageID(), + .projection_name = candidates.minmax_projection->candidate.projection->name, + }; } else { auto storage_snapshot = reading->getStorageSnapshot(); - auto proj_snapshot = std::make_shared( - storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); - proj_snapshot->addProjection(best_candidate->projection); + auto proj_snapshot = std::make_shared(storage_snapshot->storage, best_candidate->projection->metadata); auto query_info_copy = query_info; query_info_copy.prewhere_info = nullptr; @@ -663,23 +659,23 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & { auto header = proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames()); Pipe pipe(std::make_shared(std::move(header))); - projection_reading = std::make_unique( - std::move(pipe), - context, - query_info.is_internal - ? Context::QualifiedProjectionName{} - : Context::QualifiedProjectionName - { - .storage_id = reading->getMergeTreeData().getStorageID(), - .projection_name = best_candidate->projection->name, - }); + projection_reading = std::make_unique(std::move(pipe)); } has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr; if (has_ordinary_parts) reading->setAnalyzedResult(std::move(best_candidate->merge_tree_ordinary_select_result_ptr)); + + projection_name = Context::QualifiedProjectionName + { + .storage_id = reading->getMergeTreeData().getStorageID(), + .projection_name = best_candidate->projection->name, + }; } + if (!query_info.is_internal && context->hasQueryContext()) + context->getQueryContext()->addQueryAccessInfo(projection_name); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", // projection_reading->getOutputStream().header.dumpStructure()); diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp index 727afcb1a994..c326ff43c9e2 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp @@ -164,9 +164,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) } auto storage_snapshot = reading->getStorageSnapshot(); - auto proj_snapshot = std::make_shared( - storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data); - proj_snapshot->addProjection(best_candidate->projection); + auto proj_snapshot = std::make_shared(storage_snapshot->storage, best_candidate->projection->metadata); auto query_info_copy = query_info; query_info_copy.prewhere_info = nullptr; @@ -187,16 +185,16 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) if (!projection_reading) { Pipe pipe(std::make_shared(proj_snapshot->getSampleBlockForColumns(required_columns))); - projection_reading = std::make_unique( - std::move(pipe), - context, - query_info.is_internal - ? Context::QualifiedProjectionName{} - : Context::QualifiedProjectionName - { - .storage_id = reading->getMergeTreeData().getStorageID(), - .projection_name = best_candidate->projection->name, - }); + projection_reading = std::make_unique(std::move(pipe)); + } + + if (!query_info.is_internal && context->hasQueryContext()) + { + context->getQueryContext()->addQueryAccessInfo(Context::QualifiedProjectionName + { + .storage_id = reading->getMergeTreeData().getStorageID(), + .projection_name = best_candidate->projection->name, + }); } bool has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index f743f9171f85..e7943fca112d 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -260,7 +260,7 @@ ReadFromMergeTree::ReadFromMergeTree( , prewhere_info(query_info_.prewhere_info) , actions_settings(ExpressionActionsSettings::fromContext(context_)) , storage_snapshot(std::move(storage_snapshot_)) - , metadata_for_reading(storage_snapshot->getMetadataForQuery()) + , metadata_for_reading(storage_snapshot->metadata) , context(std::move(context_)) , block_size{ .max_block_size_rows = max_block_size_, @@ -310,7 +310,7 @@ ReadFromMergeTree::ReadFromMergeTree( updateSortDescriptionForOutputStream( *output_stream, - storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(), + storage_snapshot->metadata->getSortingKeyColumns(), getSortDirection(), query_info.input_order_info, prewhere_info); @@ -1681,7 +1681,7 @@ void ReadFromMergeTree::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info updateSortDescriptionForOutputStream( *output_stream, - storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(), + storage_snapshot->metadata->getSortingKeyColumns(), getSortDirection(), query_info.input_order_info, prewhere_info); @@ -1884,11 +1884,8 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons partition_names.emplace_back( fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id)); } - context->getQueryContext()->addQueryAccessInfo(partition_names); - if (storage_snapshot->projection) - context->getQueryContext()->addQueryAccessInfo( - Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name}); + context->getQueryContext()->addQueryAccessInfo(partition_names); } ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts); diff --git a/src/Processors/QueryPlan/ReadFromPreparedSource.cpp b/src/Processors/QueryPlan/ReadFromPreparedSource.cpp index a24c4dbe4d08..7446203ec358 100644 --- a/src/Processors/QueryPlan/ReadFromPreparedSource.cpp +++ b/src/Processors/QueryPlan/ReadFromPreparedSource.cpp @@ -4,19 +4,14 @@ namespace DB { -ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, ContextPtr context_, Context::QualifiedProjectionName qualified_projection_name_) +ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_) : ISourceStep(DataStream{.header = pipe_.getHeader()}) , pipe(std::move(pipe_)) - , context(std::move(context_)) - , qualified_projection_name(std::move(qualified_projection_name_)) { } void ReadFromPreparedSource::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - if (context && context->hasQueryContext()) - context->getQueryContext()->addQueryAccessInfo(qualified_projection_name); - for (const auto & processor : pipe.getProcessors()) processors.emplace_back(processor); diff --git a/src/Processors/QueryPlan/ReadFromPreparedSource.h b/src/Processors/QueryPlan/ReadFromPreparedSource.h index 2606f5010099..03831ef62077 100644 --- a/src/Processors/QueryPlan/ReadFromPreparedSource.h +++ b/src/Processors/QueryPlan/ReadFromPreparedSource.h @@ -11,8 +11,7 @@ namespace DB class ReadFromPreparedSource : public ISourceStep { public: - explicit ReadFromPreparedSource( - Pipe pipe_, ContextPtr context_ = nullptr, Context::QualifiedProjectionName qualified_projection_name_ = {}); + explicit ReadFromPreparedSource(Pipe pipe_); String getName() const override { return "ReadFromPreparedSource"; } diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index f5f0fa6f726b..e0a0a3313fe8 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -70,7 +70,7 @@ bool injectRequiredColumnsRecursively( /// Column doesn't have default value and don't exist in part /// don't need to add to required set. - auto metadata_snapshot = storage_snapshot->getMetadataForQuery(); + auto metadata_snapshot = storage_snapshot->metadata; const auto column_default = metadata_snapshot->getColumns().getDefault(column_name); if (!column_default) return false; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a4e88ebe936a..1579409e3a4a 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6612,7 +6612,7 @@ bool MergeTreeData::canUseParallelReplicasBasedOnPKAnalysis( auto result_ptr = reader.estimateNumMarksToRead( parts, query_info.prewhere_info, - storage_snapshot->getMetadataForQuery()->getColumns().getAll().getNames(), + storage_snapshot->metadata->getColumns().getAll().getNames(), storage_snapshot->metadata, storage_snapshot->metadata, query_info, diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index ad1808d90a08..c3019b49f1fd 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -436,7 +436,7 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu query_info.input_order_info = input_sorting_info; } - auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock(); + auto sample_block = merge_storage_snapshot->metadata->getSampleBlock(); std::vector> pipelines; QueryPlanResourceHolder resources; diff --git a/src/Storages/StorageSnapshot.cpp b/src/Storages/StorageSnapshot.cpp index 0c19634f50c2..3df4ad4a862d 100644 --- a/src/Storages/StorageSnapshot.cpp +++ b/src/Storages/StorageSnapshot.cpp @@ -30,7 +30,7 @@ void StorageSnapshot::init() NamesAndTypesList StorageSnapshot::getColumns(const GetColumnsOptions & options) const { - auto all_columns = getMetadataForQuery()->getColumns().get(options); + auto all_columns = metadata->getColumns().get(options); if (options.with_extended_objects) extendObjectColumns(all_columns, object_columns, options.with_subcolumns); @@ -77,7 +77,7 @@ NamesAndTypesList StorageSnapshot::getColumnsByNames(const GetColumnsOptions & o std::optional StorageSnapshot::tryGetColumn(const GetColumnsOptions & options, const String & column_name) const { - const auto & columns = getMetadataForQuery()->getColumns(); + const auto & columns = metadata->getColumns(); auto column = columns.tryGetColumn(options, column_name); if (column && (!column->type->hasDynamicSubcolumns() || !options.with_extended_objects)) return column; @@ -119,7 +119,7 @@ Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) cons { Block res; - const auto & columns = getMetadataForQuery()->getColumns(); + const auto & columns = metadata->getColumns(); for (const auto & column_name : column_names) { auto column = columns.tryGetColumnOrSubcolumn(GetColumnsOptions::All, column_name); @@ -151,7 +151,7 @@ Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) cons ColumnsDescription StorageSnapshot::getDescriptionForColumns(const Names & column_names) const { ColumnsDescription res; - const auto & columns = getMetadataForQuery()->getColumns(); + const auto & columns = metadata->getColumns(); for (const auto & name : column_names) { auto column = columns.tryGetColumnOrSubcolumnDescription(GetColumnsOptions::All, name); @@ -188,7 +188,7 @@ namespace void StorageSnapshot::check(const Names & column_names) const { - const auto & columns = getMetadataForQuery()->getColumns(); + const auto & columns = metadata->getColumns(); auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns(); if (column_names.empty()) diff --git a/src/Storages/StorageSnapshot.h b/src/Storages/StorageSnapshot.h index a69f9b959551..a07479f9372e 100644 --- a/src/Storages/StorageSnapshot.h +++ b/src/Storages/StorageSnapshot.h @@ -25,9 +25,6 @@ struct StorageSnapshot using DataPtr = std::unique_ptr; DataPtr data; - /// Projection that is used in query. - mutable const ProjectionDescription * projection = nullptr; - StorageSnapshot( const IStorage & storage_, StorageMetadataPtr metadata_) @@ -81,11 +78,6 @@ struct StorageSnapshot DataTypePtr getConcreteType(const String & column_name) const; - void addProjection(const ProjectionDescription * projection_) const { projection = projection_; } - - /// If we have a projection then we should use its metadata. - StorageMetadataPtr getMetadataForQuery() const { return projection ? projection->metadata : metadata; } - private: void init(); From 69896d94bead717ebc75bdf1907935e6057ae98c Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Mon, 4 Dec 2023 21:40:19 +0000 Subject: [PATCH 3/8] fix tests --- src/Interpreters/Context.cpp | 18 +++++++++++------- src/Interpreters/Context.h | 9 +++++---- src/Interpreters/InterpreterSelectQuery.cpp | 5 +---- src/Planner/PlannerJoinTree.cpp | 4 +--- .../QueryPlan/ReadFromPreparedSource.cpp | 18 +++++++++++++++--- .../QueryPlan/ReadFromPreparedSource.h | 14 ++------------ .../QueryPlan/ReadFromSystemNumbersStep.cpp | 2 +- .../Transforms/buildPushingToViewsChain.cpp | 6 +++++- src/Storages/IStorage.cpp | 5 +++-- src/Storages/IStorage.h | 1 + src/Storages/NATS/StorageNATS.cpp | 2 +- src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 2 +- src/Storages/StorageExecutable.cpp | 2 +- 13 files changed, 48 insertions(+), 40 deletions(-) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 0a8a8f1f5297..70d0e6e9b275 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1553,9 +1553,7 @@ bool Context::hasScalar(const String & name) const void Context::addQueryAccessInfo( const String & quoted_database_name, const String & full_quoted_table_name, - const Names & column_names, - const String & projection_name, - const String & view_name) + const Names & column_names) { if (isGlobalContext()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info"); @@ -1563,12 +1561,9 @@ void Context::addQueryAccessInfo( std::lock_guard lock(query_access_info.mutex); query_access_info.databases.emplace(quoted_database_name); query_access_info.tables.emplace(full_quoted_table_name); + for (const auto & column_name : column_names) query_access_info.columns.emplace(full_quoted_table_name + "." + backQuoteIfNeed(column_name)); - if (!projection_name.empty()) - query_access_info.projections.emplace(full_quoted_table_name + "." + backQuoteIfNeed(projection_name)); - if (!view_name.empty()) - query_access_info.views.emplace(view_name); } void Context::addQueryAccessInfo(const Names & partition_names) @@ -1581,6 +1576,15 @@ void Context::addQueryAccessInfo(const Names & partition_names) query_access_info.partitions.emplace(partition_name); } +void Context::addViewAccessInfo(const String & view_name) +{ + if (isGlobalContext()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info"); + + std::lock_guard lock(query_access_info.mutex); + query_access_info.views.emplace(view_name); +} + void Context::addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name) { if (!qualified_projection_name) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 8c169dd664f3..5ec555c55963 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -690,13 +690,14 @@ class Context: public ContextData, public std::enable_shared_from_this void addSpecialScalar(const String & name, const Block & block); const QueryAccessInfo & getQueryAccessInfo() const { return query_access_info; } + void addQueryAccessInfo( const String & quoted_database_name, const String & full_quoted_table_name, - const Names & column_names, - const String & projection_name = {}, - const String & view_name = {}); + const Names & column_names); + void addQueryAccessInfo(const Names & partition_names); + void addViewAccessInfo(const String & view_name); struct QualifiedProjectionName { @@ -704,8 +705,8 @@ class Context: public ContextData, public std::enable_shared_from_this String projection_name; explicit operator bool() const { return !projection_name.empty(); } }; - void addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name); + void addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name); /// Supported factories for records in query_log enum class QueryLogFactories diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 26ec8c2ce346..98fae8e362cb 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2493,14 +2493,11 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc if (context->hasQueryContext() && !options.is_internal) { - const String view_name{}; auto local_storage_id = storage->getStorageID(); context->getQueryContext()->addQueryAccessInfo( backQuoteIfNeed(local_storage_id.getDatabaseName()), local_storage_id.getFullTableName(), - required_columns, - /*projection_name=*/ "", - view_name); + required_columns); } /// Create step which reads from empty source if storage has no data. diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 99b4468c63c9..c063d36e288b 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -841,9 +841,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres query_context->getQueryContext()->addQueryAccessInfo( backQuoteIfNeed(local_storage_id.getDatabaseName()), local_storage_id.getFullTableName(), - columns_names, - {}, - {}); + columns_names); } } diff --git a/src/Processors/QueryPlan/ReadFromPreparedSource.cpp b/src/Processors/QueryPlan/ReadFromPreparedSource.cpp index d0a97f5c74b9..38945f4945a5 100644 --- a/src/Processors/QueryPlan/ReadFromPreparedSource.cpp +++ b/src/Processors/QueryPlan/ReadFromPreparedSource.cpp @@ -20,11 +20,23 @@ void ReadFromPreparedSource::initializePipeline(QueryPipelineBuilder & pipeline, pipeline.init(std::move(pipe)); } -void ReadFromStorageStep::applyFilters() +ReadFromStorageStep::ReadFromStorageStep( + Pipe pipe_, + String storage_name, + ContextPtr context_, + const SelectQueryInfo & query_info_) + : ReadFromPreparedSource(std::move(pipe_)) + , context(std::move(context_)) + , query_info(query_info_) { - if (!context) - return; + setStepDescription(storage_name); + + for (const auto & processor : pipe.getProcessors()) + processor->setStorageLimits(query_info.storage_limits); +} +void ReadFromStorageStep::applyFilters() +{ std::shared_ptr key_condition; if (!context->getSettingsRef().allow_experimental_analyzer) { diff --git a/src/Processors/QueryPlan/ReadFromPreparedSource.h b/src/Processors/QueryPlan/ReadFromPreparedSource.h index 3b6ff2f06960..2eea48553b3a 100644 --- a/src/Processors/QueryPlan/ReadFromPreparedSource.h +++ b/src/Processors/QueryPlan/ReadFromPreparedSource.h @@ -16,32 +16,22 @@ class ReadFromPreparedSource : public SourceStepWithFilter explicit ReadFromPreparedSource(Pipe pipe_); String getName() const override { return "ReadFromPreparedSource"; } - void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; protected: Pipe pipe; - ContextPtr context; - Context::QualifiedProjectionName qualified_projection_name; }; class ReadFromStorageStep : public ReadFromPreparedSource { public: - ReadFromStorageStep(Pipe pipe_, String storage_name, const SelectQueryInfo & query_info_) - : ReadFromPreparedSource(std::move(pipe_)), query_info(query_info_) - { - setStepDescription(storage_name); - - for (const auto & processor : pipe.getProcessors()) - processor->setStorageLimits(query_info.storage_limits); - } + ReadFromStorageStep(Pipe pipe_, String storage_name, ContextPtr context_, const SelectQueryInfo & query_info_); String getName() const override { return "ReadFromStorage"; } - void applyFilters() override; private: + ContextPtr context; SelectQueryInfo query_info; }; diff --git a/src/Processors/QueryPlan/ReadFromSystemNumbersStep.cpp b/src/Processors/QueryPlan/ReadFromSystemNumbersStep.cpp index 41690c1b1327..67b7e3f4a80f 100644 --- a/src/Processors/QueryPlan/ReadFromSystemNumbersStep.cpp +++ b/src/Processors/QueryPlan/ReadFromSystemNumbersStep.cpp @@ -331,7 +331,7 @@ ReadFromSystemNumbersStep::ReadFromSystemNumbersStep( , storage{std::move(storage_)} , storage_snapshot{storage_snapshot_} , context{std::move(context_)} - , key_expression{KeyDescription::parse(column_names[0], storage_snapshot->getMetadataForQuery()->columns, context).expression} + , key_expression{KeyDescription::parse(column_names[0], storage_snapshot->metadata->columns, context).expression} , max_block_size{max_block_size_} , num_streams{num_streams_} , limit_length_and_offset(InterpreterSelectQuery::getLimitLengthAndOffset(query_info.query->as(), context)) diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index ea10b025e878..d808966bb6a8 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -405,7 +405,11 @@ Chain buildPushingToViewsChain( if (!no_destination && context->hasQueryContext()) { context->getQueryContext()->addQueryAccessInfo( - backQuoteIfNeed(view_id.getDatabaseName()), views_data->views.back().runtime_stats->target_name, {}, "", view_id.getFullTableName()); + backQuoteIfNeed(view_id.getDatabaseName()), + views_data->views.back().runtime_stats->target_name, + /*column_names=*/ {}); + + context->getQueryContext()->addViewAccessInfo(view_id.getFullTableName()); } } diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 16fc0f6c09b5..ce8176c1fc10 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -150,7 +150,7 @@ void IStorage::read( if (parallelize_output && parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < num_streams) pipe.resize(num_streams); - readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, getName()); + readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, context, getName()); } void IStorage::readFromPipe( @@ -159,6 +159,7 @@ void IStorage::readFromPipe( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, + ContextPtr context, std::string storage_name) { if (pipe.empty()) @@ -168,7 +169,7 @@ void IStorage::readFromPipe( } else { - auto read_step = std::make_unique(std::move(pipe), storage_name, query_info); + auto read_step = std::make_unique(std::move(pipe), storage_name, context, query_info); query_plan.addStep(std::move(read_step)); } } diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 1e6fbc6a75d1..2a705b801dae 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -731,6 +731,7 @@ class IStorage : public std::enable_shared_from_this, public TypePromo const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, + ContextPtr context, std::string storage_name); private: diff --git a/src/Storages/NATS/StorageNATS.cpp b/src/Storages/NATS/StorageNATS.cpp index cd4d6382bee7..9cb1fbd85061 100644 --- a/src/Storages/NATS/StorageNATS.cpp +++ b/src/Storages/NATS/StorageNATS.cpp @@ -351,7 +351,7 @@ void StorageNATS::read( } else { - auto read_step = std::make_unique(std::move(pipe), getName(), query_info); + auto read_step = std::make_unique(std::move(pipe), getName(), local_context, query_info); query_plan.addStep(std::move(read_step)); query_plan.addInterpreterContext(modified_context); } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 03c46f8699e3..fce2d775b157 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -762,7 +762,7 @@ void StorageRabbitMQ::read( } else { - auto read_step = std::make_unique(std::move(pipe), getName(), query_info); + auto read_step = std::make_unique(std::move(pipe), getName(), local_context, query_info); query_plan.addStep(std::move(read_step)); query_plan.addInterpreterContext(modified_context); } diff --git a/src/Storages/StorageExecutable.cpp b/src/Storages/StorageExecutable.cpp index c3d1f39a9f6a..2acbf3f46106 100644 --- a/src/Storages/StorageExecutable.cpp +++ b/src/Storages/StorageExecutable.cpp @@ -170,7 +170,7 @@ void StorageExecutable::read( } auto pipe = coordinator->createPipe(script_path, settings.script_arguments, std::move(inputs), std::move(sample_block), context, configuration); - IStorage::readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, getName()); + IStorage::readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, context, getName()); query_plan.addResources(std::move(resources)); } From 7dd128f90f2810ce640f841a05bd0733594ae1ec Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 3 Jan 2024 16:00:50 +0000 Subject: [PATCH 4/8] Revert "remove projection from StorageSnapshot" This reverts commit a01acf5d2a650fa7e7d4a2a7426fbdc29c5142c5. --- .../Passes/ShardNumColumnToFunctionPass.cpp | 2 +- src/Interpreters/TreeRewriter.cpp | 2 +- src/Planner/PlannerJoinTree.cpp | 6 +++--- .../QueryPlan/Optimizations/optimizePrewhere.cpp | 2 +- .../optimizeUseAggregateProjection.cpp | 15 +++++++-------- .../Optimizations/optimizeUseNormalProjection.cpp | 4 +++- src/Processors/QueryPlan/ReadFromMergeTree.cpp | 11 +++++++---- .../MergeTree/MergeTreeBlockReadUtils.cpp | 2 +- src/Storages/MergeTree/MergeTreeData.cpp | 2 +- src/Storages/StorageMerge.cpp | 8 ++++++++ src/Storages/StorageSnapshot.cpp | 10 +++++----- src/Storages/StorageSnapshot.h | 8 ++++++++ 12 files changed, 46 insertions(+), 26 deletions(-) diff --git a/src/Analyzer/Passes/ShardNumColumnToFunctionPass.cpp b/src/Analyzer/Passes/ShardNumColumnToFunctionPass.cpp index f3ff7fe06b13..52c30b7b35d1 100644 --- a/src/Analyzer/Passes/ShardNumColumnToFunctionPass.cpp +++ b/src/Analyzer/Passes/ShardNumColumnToFunctionPass.cpp @@ -46,7 +46,7 @@ class ShardNumColumnToFunctionVisitor : public InDepthQueryTreeVisitorWithContex return; const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot(); - if (!storage->isVirtualColumn(column.name, storage_snapshot->metadata)) + if (!storage->isVirtualColumn(column.name, storage_snapshot->getMetadataForQuery())) return; auto function_node = std::make_shared("shardNum"); diff --git a/src/Interpreters/TreeRewriter.cpp b/src/Interpreters/TreeRewriter.cpp index 89b014a0360d..9cbf24091e33 100644 --- a/src/Interpreters/TreeRewriter.cpp +++ b/src/Interpreters/TreeRewriter.cpp @@ -1146,7 +1146,7 @@ bool TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select { for (const auto & name_type : storage_virtuals) { - if (name_type.name == "_shard_num" && storage->isVirtualColumn("_shard_num", storage_snapshot->metadata)) + if (name_type.name == "_shard_num" && storage->isVirtualColumn("_shard_num", storage_snapshot->getMetadataForQuery())) { has_virtual_shard_num = true; break; diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 2736f8052716..56ac3f54b065 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -397,20 +397,20 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info /// We evaluate sampling for Merge lazily so we need to get all the columns if (storage_snapshot->storage.getName() == "Merge") { - const auto columns = storage_snapshot->metadata->getColumns().getAll(); + const auto columns = storage_snapshot->getMetadataForQuery()->getColumns().getAll(); for (const auto & column : columns) required_columns.insert(column.name); } else { - auto columns_required_for_sampling = storage_snapshot->metadata->getColumnsRequiredForSampling(); + auto columns_required_for_sampling = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForSampling(); required_columns.insert(columns_required_for_sampling.begin(), columns_required_for_sampling.end()); } } if (table_expression_modifiers->hasFinal()) { - auto columns_required_for_final = storage_snapshot->metadata->getColumnsRequiredForFinal(); + auto columns_required_for_final = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForFinal(); required_columns.insert(columns_required_for_final.begin(), columns_required_for_final.end()); } } diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp index f7d31a54373b..5c5171d4296d 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp @@ -137,7 +137,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes) if (table_expression_modifiers && table_expression_modifiers->hasSampleSizeRatio()) { - const auto & sampling_key = storage_snapshot->metadata->getSamplingKey(); + const auto & sampling_key = storage_snapshot->getMetadataForQuery()->getSamplingKey(); const auto & sampling_source_columns = sampling_key.expression->getRequiredColumnsWithTypes(); for (const auto & column : sampling_source_columns) required_columns_after_filter.push_back(ColumnWithTypeAndName(column.type, column.name)); diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp index f7d97f71d087..169ef13e732d 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp @@ -669,7 +669,9 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & else { auto storage_snapshot = reading->getStorageSnapshot(); - auto proj_snapshot = std::make_shared(storage_snapshot->storage, best_candidate->projection->metadata); + auto proj_snapshot = std::make_shared( + storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); + proj_snapshot->addProjection(best_candidate->projection); auto query_info_copy = query_info; query_info_copy.prewhere_info = nullptr; @@ -694,19 +696,16 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & projection_reading = std::make_unique(std::move(pipe)); } - has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr; - if (has_ordinary_parts) - reading->setAnalyzedResult(std::move(best_candidate->merge_tree_ordinary_select_result_ptr)); - projection_name = Context::QualifiedProjectionName { .storage_id = reading->getMergeTreeData().getStorageID(), .projection_name = best_candidate->projection->name, }; - } - if (!query_info.is_internal && context->hasQueryContext()) - context->getQueryContext()->addQueryAccessInfo(projection_name); + has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr; + if (has_ordinary_parts) + reading->setAnalyzedResult(std::move(best_candidate->merge_tree_ordinary_select_result_ptr)); + } // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", // projection_reading->getOutputStream().header.dumpStructure()); diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp index f821e9c7b42e..90de861651a2 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp @@ -196,7 +196,9 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) } auto storage_snapshot = reading->getStorageSnapshot(); - auto proj_snapshot = std::make_shared(storage_snapshot->storage, best_candidate->projection->metadata); + auto proj_snapshot = std::make_shared( + storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data); + proj_snapshot->addProjection(best_candidate->projection); auto query_info_copy = query_info; query_info_copy.prewhere_info = nullptr; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index d446ca176345..9f133ff075f4 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -261,7 +261,7 @@ ReadFromMergeTree::ReadFromMergeTree( , prewhere_info(query_info_.prewhere_info) , actions_settings(ExpressionActionsSettings::fromContext(context_)) , storage_snapshot(std::move(storage_snapshot_)) - , metadata_for_reading(storage_snapshot->metadata) + , metadata_for_reading(storage_snapshot->getMetadataForQuery()) , context(std::move(context_)) , block_size{ .max_block_size_rows = max_block_size_, @@ -311,7 +311,7 @@ ReadFromMergeTree::ReadFromMergeTree( updateSortDescriptionForOutputStream( *output_stream, - storage_snapshot->metadata->getSortingKeyColumns(), + storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(), getSortDirection(), query_info.input_order_info, prewhere_info); @@ -1695,7 +1695,7 @@ void ReadFromMergeTree::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info updateSortDescriptionForOutputStream( *output_stream, - storage_snapshot->metadata->getSortingKeyColumns(), + storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(), getSortDirection(), query_info.input_order_info, prewhere_info); @@ -1896,8 +1896,11 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons partition_names.emplace_back( fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id)); } - context->getQueryContext()->addQueryAccessInfo(partition_names); + + if (storage_snapshot->projection) + context->getQueryContext()->addQueryAccessInfo( + Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name}); } ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts); diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index e0a0a3313fe8..f5f0fa6f726b 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -70,7 +70,7 @@ bool injectRequiredColumnsRecursively( /// Column doesn't have default value and don't exist in part /// don't need to add to required set. - auto metadata_snapshot = storage_snapshot->metadata; + auto metadata_snapshot = storage_snapshot->getMetadataForQuery(); const auto column_default = metadata_snapshot->getColumns().getDefault(column_name); if (!column_default) return false; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 3546e3c0e882..2dec0f8257e0 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6896,7 +6896,7 @@ UInt64 MergeTreeData::estimateNumberOfRowsToRead( auto result_ptr = reader.estimateNumMarksToRead( parts, query_info.prewhere_info, - storage_snapshot->metadata->getColumns().getAll().getNames(), + storage_snapshot->getMetadataForQuery()->getColumns().getAll().getNames(), storage_snapshot->metadata, storage_snapshot->metadata, query_info, diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 4b89dc5fc7ca..a03ad365767d 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -545,6 +545,14 @@ void ReadFromMerge::createChildPlans() query_info.input_order_info = input_sorting_info; } +<<<<<<< HEAD +======= + auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock(); + + std::vector> pipelines; + QueryPlanResourceHolder resources; + +>>>>>>> parent of a01acf5d2a6 (remove projection from StorageSnapshot) for (const auto & table : selected_tables) { size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count); diff --git a/src/Storages/StorageSnapshot.cpp b/src/Storages/StorageSnapshot.cpp index af9d5fd1b34c..ada3252630fb 100644 --- a/src/Storages/StorageSnapshot.cpp +++ b/src/Storages/StorageSnapshot.cpp @@ -38,7 +38,7 @@ void StorageSnapshot::init() NamesAndTypesList StorageSnapshot::getColumns(const GetColumnsOptions & options) const { - auto all_columns = metadata->getColumns().get(options); + auto all_columns = getMetadataForQuery()->getColumns().get(options); if (options.with_extended_objects) extendObjectColumns(all_columns, object_columns, options.with_subcolumns); @@ -85,7 +85,7 @@ NamesAndTypesList StorageSnapshot::getColumnsByNames(const GetColumnsOptions & o std::optional StorageSnapshot::tryGetColumn(const GetColumnsOptions & options, const String & column_name) const { - const auto & columns = metadata->getColumns(); + const auto & columns = getMetadataForQuery()->getColumns(); auto column = columns.tryGetColumn(options, column_name); if (column && (!column->type->hasDynamicSubcolumns() || !options.with_extended_objects)) return column; @@ -127,7 +127,7 @@ Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) cons { Block res; - const auto & columns = metadata->getColumns(); + const auto & columns = getMetadataForQuery()->getColumns(); for (const auto & column_name : column_names) { auto column = columns.tryGetColumnOrSubcolumn(GetColumnsOptions::All, column_name); @@ -159,7 +159,7 @@ Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) cons ColumnsDescription StorageSnapshot::getDescriptionForColumns(const Names & column_names) const { ColumnsDescription res; - const auto & columns = metadata->getColumns(); + const auto & columns = getMetadataForQuery()->getColumns(); for (const auto & name : column_names) { auto column = columns.tryGetColumnOrSubcolumnDescription(GetColumnsOptions::All, name); @@ -196,7 +196,7 @@ namespace void StorageSnapshot::check(const Names & column_names) const { - const auto & columns = metadata->getColumns(); + const auto & columns = getMetadataForQuery()->getColumns(); auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns(); if (column_names.empty()) diff --git a/src/Storages/StorageSnapshot.h b/src/Storages/StorageSnapshot.h index 9856164bc9f7..d62e118e1f28 100644 --- a/src/Storages/StorageSnapshot.h +++ b/src/Storages/StorageSnapshot.h @@ -25,6 +25,9 @@ struct StorageSnapshot using DataPtr = std::unique_ptr; DataPtr data; + /// Projection that is used in query. + mutable const ProjectionDescription * projection = nullptr; + StorageSnapshot( const IStorage & storage_, StorageMetadataPtr metadata_) @@ -80,6 +83,11 @@ struct StorageSnapshot DataTypePtr getConcreteType(const String & column_name) const; + void addProjection(const ProjectionDescription * projection_) const { projection = projection_; } + + /// If we have a projection then we should use its metadata. + StorageMetadataPtr getMetadataForQuery() const { return projection ? projection->metadata : metadata; } + private: void init(); From cf4604bfb69c7450e79f9f791a257fd3fa8c3c0f Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 3 Jan 2024 16:53:47 +0000 Subject: [PATCH 5/8] fix build and tests --- .../Optimizations/optimizeUseAggregateProjection.cpp | 3 +-- .../Optimizations/optimizeUseNormalProjection.cpp | 3 +-- src/Storages/StorageMerge.cpp | 10 +--------- src/Storages/StorageSnapshot.cpp | 3 +++ 4 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp index 169ef13e732d..198bdbbb71d7 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp @@ -669,8 +669,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & else { auto storage_snapshot = reading->getStorageSnapshot(); - auto proj_snapshot = std::make_shared( - storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); + auto proj_snapshot = std::make_shared(storage_snapshot->storage, storage_snapshot->metadata); proj_snapshot->addProjection(best_candidate->projection); auto query_info_copy = query_info; diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp index 90de861651a2..05afc80cba03 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp @@ -196,8 +196,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) } auto storage_snapshot = reading->getStorageSnapshot(); - auto proj_snapshot = std::make_shared( - storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data); + auto proj_snapshot = std::make_shared(storage_snapshot->storage, storage_snapshot->metadata); proj_snapshot->addProjection(best_candidate->projection); auto query_info_copy = query_info; diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index e0ef21106067..0d67403fa2f2 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -488,14 +488,6 @@ std::vector ReadFromMerge::createChildrenPlans(SelectQ query_info_.input_order_info = input_sorting_info; } -<<<<<<< HEAD -======= - auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock(); - - std::vector> pipelines; - QueryPlanResourceHolder resources; - ->>>>>>> parent of a01acf5d2a6 (remove projection from StorageSnapshot) for (const auto & table : selected_tables) { size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count); @@ -548,7 +540,7 @@ std::vector ReadFromMerge::createChildrenPlans(SelectQ ASTPtr required_columns_expr_list = std::make_shared(); ASTPtr column_expr; - auto sample_block = merge_storage_snapshot->metadata->getSampleBlock(); + auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock(); for (const auto & column : real_column_names) { diff --git a/src/Storages/StorageSnapshot.cpp b/src/Storages/StorageSnapshot.cpp index ada3252630fb..34c092c72087 100644 --- a/src/Storages/StorageSnapshot.cpp +++ b/src/Storages/StorageSnapshot.cpp @@ -21,7 +21,10 @@ namespace ErrorCodes std::shared_ptr StorageSnapshot::clone(DataPtr data_) const { auto res = std::make_shared(storage, metadata, object_columns); + + res->projection = projection; res->data = std::move(data_); + return res; } From 48a3968d5a7d955c3acbbecea963d2b60a20f076 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 3 Jan 2024 20:06:36 +0000 Subject: [PATCH 6/8] fix query info with projection --- .../Optimizations/optimizeUseAggregateProjection.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp index 198bdbbb71d7..f0cf52aa46d5 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp @@ -706,6 +706,15 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & reading->setAnalyzedResult(std::move(best_candidate->merge_tree_ordinary_select_result_ptr)); } + if (!query_info.is_internal && context->hasQueryContext()) + { + context->getQueryContext()->addQueryAccessInfo(Context::QualifiedProjectionName + { + .storage_id = reading->getMergeTreeData().getStorageID(), + .projection_name = best_candidate->projection->name, + }); + } + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", // projection_reading->getOutputStream().header.dumpStructure()); From 24fe5f4d534ed8ec2b3df36514660af08faa5367 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 5 Jan 2024 20:41:58 +0000 Subject: [PATCH 7/8] slightly better --- src/Interpreters/InterpreterSelectQuery.cpp | 36 ++++++++++----------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 746f419fa1d1..ed1e65a6b648 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2493,32 +2493,30 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc /// Create optimizer with prepared actions. /// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge. - if (optimize_read_in_order || optimize_aggregation_in_order) + if (optimize_read_in_order) { - if (optimize_read_in_order) - { - query_info.order_optimizer = std::make_shared( - query, - analysis_result.order_by_elements_actions, - getSortDescription(query, context), - query_info.syntax_analyzer_result); - } - else - { - query_info.order_optimizer = std::make_shared( - query, - analysis_result.group_by_elements_actions, - getSortDescriptionFromGroupBy(query), - query_info.syntax_analyzer_result); - } + query_info.order_optimizer = std::make_shared( + query, + analysis_result.order_by_elements_actions, + getSortDescription(query, context), + query_info.syntax_analyzer_result); /// If we don't have filtration, we can pushdown limit to reading stage for optimizations. - UInt64 limit = (query.hasFiltration() || query.groupBy()) ? 0 : getLimitForSorting(query, context); + UInt64 limit = query.hasFiltration() ? 0 : getLimitForSorting(query, context); query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit); } + else if (optimize_aggregation_in_order) + { + query_info.order_optimizer = std::make_shared( + query, + analysis_result.group_by_elements_actions, + getSortDescriptionFromGroupBy(query), + query_info.syntax_analyzer_result); - query_info.storage_limits = std::make_shared(storage_limits); + query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, /*limit=*/ 0); + } + query_info.storage_limits = std::make_shared(storage_limits); query_info.settings_limit_offset_done = options.settings_limit_offset_done; storage->read(query_plan, required_columns, storage_snapshot, query_info, context, processing_stage, max_block_size, max_streams); From 48a7402b34a1fac93b08827f4f590b5b3f84c54e Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Sat, 6 Jan 2024 16:39:27 +0000 Subject: [PATCH 8/8] fix build --- src/Storages/StorageFile.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index f3917b878d6b..5c068ae26dfb 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1398,7 +1398,7 @@ void StorageFile::read( throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist", p->at(0)); auto header = storage_snapshot->getSampleBlockForColumns(column_names); - InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context); + InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info); return; } }