New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add additional data types are supported in hash join #2721
Conversation
Hi @alamb , as discussed before, more common used data types, such as Date32/Date64/Decimal/Dictionary(integer/unsigned integer, Utf8) are supported in hash join. |
Grate work 👍 This change looks good to me. I try to trace the failing case and think it's not introduced here? Log shows that right join only matched twice. Need more investigation. |
@waynexia Thanks!I run the test case on my server, and get the expected results. I'll check it later. |
Please let me know if I can provide some info you need🧐 |
Do you have any ideas about the test failure? I can not reproduce that case. |
This seems to be a flaky case. I got a passing run today 😢 Here is my log: Error log
|
Cannot reproduce the success and I start to doubt whether that passing run is the correct test case... |
the left side should be null if not matched for right join, maybe some corner case |
Yes, the log shows there are only two matches. And that decimal number does not like a valid row. So I suspect this is not related to the PR... |
@AssHero would you like some help with this PR? @waynexia points out that @andygrove proposes adding support for date32/date64 in #2746 which is already covered by this PR I will try and review this PR tomorrow -- I am sorry I haven't done it until now: It looked like you and @waynexia were making good progress |
sure, I'll merge this with the new changes in master. And recheck the test cases. |
merge the new changes in master |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AssHero -- I think other than the Dictionary comparison, this PR is looking good. I also have a suggestion for what to do with the CI test failure.
datafusion/expr/src/utils.rs
Outdated
DataType::Dictionary(key_type, value_type) | ||
if *value_type.as_ref() == DataType::Utf8 => | ||
{ | ||
matches!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comment: could potentially use DataType::is_dictionary_key_type
here: https://docs.rs/arrow/16.0.0/arrow/datatypes/enum.DataType.html#method.is_dictionary_key_type
@@ -1054,6 +1079,102 @@ fn equal_rows( | |||
DataType::LargeUtf8 => { | |||
equal_rows_elem!(LargeStringArray, l, r, left, right, null_equals_null) | |||
} | |||
DataType::Decimal(_, _) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For decimal, I wonder if we also need to ensure that the precision and scale are the same (e.g. l.data_type() == r.data_type()
) 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the scale should be the same, precision is insignificant. what is your suggestion?
@@ -947,6 +951,27 @@ macro_rules! equal_rows_elem { | |||
}}; | |||
} | |||
|
|||
macro_rules! equal_rows_elem_with_string_dict { | |||
($key_array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident, $null_equals_null: ident) => {{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For dictionaries, I think $left
and $right
are actually indexes into the keys array, and then the keys array contains the corresponding index into values
.
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┌─────────────────┐ ┌─────────┐ │ ┌─────────────────┐
│ │ A │ │ 0 │ │ A │ values[keys[0]]
├─────────────────┤ ├─────────┤ │ ├─────────────────┤
│ │ D │ │ 2 │ │ B │ values[keys[1]]
├─────────────────┤ ├─────────┤ │ ├─────────────────┤
│ │ B │ │ 2 │ │ B │ values[keys[2]]
├─────────────────┤ ├─────────┤ │ ├─────────────────┤
│ │ C │ │ 1 │ │ D │ values[keys[3]]
├─────────────────┤ └─────────┘ │ └─────────────────┘
│ │ E │ keys
└─────────────────┘ │
│ values Logical array
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ Contents
DictionaryArray
length = 4
In other words, I think you need to compare the values using something like:
https://github.com/AssHero/arrow-datafusion/blob/hashjoin/datafusion/common/src/scalar.rs#L338-L361
"| | | | | | | 1970-01-04 | 0.00 | qwerty | 74776f |", | ||
"| | 1970-01-04 | 789.00 | ghi | | 1970-01-04 | | 789.00 | | 7468726565 |", | ||
"| 1970-01-04 | | -123.12 | jkl | 7468726565 | 1970-01-02 | 1970-01-02 | -123.12 | abc | 6f6e65 |", | ||
"+------------+------------+---------+-----+------------+------------+------------+-----------+---------+------------+", | ||
]; | ||
|
||
let results = execute_to_batches(&ctx, sql).await; | ||
assert_batches_sorted_eq!(expected, &results); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is failing on CI: https://github.com/apache/arrow-datafusion/runs/6953453815?check_suite_focus=true
Looking at the diff it appears to be related to column c5:
And thus it seems unrelated to this PR (though a real bug). Thus I personally suggest you change the query to not select c5
and we then file ticket to chase down the real problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. That wrong c5 is not actually a correct row in t1.
and we then file ticket to chase down the real problem.
@AssHero cannot reproduce this might also be a clue. And I hope we can add c5 back in the next fixing PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/arrow-datafusion/runs/6953453815?check_suite_focus=tr
Thanks! Let me fix the test case.
Codecov Report
@@ Coverage Diff @@
## master #2721 +/- ##
==========================================
- Coverage 84.96% 84.95% -0.02%
==========================================
Files 271 271
Lines 48053 48142 +89
==========================================
+ Hits 40827 40897 +70
- Misses 7226 7245 +19
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I have reviewed the new changes and they look good to me, thanks @AssHero
@@ -1054,6 +1110,116 @@ fn equal_rows( | |||
DataType::LargeUtf8 => { | |||
equal_rows_elem!(LargeStringArray, l, r, left, right, null_equals_null) | |||
} | |||
DataType::Decimal(_, lscale) => match r.data_type() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my knowledge in the datafusion, we have converted the left and right to the same data type in the planner.
The data type of left and right must be same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I create two table in the spark,
spark-sql> desc t1;
c1 decimal(10,3)
spark-sql> desc t2;
c1 decimal(10,4)
and join two table with the eq condition: t1.c1 = t2.c1, get the plan
spark-sql> explain select * from t1,t2 where t1.c1 = t2.c1;
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [cast(c1#111 as decimal(11,4))], [cast(c1#112 as decimal(11,4))], Inner
:- Sort [cast(c1#111 as decimal(11,4)) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(c1#111 as decimal(11,4)), 200), ENSURE_REQUIREMENTS, [id=#293]
: +- Filter isnotnull(c1#111)
: +- Scan hive default.t1 [c1#111], HiveTableRelation [`default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#111], Partition Cols: []]
+- Sort [cast(c1#112 as decimal(11,4)) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(c1#112 as decimal(11,4)), 200), ENSURE_REQUIREMENTS, [id=#294]
+- Filter isnotnull(c1#112)
+- Scan hive default.t2 [c1#112], HiveTableRelation [`default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#112], Partition Cols: []]
From the from, we can get the the t1.c1 and t2.c1 will be casted to coerced type(decimal(11,4));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I create two tables in datafusion
❯ describe t1;
+-------------+----------------+-------------+
| column_name | data_type | is_nullable |
+-------------+----------------+-------------+
| c1 | Decimal(10, 2) | NO |
+-------------+----------------+-------------+
select * from t1;
+------+
| c1 |
+------+
| 1.00 |
+------+
❯ describe t2;
+-------------+----------------+-------------+
| column_name | data_type | is_nullable |
+-------------+----------------+-------------+
| c1 | Decimal(10, 4) | NO |
+-------------+----------------+-------------+
select * from t2;
+--------+
| c1 |
+--------+
| 1.0000 |
+--------+
this query gets 0 rows
select * from t1 join t2 on t1.c1 = t2.c1;
0 rows in set. Query took 0.005 seconds.
but this query gets one row
select * from t1 join t2 on t1.c1::decimal(10,4) = t2.c1;
+------+--------+
| c1 | c1 |
+------+--------+
| 1.00 | 1.0000 |
+------+--------+
Does the planner still not converts the data types between left and right for decimal ? If so, perhaps we can add this to planner.
please let me know if I miss something? thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make the data type is same between left and right.
In this pr, we should better forbid calculate the join operation for diff data type, For example
Decimal(10,3) join Decimal(11,3)
I think you can create a follow up pr to make data type coercion in the planner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we should check the data types and make the coercion in the planner. I'll create an issue and follow up the pr later. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @AssHero
I'll try and take a look at this tomorrow. Sorry for the delay in review, but it is a bit of a crunch time at work (e.g. https://github.com/influxdata/influxdb_iox/issues/4658) so I don't have as much time to devote to things in arrow and datafusion that are not directly connected for the next few weeks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @AssHero -- let me know if you would like help filing the issue for the strange behavior we were seeing with c5
|
||
match values_index { | ||
Ok(index) => (as_string_array(left_array.values()), Some(index)), | ||
_ => (as_string_array(left_array.values()), None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code silently ignores failures
Filed #2767 to make this a panic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Thanks again @AssHero -- let me know if you would like help filing the issue for the strange behavior we were seeing with
c5
yes, i'll trace the issue and try to solve this issue.
Which issue does this PR close?
Closes #2145
Rationale for this change
More data types are supported in hash join. such as Date32、Date64、Decimal、Dictionary(_, Utf8)
What changes are included in this PR?
support Date32/Date64/Decimal/Dictionary data types in equal_rows, also add those data types in can_hash function.