Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DISTINCT for Distributed and optimize_skip_unused_shards #9808

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere)
}

if (storage && !options.only_analyze)
from_stage = storage->getQueryProcessingStage(*context);
from_stage = storage->getQueryProcessingStage(*context, query_ptr);

/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = from_stage < QueryProcessingStage::WithMergeableState
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,12 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/** Returns stage to which query is going to be processed in read() function.
* (Normally, the function only reads the columns from the list, but in other cases,
* for example, the request can be partially processed on a remote server.)
*
* SelectQueryInfo is required since the stage can depends on the query
* (see Distributed() engine and optimize_skip_unused_shards).
*/
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const { return QueryProcessingStage::FetchColumns; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const { return getQueryProcessingStage(context, {}); }
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const { return QueryProcessingStage::FetchColumns; }

/** Watch live changes to the table.
* Accepts a list of columns to read, as well as a description of the query,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/LiveView/StorageBlocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class StorageBlocks : public IStorage
return std::make_shared<StorageBlocks>(table_id, columns, std::move(pipes), to_stage);
}
std::string getName() const override { return "Blocks"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & /*context*/) const override { return to_stage; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const override { return to_stage; }

Pipes read(
const Names & /*column_names*/,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/StorageBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class BufferSource : public SourceWithProgress
};


QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
if (destination_id)
{
Expand All @@ -144,7 +144,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);

return destination->getQueryProcessingStage(context);
return destination->getQueryProcessingStage(context, query_ptr);
}

return QueryProcessingStage::FetchColumns;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ friend class BufferBlockOutputStream;

std::string getName() const override { return "Buffer"; }

QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override;

Pipes read(
const Names & column_names,
Expand Down
101 changes: 51 additions & 50 deletions dbms/src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,9 @@ static QueryProcessingStage::Enum getQueryProcessingStageImpl(const Context & co
: QueryProcessingStage::WithMergeableState;
}

QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
auto cluster = getCluster();
auto cluster = getOptimizedCluster(context, query_ptr);
return getQueryProcessingStageImpl(context, cluster);
}

Expand All @@ -383,9 +383,7 @@ Pipes StorageDistributed::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
auto cluster = getCluster();

const Settings & settings = context.getSettingsRef();
auto cluster = getOptimizedCluster(context, query_info.query);

const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
Expand All @@ -405,50 +403,8 @@ Pipes StorageDistributed::read(
: ClusterProxy::SelectStreamFactory(
header, processed_stage, StorageID{remote_database, remote_table}, scalars, has_virtual_shard_num_column, context.getExternalTables());

UInt64 force = settings.force_optimize_skip_unused_shards;
if (settings.optimize_skip_unused_shards)
{
ClusterPtr smaller_cluster;
auto table_id = getStorageID();

if (has_sharding_key)
{
smaller_cluster = skipUnusedShards(cluster, query_info, context);

if (smaller_cluster)
{
cluster = smaller_cluster;
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() << ": "
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
" " << makeFormattedListOfShards(cluster));
}
}

if (!smaller_cluster)
{
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() <<
(has_sharding_key ? "" : " (no sharding key)") << ": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - "
"the query will be sent to all shards of the cluster");

if (force)
{
std::stringstream exception_message;
if (!has_sharding_key)
exception_message << "No sharding key";
else
exception_message << "Sharding key " << sharding_key_column_name << " is not used";

if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY && has_sharding_key)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
}
}
}

return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings, query_info);
select_stream_factory, cluster, modified_query_ast, context, context.getSettingsRef(), query_info);
}


Expand Down Expand Up @@ -631,6 +587,51 @@ ClusterPtr StorageDistributed::getCluster() const
return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name);
}

ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, const ASTPtr & query_ptr) const
{
ClusterPtr cluster = getCluster();
const Settings & settings = context.getSettingsRef();
auto table_id = getStorageID();

if (!settings.optimize_skip_unused_shards)
return cluster;

if (has_sharding_key)
{
ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, context);

if (optimized)
{
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() << ": "
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
" " << makeFormattedListOfShards(cluster));
return optimized;
}
}

LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() <<
(has_sharding_key ? "" : " (no sharding key)") << ": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - "
"the query will be sent to all shards of the cluster");

UInt64 force = settings.force_optimize_skip_unused_shards;
if (force)
{
std::stringstream exception_message;
if (!has_sharding_key)
exception_message << "No sharding key";
else
exception_message << "Sharding key " << sharding_key_column_name << " is not used";

if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY && has_sharding_key)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
}

return cluster;
}

void StorageDistributed::ClusterNodeData::flushAllData()
{
directory_monitor->flushAllData();
Expand All @@ -643,9 +644,9 @@ void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()

/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info, const Context & context)
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const Context & context) const
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
const auto & select = query_ptr->as<ASTSelectQuery &>();

if (!select.prewhere() && !select.where())
{
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Storages/StorageDistributed.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class StorageDistributed final : public ext::shared_ptr_helper<StorageDistribute

bool isRemote() const override { return true; }

QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override;

Pipes read(
const Names & column_names,
Expand Down Expand Up @@ -114,6 +114,12 @@ class StorageDistributed final : public ext::shared_ptr_helper<StorageDistribute

ClusterPtr getCluster() const;

/// Apply the following settings:
/// - optimize_skip_unused_shards
/// - force_optimize_skip_unused_shards
ClusterPtr getOptimizedCluster(const Context &, const ASTPtr & query_ptr) const;
ClusterPtr skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const Context & context) const;

ActionLock getActionLock(StorageActionBlockType type) override;

String remote_database;
Expand Down Expand Up @@ -164,8 +170,6 @@ class StorageDistributed final : public ext::shared_ptr_helper<StorageDistribute
const String & relative_data_path_,
bool attach);

ClusterPtr skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info, const Context & context);

void createStorage();

String storage_policy;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/StorageMaterializedView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ StorageInMemoryMetadata StorageMaterializedView::getInMemoryMetadata() const
return result;
}

QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
return getTargetTable()->getQueryProcessingStage(context);
return getTargetTable()->getQueryProcessingStage(context, query_ptr);
}

Pipes StorageMaterializedView::read(
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageMaterializedView.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StorageMaterializedView final : public ext::shared_ptr_helper<StorageMater
void checkTableCanBeDropped() const override;
void checkPartitionCanBeDropped(const ASTPtr & partition) override;

QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override;

StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/StorageMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, cons
}


QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;

Expand All @@ -150,7 +150,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
if (table.get() != this)
{
++selected_table_size;
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context));
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, query_ptr));
}

iterator->next();
Expand Down Expand Up @@ -287,15 +287,15 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
return pipes;
}

if (processed_stage <= storage->getQueryProcessingStage(*modified_context))
if (processed_stage <= storage->getQueryProcessingStage(*modified_context, query_info.query))
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));

pipes = storage->read(real_column_names, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num));
}
else if (processed_stage > storage->getQueryProcessingStage(*modified_context))
else if (processed_stage > storage->getQueryProcessingStage(*modified_context, query_info.query))
{
modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, table_name);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class StorageMerge final : public ext::shared_ptr_helper<StorageMerge>, public I
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;

QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const override;

Pipes read(
const Names & column_names,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
distributed_group_by_no_merge
1
1
optimize_skip_unused_shards
1
optimize_skip_unused_shards lack of WHERE
0
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE TABLE IF NOT EXISTS local_01213 (id Int) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE IF NOT EXISTS dist_01213 AS local_01213 ENGINE = Distributed(test_cluster_two_shards_localhost, currentDatabase(), local_01213, id);

-- at least two parts
INSERT INTO local_01213 SELECT toString(number) FROM numbers(2);
INSERT INTO local_01213 SELECT toString(number) FROM numbers(2);

-- check that without merge we will have two rows
SELECT 'distributed_group_by_no_merge';
SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS distributed_group_by_no_merge=1;
-- check that with merge there will be only one
SELECT 'optimize_skip_unused_shards';
SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS optimize_skip_unused_shards=1;
-- check that querying all shards is ok
SELECT 'optimize_skip_unused_shards lack of WHERE';
SELECT DISTINCT id FROM dist_01213 SETTINGS optimize_skip_unused_shards=1;

DROP TABLE local_01213;
DROP TABLE dist_01213;