Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible incorrect result with SimpleAggregateFunction in PREWHERE and FINAL #54436

Merged
merged 1 commit into from Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 52 additions & 11 deletions src/Interpreters/ExpressionAnalyzer.cpp
Expand Up @@ -62,6 +62,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>

#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/GetAggregatesVisitor.h>
Expand Down Expand Up @@ -1209,32 +1210,72 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
}

{
ActionsDAGPtr actions;

auto required_columns = prewhere_actions->getRequiredColumns();
NameSet prewhere_input_names;
for (const auto & col : required_columns)
prewhere_input_names.insert(col.name);

NameSet unused_source_columns;

/// Add empty action with input = {prewhere actions output} + {unused source columns}
/// Reasons:
/// 1. Remove remove source columns which are used only in prewhere actions during prewhere actions execution.
/// Example: select A prewhere B > 0. B can be removed at prewhere step.
/// 2. Store side columns which were calculated during prewhere actions execution if they are used.
/// Example: select F(A) prewhere F(A) > 0. F(A) can be saved from prewhere step.
///
/// NOTE: this cannot be done for queries with FINAL and PREWHERE over SimpleAggregateFunction,
/// since it can be changed after applying merge algorithm.
///
/// 3. Check if we can remove filter column at prewhere step. If we can, action will store single REMOVE_COLUMN.
ColumnsWithTypeAndName columns = prewhere_actions->getResultColumns();
auto required_columns = prewhere_actions->getRequiredColumns();
NameSet prewhere_input_names;
NameSet unused_source_columns;
bool columns_from_prewhere_can_be_reused = true;
if (storage() && getSelectQuery()->final())
{
for (const auto & column : metadata_snapshot->getColumns().getOrdinary())
{
if (!prewhere_input_names.contains(column.name))
continue;
if (dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(column.type->getCustomName()))
{
columns_from_prewhere_can_be_reused = false;
break;
}
}
}

for (const auto & col : required_columns)
prewhere_input_names.insert(col.name);
if (!columns_from_prewhere_can_be_reused)
{
for (const auto & column : sourceColumns())
{
if (!prewhere_input_names.contains(column.name))
{
required_columns.emplace_back(NameAndTypePair{column.name, column.type});
unused_source_columns.emplace(column.name);
}
}

for (const auto & column : sourceColumns())
actions = std::make_shared<ActionsDAG>(std::move(required_columns));
}
else
{
if (!prewhere_input_names.contains(column.name))
ColumnsWithTypeAndName columns = prewhere_actions->getResultColumns();

for (const auto & column : sourceColumns())
{
columns.emplace_back(column.type, column.name);
unused_source_columns.emplace(column.name);
if (!prewhere_input_names.contains(column.name))
{
columns.emplace_back(column.type, column.name);
unused_source_columns.emplace(column.name);
}
}

actions = std::make_shared<ActionsDAG>(std::move(columns));
}

chain.steps.emplace_back(
std::make_unique<ExpressionActionsChain::ExpressionActionsStep>(std::make_shared<ActionsDAG>(std::move(columns))));
std::make_unique<ExpressionActionsChain::ExpressionActionsStep>(std::move(actions)));
chain.steps.back()->additional_input = std::move(unused_source_columns);
chain.getLastActions();
chain.addStep();
Expand Down
Empty file.
9 changes: 9 additions & 0 deletions tests/queries/0_stateless/02872_prewhere_filter.sql
@@ -0,0 +1,9 @@
drop table if exists data;

create table data (key Int, val1 SimpleAggregateFunction(max, Nullable(Int)), val2 SimpleAggregateFunction(min, Int)) engine=AggregatingMergeTree() order by key;
system stop merges data;

insert into data values (1,10,100);
insert into data values (1,20,10);

select key, val1, val2, assumeNotNull(val1) > val2 x1, val1 > val2 x2 from data final prewhere assumeNotNull(val1) > 0 where x1 != x2 settings max_threads=1;