Skip to content

Commit

Permalink
Merge pull request #54436 from azat/prewhere-filter-fix
Browse files Browse the repository at this point in the history
Fix possible incorrect result with SimpleAggregateFunction in PREWHERE and FINAL
  • Loading branch information
alexey-milovidov committed Sep 24, 2023
2 parents ff66d29 + 24c0d53 commit 7bef66a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 11 deletions.
63 changes: 52 additions & 11 deletions src/Interpreters/ExpressionAnalyzer.cpp
Expand Up @@ -62,6 +62,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>

#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/GetAggregatesVisitor.h>
Expand Down Expand Up @@ -1209,32 +1210,72 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
}

{
ActionsDAGPtr actions;

auto required_columns = prewhere_actions->getRequiredColumns();
NameSet prewhere_input_names;
for (const auto & col : required_columns)
prewhere_input_names.insert(col.name);

NameSet unused_source_columns;

/// Add empty action with input = {prewhere actions output} + {unused source columns}
/// Reasons:
/// 1. Remove remove source columns which are used only in prewhere actions during prewhere actions execution.
/// Example: select A prewhere B > 0. B can be removed at prewhere step.
/// 2. Store side columns which were calculated during prewhere actions execution if they are used.
/// Example: select F(A) prewhere F(A) > 0. F(A) can be saved from prewhere step.
///
/// NOTE: this cannot be done for queries with FINAL and PREWHERE over SimpleAggregateFunction,
/// since it can be changed after applying merge algorithm.
///
/// 3. Check if we can remove filter column at prewhere step. If we can, action will store single REMOVE_COLUMN.
ColumnsWithTypeAndName columns = prewhere_actions->getResultColumns();
auto required_columns = prewhere_actions->getRequiredColumns();
NameSet prewhere_input_names;
NameSet unused_source_columns;
bool columns_from_prewhere_can_be_reused = true;
if (storage() && getSelectQuery()->final())
{
for (const auto & column : metadata_snapshot->getColumns().getOrdinary())
{
if (!prewhere_input_names.contains(column.name))
continue;
if (dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(column.type->getCustomName()))
{
columns_from_prewhere_can_be_reused = false;
break;
}
}
}

for (const auto & col : required_columns)
prewhere_input_names.insert(col.name);
if (!columns_from_prewhere_can_be_reused)
{
for (const auto & column : sourceColumns())
{
if (!prewhere_input_names.contains(column.name))
{
required_columns.emplace_back(NameAndTypePair{column.name, column.type});
unused_source_columns.emplace(column.name);
}
}

for (const auto & column : sourceColumns())
actions = std::make_shared<ActionsDAG>(std::move(required_columns));
}
else
{
if (!prewhere_input_names.contains(column.name))
ColumnsWithTypeAndName columns = prewhere_actions->getResultColumns();

for (const auto & column : sourceColumns())
{
columns.emplace_back(column.type, column.name);
unused_source_columns.emplace(column.name);
if (!prewhere_input_names.contains(column.name))
{
columns.emplace_back(column.type, column.name);
unused_source_columns.emplace(column.name);
}
}

actions = std::make_shared<ActionsDAG>(std::move(columns));
}

chain.steps.emplace_back(
std::make_unique<ExpressionActionsChain::ExpressionActionsStep>(std::make_shared<ActionsDAG>(std::move(columns))));
std::make_unique<ExpressionActionsChain::ExpressionActionsStep>(std::move(actions)));
chain.steps.back()->additional_input = std::move(unused_source_columns);
chain.getLastActions();
chain.addStep();
Expand Down
Empty file.
9 changes: 9 additions & 0 deletions tests/queries/0_stateless/02872_prewhere_filter.sql
@@ -0,0 +1,9 @@
drop table if exists data;

create table data (key Int, val1 SimpleAggregateFunction(max, Nullable(Int)), val2 SimpleAggregateFunction(min, Int)) engine=AggregatingMergeTree() order by key;
system stop merges data;

insert into data values (1,10,100);
insert into data values (1,20,10);

select key, val1, val2, assumeNotNull(val1) > val2 x1, val1 > val2 x2 from data final prewhere assumeNotNull(val1) > 0 where x1 != x2 settings max_threads=1;

0 comments on commit 7bef66a

Please sign in to comment.