Skip to content

Commit

Permalink
Merge pull request #61666 from ClickHouse/fix-not-ready-set-system-parts
Browse files Browse the repository at this point in the history
Fix Non-ready set for system.parts.
  • Loading branch information
KochetovNicolai committed Mar 22, 2024
2 parents 3bae0fc + e828acd commit 3ed3095
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 20 deletions.
21 changes: 18 additions & 3 deletions src/Storages/System/StorageSystemDetachedParts.cpp
Expand Up @@ -4,10 +4,12 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/System/StorageSystemPartsBase.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Storages/VirtualColumnUtils.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <IO/SharedThreadPools.h>
Expand Down Expand Up @@ -320,7 +322,7 @@ class ReadFromSystemDetachedParts : public SourceStepWithFilter
std::shared_ptr<StorageSystemDetachedParts> storage;
std::vector<UInt8> columns_mask;

const ActionsDAG::Node * predicate = nullptr;
ActionsDAGPtr filter;
const size_t max_block_size;
const size_t num_streams;
};
Expand All @@ -329,7 +331,20 @@ void ReadFromSystemDetachedParts::applyFilters(ActionDAGNodes added_filter_nodes
{
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
{
const auto * predicate = filter_actions_dag->getOutputs().at(0);

Block block;
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "database"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "table"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "engine"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUInt8>(), "active"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUUID>(), "uuid"));

filter = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block);
if (filter)
VirtualColumnUtils::buildSetsForDAG(filter, context);
}
}

void StorageSystemDetachedParts::read(
Expand Down Expand Up @@ -358,7 +373,7 @@ void StorageSystemDetachedParts::read(

void ReadFromSystemDetachedParts::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto state = std::make_shared<SourceState>(StoragesInfoStream(predicate, context));
auto state = std::make_shared<SourceState>(StoragesInfoStream(nullptr, filter, context));

Pipe pipe;

Expand Down
5 changes: 3 additions & 2 deletions src/Storages/System/StorageSystemDroppedTablesParts.cpp
Expand Up @@ -11,7 +11,7 @@ namespace DB
{


StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAG::Node * predicate, ContextPtr context)
StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAGPtr & filter, ContextPtr context)
: StoragesInfoStreamBase(context)
{
/// Will apply WHERE to subset of columns and then add more columns.
Expand Down Expand Up @@ -74,7 +74,8 @@ StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAG::Node * pr
if (block_to_filter.rows())
{
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
if (filter)
VirtualColumnUtils::filterBlockWithDAG(filter, block_to_filter, context);
rows = block_to_filter.rows();
}

Expand Down
6 changes: 3 additions & 3 deletions src/Storages/System/StorageSystemDroppedTablesParts.h
Expand Up @@ -9,7 +9,7 @@ namespace DB
class StoragesDroppedInfoStream : public StoragesInfoStreamBase
{
public:
StoragesDroppedInfoStream(const ActionsDAG::Node * predicate, ContextPtr context);
StoragesDroppedInfoStream(const ActionsDAGPtr & filter, ContextPtr context);
protected:
bool tryLockTable(StoragesInfo &) override
{
Expand All @@ -30,9 +30,9 @@ class StorageSystemDroppedTablesParts final : public StorageSystemParts

std::string getName() const override { return "SystemDroppedTablesParts"; }
protected:
std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context) override
std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAGPtr &, const ActionsDAGPtr & filter, ContextPtr context) override
{
return std::make_unique<StoragesDroppedInfoStream>(predicate, context);
return std::make_unique<StoragesDroppedInfoStream>(filter, context);
}
};

Expand Down
33 changes: 27 additions & 6 deletions src/Storages/System/StorageSystemPartsBase.cpp
Expand Up @@ -83,7 +83,7 @@ StoragesInfo::getProjectionParts(MergeTreeData::DataPartStateVector & state, boo
return data->getProjectionPartsVectorForInternalUsage({State::Active}, &state);
}

StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context)
StoragesInfoStream::StoragesInfoStream(const ActionsDAGPtr & filter_by_database, const ActionsDAGPtr & filter_by_other_columns, ContextPtr context)
: StoragesInfoStreamBase(context)
{
/// Will apply WHERE to subset of columns and then add more columns.
Expand Down Expand Up @@ -115,7 +115,8 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte
std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));

/// Filter block_to_filter with column 'database'.
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
if (filter_by_database)
VirtualColumnUtils::filterBlockWithDAG(filter_by_database, block_to_filter, context);
rows = block_to_filter.rows();

/// Block contains new columns, update database_column.
Expand Down Expand Up @@ -194,7 +195,8 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte
if (rows)
{
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
if (filter_by_other_columns)
VirtualColumnUtils::filterBlockWithDAG(filter_by_other_columns, block_to_filter, context);
rows = block_to_filter.rows();
}

Expand Down Expand Up @@ -226,7 +228,8 @@ class ReadFromSystemPartsBase : public SourceStepWithFilter
std::shared_ptr<StorageSystemPartsBase> storage;
std::vector<UInt8> columns_mask;
const bool has_state_column;
const ActionsDAG::Node * predicate = nullptr;
ActionsDAGPtr filter_by_database;
ActionsDAGPtr filter_by_other_columns;
};

