Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::sync::Arc;

use arrow::array::{ArrayRef, StructArray};
use arrow::compute::cast;
use arrow::datatypes::{Field, FieldRef, Fields};
use arrow_schema::DataType;
use datafusion_common::Result;
Expand All @@ -33,19 +32,6 @@ pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result<Fields> {
.collect()
}

/// Casts dictionary-encoded arrays to their underlying value type, preserving row count.
/// Non-dictionary arrays are returned as-is.
fn flatten_dictionary_array(array: &ArrayRef) -> Result<ArrayRef> {
match array.data_type() {
DataType::Dictionary(_, value_type) => {
let casted = cast(array, value_type)?;
// Recursively flatten in case of nested dictionaries
flatten_dictionary_array(&casted)
}
_ => Ok(Arc::clone(array)),
}
}

/// Builds InList values from join key column arrays.
///
/// If `join_key_arrays` is:
Expand All @@ -65,20 +51,14 @@ fn flatten_dictionary_array(array: &ArrayRef) -> Result<ArrayRef> {
pub(super) fn build_struct_inlist_values(
join_key_arrays: &[ArrayRef],
) -> Result<Option<ArrayRef>> {
// Flatten any dictionary-encoded arrays
let flattened_arrays: Vec<ArrayRef> = join_key_arrays
.iter()
.map(flatten_dictionary_array)
.collect::<Result<Vec<_>>>()?;

// Build the source array/struct
let source_array: ArrayRef = if flattened_arrays.len() == 1 {
let source_array: ArrayRef = if join_key_arrays.len() == 1 {
// Single column: use directly
Arc::clone(&flattened_arrays[0])
Arc::clone(&join_key_arrays[0])
} else {
// Multi-column: build StructArray once from all columns
let fields = build_struct_fields(
&flattened_arrays
&join_key_arrays
.iter()
.map(|arr| arr.data_type().clone())
.collect::<Vec<_>>(),
Expand All @@ -88,7 +68,7 @@ pub(super) fn build_struct_inlist_values(
let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields
.iter()
.cloned()
.zip(flattened_arrays.iter().cloned())
.zip(join_key_arrays.iter().cloned())
.collect();

Arc::new(StructArray::from(arrays_with_fields))
Expand Down Expand Up @@ -152,7 +132,14 @@ mod tests {
assert_eq!(
*result.data_type(),
DataType::Struct(
build_struct_fields(&[DataType::Utf8, DataType::Int32]).unwrap()
build_struct_fields(&[
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Utf8)
),
DataType::Int32
])
.unwrap()
)
);
}
Expand All @@ -168,6 +155,6 @@ mod tests {
.unwrap();

assert_eq!(result.len(), 3);
assert_eq!(*result.data_type(), DataType::Utf8);
assert_eq!(result.data_type(), dict_array.data_type());
}
}
100 changes: 100 additions & 0 deletions datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,103 @@ ORDER BY start_timestamp, trace_id
LIMIT 1;
----
2024-10-01T00:00:00


statement ok
set datafusion.execution.parquet.pushdown_filters = false;

# Regression test for https://github.com/apache/datafusion/issues/20696
# Multi-column INNER JOIN with dictionary fails
# when parquet pushdown filters are enabled.

statement ok
COPY (
SELECT
to_timestamp_nanos(time_ns) AS time,
arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
temp
FROM (
VALUES
(200, 'CA', 'LA', 90.0),
(250, 'MA', 'Boston', 72.4),
(100, 'MA', 'Boston', 70.4),
(350, 'CA', 'LA', 90.0)
) AS t(time_ns, state, city, temp)
)
TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/data.parquet';

statement ok
COPY (
SELECT
to_timestamp_nanos(time_ns) AS time,
arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
temp,
reading
FROM (
VALUES
(250, 'MA', 'Boston', 53.4, 51.0),
(100, 'MA', 'Boston', 50.4, 50.0)
) AS t(time_ns, state, city, temp, reading)
)
TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/data.parquet';

statement ok
CREATE EXTERNAL TABLE h2o_parquet_20696 STORED AS PARQUET
LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/';

statement ok
CREATE EXTERNAL TABLE o2_parquet_20696 STORED AS PARQUET
LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/';

# Query should work both with and without filters
statement ok
set datafusion.execution.parquet.pushdown_filters = false;

query RRR
SELECT
h2o_parquet_20696.temp AS h2o_temp,
o2_parquet_20696.temp AS o2_temp,
o2_parquet_20696.reading
FROM h2o_parquet_20696
INNER JOIN o2_parquet_20696
ON h2o_parquet_20696.time = o2_parquet_20696.time
AND h2o_parquet_20696.state = o2_parquet_20696.state
AND h2o_parquet_20696.city = o2_parquet_20696.city
WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
----
72.4 53.4 51
70.4 50.4 50


statement ok
set datafusion.execution.parquet.pushdown_filters = true;

query RRR
SELECT
h2o_parquet_20696.temp AS h2o_temp,
o2_parquet_20696.temp AS o2_temp,
o2_parquet_20696.reading
FROM h2o_parquet_20696
INNER JOIN o2_parquet_20696
ON h2o_parquet_20696.time = o2_parquet_20696.time
AND h2o_parquet_20696.state = o2_parquet_20696.state
AND h2o_parquet_20696.city = o2_parquet_20696.city
WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
----
72.4 53.4 51
70.4 50.4 50

# Cleanup
statement ok
DROP TABLE h2o_parquet_20696;

statement ok
DROP TABLE o2_parquet_20696;

# Cleanup settings
statement ok
set datafusion.execution.parquet.pushdown_filters = false;