Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support RuntimeFilter In Doris #6075

Closed
stdpain opened this issue Jun 22, 2021 · 3 comments · Fixed by #6077
Closed

[Feature] Support RuntimeFilter In Doris #6075

stdpain opened this issue Jun 22, 2021 · 3 comments · Fixed by #6077

Comments

@stdpain
Copy link
Contributor

stdpain commented Jun 22, 2021

What is Runtime Filter?

Suppose you have the following SQL to execute:

select * from table1 join table2 on table1.id = table2.id;

The execution plan usually determines the size of the two tables, and then lets the big table join the small table, if nothing is optimized then the big table on the left needs to sweep all
If nothing is optimized, the big table on the left needs to sweep all the data, then send it to joinnode, and then go to join one by one, which wastes a lot of overhead in the middle. For example, network IO overhead.

image

If we scan the small table first, and then get some information of the small table, such as the upper and lower bounds, or build a bloomfilter, this can make
The big table on the left can filter some data in advance, or even scan a lot less data (by the upper and lower bounds we can know which partitions don't need to be swept, and if some columns are ordered, you can also use the index to filter.

image

The current runtime filter of Doris

Currently Doris has runtimefilter-like logic, where the hash_join inside tries to build the In expressions and then push down to scan_node.
Also, shuffle join pushdown is not supported.

How Doris implements RuntimeFilter

Step 1: Push down a RuntimeFilter that supports broadcast join. It is better to push it down as an expression, and later we can take advantage of our storage primer to do some optimization.
The first step is to implement a RuntimeFilter that supports broadcast join.
There is no adjuster involved in broadcast join. We don't need to design a lot of classes, so we can reuse the existing logic and finish the development quickly (currently
bloomfilter and minmaxFilter are finished and being tested)
Step 2: Support shuffle join
Doris BE doesn't have a Coordinator, so we need to push all the filters built by JoinNode to scan_node in this part.
and do the aggregation of filters when scan_node.
Filter must be aggregated, not aggregated will cause scan_node to lose data when scanning.

Some components

RuntimeFilterMgr A component that manages the RuntimeFilter (all filters are managed by this one), with a lifecycle strongly bound to
The lifecycle is strongly bound to RuntimeState. All filters in a fragment_instance need to be registered to this class, and a
Filter is also retrieved from this class.
Why this is strongly bound to the RuntimeState lifecycle 。。。。 Because when we update Filter via rpc, we need to
We need to locate this class by a fragment_instance_id and then assign the value to it.
RuntimeFilterMergeControllerEntity The naming of this class is not very friendly, it can be understood as the context for merging nodes.
The life cycle can be temporarily understood as the entire phase is valid (you can think of this query is not stopped, this will always be in)
FE must select the last destroyed sink node as the merge node. Each time
It will store query-id -> weak_this in controller, and then store shared_ptr in each fragment_state
in each fragment_state. When all the strong reference pointers are destroyed. This will be automatically destroyed from the map (override close)

RuntimeFilterMergeController The controller used for Filter merging, all Filter production segments. The life cycle is strongly bound to
FragmentMgr strong binding.

Filter Construction
RuntimeFilter is built in HashJoinNode.
The specific type of build, and some other properties, as well as build_expr are obtained in FE.
Since expression computation in doris is currently a rather unwieldy operation, FE needs to provide an expr_order attribute to represent that this is the
the first join condition
For example A.a = B.a and A.b + 1 = B.b and A.c = B.c*2+1

Here the prob_expr is [SlotRef(A.a),BinaryPredicate(A.a,1),SlotRef(A.c)]
Here the build_expr is [SlotRef(B.a),SlotRef(B.b),XXXPredicate]
If we build two RuntimeFilters on the equivalence condition that A.b + 1 = B.b, we can circumvent this
expr_order to circumvent one expression computation.

Filter merge

The current types of joins that can generate RuntimeFilter are BoardCast Join Shuffle Join and Colocation Join

join type whether to merge runtime filter role point
BoardCast no may be remote or local (on top of the same fragment_instance)
Shuffle yes Remote
Colocation no the same fragment_instance

BoardCast Join can theoretically be sent directly to the role node when the role point is remote, but for convenience, it is sent directly to the merge node.
to the merge node, and then the merge node sends it to the node that needs to receive it.

Filter apply

The current logic is that the right table scan_node will try to wait for a while in the open phase, and if it does, it will try to push the filter down to the storage primer.
engine. If the configuration is to wait at most 1s per filter, and there are 3 filters on a particular scan_node, then it will wait at most 3s.
But not all conditions will be pushed to the storage engine, for example the following sql:
The wait here is the await nodify implementation
In this case, since the equal conditions are prob_expr: CastExpr(SlotExpr()) , build_expr: SlotExpr() , this will not
be pushed to the storage engine level, but can be filtered out in the scan_node expression filtering phase
here the build_expr is [SlotRef(A.a),BinaryPredicate(A.a,1),SlotRef(A.c)]
Here the build_expr is [SlotRef(B.a),SlotRef(B.b),XXXPredicate]
If we build two RuntimeFilters on the equivalence condition that A.b + 1 = B.b, we can circumvent this
expr_order to circumvent one expression calculation.

select count(*) from line_order join date where line_order.date=date.date_str
and date.year=1993
-- type line_order.date -> int
-- type date.date_str -> varchar If it doesn't wait in the open phase, then each will check if the filter is ready at the start of each 

If it doesn't wait in the open phase, then each will check if the filter is ready at the start of each data scan phase, and if If it is ready, the filter will be used, but this time it will not try to push down to the storage engine, but will only perform the expression calculation filter

reference

https://bbs.huaweicloud.com/blogs/174769
https://impala.apache.org/docs/build/html/topics/impala_runtime_filtering.html#runtime_filtering_file_formats

@xinghuayu007
Copy link
Contributor

Does it has a measure that how much big the big table has data than small table? Is RuntimeFilter suitable to all join operation? For example, the bigger table had 2000 rows of data, left table has 1000 rows of data, only 2 times of data size. Will RuntimeFilter be faster than normal broadcast?

@stdpain
Copy link
Contributor Author

stdpain commented Jun 24, 2021

@xinghuayu007
It is difficult to find a metric to determine the effectiveness of a runtime filter; the exact effectiveness of RuntimeFilter still depends on the filtering rate and the amount of data filtering involved in saving. So by default RuntimeFilter uses a more conservative approach, generating IN Filter only when the hash table is not large to maintain consistency with the old logic.
After a few iterations of RuntimeFilter, the speed of building RuntimeFilter has improved significantly compared to 0.14.12. Also, the storage engine has less cache misses when using short-circuit filtering, so the performance of the original query is slowed down very little.
RuntimeFilter is tested with mode=7 in TPCDS, and no significant query speed reduction is found.

@xinyiZzz
Copy link
Contributor

@xinghuayu007 If the left table has only 1000 rows of data, the use of runtime filter may result in performance degradation, even if only the smallest overhead IN filter is used and the actual filtering rate is very high, but this is negligible in most cases

Detailed introduction, usage, and test results of Runtime Filtering for Doris will be gradually added on #6116

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants