-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Refactor the Hash Join #4377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor the Hash Join #4377
Conversation
| Some(result.map(|x| x.0)) | ||
| } | ||
| // the right side has been consumed | ||
| // TODO: Some(Err) case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Some(result) | ||
| } | ||
| Err(_) => { | ||
| // TODO why the type of result stream is `Result<T, ArrowError>`, and not the `DataFusionError` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb @Dandandan why we use the ArrowError instead the DataFusionError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it same with #4172?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it same with #4172?
yes
|
cc @alamb @Dandandan @mingmwang @jackwener PTAL |
|
I will try and find time to review this carefully over the next few days -- joins are a complicated subject so thanks for taking them on. However they aren't very high priority to my day job at InfluxData yet so finding time to review this kind of PR is hard for me |
|
I will review this PR carefully tomorrow, thanks @liukun4515 |
|
|
||
| #[tokio::test] | ||
| #[ignore = "Test ignored, will be enabled after fixing right semi join bug"] | ||
| // https://github.com/apache/arrow-datafusion/issues/4247 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix the bug for right semi join cc @mingmwang
jackwener
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job! 👍 The whole work is very clear.
| // "+----+----+-----+----+----+-----+" | ||
| // And the result of left and right indices | ||
| // left indices: 5,6,6,4 | ||
| // right indices: 3,4,5,3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| RecordBatch::try_new(Arc::new(schema.clone()), columns) | ||
| } | ||
|
|
||
| // Get left and right indices which is satisfies the on condition in the Join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is easy to forget the filter and equal-conditon both are on-condition and they are combined by And implicitly.
| // Get left and right indices which is satisfies the on condition in the Join | |
| // Get left and right indices which is satisfies the on condition (include equal_conditon and filter_in_join) in the Join |
| JoinType::LeftSemi | JoinType::LeftAnti => { | ||
| // matched or unmatched left row will be produced in the end of loop | ||
| ( | ||
| UInt64Array::from_iter_values(vec![]), | ||
| UInt32Array::from_iter_values(vec![]), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a TODO optimized points
Because semi don't need to wait the end
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @liukun4515 -- I reviewed this code and tests carefully; I found it easier to read / understand. 🏅
| PrimitiveArray::<UInt32Type>::from(right), | ||
| )) | ||
| } | ||
| JoinType::RightSemi => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
| let matched_size = left_indices.len(); | ||
| let unmatched_size = appended_right_indices.len(); | ||
| let total_size = matched_size + unmatched_size; | ||
| // the new left indices: left_indices + null array | ||
| // the new right indices: right_indices + appended_right_indices | ||
| let new_left_indices = (0..total_size) | ||
| .map(|pos| { | ||
| if pos < matched_size { | ||
| unsafe { Some(left_indices.value_unchecked(pos)) } | ||
| } else { | ||
| None | ||
| } | ||
| }) | ||
| .collect::<UInt64Array>(); | ||
| let new_right_indices = (0..total_size) | ||
| .map(|pos| { | ||
| if pos < matched_size { | ||
| unsafe { Some(right_indices.value_unchecked(pos)) } | ||
| } else { | ||
| unsafe { | ||
| Some(appended_right_indices.value_unchecked(pos - matched_size)) | ||
| } | ||
| } | ||
| }) | ||
| .collect::<UInt32Array>(); | ||
| (new_left_indices, new_right_indices) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might be able to do this without unsafe and more concisely:
| let matched_size = left_indices.len(); | |
| let unmatched_size = appended_right_indices.len(); | |
| let total_size = matched_size + unmatched_size; | |
| // the new left indices: left_indices + null array | |
| // the new right indices: right_indices + appended_right_indices | |
| let new_left_indices = (0..total_size) | |
| .map(|pos| { | |
| if pos < matched_size { | |
| unsafe { Some(left_indices.value_unchecked(pos)) } | |
| } else { | |
| None | |
| } | |
| }) | |
| .collect::<UInt64Array>(); | |
| let new_right_indices = (0..total_size) | |
| .map(|pos| { | |
| if pos < matched_size { | |
| unsafe { Some(right_indices.value_unchecked(pos)) } | |
| } else { | |
| unsafe { | |
| Some(appended_right_indices.value_unchecked(pos - matched_size)) | |
| } | |
| } | |
| }) | |
| .collect::<UInt32Array>(); | |
| (new_left_indices, new_right_indices) | |
| let matched_size = left_indices.len(); | |
| let unmatched_size = appended_right_indices.len(); | |
| // the new left indices: left_indices + null array | |
| // the new right indices: right_indices + appended_right_indices | |
| let new_left_indices = left_indices | |
| .iter() | |
| .chain(std::iter::repeat(None).take(unmatched_size)) | |
| .collect::<UInt64Array>(); | |
| let new_right_indices = right_indices | |
| .iter() | |
| .chain(appended_right_indices.iter()) | |
| .collect::<UInt32Array>(); | |
| (new_left_indices, new_right_indices) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| // b2 = 10 | ||
| build_table( | ||
| ("a2", &vec![2, 4, 6, 8, 10, 12]), | ||
| ("b2", &vec![2, 4, 6, 8, 10, 10]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend changing the order of the inputs so they are no sorted to add additional coverage
For example:
| ("b2", &vec![2, 4, 6, 8, 10, 10]), | |
| ("b2", &vec![8, 10 6, 2, 10, 4]), |
Bonus points for fuzzing and trying several different combinations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i change the order for b2, i need to change a2 with the same changes, because there are some filter for a2 and the join_equal is b2 with b1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the result for c2 in the rightsemi or rightanti will be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Benchmark runs are scheduled for baseline = 48f0f3a and contender = 8e0556b. 8e0556b is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4355
Closes #4247
Closes #4356
Rationale for this change
What changes are included in this PR?
described in this issue #4356
join typeto adjust left/right indicesjoin typeAre these changes tested?
Are there any user-facing changes?