You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
At present, Doris support 3 type join: shuffle join, broadcast join, colocate join.
Except colocate join,another join will lead to a lot of network consumption.
For example, there a SQL A join B, the cost of network.
broadcast join: if table A is divided into three parts,the net work cost is 3B
shuffle join: the network cost is A + B.
These network consumption not only leads to slow query, but also leads to extra memory consumption during join.
Each Doris table have disrtribute info, if the join expr hit the distribute info, we should use the distribute info to reduce the network consumption.
What is bucket shuffle join
just like Hive's bucket map join, the picture show how it work. if there a SQL A join B, and the join expr hit the distribute info of A. Bucket shuffle join only need distribute table B, sent the data to proper table A part. So the network cost is always B.
So compared with the original join, obviously bucket shuffle join lead to less network cost:
B < min(3B, A + B)
It can bring us the following benefits:
First, Bucket Shuffle Join reduce the network cost and lead to a better performance for some join. Especially when the bucket is cropped.
It does not strongly rely on the mechanism of collocate, so it is transparent to users. There is no mandatory requirement for data distribution, which will not lead to data skew.
It can provide more query optimization space for join reorder.
How Bucket Shffle Join For Doris
The key idea and challenge
Keep left table data locally in same join case, the join expr need contain the left table distribute column
BE must know how right table data to send by distribution in left table
1. Now the data distribute in left table when data load into doris
Firstly, we get the hash value by distributed column of left table, and mod by bucket num. like the pic below:
Secondy, we data distributed column of left table, send the data distribute of left table to right table。Right table do the same hash way to distribute data, so we only send one copy right table data when we do join compute.
2 Bucket Shuffle Join query plan
For the Bucket Shuffle Join, we change the DataStreamSend Partition way to BUCKET_SHFFULE_HASH_PARTITIONED, the right table will distribute data by distribution of left table.
3 Bucket Shuffle Join query schedule
The schedule goal: the DataStreamSender Should know of the bucket that left table distribute and send the proper data to the proper instance.
The schedule strategy: the left table ScanNode chose which bucket seqs it should own and need to keep each Instance of ScanNode have same count of bucket seqs. The FE Send how bucket seqs distribute in BE to that DataStream Sender know how to send data.
Different Framgment have different Bucket Seq distribute, so we need distinguish them by FragmentId
We need to make sure each BucketID have same Bucekt count to keep data compulte balance
4 How to decide a query can be Bucket Shuffle Join
The eqJoinConjuncts must contain the left table distribution columns
The eqJoinConjuncts between left table columns and right table columns must keep the same type
5 Others
Add a session variable enable_bucket_shuffle_join, the default value is true.
The limit for curren bucket shuffle join
The left table of bucket shuffle join must be OLAP table. (the reason is obvious)
The join columns should contains all left table distribute columns to enable bucket shuffle join.(the reason is obvious)
After Partition crop, there should only have one partition in left table. (the reason is obvious)
The join column of left table distribution columns and right table column must have the same data type, Different data type will cause different the hash result. (I will solve this problem in next version)
Origin Join VS Bucket Shuffle Join
Test Data:
TPC-DS 1TB Data
Cluser Info:
10 BEs, each BE is Physical Machine and has 48CPU, 96GMEM.
Test Result of TPC-DS
If pic do not show time which means query failed since mem limit exceed.
The text was updated successfully, but these errors were encountered:
Motivation
At present, Doris support 3 type join: shuffle join, broadcast join, colocate join.
Except colocate join,another join will lead to a lot of network consumption.
For example, there a SQL A join B, the cost of network.
3B
A + B
.These network consumption not only leads to slow query, but also leads to extra memory consumption during join.
Each Doris table have disrtribute info, if the join expr hit the distribute info, we should use the distribute info to reduce the network consumption.
What is bucket shuffle join
just like Hive's bucket map join, the picture show how it work. if there a SQL A join B, and the join expr hit the distribute info of A. Bucket shuffle join only need distribute table B, sent the data to proper table A part. So the network cost is always
B
.So compared with the original join, obviously bucket shuffle join lead to less network cost:
It can bring us the following benefits:
First, Bucket Shuffle Join reduce the network cost and lead to a better performance for some join. Especially when the bucket is cropped.
It does not strongly rely on the mechanism of collocate, so it is transparent to users. There is no mandatory requirement for data distribution, which will not lead to data skew.
It can provide more query optimization space for join reorder.
How Bucket Shffle Join For Doris
The key idea and challenge
1. Now the data distribute in left table when data load into doris
Firstly, we get the hash value by distributed column of left table, and mod by bucket num. like the pic below:
![image.png](https://camo.githubusercontent.com/082d4fa719e5b82ed05ef730271d2d5a5f587d1f734a0da20f6fb299c9ae2929/68747470733a2f2f75706c6f61642d696d616765732e6a69616e7368752e696f2f75706c6f61645f696d616765732f383535323230312d366365393138356638333132376636352e706e673f696d6167654d6f6772322f6175746f2d6f7269656e742f7374726970253743696d61676556696577322f322f772f31323430)
Secondy, we data distributed column of left table, send the data distribute of left table to right table。Right table do the same hash way to distribute data, so we only send one copy right table data when we do join compute.
![image.png](https://camo.githubusercontent.com/b50c4363b8d3e3137305e10fc5b2862202585af976c17a6287f73ba286e26745/68747470733a2f2f75706c6f61642d696d616765732e6a69616e7368752e696f2f75706c6f61645f696d616765732f383535323230312d326439323433623463323931363134332e706e673f696d6167654d6f6772322f6175746f2d6f7269656e742f7374726970253743696d61676556696577322f322f772f31323430)
2 Bucket Shuffle Join query plan
For the Bucket Shuffle Join, we change the DataStreamSend Partition way to
![image.png](https://camo.githubusercontent.com/fed1e72099ce605df73ab792ed7df05caf1419df547ac7ff58ae18e66fc89b70/68747470733a2f2f75706c6f61642d696d616765732e6a69616e7368752e696f2f75706c6f61645f696d616765732f383535323230312d663434353730633834663863613139322e706e673f696d6167654d6f6772322f6175746f2d6f7269656e742f7374726970253743696d61676556696577322f322f772f31323430)
BUCKET_SHFFULE_HASH_PARTITIONED
, the right table will distribute data by distribution of left table.3 Bucket Shuffle Join query schedule
The schedule goal: the DataStreamSender Should know of the bucket that left table distribute and send the proper data to the proper instance.
The schedule strategy: the left table ScanNode chose which bucket seqs it should own and need to keep each Instance of ScanNode have same count of bucket seqs. The FE Send how bucket seqs distribute in BE to that DataStream Sender know how to send data.
FragmentId
4 How to decide a query can be Bucket Shuffle Join
5 Others
Add a session variable
enable_bucket_shuffle_join
, the default value istrue
.The limit for curren bucket shuffle join
Origin Join VS Bucket Shuffle Join
Test Data:
TPC-DS 1TB Data
Cluser Info:
10 BEs, each BE is Physical Machine and has 48CPU, 96GMEM.
Test Result of TPC-DS
If pic do not show time which means query failed since mem limit exceed.
The text was updated successfully, but these errors were encountered: