You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
At present, Doirs will judge the size of T1 and T2, put the large table on the left side of the join, assuming that the number of data rows in T1 is greater than T2, then when joining, Doris will scan all the data of T1 and T2 and send it to HashJoinNode for use on HashJoinNode T2 data build hash table, and then use T1 data probe hash table.
In the probe, the hash table cannot be hit, which means that some data in T1 does not exist in T2. For join types such as inner join and right outer join, these join key values that do not exist in T2 will not be retained in the end In the join result, it is meaningless to send it from the T1 scan to the HashJoinNode. If this part of the join key value can be filtered out during the scan, it will reduce the scan time and network overhead.
What is Runtime Filter
Assuming that in the above query, T1 is a fact table with 100000 data rows, and T2 is a dimension table with 100 data rows. The current actual situation of Doris join is:
Obviously, scanning data for T2 is much faster than T1. If we take the initiative to wait for a while and then scan T1, after T2 sends the scanned data record to HashJoinNode, HashJoinNode calculates a filter condition based on the data of T2, such as the maximum value of T2 data And the minimum value, or build a Bloom Filter, and then send this filter condition to ScanNode waiting to scan T1, the latter applies this filter condition and delivers the filtered data to HashJoinNode, thereby reducing the number of probe hash tables and network overhead. This filter condition is Runtime Filter, and the effect is as follows:
If the filter condition (Runtime Filter) can be pushed down to the storage engine, in some cases, the index can be used to directly reduce the amount of scanned data, thereby greatly reducing the scanning time. The effect is as follows:
It can be seen that, unlike predicate push-down and partition cutting, Runtime Filter is a filter condition dynamically generated at runtime, that is, when the query is run, the join on clause is parsed to determine the filter expression, and the expression is broadcast to ScanNode that is reading the left table , Thereby reducing the amount of scanned data, thereby reducing the number of probe hash table, avoiding unnecessary I/O and network transmission.
Runtime Filter is mainly used to optimize joins for large tables. If the amount of data in the left table is too small, or the amount of data in the right table is too large, the Runtime Filter may not achieve the expected effect.
How Doris implements Runtime Filter
Runtime Filter is generated during FE query planning, constructed in BE HashJoinNode, and applied in BE ScanNode.
Doris' current Runtime Filter
Doris currently has a simple Runtime Filter capability. Only when the amount of data on the right side of the join (may be a table or a subquery) is small (the default is less than 1024), HashJoinNode will try to build an IN predicate based on the data on the right side of the join. The method is similar to key in [xx, xx, ……]). If it is a broadcast join, it will be pushed down to the left side of the join (it may also be a table or a subquery). ScanNode filters data in the same Fragment, if it is shuffle join will push down to the nearest ExchangeNode to filter data.
Key points and challenges
Support for larger data volume generation Runtime Filter: The size of IN predicate increases linearly with the increase of data volume, so the introduction of Bloom Filter and MinMax Filter (including the maximum and minimum values of the data set), and supports Bloom Filter and MinMax Filter is pushed down to the storage engine.
Extended Runtime Filter application scenarios: The join types supported by the target include boardcast join, shuffle join, colocation join, bucket shuffle join. Runtime Filter has different behaviors in the construction and application of broadcast join and shuffle join, which is mainly reflected in the fact that shuffle join requires all The Runtime Filter built by HashJoinNode is merged.
Why does Runtime Filter in Shuffle Join need to be merged?
The filter constructed by HashJoinNode in Broadcast join contains all the values in the join conjuncts whose key is listed on the right side of the join, and can be directly handed over to the ScanNode application on the left side of the join. At this time, HashJoinNode and ScanNode are located in the same Fragment;
The filter constructed by each HashJoinNode in the shuffle join only contains the partial value of the key column in the join conjuncts on the right side of the join, that is, the shuffle partition processed by this HashJoinNode. In distributed execution, if the filters constructed by multiple HashJoinNodes are not merged, the ScanNode using the filter constructed by HashJoinNode in the same Fragment will lose data.
Therefore, it is necessary to merge the filters constructed by HashJoinNode in all nodes into one, and push them down to ScanNodes in different Fragments on the left side of the join. The combined filter can accurately filter the scan data.
It should be noted that if the left side of the broadcast join is a shuffle join, then the filter constructed by the broadcast join needs to be pushed down to the ScanNode on the left side of the shuffle join, so it needs to cross Fragment, so the filter still needs to be merged and pushed down, that is, whether the filter is required The judgment condition for merging is whether the HashJoinNode that constructs the filter and the ScanNode that applies the filter are located in different Fragments;
The instance in the fragment with ID 0 at the top of the query plan tree is selected as the filter merge node, that is, the rusult sink node, because the fragment with ID 0 has one and only one instance, and it is the last instance destroyed during the query operation phase.
Structure of Runtime Filter
Runtime Filter is generated by equi-join conjuncts, where the right expr of conjunct is called src expr, which is used to construct Runtime Filter and can be bound to any number of tuple IDs on the right side of the join; the left expr of conjunct is Called target expr, used to apply Runtime Filter, and must be bound to the only tuple ID on the left side of the join. The Runtime Filter records the HashJoinNode ID that builds the Runtime Filter and the ScanNode ID that applies the Runtime Filter.
Some other attributes of Runtime Filter, such as ID, filter type, order of Expr in join conjunct, join type, whether need to merge, bloom filter size bytes, and the correspondence between ID and Fragment instance addr, are all generated during FE query planning.
The essence of the filter transmitted between query plan Fragments is the value list of the key column in join conjuncts. When this value list is delivered to ScanNode in time, they can be used to filter out unmatched values immediately, instead of transmitting all data to HashJoinNode. probe.
Types of Runtime Filter
Runtime Filter includes three types: Bloom Filter, MinMax Filter, and IN predicate
Bloom Filter: There is a certain misjudgment rate, resulting in filtered data less than expected, but it will not lead to inaccurate final results. In most cases, Bloom Filter can improve performance or have no significant impact on performance , But in some cases it will cause performance degradation.
Bloom Filter construction and application overhead is high, so when the filtering rate is low, or the amount of data in the left table is small, Bloom Filter may cause performance degradation.
At present, only the Key column of the left table can be pushed down to the storage engine if the Bloom Filter is applied, and the test results show that the performance of the Bloom Filter is not pushed down to the storage engine.
Currently Bloom Filter only has short-circuit logic when using expression filtering on ScanNode, that is, when the false positive rate is too high, the Bloom Filter will not continue to be used, but there is no short-circuit logic when the Bloom Filter is pushed down to the storage engine , So when the filtration rate is low, it may cause performance degradation.
MinMax Filter: Contains the maximum value and the minimum value, thereby filtering data smaller than the minimum value and greater than the maximum value. The filtering effect of the MinMax Filter is related to the type of the Key column in the join on clause and the data distribution of the left and right tables.
When the type of the Key column in the join on clause is int/bigint/double, etc., in extreme cases, if the maximum and minimum values of the left and right tables are the same, there is no effect, otherwise the maximum value of the right table is less than the minimum value of the left table, or the minimum of the right table The value is greater than the maximum value in the left table, the effect is best.
When the type of the Key column in the join on clause is varchar, etc., applying the MinMax Filter will often cause performance degradation.
IN predicate: Construct the IN predicate based on all the values of the Key listed in the join on clause on the right table, and use the constructed IN predicate to filter on the left table. Compared with the loom filter, the cost of construction and application is lower. The table on the right tends to have higher performance when the amount of data is small.
By default, only the number of data rows in the right table is less than 1024 will be pushed down (can be adjusted by the runtime_filter_max_in_num in the session variable).
Currently IN predicate does not implement a merge method, that is, it cannot be pushed down across Fragments, so currently when it is necessary to push down to the ScanNode of the left table of shuffle join, if Bloom Filter is not generated, then we will convert IN predicate to Bloom Filter for Process pushdown across Fragments, so even if the type only selects IN predicate, Bloom Filter may actually be applied;
How to judge whether a query can generate Runtime Filter
Only support the generation of Runtime Filter for the equivalent conditions in the join on clause, excluding the Null-safe condition, because it may filter out the null value of the join left table.
Does not support pushing down Runtime Filter to the left table of left outer, full outer, and anti join;
Does not support src expr or target expr is constant;
The equality of src expr and target expr is not supported;
The type of src expr is not supported to be equal to HLL or BITMAP;
Currently only supports pushing down Runtime Filter to OlapScanNode;
Target expr does not support NULL-checking expressions, such as COALESCE/IFNULL/CASE, because when the join on clause of other joins at the upper level of the outer join contains NULL-checking expressions and a Runtime Filter is generated, this Runtime Filter is downloaded Pushing to the left table of outer join may cause incorrect results;
The column (slot) in target expr is not supported, and an equivalent column cannot be found in the original table;
Column conduction is not supported. This includes two cases:
First, when the join on clause contains A.k = B.k and B.k = C.k, currently C.k can only be pushed down to B.k, but not to A.k;
Second, for example, the join on clause contains Aa + Bb = Cc. If Aa can be transmitted to Ba, that is, Aa and Ba are equivalent columns, then you can replace Aa with Ba, and then you can try to push the Runtime Filter down to B ( If Aa and Ba are not equivalent columns, they cannot be pushed down to B, because target expr must be bound to the only join left table);
The types of Target expr and src expr must be equal, because Bloom Filter is based on hash, if the types are not equal, it will try to convert the type of target expr to the type of src expr;
The Runtime Filter generated by PlanNode.Conjuncts is not supported. Unlike HashJoinNode's eqJoinConjuncts and otherJoinConjuncts, the Runtime Filter generated by PlanNode.Conjuncts found in the test that it may cause incorrect results, such as When an IN subquery is converted to a join, the automatically generated join on clause will be stored in PlanNode.Conjuncts. At this time, applying Runtime Filter may result in missing some rows in the result.
Some components of Runtime filter
BE
RuntimeFilterMgr is a component that manages Runtime Filter. All filters constructed by HashJoinNode in Fragment need to be registered in this class, ScanNode obtains the filter application from it, and its life cycle is strongly bound to RuntimeState.
RuntimeFilterMergeControllerEntity can be understood as the context of the node that merges the filter. The last rusult sink node is used as the merge node, and the life cycle is valid throughout the query phase.
RuntimeFilterMergeController is a filter merged controller, and the life cycle FragmentMgr is strongly bound.
FE
RuntimeFilterGenerator is used to generate Runtime Filter and assign it to the nodes that use Runtime Filter in the query plan.
RuntimeFilter represents a filter in the query plan, including the specific properties of the filter, the binding method of expr and tuple slot, etc.
RuntimeFilterTarget represents the filter information provided to ScanNode, including target expr, whether it needs to be merged, etc.
Runtime Filter Query Plans
The query plan that can be displayed by the explain command includes the join on clause information used by each Fragment, as well as comments on the generation and use of the Runtime Filter by the Fragment, so as to confirm whether the Runtime Filter is applied to the desired join on clause.
The comment contained in the plan fragment that generates the Runtime Filter, such as runtime filters: filter_id[type] <- table.column.
The comment contained in the plan fragment using Runtime Filter, such as runtime filters: filter_id[type] -> table.column.
The query in the following example uses a Runtime Filter with ID RF000.
CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2 PROPERTIES("replication_num" = "1");
INSERT INTO test VALUES (1), (2), (3), (4);
CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2 PROPERTIES("replication_num" = "1");
INSERT INTO test2 VALUES (3), (4), (5);
EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
+-------------------------------------------------------------------+
| Explain String |
+-------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`t1` |
| |
| 4:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test`.`t1` |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BUCKET_SHUFFLE) |
| | equal join conjunct: `test`.`t1` = `test2`.`t2` |
| | runtime filters: RF000[in] <- `test2`.`t2` |
| | |
| |----3:EXCHANGE |
| | |
| 0:OlapScanNode |
| TABLE: test |
| runtime filters: RF000[in] -> `test`.`t1` |
| |
| PLAN FRAGMENT 2 |
| OUTPUT EXPRS: |
| PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test2`.`t2` |
| |
| 1:OlapScanNode |
| TABLE: test2 |
+-------------------------------------------------------------------+
-- The line of `runtime filters` above shows that `2:HASH JOIN` of `PLAN FRAGMENT 1` generates IN predicate with ID RF000,
-- Among them, the key values of `test2`.`t2` are only known at runtime,
-- This IN predicate is used in `0:OlapScanNode` to filter unnecessary data when reading `test`.`t1`.
SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
-- Return 2 rows of results [3, 4];
-- Through the query profile (set is_report_success=true;) you can view the detailed information of the internal work of the query,
-- Including whether each Runtime Filter is pushed down, waiting time,
-- and the total time from prepare to receiving Runtime Filter for OLAP_SCAN_NODE.
RuntimeFilter:in:
- HasPushDownToEngine: true
- AWaitTimeCost: 0ns
- EffectTimeCost: 2.76ms
-- In addition, in the OLAP_SCAN_NODE of the profile, you can also view the filtering effect
-- and time consumption after the Runtime Filter is pushed down.
- RowsVectorPredFiltered: 9.320008M (9320008)
- VectorPredEvalTime: 364.39ms
Runtime Filter Session Variable
For the query options (session variable) that control the Runtime Filter, please refer to the following section. In addition to runtime_filter_type, other query options are used to adjust the Runtime Filter to achieve the best performance in a specific scenario, usually only after performance testing. Optimize queries that are intensive, take a long enough time to run, and have a high enough frequency.
1. runtime_filter_type
The types of Runtime Filter used include Bloom Filter, MinMax Filter, and IN predicate. When multiple types are used, they are separated by commas. Note that you need to add quotation marks. For example:
set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
By default, only IN predicate is used conservatively. In some cases, the performance is higher when Bloom Filter, MinMax Filter, and IN predicate are used at the same time.
2. runtime_filter_mode
It is used to adjust the push-down strategy of Runtime Filter, including two strategies, LOCAL and GLOBAL, and the default setting is GLOBAL strategy.
LOCAL: Relatively conservative, the constructed Runtime Filter can only be used in the same Fragment on the same instance (the smallest unit of query execution), that is, the Runtime Filter producer (the HashJoinNode that constructs the Filter) and the consumer (the ScanNode that uses the RuntimeFilter) The same Fragment, such as the general scene of broadcast join;
GLOBAL: Relatively radical. In addition to satisfying the scenario of the LOCAL strategy, the Runtime Filter can also be combined and transmitted to different Fragments on different instances via the network. For example, Runtime Filter producers and consumers are in different Fragments, such as shuffle join. The query can be optimized in a wider range of scenarios.
3. runtime_filter_wait_time_ms
After the Runtime Filter is turned on, the ScanNode that joins the left table will wait for a period of time for each Runtime Filter assigned to itself and then scan the data. The default wait is 1s (1000ms, in milliseconds), that is, if the ScanNode is assigned 3 Runtime Filters , Then it will wait at most 3s.
Because it takes time to build and merge the Runtime Filter, ScanNode will try to push down the Runtime Filter that arrives within the waiting time to the storage engine. If the waiting time is exceeded, ScanNode will directly start scanning data using the Runtime Filter that has arrived.
If the Runtime Filter arrives after ScanNode starts scanning, ScanNode will not push the Runtime Filter down to the storage engine. Instead, it will use expression filtering on ScanNode based on the Runtime Filter for the data that has been scanned from the storage engine. The scanned data will not apply the Runtime Filter, so the intermediate data size obtained will be larger than the optimal solution, but serious cracking can be avoided.
If the cluster is busy and there are many resource-intensive or long-time-consuming queries on the cluster, consider increasing the waiting time to avoid missing optimization opportunities for complex queries. If the cluster load is light, and there are many small queries on the cluster that only take a few seconds, you can consider reducing the waiting time to avoid an increase of 1s for each query.
4. runtime_filters_max_num
The maximum number of Bloom Filters in the Runtime Filter that can be applied to each query, the default is 10. Because Bloom Filter construction and application costs are high, so if the generated Bloom Filter exceeds the maximum allowed number, the Bloom Filter with large selectivity is retained. Currently, only the number of Bloom Filters is limited, because compared to MinMax Filter and IN predicate It is more expensive to build and apply.
Selectivity = (HashJoinNode Cardinality / HashJoinNode left child Cardinality)
Because the cardinality of FE is currently inaccurate, the selectivity of Bloom Filter calculation here is inaccurate, so in the end it may only randomly reserve part of Bloom Filter.
5. Bloom Filter related parameters
runtime_bloom_filter_min_size: the minimum length of Bloom Filter in Runtime Filter (in bytes), the default is 1048576 (1M);
runtime_bloom_filter_max_size: the maximum length of Bloom Filter in Runtime Filter (in bytes), the default is 16777216 (16M);
runtime_bloom_filter_size: The default length of Bloom Filter in Runtime Filter (in bytes), the default is 2097152 (2M);
Because it is necessary to ensure that the length of the Bloom Filter constructed by each HashJoinNode is the same to be merged, the length of the Bloom Filter is currently calculated in the FE query planning.
If you can get the number of data rows (Cardinality) in the statistical information of the join table on the right, you will try to use Cardinality as NDV, the default false detection rate fpp is 0.05, and the Bloom that contains NDV unique elements and the false detection rate is lower than fpp is calculated by the formula The minimum number of bytes required by the filter, rounded to the nearest power of 2 (log value with 2 as the base), and limits the upper and lower limits of the length of the final Bloom Filter.
If the Cardinality of the join right table is not available, the default Bloom Filter length will be used.
6. runtime_filter_max_in_num
If the number of rows in the right table of the join is greater than this value, we will not generate an IN predicate, and the default is 1024;
Runtime Filter performance test
In the actual production environment, it can bring 1-20 times performance improvement.
SSB stand-alone performance test:
Test Results
Shuffle Performance
SQL 1
SQL 2
SQL 3
SQL 4
SQL 5
SQL 6
SQL 7
SQL 8
SQL 9
MinMax
38.12%
66.67%
-14.31%
>99.9%
63.9%
81.3%
-3.47%
-7.94%
33.57%
Bloom Filter
25.53%
59.85%
-18.4%
>99.9%
63.3%
74.75%
68.39%
-6.01%
71.2%
MinMax + Bloom Filter
32.5%
62.19%
-18.04%
>99.9%
62.87%
80.63%
67.94%
-7.51%
69.72%
Broadcast Performance
SQL 1
SQL 2
SQL 3
SQL 4
SQL 5
SQL 6
SQL 7
SQL 8
SQL 9
MinMax
10.26%
49.26%
-2.75%
>99.9%
Bloom Filter
-5.62%
38.78%
-8.54%
>99.9%
MinMax + Bloom Filter + IN
-11%
34.81%
-7.09%
>99.9%
Test conclusion
The performance improvement effect of Runtime Filter under different filter rates:
The greater the filter rate of Runtime Filter, the better the filtering effect, and the greater the performance improvement. In most cases, as long as part of the data can be filtered, the performance will be improved to a certain extent. In extreme cases, when the filter rate is 0%, it is about The value of the join conjuncts key column of the table is exactly the same. A single Runtime Filter will reduce the performance by about 10%. Therefore, if the filtering effect is expected to be poor, consider turning off the Runtime Filter (query option RUNTIME_FILTER_MODE=OFF).
Simultaneous use of different types of Runtime Filter:
If the data that can be filtered by multiple Runtime Filters contains each other, the performance is worse than using only one filter, that is, after one filter is filtered first, the other types of filters actually do not work (or filter very little), which brings extra Overhead
The impact of Runtime Filter pushdown on performance:
At present, both MinMax Filter and IN predicate can be pushed down to the storage engine. Bloom Filter can only push down the Key column to the storage engine. In the actual test, pushing the filter down to the storage engine is much faster than using expression filtering on ScanNode. And when the Bloom Filter is not pushed down, the performance will be reduced in most cases, so the effect of using Bloom Filter for non-Key columns may not be satisfactory.
The impact of different lengths of Bloom Filter on performance:
When the large amount of data on the right side of join leads to more false positives in the Bloom Filter, as the length of the Bloom Filter increases, the filtering rate increases, and the construction and merging time becomes longer. In some cases, the length of the Bloom Filter is 2M The best performance;
When a large table join conjuncts keys contains unique columns:
If a join query involving a large table uses unique columns as join conjuncts keys, the cost of constructing and transmitting the filter may exceed its performance benefits, and too many key values will cause the false detection rate of the Bloom Filter to be too large, which will not effectively reduce the join. The amount of data in the scan on the left. For this type of query, consider turning off the Runtime Filter.
The influence of the base size of Join conjuncts keys on Runtime Filter:
In Inner Join, the smaller the base of join conjuncts keys, that is, the smaller the count (distinct join conjuncts keys), the smaller the false detection rate of the Bloom Filter, and the greater the performance improvement of the Runtime Filter.
Whether to wait for Runtime Filter performance comparison:
Waiting for the Runtime Filter to arrive and push it down to the storage engine before starting the scan is better than directly scanning and then waiting for the filter to arrive before using it (which cannot be pushed down to the storage engine). If it takes a few seconds to wait for the construction of the Runtime Filter, but you may save a few minutes after trimming unnecessary data using the Runtime Filter, then it is worthwhile to wait a few more seconds.
Comparison of the performance improvement effect of Runtime Filter on broadcast join, shuffle join, and bucket shuffle join:
In some cases, the performance improvement of Runtime Filter on shuffle join is higher than that of broadcast join because of the time-consuming effect of constructing filter. Each BE in broadcast join must use all the data on the right side of the join to construct the filter; each of the shuffle joins BE only needs to use 1/N of the data in the table on the right to construct. The network time caused by filter merging is usually negligible, so the construction speed is faster;
In some cases, bucket shuffle join has a higher filtering rate than broadcast join and shuffle join when the Bloom Filter length is the same. The reason is that the Bloom Filter used by each ScanNode of broadcast join and shuffle join contains all the data of the right table, while bucket shuffle join reduces the amount of data in the shuffle of the right table. The Bloom Filter used by each ScanNode only contains one-Nth of the right table. Data, so the false detection rate of Bloom Filter is lower;
But in some cases, the effect of Runtime Filter on shuffle join is better than bucket shuffle join, because bucket shuffle join has data skew;
Comparison of IN predicate and BloomFilter effects:
IN predicate has lower construction and application overhead than Bloom Filter, and tends to have higher performance when the amount of data in the table on the right is small;
Bloom Filter has a wider application range than IN predicate. The memory occupied by IN predicate will increase linearly with the number of data rows in the right table. At present, by default, the IN predicate is only used when the number of data rows in the right table is less than 1024.
The total time for a successful query is reduced from 4192s to 3133s, the performance is improved by 25.3%, 30 queries become faster, and 13 queries become slower;
Test conclusion
Query analysis with reduced performance:
After IN predicate filtering, Bloom Filter and MinMax Filter actually have no effect;
Runtime Filter filter rate is low;
The time consuming to wait and apply the Runtime Filter is greater than the time saved by the Runtime Filter;
Three types of Runtime Filters, Bloom Filter, MinMax Filter, and IN predicate, have an impact on performance:
Bloom Filter reduced the overall performance before the pushdown (reduced by 5.3%, and only counts the time spent on successful queries), and the performance increased by 7.5% after the pushdown;
Compared with only using IN predicate, the performance is increased by 3.3% after adding Bloom Filter and MinMax Filter;
Combined with the SSB stand-alone test, when using Bloom Filter, MinMax Filter, and IN predicate at the same time, there is a certain chance that the query will slow down, but it will not slow down when only the IN predicate is used;
Test the ScanNode waiting time, which is different from Bloom Filter, and its impact on performance:
The default runtime_filter_wait_time_ms=1000, runtime_bloom_filter_size=2048000 (2M), changed to runtime_filter_wait_time_ms=4000, runtime_bloom_filter_size=4194304 (4M), the performance increased by 5.5%, and the proportion of Runtime Filter waiting for success increased from 40% to 50%;
Comparing 125 with 120, changing to runtime_filter_wait_time_ms=1000000 and runtime_bloom_filter_size=4194304, the performance is reduced by 178%, and the proportion of Runtime Filter waiting for success has increased from 50% to 89%;
TODO
Runtime Filter supports column conduction, two scenarios:
When the join on clause contains A.k = B.k and B.k = C.k, the filter can be pushed down to B.k and A.k at the same time;
When the join on clause contains A.a + B.b = C.c, if A.a can be transmitted to B.a, and A.a and B.a are equivalent columns, then the filter can be pushed down to B.
To support the push-down of the filter generated by PlanNode.Conjuncts, 2 issues need to be confirmed:
Whether the filter generated by PlanNode.Conjuncts should limit the join type;
Under what circumstances will generate PlanNode.Conjuncts, currently known to be generated when the in subquery is converted to join;
Support FE to obtain accurate Cardinality of the data sheet.
Currently the Cardinality obtained by FE is inaccurate, so the length of Bloom Filter calculated by Cardinality and fpp on the right side of join is inaccurate. Moreover, affected by data skew, there may be problems in unifying the length of Bloom Filter;
Add the switch of whether to allow only the Key column of the join left table to apply Bloom Filter.
Because Bloom Filter can only be pushed down to the storage engine when it is applied to the Key column of the left table, but Bloom Filter often brings negative effects when it cannot be pushed down to the storage engine;
After Bloom Filter is pushed down to the storage engine, short-circuit logic is added.
When the false positive rate is too high, do not continue to use Bloom Filter;
When the amount of data on the left side of the join is too small, no runtime filter is generated, and when the amount of data on the right side of the join is too large, no runtime filter is generated;
MinMax Filter is only used when the join keys type is int/bigint/double.
Because when the type of join keys is varchar and other types, applying MinMax Filter often results in performance degradation.
Only use IN predicate when the number of data rows in the right table is less than 1024.
Difference from Impala Runtime Filtering
Doris' current runtime filter and Impala are quite different in design goals and specific implementation.
The runtime filter generation process in FE refers to Impala, and the logic of generation rules, merging strategies, configuration methods, etc. are modified in specific implementation;
Support Runtime Filter for Doris
Motivation
Suppose the following SQL needs to be executed:
At present, Doirs will judge the size of T1 and T2, put the large table on the left side of the join, assuming that the number of data rows in T1 is greater than T2, then when joining, Doris will scan all the data of T1 and T2 and send it to HashJoinNode for use on HashJoinNode T2 data build hash table, and then use T1 data probe hash table.
In the probe, the hash table cannot be hit, which means that some data in T1 does not exist in T2. For join types such as inner join and right outer join, these join key values that do not exist in T2 will not be retained in the end In the join result, it is meaningless to send it from the T1 scan to the HashJoinNode. If this part of the join key value can be filtered out during the scan, it will reduce the scan time and network overhead.
What is Runtime Filter
Assuming that in the above query, T1 is a fact table with 100000 data rows, and T2 is a dimension table with 100 data rows. The current actual situation of Doris join is:
Obviously, scanning data for T2 is much faster than T1. If we take the initiative to wait for a while and then scan T1, after T2 sends the scanned data record to HashJoinNode, HashJoinNode calculates a filter condition based on the data of T2, such as the maximum value of T2 data And the minimum value, or build a Bloom Filter, and then send this filter condition to ScanNode waiting to scan T1, the latter applies this filter condition and delivers the filtered data to HashJoinNode, thereby reducing the number of probe hash tables and network overhead. This filter condition is Runtime Filter, and the effect is as follows:
If the filter condition (Runtime Filter) can be pushed down to the storage engine, in some cases, the index can be used to directly reduce the amount of scanned data, thereby greatly reducing the scanning time. The effect is as follows:
Expected return
It can be seen that, unlike predicate push-down and partition cutting, Runtime Filter is a filter condition dynamically generated at runtime, that is, when the query is run, the join on clause is parsed to determine the filter expression, and the expression is broadcast to ScanNode that is reading the left table , Thereby reducing the amount of scanned data, thereby reducing the number of probe hash table, avoiding unnecessary I/O and network transmission.
Runtime Filter is mainly used to optimize joins for large tables. If the amount of data in the left table is too small, or the amount of data in the right table is too large, the Runtime Filter may not achieve the expected effect.
How Doris implements Runtime Filter
Runtime Filter is generated during FE query planning, constructed in BE HashJoinNode, and applied in BE ScanNode.
Doris' current Runtime Filter
Doris currently has a simple Runtime Filter capability. Only when the amount of data on the right side of the join (may be a table or a subquery) is small (the default is less than 1024), HashJoinNode will try to build an IN predicate based on the data on the right side of the join. The method is similar to key in [xx, xx, ……]). If it is a broadcast join, it will be pushed down to the left side of the join (it may also be a table or a subquery). ScanNode filters data in the same Fragment, if it is shuffle join will push down to the nearest ExchangeNode to filter data.
Key points and challenges
Why does Runtime Filter in Shuffle Join need to be merged?
The filter constructed by HashJoinNode in Broadcast join contains all the values in the join conjuncts whose key is listed on the right side of the join, and can be directly handed over to the ScanNode application on the left side of the join. At this time, HashJoinNode and ScanNode are located in the same Fragment;
The filter constructed by each HashJoinNode in the shuffle join only contains the partial value of the key column in the join conjuncts on the right side of the join, that is, the shuffle partition processed by this HashJoinNode. In distributed execution, if the filters constructed by multiple HashJoinNodes are not merged, the ScanNode using the filter constructed by HashJoinNode in the same Fragment will lose data.
Therefore, it is necessary to merge the filters constructed by HashJoinNode in all nodes into one, and push them down to ScanNodes in different Fragments on the left side of the join. The combined filter can accurately filter the scan data.
It should be noted that if the left side of the broadcast join is a shuffle join, then the filter constructed by the broadcast join needs to be pushed down to the ScanNode on the left side of the shuffle join, so it needs to cross Fragment, so the filter still needs to be merged and pushed down, that is, whether the filter is required The judgment condition for merging is whether the HashJoinNode that constructs the filter and the ScanNode that applies the filter are located in different Fragments;
The instance in the fragment with ID 0 at the top of the query plan tree is selected as the filter merge node, that is, the rusult sink node, because the fragment with ID 0 has one and only one instance, and it is the last instance destroyed during the query operation phase.
Structure of Runtime Filter
Runtime Filter is generated by equi-join conjuncts, where the right expr of conjunct is called src expr, which is used to construct Runtime Filter and can be bound to any number of tuple IDs on the right side of the join; the left expr of conjunct is Called target expr, used to apply Runtime Filter, and must be bound to the only tuple ID on the left side of the join. The Runtime Filter records the HashJoinNode ID that builds the Runtime Filter and the ScanNode ID that applies the Runtime Filter.
Some other attributes of Runtime Filter, such as ID, filter type, order of Expr in join conjunct, join type, whether need to merge, bloom filter size bytes, and the correspondence between ID and Fragment instance addr, are all generated during FE query planning.
The essence of the filter transmitted between query plan Fragments is the value list of the key column in join conjuncts. When this value list is delivered to ScanNode in time, they can be used to filter out unmatched values immediately, instead of transmitting all data to HashJoinNode. probe.
Types of Runtime Filter
Runtime Filter includes three types: Bloom Filter, MinMax Filter, and IN predicate
runtime_filter_max_in_num
in the session variable).How to judge whether a query can generate Runtime Filter
HLL
orBITMAP
;COALESCE/IFNULL/CASE
, because when the join on clause of other joins at the upper level of the outer join contains NULL-checking expressions and a Runtime Filter is generated, this Runtime Filter is downloaded Pushing to the left table of outer join may cause incorrect results;PlanNode.Conjuncts
is not supported. Unlike HashJoinNode'seqJoinConjuncts
andotherJoinConjuncts
, the Runtime Filter generated byPlanNode.Conjuncts
found in the test that it may cause incorrect results, such asWhen an IN
subquery is converted to a join, the automatically generated join on clause will be stored inPlanNode.Conjuncts
. At this time, applying Runtime Filter may result in missing some rows in the result.Some components of Runtime filter
BE
RuntimeFilterMgr
is a component that manages Runtime Filter. All filters constructed by HashJoinNode in Fragment need to be registered in this class, ScanNode obtains the filter application from it, and its life cycle is strongly bound toRuntimeState
.RuntimeFilterMergeControllerEntity
can be understood as the context of the node that merges the filter. The last rusult sink node is used as the merge node, and the life cycle is valid throughout the query phase.RuntimeFilterMergeController
is a filter merged controller, and the life cycleFragmentMgr
is strongly bound.FE
RuntimeFilterGenerator
is used to generate Runtime Filter and assign it to the nodes that use Runtime Filter in the query plan.RuntimeFilter
represents a filter in the query plan, including the specific properties of the filter, the binding method of expr and tuple slot, etc.RuntimeFilterTarget
represents the filter information provided to ScanNode, including target expr, whether it needs to be merged, etc.Runtime Filter Query Plans
The query plan that can be displayed by the
explain
command includes the join on clause information used by each Fragment, as well as comments on the generation and use of the Runtime Filter by the Fragment, so as to confirm whether the Runtime Filter is applied to the desired join on clause.runtime filters: filter_id[type] <- table.column
.runtime filters: filter_id[type] -> table.column
.The query in the following example uses a Runtime Filter with ID RF000.
Runtime Filter Session Variable
For the query options (session variable) that control the Runtime Filter, please refer to the following section. In addition to
runtime_filter_type
, other query options are used to adjust the Runtime Filter to achieve the best performance in a specific scenario, usually only after performance testing. Optimize queries that are intensive, take a long enough time to run, and have a high enough frequency.1. runtime_filter_type
The types of Runtime Filter used include Bloom Filter, MinMax Filter, and IN predicate. When multiple types are used, they are separated by commas. Note that you need to add quotation marks. For example:
By default, only IN predicate is used conservatively. In some cases, the performance is higher when Bloom Filter, MinMax Filter, and IN predicate are used at the same time.
2. runtime_filter_mode
It is used to adjust the push-down strategy of Runtime Filter, including two strategies, LOCAL and GLOBAL, and the default setting is GLOBAL strategy.
LOCAL: Relatively conservative, the constructed Runtime Filter can only be used in the same Fragment on the same instance (the smallest unit of query execution), that is, the Runtime Filter producer (the HashJoinNode that constructs the Filter) and the consumer (the ScanNode that uses the RuntimeFilter) The same Fragment, such as the general scene of broadcast join;
GLOBAL: Relatively radical. In addition to satisfying the scenario of the LOCAL strategy, the Runtime Filter can also be combined and transmitted to different Fragments on different instances via the network. For example, Runtime Filter producers and consumers are in different Fragments, such as shuffle join. The query can be optimized in a wider range of scenarios.
3. runtime_filter_wait_time_ms
After the Runtime Filter is turned on, the ScanNode that joins the left table will wait for a period of time for each Runtime Filter assigned to itself and then scan the data. The default wait is 1s (1000ms, in milliseconds), that is, if the ScanNode is assigned 3 Runtime Filters , Then it will wait at most 3s.
Because it takes time to build and merge the Runtime Filter, ScanNode will try to push down the Runtime Filter that arrives within the waiting time to the storage engine. If the waiting time is exceeded, ScanNode will directly start scanning data using the Runtime Filter that has arrived.
If the Runtime Filter arrives after ScanNode starts scanning, ScanNode will not push the Runtime Filter down to the storage engine. Instead, it will use expression filtering on ScanNode based on the Runtime Filter for the data that has been scanned from the storage engine. The scanned data will not apply the Runtime Filter, so the intermediate data size obtained will be larger than the optimal solution, but serious cracking can be avoided.
If the cluster is busy and there are many resource-intensive or long-time-consuming queries on the cluster, consider increasing the waiting time to avoid missing optimization opportunities for complex queries. If the cluster load is light, and there are many small queries on the cluster that only take a few seconds, you can consider reducing the waiting time to avoid an increase of 1s for each query.
4. runtime_filters_max_num
The maximum number of Bloom Filters in the Runtime Filter that can be applied to each query, the default is 10. Because Bloom Filter construction and application costs are high, so if the generated Bloom Filter exceeds the maximum allowed number, the Bloom Filter with large selectivity is retained. Currently, only the number of Bloom Filters is limited, because compared to MinMax Filter and IN predicate It is more expensive to build and apply.
Because the cardinality of FE is currently inaccurate, the selectivity of Bloom Filter calculation here is inaccurate, so in the end it may only randomly reserve part of Bloom Filter.
5. Bloom Filter related parameters
runtime_bloom_filter_min_size
: the minimum length of Bloom Filter in Runtime Filter (in bytes), the default is 1048576 (1M);runtime_bloom_filter_max_size
: the maximum length of Bloom Filter in Runtime Filter (in bytes), the default is 16777216 (16M);runtime_bloom_filter_size
: The default length of Bloom Filter in Runtime Filter (in bytes), the default is 2097152 (2M);Because it is necessary to ensure that the length of the Bloom Filter constructed by each HashJoinNode is the same to be merged, the length of the Bloom Filter is currently calculated in the FE query planning.
If you can get the number of data rows (Cardinality) in the statistical information of the join table on the right, you will try to use Cardinality as NDV, the default false detection rate fpp is 0.05, and the Bloom that contains NDV unique elements and the false detection rate is lower than fpp is calculated by the formula The minimum number of bytes required by the filter, rounded to the nearest power of 2 (log value with 2 as the base), and limits the upper and lower limits of the length of the final Bloom Filter.
If the Cardinality of the join right table is not available, the default Bloom Filter length will be used.
6. runtime_filter_max_in_num
If the number of rows in the right table of the join is greater than this value, we will not generate an IN predicate, and the default is 1024;
Runtime Filter performance test
In the actual production environment, it can bring 1-20 times performance improvement.
SSB stand-alone performance test:
Test Results
Test conclusion
The performance improvement effect of Runtime Filter under different filter rates:
The greater the filter rate of Runtime Filter, the better the filtering effect, and the greater the performance improvement. In most cases, as long as part of the data can be filtered, the performance will be improved to a certain extent. In extreme cases, when the filter rate is 0%, it is about The value of the join conjuncts key column of the table is exactly the same. A single Runtime Filter will reduce the performance by about 10%. Therefore, if the filtering effect is expected to be poor, consider turning off the Runtime Filter (query option
RUNTIME_FILTER_MODE
=OFF).Simultaneous use of different types of Runtime Filter:
If the data that can be filtered by multiple Runtime Filters contains each other, the performance is worse than using only one filter, that is, after one filter is filtered first, the other types of filters actually do not work (or filter very little), which brings extra Overhead
The impact of Runtime Filter pushdown on performance:
At present, both MinMax Filter and IN predicate can be pushed down to the storage engine. Bloom Filter can only push down the Key column to the storage engine. In the actual test, pushing the filter down to the storage engine is much faster than using expression filtering on ScanNode. And when the Bloom Filter is not pushed down, the performance will be reduced in most cases, so the effect of using Bloom Filter for non-Key columns may not be satisfactory.
The impact of different lengths of Bloom Filter on performance:
When the large amount of data on the right side of join leads to more false positives in the Bloom Filter, as the length of the Bloom Filter increases, the filtering rate increases, and the construction and merging time becomes longer. In some cases, the length of the Bloom Filter is 2M The best performance;
When a large table join conjuncts keys contains unique columns:
If a join query involving a large table uses unique columns as join conjuncts keys, the cost of constructing and transmitting the filter may exceed its performance benefits, and too many key values will cause the false detection rate of the Bloom Filter to be too large, which will not effectively reduce the join. The amount of data in the scan on the left. For this type of query, consider turning off the Runtime Filter.
The influence of the base size of Join conjuncts keys on Runtime Filter:
In Inner Join, the smaller the base of join conjuncts keys, that is, the smaller the count (distinct join conjuncts keys), the smaller the false detection rate of the Bloom Filter, and the greater the performance improvement of the Runtime Filter.
Whether to wait for Runtime Filter performance comparison:
Waiting for the Runtime Filter to arrive and push it down to the storage engine before starting the scan is better than directly scanning and then waiting for the filter to arrive before using it (which cannot be pushed down to the storage engine). If it takes a few seconds to wait for the construction of the Runtime Filter, but you may save a few minutes after trimming unnecessary data using the Runtime Filter, then it is worthwhile to wait a few more seconds.
Comparison of the performance improvement effect of Runtime Filter on broadcast join, shuffle join, and bucket shuffle join:
Comparison of IN predicate and BloomFilter effects:
For details, see: https://docs.google.com/document/d/1Jc48T4aKxFv_QgFJjsQwVgkXuh9Re4kiAhEWBS8R1MI/edit?usp=sharing
TPC-DS multi-machine performance test:
Test parameters
Test Results
The total time for a successful query is reduced from 4192s to 3133s, the performance is improved by 25.3%, 30 queries become faster, and 13 queries become slower;
Test conclusion
Query analysis with reduced performance:
Three types of Runtime Filters, Bloom Filter, MinMax Filter, and IN predicate, have an impact on performance:
Test the ScanNode waiting time, which is different from Bloom Filter, and its impact on performance:
TODO
PlanNode.Conjuncts
, 2 issues need to be confirmed:PlanNode.Conjuncts
should limit the join type;PlanNode.Conjuncts
, currently known to be generated when thein
subquery is converted to join;Difference from Impala Runtime Filtering
Doris' current runtime filter and Impala are quite different in design goals and specific implementation.
For the source code analysis of Impala runtime filter, see: https://docs.google.com/document/d/1MbtorqoXqowW5JQZOtJtDs4lsDMsWK-IaviPvM3n0PI/edit
中文原稿: https://docs.google.com/document/d/1yqUM49qUtiOszaxeiwsey0ZU9dPut1oM4NCi6I6C3cE/edit?usp=sharing
The text was updated successfully, but these errors were encountered: