-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
Dictionary types aren't supported by the hasher. Attempting to join on two partitioned columns panics with:
thread 'parquet_join_on_partition_columns' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")', datafusion/src/physical_plan/hash_join.rs:628:6
This isn't an issue when joining partition column with another string column, I assume due to type coercion rules casting partition column to Utf8.
To Reproduce
I did this in using the path_partition tests using slightly modified version of register_partitioned_alltypes_parquet that allows setting the name of the registered table.
async fn parquet_join_on_partition_columns()->Result<()>{
let mut ctx = ExecutionContext::new();
register_partitioned_alltypes_parquet_with_name(
&mut ctx,
&[
"year=2021/month=09/day=09/file.parquet",
"year=2021/month=10/day=09/file.parquet",
"year=2021/month=10/day=28/file.parquet",
],
&["year", "month", "day"],
"",
"alltypes_plain.parquet",
"left"
)
.await;
register_partitioned_alltypes_parquet_with_name(
&mut ctx,
&[
"year=2021/month=09/day=09/file.parquet",
"year=2021/month=10/day=09/file.parquet",
"year=2021/month=10/day=28/file.parquet",
],
&["year", "month", "day"],
"",
"alltypes_plain.parquet",
"right"
)
.await;
let _results = ctx.sql("SELECT left.id as lid, right.id as rid from left inner join right on left.month = right.month")
.await?
.collect()
.await?;
Ok(())
}
Expected behavior
Query should return results without error.
I'm not sure how this should be solved. It would be pretty straightforward to add casts into either the planner or the join implementation into a string array but this would cause an increase in memory usage. Another option would be to add implementations to the hasher, but this will add a large number of additional branches if other partition column types are eventually supported. I know that a row based join format for joins was being implemented but I think that the same issue with regards to having to cast from dictionary -> concrete type will cause an increase in memory usage with the row based version as well. This would be solvable there with a hashmap that is external to the row which globally manages a integer -> T map and encodes the value of the partitioned column as a integer in the row.