New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow parallel replicas for JOIN with analyzer [part 2] #58916
Allow parallel replicas for JOIN with analyzer [part 2] #58916
Conversation
This is an automated comment for commit 03720d5 with description of existing statuses. It's updated for the latest CI running ⏳ Click here to open a full report in a separate page Successful checks
|
a19db02
to
ec0fce3
Compare
ec0fce3
to
6bf28c8
Compare
cba10d7
to
29780b1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general LGTM
auto ast = queryNodeToSelectQuery(query_node); | ||
/// Remove CTEs information from distributed queries. | ||
/// Now, if cte_name is set for subquery node, AST -> String serialization will only print cte name. | ||
/// But CTE is defined only for top-level query part, so may not be sent. | ||
/// Removing cte_name forces subquery to be always printed. | ||
removeCTEs(ast); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to add a flag to ConvertToASTOptions
and do not add CTEs in QueryNode::toASTImpl
at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried, but it did not work for some reason.
Will do it later.
class GlobalPlannerContext | ||
{ | ||
public: | ||
GlobalPlannerContext() = default; | ||
explicit GlobalPlannerContext(const QueryNode * parallel_replicas_node_, const TableNode * parallel_replicas_table_) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, add a comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a comment below ... Should I add another one?
/// The query which will be executed with parallel replicas. | ||
/// In case if only the most inner subquery can be executed with parallel replicas, node is nullptr. | ||
const QueryNode * const parallel_replicas_node = nullptr; | ||
/// Table which is used with parallel replicas reading. Now, only one table is supported by the protocol. | ||
/// It is the left-most table of the query (in JOINs, UNIONs and subqueries). | ||
const TableNode * const parallel_replicas_table = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really necessary in global context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed it. Could not find a better place so far.
CollectStoragesVisitor collect_storages; | ||
collect_storages.visit(node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not optimal and lead to O(N^2) traversal. It's better to collect this info in enterImpl
and make check in leaveImpl
. We can leave it as it, but it's better to rewrite it in the follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why.
We traverse only the left table expression in RewriteJoinToGlobalJoinVisitor
, but check allStoragesAreMergeTree
only for the right table expressions.
|
||
/// Find a qury which can be executed with parallel replicas up to WithMergableStage. | ||
/// Returned query will always contain some (>1) subqueries, possibly with joins. | ||
const QueryNode * findParallelReplicasQuery(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to call it findQueryForParallelReplicas
?
return res; | ||
} | ||
|
||
static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * query_tree_node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to have a non recursive implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree.
However, it would make sense if we rewrite all the visitors to non-recursive mode implementation :)
@KochetovNicolai Please add new settings to the history
|
@nikitamikhaylov I've added it a long time ago. Looks like the check is broken somehow. |
Build is green. Merging. |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Support
LEFT JOIN
,ALL INNER JOIN
, and simple subqueries for parallel replicas (only with analyzer). New settingparallel_replicas_prefer_local_join
chooses localJOIN
execution (by default) vsGLOBAL JOIN
. All tables should exist on every replica fromcluster_for_parallel_replicas
. New settingsmin_external_table_block_size_rows
andmin_external_table_block_size_bytes
are used to squash small blocks that are sent for temporary tables (only with analyzer).