ReadFromSystemPartsBase::ReadFromSystemPartsBase(
Expand Down Expand Up @@ -254,7 +257,25 @@ void ReadFromSystemPartsBase::applyFilters(ActionDAGNodes added_filter_nodes)
{
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
{
const auto * predicate = filter_actions_dag->getOutputs().at(0);

Block block;
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "database"));

filter_by_database = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block);
if (filter_by_database)
VirtualColumnUtils::buildSetsForDAG(filter_by_database, context);

block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "table"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "engine"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUInt8>(), "active"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUUID>(), "uuid"));

filter_by_other_columns = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block);
if (filter_by_other_columns)
VirtualColumnUtils::buildSetsForDAG(filter_by_other_columns, context);
}
}

void StorageSystemPartsBase::read(
Expand Down Expand Up @@ -288,7 +309,7 @@ void StorageSystemPartsBase::read(

void ReadFromSystemPartsBase::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto stream = storage->getStoragesInfoStream(predicate, context);
auto stream = storage->getStoragesInfoStream(filter_by_database, filter_by_other_columns, context);
auto header = getOutputStream().header;

MutableColumns res_columns = header.cloneEmptyColumns();
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/System/StorageSystemPartsBase.h
Expand Up @@ -115,7 +115,7 @@ class StoragesInfoStreamBase
class StoragesInfoStream : public StoragesInfoStreamBase
{
public:
StoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context);
StoragesInfoStream(const ActionsDAGPtr & filter_by_database, const ActionsDAGPtr & filter_by_other_columns, ContextPtr context);
};

/** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family.
Expand Down Expand Up @@ -145,9 +145,9 @@ class StorageSystemPartsBase : public IStorage

StorageSystemPartsBase(const StorageID & table_id_, ColumnsDescription && columns);

virtual std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context)
virtual std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAGPtr & filter_by_database, const ActionsDAGPtr & filter_by_other_columns, ContextPtr context)
{
return std::make_unique<StoragesInfoStream>(predicate, context);
return std::make_unique<StoragesInfoStream>(filter_by_database, filter_by_other_columns, context);
}

virtual void
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/VirtualColumnUtils.cpp
Expand Up @@ -53,9 +53,9 @@ namespace DB
namespace VirtualColumnUtils
{

static void makeSets(const ExpressionActionsPtr & actions, const ContextPtr & context)
void buildSetsForDAG(const ActionsDAGPtr & dag, const ContextPtr & context)
{
for (const auto & node : actions->getNodes())
for (const auto & node : dag->getNodes())
{
if (node.type == ActionsDAG::ActionType::COLUMN)
{
Expand All @@ -78,8 +78,8 @@ static void makeSets(const ExpressionActionsPtr & actions, const ContextPtr & co

void filterBlockWithDAG(ActionsDAGPtr dag, Block & block, ContextPtr context)
{
buildSetsForDAG(dag, context);
auto actions = std::make_shared<ExpressionActions>(dag);
makeSets(actions, context);
Block block_with_filter = block;
actions->execute(block_with_filter, /*dry_run=*/ false, /*allow_duplicates_in_input=*/ true);

Expand Down
3 changes: 3 additions & 0 deletions src/Storages/VirtualColumnUtils.h
Expand Up @@ -25,6 +25,9 @@ void filterBlockWithPredicate(const ActionsDAG::Node * predicate, Block & block,
/// Just filters block. Block should contain all the required columns.
void filterBlockWithDAG(ActionsDAGPtr dag, Block & block, ContextPtr context);

/// Builds sets used by ActionsDAG inplace.
void buildSetsForDAG(const ActionsDAGPtr & dag, const ContextPtr & context);

/// Recursively checks if all functions used in DAG are deterministic in scope of query.
bool isDeterministicInScopeOfQuery(const ActionsDAG::Node * node);

Expand Down
1 change: 1 addition & 0 deletions tests/queries/0_stateless/02841_not_ready_set_bug.sh
Expand Up @@ -10,3 +10,4 @@ $CLICKHOUSE_CLIENT -q "insert into t1 select number from numbers(10);"
$CLICKHOUSE_CLIENT --max_threads=2 --max_result_rows=1 --result_overflow_mode=break -q "with tab as (select min(number) from t1 prewhere number in (select number from view(select number, row_number() OVER (partition by number % 2 ORDER BY number DESC) from numbers_mt(1e4)) where number != 2 order by number)) select number from t1 union all select * from tab;" > /dev/null

$CLICKHOUSE_CLIENT -q "SELECT * FROM system.tables WHERE 1 in (SELECT number from numbers(2)) AND database = currentDatabase() format Null"
$CLICKHOUSE_CLIENT -q "SELECT xor(1, 0) FROM system.parts WHERE 1 IN (SELECT 1) FORMAT Null"

0 comments on commit 3ed3095

Please sign in to comment.