diff --git a/src/Storages/System/StorageSystemDetachedParts.cpp b/src/Storages/System/StorageSystemDetachedParts.cpp index 1eb79744022f..ebcd8d63a52a 100644 --- a/src/Storages/System/StorageSystemDetachedParts.cpp +++ b/src/Storages/System/StorageSystemDetachedParts.cpp @@ -4,10 +4,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -320,7 +322,7 @@ class ReadFromSystemDetachedParts : public SourceStepWithFilter std::shared_ptr storage; std::vector columns_mask; - const ActionsDAG::Node * predicate = nullptr; + ActionsDAGPtr filter; const size_t max_block_size; const size_t num_streams; }; @@ -329,7 +331,20 @@ void ReadFromSystemDetachedParts::applyFilters(ActionDAGNodes added_filter_nodes { filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes); if (filter_actions_dag) - predicate = filter_actions_dag->getOutputs().at(0); + { + const auto * predicate = filter_actions_dag->getOutputs().at(0); + + Block block; + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "database")); + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "table")); + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "engine")); + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "active")); + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "uuid")); + + filter = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block); + if (filter) + VirtualColumnUtils::buildSetsForDAG(filter, context); + } } void StorageSystemDetachedParts::read( @@ -358,7 +373,7 @@ void StorageSystemDetachedParts::read( void ReadFromSystemDetachedParts::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - auto state = std::make_shared(StoragesInfoStream(predicate, context)); + auto state = std::make_shared(StoragesInfoStream(nullptr, filter, context)); Pipe pipe; diff --git a/src/Storages/System/StorageSystemDroppedTablesParts.cpp b/src/Storages/System/StorageSystemDroppedTablesParts.cpp index 344c653f7e48..c17d6402d883 100644 --- a/src/Storages/System/StorageSystemDroppedTablesParts.cpp +++ b/src/Storages/System/StorageSystemDroppedTablesParts.cpp @@ -11,7 +11,7 @@ namespace DB { -StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAG::Node * predicate, ContextPtr context) +StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAGPtr & filter, ContextPtr context) : StoragesInfoStreamBase(context) { /// Will apply WHERE to subset of columns and then add more columns. @@ -74,7 +74,8 @@ StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAG::Node * pr if (block_to_filter.rows()) { /// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'. - VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context); + if (filter) + VirtualColumnUtils::filterBlockWithDAG(filter, block_to_filter, context); rows = block_to_filter.rows(); } diff --git a/src/Storages/System/StorageSystemDroppedTablesParts.h b/src/Storages/System/StorageSystemDroppedTablesParts.h index a44abea7285b..dff9e41cce3b 100644 --- a/src/Storages/System/StorageSystemDroppedTablesParts.h +++ b/src/Storages/System/StorageSystemDroppedTablesParts.h @@ -9,7 +9,7 @@ namespace DB class StoragesDroppedInfoStream : public StoragesInfoStreamBase { public: - StoragesDroppedInfoStream(const ActionsDAG::Node * predicate, ContextPtr context); + StoragesDroppedInfoStream(const ActionsDAGPtr & filter, ContextPtr context); protected: bool tryLockTable(StoragesInfo &) override { @@ -30,9 +30,9 @@ class StorageSystemDroppedTablesParts final : public StorageSystemParts std::string getName() const override { return "SystemDroppedTablesParts"; } protected: - std::unique_ptr getStoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context) override + std::unique_ptr getStoragesInfoStream(const ActionsDAGPtr &, const ActionsDAGPtr & filter, ContextPtr context) override { - return std::make_unique(predicate, context); + return std::make_unique(filter, context); } }; diff --git a/src/Storages/System/StorageSystemPartsBase.cpp b/src/Storages/System/StorageSystemPartsBase.cpp index 3ee309eba7e6..4855dcfd6ed6 100644 --- a/src/Storages/System/StorageSystemPartsBase.cpp +++ b/src/Storages/System/StorageSystemPartsBase.cpp @@ -83,7 +83,7 @@ StoragesInfo::getProjectionParts(MergeTreeData::DataPartStateVector & state, boo return data->getProjectionPartsVectorForInternalUsage({State::Active}, &state); } -StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context) +StoragesInfoStream::StoragesInfoStream(const ActionsDAGPtr & filter_by_database, const ActionsDAGPtr & filter_by_other_columns, ContextPtr context) : StoragesInfoStreamBase(context) { /// Will apply WHERE to subset of columns and then add more columns. @@ -115,7 +115,8 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte std::move(database_column_mut), std::make_shared(), "database")); /// Filter block_to_filter with column 'database'. - VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context); + if (filter_by_database) + VirtualColumnUtils::filterBlockWithDAG(filter_by_database, block_to_filter, context); rows = block_to_filter.rows(); /// Block contains new columns, update database_column. @@ -194,7 +195,8 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte if (rows) { /// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'. - VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context); + if (filter_by_other_columns) + VirtualColumnUtils::filterBlockWithDAG(filter_by_other_columns, block_to_filter, context); rows = block_to_filter.rows(); } @@ -226,7 +228,8 @@ class ReadFromSystemPartsBase : public SourceStepWithFilter std::shared_ptr storage; std::vector columns_mask; const bool has_state_column; - const ActionsDAG::Node * predicate = nullptr; + ActionsDAGPtr filter_by_database; + ActionsDAGPtr filter_by_other_columns; }; ReadFromSystemPartsBase::ReadFromSystemPartsBase( @@ -254,7 +257,25 @@ void ReadFromSystemPartsBase::applyFilters(ActionDAGNodes added_filter_nodes) { filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes); if (filter_actions_dag) - predicate = filter_actions_dag->getOutputs().at(0); + { + const auto * predicate = filter_actions_dag->getOutputs().at(0); + + Block block; + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "database")); + + filter_by_database = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block); + if (filter_by_database) + VirtualColumnUtils::buildSetsForDAG(filter_by_database, context); + + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "table")); + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "engine")); + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "active")); + block.insert(ColumnWithTypeAndName({}, std::make_shared(), "uuid")); + + filter_by_other_columns = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block); + if (filter_by_other_columns) + VirtualColumnUtils::buildSetsForDAG(filter_by_other_columns, context); + } } void StorageSystemPartsBase::read( @@ -288,7 +309,7 @@ void StorageSystemPartsBase::read( void ReadFromSystemPartsBase::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - auto stream = storage->getStoragesInfoStream(predicate, context); + auto stream = storage->getStoragesInfoStream(filter_by_database, filter_by_other_columns, context); auto header = getOutputStream().header; MutableColumns res_columns = header.cloneEmptyColumns(); diff --git a/src/Storages/System/StorageSystemPartsBase.h b/src/Storages/System/StorageSystemPartsBase.h index 10d1a3a2e0e7..be945162b394 100644 --- a/src/Storages/System/StorageSystemPartsBase.h +++ b/src/Storages/System/StorageSystemPartsBase.h @@ -115,7 +115,7 @@ class StoragesInfoStreamBase class StoragesInfoStream : public StoragesInfoStreamBase { public: - StoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context); + StoragesInfoStream(const ActionsDAGPtr & filter_by_database, const ActionsDAGPtr & filter_by_other_columns, ContextPtr context); }; /** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family. @@ -145,9 +145,9 @@ class StorageSystemPartsBase : public IStorage StorageSystemPartsBase(const StorageID & table_id_, ColumnsDescription && columns); - virtual std::unique_ptr getStoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context) + virtual std::unique_ptr getStoragesInfoStream(const ActionsDAGPtr & filter_by_database, const ActionsDAGPtr & filter_by_other_columns, ContextPtr context) { - return std::make_unique(predicate, context); + return std::make_unique(filter_by_database, filter_by_other_columns, context); } virtual void diff --git a/src/Storages/VirtualColumnUtils.cpp b/src/Storages/VirtualColumnUtils.cpp index c3ac27903c99..e3cbff5f01b4 100644 --- a/src/Storages/VirtualColumnUtils.cpp +++ b/src/Storages/VirtualColumnUtils.cpp @@ -53,9 +53,9 @@ namespace DB namespace VirtualColumnUtils { -static void makeSets(const ExpressionActionsPtr & actions, const ContextPtr & context) +void buildSetsForDAG(const ActionsDAGPtr & dag, const ContextPtr & context) { - for (const auto & node : actions->getNodes()) + for (const auto & node : dag->getNodes()) { if (node.type == ActionsDAG::ActionType::COLUMN) { @@ -78,8 +78,8 @@ static void makeSets(const ExpressionActionsPtr & actions, const ContextPtr & co void filterBlockWithDAG(ActionsDAGPtr dag, Block & block, ContextPtr context) { + buildSetsForDAG(dag, context); auto actions = std::make_shared(dag); - makeSets(actions, context); Block block_with_filter = block; actions->execute(block_with_filter, /*dry_run=*/ false, /*allow_duplicates_in_input=*/ true); diff --git a/src/Storages/VirtualColumnUtils.h b/src/Storages/VirtualColumnUtils.h index 83494872cacc..62f2e4855b5c 100644 --- a/src/Storages/VirtualColumnUtils.h +++ b/src/Storages/VirtualColumnUtils.h @@ -25,6 +25,9 @@ void filterBlockWithPredicate(const ActionsDAG::Node * predicate, Block & block, /// Just filters block. Block should contain all the required columns. void filterBlockWithDAG(ActionsDAGPtr dag, Block & block, ContextPtr context); +/// Builds sets used by ActionsDAG inplace. +void buildSetsForDAG(const ActionsDAGPtr & dag, const ContextPtr & context); + /// Recursively checks if all functions used in DAG are deterministic in scope of query. bool isDeterministicInScopeOfQuery(const ActionsDAG::Node * node); diff --git a/tests/queries/0_stateless/02841_not_ready_set_bug.sh b/tests/queries/0_stateless/02841_not_ready_set_bug.sh index 3aaffe515789..9b2f3b0698e1 100755 --- a/tests/queries/0_stateless/02841_not_ready_set_bug.sh +++ b/tests/queries/0_stateless/02841_not_ready_set_bug.sh @@ -10,3 +10,4 @@ $CLICKHOUSE_CLIENT -q "insert into t1 select number from numbers(10);" $CLICKHOUSE_CLIENT --max_threads=2 --max_result_rows=1 --result_overflow_mode=break -q "with tab as (select min(number) from t1 prewhere number in (select number from view(select number, row_number() OVER (partition by number % 2 ORDER BY number DESC) from numbers_mt(1e4)) where number != 2 order by number)) select number from t1 union all select * from tab;" > /dev/null $CLICKHOUSE_CLIENT -q "SELECT * FROM system.tables WHERE 1 in (SELECT number from numbers(2)) AND database = currentDatabase() format Null" +$CLICKHOUSE_CLIENT -q "SELECT xor(1, 0) FROM system.parts WHERE 1 IN (SELECT 1) FORMAT Null"