Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make custom key for parallel replicas work in new analyzer #48054

Merged
merged 6 commits into from
Apr 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,15 +518,13 @@ InterpreterSelectQuery::InterpreterSelectQuery(
settings.additional_table_filters, joined_tables.tablesWithColumns().front().table, *context);

ASTPtr parallel_replicas_custom_filter_ast = nullptr;
if (context->getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY && !joined_tables.tablesWithColumns().empty())
if (storage && context->getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY && !joined_tables.tablesWithColumns().empty())
{
if (settings.parallel_replicas_count > 1)
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *context))
{
LOG_TRACE(log, "Processing query on a replica using custom_key '{}'", settings.parallel_replicas_custom_key.value);
if (!storage)
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Storage is unknown when trying to parse custom key for parallel replica");

parallel_replicas_custom_filter_ast = getCustomKeyFilterForParallelReplica(
settings.parallel_replicas_count,
Expand Down
13 changes: 13 additions & 0 deletions src/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,19 @@ void Planner::buildPlanForQueryNode()
collectSets(query_tree, *planner_context);
collectTableExpressionData(query_tree, planner_context);

const auto & settings = query_context->getSettingsRef();

if (planner_context->getTableExpressionNodeToData().size() > 1
&& (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
{
LOG_WARNING(
&Poco::Logger::get("Planner"), "Joins are not supported with parallel replicas. Query will be executed without using them.");

auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
mutable_context->setSetting("parallel_replicas_custom_key", String{""});
}

auto top_level_identifiers = collectTopLevelColumnIdentifiers(query_tree, planner_context);
auto join_tree_query_plan = buildJoinTreeQueryPlan(query_tree,
select_query_info,
Expand Down
157 changes: 114 additions & 43 deletions src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <Storages/IStorage.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageDistributed.h>

#include <Analyzer/ConstantNode.h>
#include <Analyzer/ColumnNode.h>
Expand Down Expand Up @@ -47,6 +48,7 @@
#include <Interpreters/TableJoin.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>

#include <Planner/CollectColumnIdentifiers.h>
#include <Planner/Planner.h>
Expand Down Expand Up @@ -381,49 +383,89 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
prewhere_outputs.insert(prewhere_outputs.end(), required_output_nodes.begin(), required_output_nodes.end());
}

FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
{
auto storage_id = storage->getStorageID();
const auto & query_context = planner_context->getQueryContext();

auto row_policy_filter = query_context->getRowPolicyFilter(storage_id.getDatabaseName(), storage_id.getTableName(), RowPolicyFilterType::SELECT_FILTER);
if (!row_policy_filter)
return {};

auto row_policy_filter_query_tree = buildQueryTree(row_policy_filter->expression, query_context);
auto filter_query_tree = buildQueryTree(filter_expression, query_context);

QueryAnalysisPass query_analysis_pass(table_expression_query_info.table_expression);
query_analysis_pass.run(row_policy_filter_query_tree, query_context);
query_analysis_pass.run(filter_query_tree, query_context);

auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression_query_info.table_expression);
const auto table_expression_names = table_expression_data.getColumnNames();
NameSet table_expression_required_names_without_row_policy(table_expression_names.begin(), table_expression_names.end());
NameSet table_expression_required_names_without_filter(table_expression_names.begin(), table_expression_names.end());

collectSourceColumns(row_policy_filter_query_tree, planner_context);
collectSets(row_policy_filter_query_tree, *planner_context);
collectSourceColumns(filter_query_tree, planner_context);
collectSets(filter_query_tree, *planner_context);

auto row_policy_actions_dag = std::make_shared<ActionsDAG>();
auto filter_actions_dag = std::make_shared<ActionsDAG>();

PlannerActionsVisitor actions_visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
auto expression_nodes = actions_visitor.visit(row_policy_actions_dag, row_policy_filter_query_tree);
auto expression_nodes = actions_visitor.visit(filter_actions_dag, filter_query_tree);
if (expression_nodes.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Row policy filter actions must return single output node. Actual {}",
"Filter actions must return single output node. Actual {}",
expression_nodes.size());

auto & row_policy_actions_outputs = row_policy_actions_dag->getOutputs();
row_policy_actions_outputs = std::move(expression_nodes);
auto & filter_actions_outputs = filter_actions_dag->getOutputs();
filter_actions_outputs = std::move(expression_nodes);

std::string filter_node_name = row_policy_actions_outputs[0]->result_name;
std::string filter_node_name = filter_actions_outputs[0]->result_name;
bool remove_filter_column = true;

for (const auto & row_policy_input_node : row_policy_actions_dag->getInputs())
if (table_expression_required_names_without_row_policy.contains(row_policy_input_node->result_name))
row_policy_actions_outputs.push_back(row_policy_input_node);
for (const auto & filter_input_node : filter_actions_dag->getInputs())
if (table_expression_required_names_without_filter.contains(filter_input_node->result_name))
filter_actions_outputs.push_back(filter_input_node);

return {std::move(row_policy_actions_dag), std::move(filter_node_name), remove_filter_column};
return {std::move(filter_actions_dag), std::move(filter_node_name), remove_filter_column};
}

FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
{
auto storage_id = storage->getStorageID();
const auto & query_context = planner_context->getQueryContext();

auto row_policy_filter = query_context->getRowPolicyFilter(storage_id.getDatabaseName(), storage_id.getTableName(), RowPolicyFilterType::SELECT_FILTER);
if (!row_policy_filter)
return {};

return buildFilterInfo(row_policy_filter->expression, table_expression_query_info, planner_context);
}

FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
{
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();

if (settings.parallel_replicas_count <= 1 || settings.parallel_replicas_custom_key.value.empty())
return {};

auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *query_context);
if (!custom_key_ast)
throw DB::Exception(
ErrorCodes::BAD_ARGUMENTS,
"Parallel replicas processing with custom_key has been requested "
"(setting 'max_parallel_replcias'), but the table does not have custom_key defined for it "
" or it's invalid (setting 'parallel_replicas_custom_key')");

LOG_TRACE(&Poco::Logger::get("Planner"), "Processing query on a replica using custom_key '{}'", settings.parallel_replicas_custom_key.value);

auto parallel_replicas_custom_filter_ast = getCustomKeyFilterForParallelReplica(
settings.parallel_replicas_count,
settings.parallel_replica_offset,
std::move(custom_key_ast),
settings.parallel_replicas_custom_key_filter_type,
*storage,
query_context);

return buildFilterInfo(parallel_replicas_custom_filter_ast, table_expression_query_info, planner_context);
}

JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression,
Expand Down Expand Up @@ -596,11 +638,14 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres

updatePrewhereOutputsIfNeeded(table_expression_query_info, table_expression_data.getColumnNames(), storage_snapshot);

auto row_policy_filter_info = buildRowPolicyFilterIfNeeded(storage, table_expression_query_info, planner_context);
bool moved_row_policy_to_prewhere = false;
const auto & columns_names = table_expression_data.getColumnNames();

if (row_policy_filter_info.actions)
std::vector<std::pair<FilterDAGInfo, std::string>> where_filters;
const auto add_filter = [&](const FilterDAGInfo & filter_info, std::string description)
{
if (!filter_info.actions)
return;

bool is_final = table_expression_query_info.table_expression_modifiers &&
table_expression_query_info.table_expression_modifiers->hasFinal();
bool optimize_move_to_prewhere = settings.optimize_move_to_prewhere && (!is_final || settings.optimize_move_to_prewhere_if_final);
Expand All @@ -612,36 +657,62 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres

if (!table_expression_query_info.prewhere_info->prewhere_actions)
{
table_expression_query_info.prewhere_info->prewhere_actions = row_policy_filter_info.actions;
table_expression_query_info.prewhere_info->prewhere_column_name = row_policy_filter_info.column_name;
table_expression_query_info.prewhere_info->remove_prewhere_column = row_policy_filter_info.do_remove_column;
table_expression_query_info.prewhere_info->prewhere_actions = filter_info.actions;
table_expression_query_info.prewhere_info->prewhere_column_name = filter_info.column_name;
table_expression_query_info.prewhere_info->remove_prewhere_column = filter_info.do_remove_column;
}
else
{
table_expression_query_info.prewhere_info->row_level_filter = row_policy_filter_info.actions;
table_expression_query_info.prewhere_info->row_level_column_name = row_policy_filter_info.column_name;
table_expression_query_info.prewhere_info->row_level_filter = filter_info.actions;
table_expression_query_info.prewhere_info->row_level_column_name = filter_info.column_name;
}

table_expression_query_info.prewhere_info->need_filter = true;
moved_row_policy_to_prewhere = true;
}
else
{
where_filters.emplace_back(filter_info, std::move(description));
}
};

auto row_policy_filter_info = buildRowPolicyFilterIfNeeded(storage, table_expression_query_info, planner_context);
add_filter(row_policy_filter_info, "Row-level security filter");

if (query_context->getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY)
{
if (settings.parallel_replicas_count > 1)
{
auto parallel_replicas_custom_key_filter_info = buildCustomKeyFilterIfNeeded(storage, table_expression_query_info, planner_context);
add_filter(parallel_replicas_custom_key_filter_info, "Parallel replicas custom key filter");
}
else
{
if (auto * distributed = typeid_cast<StorageDistributed *>(storage.get());
distributed && canUseCustomKey(settings, *distributed->getCluster(), *query_context))
{
table_expression_query_info.use_custom_key = true;
planner_context->getMutableQueryContext()->setSetting("distributed_group_by_no_merge", 2);
}
}
}

const auto & columns_names = table_expression_data.getColumnNames();
from_stage = storage->getQueryProcessingStage(query_context, select_query_options.to_stage, storage_snapshot, table_expression_query_info);
storage->read(query_plan, columns_names, storage_snapshot, table_expression_query_info, query_context, from_stage, max_block_size, max_streams);

if (query_plan.isInitialized() &&
from_stage == QueryProcessingStage::FetchColumns &&
row_policy_filter_info.actions &&
!moved_row_policy_to_prewhere)
for (const auto & filter_info_and_description : where_filters)
{
auto row_level_filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
row_policy_filter_info.actions,
row_policy_filter_info.column_name,
row_policy_filter_info.do_remove_column);
row_level_filter_step->setStepDescription("Row-level security filter");
query_plan.addStep(std::move(row_level_filter_step));
const auto & [filter_info, description] = filter_info_and_description;
if (query_plan.isInitialized() &&
from_stage == QueryProcessingStage::FetchColumns &&
filter_info.actions)
{
auto filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
filter_info.actions,
filter_info.column_name,
filter_info.do_remove_column);
filter_step->setStepDescription(description);
query_plan.addStep(std::move(filter_step));
}
}

if (query_context->hasQueryContext() && !select_query_options.is_internal)
Expand Down