Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PREWHERE for Merge with different default types #46454

Merged
merged 2 commits into from Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 38 additions & 13 deletions src/Interpreters/InterpreterSelectQuery.cpp
Expand Up @@ -586,12 +586,17 @@ InterpreterSelectQuery::InterpreterSelectQuery(
current_info.query = query_ptr;
current_info.syntax_analyzer_result = syntax_analyzer_result;

Names queried_columns = syntax_analyzer_result->requiredSourceColumns();
const auto & supported_prewhere_columns = storage->supportedPrewhereColumns();
if (supported_prewhere_columns.has_value())
std::erase_if(queried_columns, [&](const auto & name) { return !supported_prewhere_columns->contains(name); });

MergeTreeWhereOptimizer{
current_info,
context,
std::move(column_compressed_sizes),
metadata_snapshot,
syntax_analyzer_result->requiredSourceColumns(),
queried_columns,
log};
}
}
Expand Down Expand Up @@ -1994,31 +1999,39 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
}
}

/// There are multiple sources of required columns:
/// - raw required columns,
/// - columns deduced from ALIAS columns,
/// - raw required columns from PREWHERE,
/// - columns deduced from ALIAS columns from PREWHERE.
/// PREWHERE is a special case, since we need to resolve it and pass directly to `IStorage::read()`
/// before any other executions.
if (alias_columns_required)
/// Set of all (including ALIAS) required columns for PREWHERE
auto get_prewhere_columns = [&]()
{
NameSet required_columns_from_prewhere; /// Set of all (including ALIAS) required columns for PREWHERE
NameSet required_aliases_from_prewhere; /// Set of ALIAS required columns for PREWHERE
NameSet columns;

if (prewhere_info)
{
/// Get some columns directly from PREWHERE expression actions
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());

if (prewhere_info->row_level_filter)
{
auto row_level_required_columns = prewhere_info->row_level_filter->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(row_level_required_columns.begin(), row_level_required_columns.end());
columns.insert(row_level_required_columns.begin(), row_level_required_columns.end());
}
}

return columns;
};

/// There are multiple sources of required columns:
/// - raw required columns,
/// - columns deduced from ALIAS columns,
/// - raw required columns from PREWHERE,
/// - columns deduced from ALIAS columns from PREWHERE.
/// PREWHERE is a special case, since we need to resolve it and pass directly to `IStorage::read()`
/// before any other executions.
if (alias_columns_required)
{
NameSet required_columns_from_prewhere = get_prewhere_columns();
NameSet required_aliases_from_prewhere; /// Set of ALIAS required columns for PREWHERE

/// Expression, that contains all raw required columns
ASTPtr required_columns_all_expr = std::make_shared<ASTExpressionList>();

Expand Down Expand Up @@ -2114,6 +2127,18 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
required_columns.push_back(column);
}
}

const auto & supported_prewhere_columns = storage->supportedPrewhereColumns();
if (supported_prewhere_columns.has_value())
{
NameSet required_columns_from_prewhere = get_prewhere_columns();

for (const auto & column_name : required_columns_from_prewhere)
{
if (!supported_prewhere_columns->contains(column_name))
throw Exception(ErrorCodes::ILLEGAL_PREWHERE, "Storage {} doesn't support PREWHERE for {}", storage->getName(), column_name);
}
}
}

void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan)
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/IStorage.h
Expand Up @@ -135,6 +135,10 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// Returns true if the storage supports queries with the PREWHERE section.
virtual bool supportsPrewhere() const { return false; }

/// Returns which columns supports PREWHERE, or empty std::nullopt if all columns is supported.
/// This is needed for engines whose aggregates data from multiple tables, like Merge.
virtual std::optional<NameSet> supportedPrewhereColumns() const { return std::nullopt; }

/// Returns true if the storage supports optimization of moving conditions to PREWHERE section.
virtual bool canMoveConditionsToPrewhere() const { return supportsPrewhere(); }

Expand Down
65 changes: 35 additions & 30 deletions src/Storages/StorageMerge.cpp
Expand Up @@ -24,6 +24,7 @@
#include <Columns/ColumnString.h>
#include <Common/typeid_cast.h>
#include <Common/checkStackSize.h>
#include "DataTypes/IDataType.h"
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
Expand Down Expand Up @@ -145,48 +146,58 @@ bool StorageMerge::tableSupportsPrewhere() const
/// If new table that matches regexp for current storage and doesn't support PREWHERE
/// will appear after this check and before calling "read" method, the optimized query may fail.
/// Since it's quite rare case, we just ignore this possibility.
const auto & table_doesnt_support_prewhere = getFirstTable([](const auto & table) { return !table->canMoveConditionsToPrewhere(); });
bool supports_prewhere = (table_doesnt_support_prewhere == nullptr);
///
/// NOTE: Type can be different, and in this case, PREWHERE cannot be
/// applied for those columns, but there a separate method to return
/// supported columns for PREWHERE - supportedPrewhereColumns().
return getFirstTable([](const auto & table) { return !table->canMoveConditionsToPrewhere(); }) == nullptr;
}

if (!supports_prewhere)
return false;
bool StorageMerge::canMoveConditionsToPrewhere() const
{
return tableSupportsPrewhere();
}

if (!getInMemoryMetadataPtr())
return false;
std::optional<NameSet> StorageMerge::supportedPrewhereColumns() const
{
bool supports_prewhere = true;

std::unordered_map<std::string, const IDataType *> column_types;
for (const auto & name_type : getInMemoryMetadataPtr()->getColumns().getAll())
const auto & metadata = getInMemoryMetadata();
const auto & columns = metadata.getColumns();

NameSet supported_columns;

std::unordered_map<std::string, std::pair<const IDataType *, std::optional<ColumnDefault>>> column_type_default;
for (const auto & name_type : columns.getAll())
{
column_types.emplace(name_type.name, name_type.type.get());
column_type_default.emplace(name_type.name, std::make_pair(
name_type.type.get(), columns.getDefault(name_type.name)));
supported_columns.emplace(name_type.name);
}

/// Check that all tables have the same column types, otherwise prewhere will fail
forEachTable([&](const StoragePtr & table)
{
const auto & metadata_ptr = table->getInMemoryMetadataPtr();
if (!metadata_ptr)
const auto & table_metadata_ptr = table->getInMemoryMetadataPtr();
if (!table_metadata_ptr)
supports_prewhere = false;

if (!supports_prewhere)
return;

for (const auto & column : metadata_ptr->getColumns().getAll())
const auto & table_columns = table_metadata_ptr->getColumns();
for (const auto & column : table_columns.getAll())
{
const auto * src_type = column_types[column.name];
if (src_type && !src_type->equals(*column.type))
const auto & root_type_default = column_type_default[column.name];
const IDataType * root_type = root_type_default.first;
const std::optional<ColumnDefault> & src_default = root_type_default.second;
if ((root_type && !root_type->equals(*column.type)) ||
src_default != table_columns.getDefault(column.name))
{
supports_prewhere = false;
return;
supported_columns.erase(column.name);
}
}
});

return supports_prewhere;
}

bool StorageMerge::canMoveConditionsToPrewhere() const
{
return tableSupportsPrewhere();
return supported_columns;
}

bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & /*metadata_snapshot*/) const
Expand Down Expand Up @@ -300,12 +311,6 @@ void StorageMerge::read(
auto modified_context = Context::createCopy(local_context);
modified_context->setSetting("optimize_move_to_prewhere", false);

if (query_info.prewhere_info && !tableSupportsPrewhere())
throw DB::Exception(
DB::ErrorCodes::ILLEGAL_PREWHERE,
"Cannot use PREWHERE with table {}, probably some columns don't have same type or an underlying table doesn't support PREWHERE",
getStorageID().getTableName());

bool has_database_virtual_column = false;
bool has_table_virtual_column = false;
Names real_column_names;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageMerge.h
Expand Up @@ -47,6 +47,7 @@ class StorageMerge final : public IStorage, WithContext
bool supportsIndexForIn() const override { return true; }
bool supportsSubcolumns() const override { return true; }
bool supportsPrewhere() const override { return true; }
std::optional<NameSet> supportedPrewhereColumns() const override;

bool canMoveConditionsToPrewhere() const override;

Expand Down
8 changes: 4 additions & 4 deletions tests/queries/0_stateless/00717_merge_and_distributed.sql
Expand Up @@ -20,9 +20,9 @@ SELECT * FROM merge(currentDatabase(), 'test_local_1');
SELECT *, _table FROM merge(currentDatabase(), 'test_local_1') ORDER BY _table;
SELECT sum(value), _table FROM merge(currentDatabase(), 'test_local_1') GROUP BY _table ORDER BY _table;
SELECT * FROM merge(currentDatabase(), 'test_local_1') WHERE _table = 'test_local_1';
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table = 'test_local_1'; -- { serverError NOT_FOUND_COLUMN_IN_BLOCK }
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table = 'test_local_1'; -- { serverError ILLEGAL_PREWHERE }
SELECT * FROM merge(currentDatabase(), 'test_local_1') WHERE _table in ('test_local_1', 'test_local_2');
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2'); -- { serverError NOT_FOUND_COLUMN_IN_BLOCK }
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2'); -- { serverError ILLEGAL_PREWHERE }

SELECT '--------------Single Distributed------------';
SELECT * FROM merge(currentDatabase(), 'test_distributed_1');
Expand All @@ -38,9 +38,9 @@ SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') ORDER BY _ta
SELECT *, _table FROM merge(currentDatabase(), 'test_local_1|test_local_2') ORDER BY _table;
SELECT sum(value), _table FROM merge(currentDatabase(), 'test_local_1|test_local_2') GROUP BY _table ORDER BY _table;
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') WHERE _table = 'test_local_1';
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1'; -- { serverError NOT_FOUND_COLUMN_IN_BLOCK }
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1'; -- { serverError ILLEGAL_PREWHERE }
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') WHERE _table in ('test_local_1', 'test_local_2') ORDER BY value;
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; -- { serverError NOT_FOUND_COLUMN_IN_BLOCK }
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; -- { serverError ILLEGAL_PREWHERE }

SELECT '--------------Local Merge Distributed------------';
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_distributed_2') ORDER BY _table;
Expand Down
Expand Up @@ -9,7 +9,10 @@ ENGINE = MergeTree()
ORDER BY f1;

-- In version 20.12 this query sometimes produces an exception "Cannot find column"
SELECT f2 FROM merge(currentDatabase(), '^abc$') PREWHERE _table = 'abc' AND f1 = 'a' AND rand() % 100 < 20;
SELECT f2 FROM merge(currentDatabase(), '^abc$') PREWHERE _table = 'abc' AND f1 = 'a';
SELECT f2 FROM merge(currentDatabase(), '^abc$') PREWHERE _table = 'abc' AND f1 = 'a' AND rand() % 100 < 20; -- { serverError ILLEGAL_PREWHERE }
azat marked this conversation as resolved.
Show resolved Hide resolved
SELECT f2 FROM merge(currentDatabase(), '^abc$') PREWHERE _table = 'abc' AND f1 = 'a'; -- { serverError ILLEGAL_PREWHERE }

SELECT f2 FROM merge(currentDatabase(), '^abc$') PREWHERE f1 = 'a' AND rand() % 100 < 20 WHERE _table = 'abc';
SELECT f2 FROM merge(currentDatabase(), '^abc$') PREWHERE f1 = 'a' WHERE _table = 'abc';

DROP TABLE abc;
3 changes: 2 additions & 1 deletion tests/queries/0_stateless/01931_storage_merge_no_columns.sql
@@ -1,4 +1,5 @@
drop table if exists data;
create table data (key Int) engine=MergeTree() order by key;
select 1 from merge(currentDatabase(), '^data$') prewhere _table in (NULL);
select 1 from merge(currentDatabase(), '^data$') prewhere _table in (NULL); -- { serverError ILLEGAL_PREWHERE }
select 1 from merge(currentDatabase(), '^data$') where _table in (NULL);
drop table data;
12 changes: 12 additions & 0 deletions tests/queries/0_stateless/02570_merge_alias_prewhere.reference
@@ -0,0 +1,12 @@
-- { echoOn }
-- for pure PREWHERE it is not addressed yet.
SELECT * FROM m PREWHERE a = 'OK';
OK 0
SELECT * FROM m PREWHERE f = 0; -- { serverError ILLEGAL_PREWHERE }
SELECT * FROM m WHERE f = 0 SETTINGS optimize_move_to_prewhere=0;
OK 0
SELECT * FROM m WHERE f = 0 SETTINGS optimize_move_to_prewhere=1;
OK 0
-- { echoOn }
SELECT * FROM m WHERE f = 0 SETTINGS optimize_move_to_prewhere=1;
OK 0
42 changes: 42 additions & 0 deletions tests/queries/0_stateless/02570_merge_alias_prewhere.sql
@@ -0,0 +1,42 @@
DROP TABLE IF EXISTS m;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;

CREATE TABLE m
(
`a` String,
`f` UInt8
)
ENGINE = Merge(currentDatabase(), '^(t1|t2)$');

CREATE TABLE t1
(
a String,
f UInt8 ALIAS 0
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS index_granularity = 8192;
INSERT INTO t1 VALUES ('OK');

-- { echoOn }
-- for pure PREWHERE it is not addressed yet.
SELECT * FROM m PREWHERE a = 'OK';
SELECT * FROM m PREWHERE f = 0; -- { serverError ILLEGAL_PREWHERE }
SELECT * FROM m WHERE f = 0 SETTINGS optimize_move_to_prewhere=0;
SELECT * FROM m WHERE f = 0 SETTINGS optimize_move_to_prewhere=1;
-- { echoOff }

CREATE TABLE t2
(
a String,
f UInt8,
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS index_granularity = 8192;
INSERT INTO t2 VALUES ('OK', 1);

-- { echoOn }
SELECT * FROM m WHERE f = 0 SETTINGS optimize_move_to_prewhere=1;
-- { echoOff }