diff --git a/src/query/service/src/physical_plans/runtime_filter/builder.rs b/src/query/service/src/physical_plans/runtime_filter/builder.rs index 51c7024c61a6d..60cf42e3685b5 100644 --- a/src/query/service/src/physical_plans/runtime_filter/builder.rs +++ b/src/query/service/src/physical_plans/runtime_filter/builder.rs @@ -102,10 +102,12 @@ pub async fn build_runtime_filter( let build_side = s_expr.build_side_child(); let build_side_data_distribution = build_side.get_data_distribution()?; - if build_side_data_distribution - .as_ref() - .is_some_and(|e| !matches!(e, Exchange::Broadcast | Exchange::NodeToNodeHash(_))) - { + if build_side_data_distribution.as_ref().is_some_and(|e| { + !matches!( + e, + Exchange::Broadcast | Exchange::NodeToNodeHash(_) | Exchange::Merge + ) + }) { return Ok(Default::default()); } @@ -124,9 +126,16 @@ pub async fn build_runtime_filter( }) }) { - // Skip if not a column reference - if probe_key.as_column_ref().is_none() { - continue; + // Skip if the probe expression is neither a direct column reference nor a + // cast from not null to nullable type (e.g. CAST(col AS Nullable(T))). + match &probe_key { + RemoteExpr::ColumnRef { .. } => {} + RemoteExpr::Cast { + expr: box RemoteExpr::ColumnRef { data_type, .. }, + dest_type, + .. + } if &dest_type.remove_nullable() == data_type => {} + _ => continue, } let probe_targets = diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs index f58e01bcfaa83..b5a9ca264fa10 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs @@ -123,7 +123,15 @@ fn build_inlist_filter(inlist: Column, probe_key: &Expr) -> Result col, + // Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T)) + Expr::Cast(cast) => match cast.expr.as_ref() { + Expr::ColumnRef(col) => col, + _ => unreachable!(), + }, + _ => unreachable!(), + }; let raw_probe_key = RawExpr::ColumnRef { span: probe_key.span, @@ -249,7 +257,15 @@ async fn build_bloom_filter( probe_key: &Expr, max_threads: usize, ) -> Result { - let probe_key = probe_key.as_column_ref().unwrap(); + let probe_key = match probe_key { + Expr::ColumnRef(col) => col, + // Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T)) + Expr::Cast(cast) => match cast.expr.as_ref() { + Expr::ColumnRef(col) => col, + _ => unreachable!(), + }, + _ => unreachable!(), + }; let column_name = probe_key.id.to_string(); let total_items = bloom.len(); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs index 497b3f1d576d3..a30412e074cd7 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs @@ -132,7 +132,15 @@ pub(crate) fn min_max_filter( max: Scalar, probe_key: &Expr, ) -> Result> { - let probe_key = probe_key.as_column_ref().unwrap(); + let probe_key = match probe_key { + Expr::ColumnRef(col) => col, + // Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T)) + Expr::Cast(cast) => match cast.expr.as_ref() { + Expr::ColumnRef(col) => col, + _ => unreachable!(), + }, + _ => unreachable!(), + }; let raw_probe_key = RawExpr::ColumnRef { span: probe_key.span, id: probe_key.id.to_string(), diff --git a/tests/sqllogictests/suites/mode/cluster/exchange.test b/tests/sqllogictests/suites/mode/cluster/exchange.test index 0b95a85d1b1fe..f5011c457591e 100644 --- a/tests/sqllogictests/suites/mode/cluster/exchange.test +++ b/tests/sqllogictests/suites/mode/cluster/exchange.test @@ -189,6 +189,8 @@ Exchange ├── probe keys: [CAST(t1.number (#2) AS UInt64 NULL)] ├── keys is null equal: [false] ├── filters: [] + ├── build join filters: + │ └── filter id:0, build key:t.number (#1), probe targets:[CAST(t1.number (#2) AS UInt64 NULL)@scan1], filter type:bloom,inlist,min_max ├── estimated rows: 2.00 ├── Exchange(Build) │ ├── output columns: [sum(number) (#1), numbers.number (#0)] @@ -224,6 +226,7 @@ Exchange ├── partitions total: 1 ├── partitions scanned: 1 ├── push downs: [filters: [], limit: NONE] + ├── apply join filters: [#0] └── estimated rows: 2.00 query T @@ -277,6 +280,8 @@ Fragment 2: ├── probe keys: [CAST(t1.number (#2) AS UInt64 NULL)] ├── keys is null equal: [false] ├── filters: [] + ├── build join filters: + │ └── filter id:0, build key:t.number (#1), probe targets:[CAST(t1.number (#2) AS UInt64 NULL)@scan1], filter type:bloom,inlist,min_max ├── estimated rows: 2.00 ├── ExchangeSource(Build) │ ├── output columns: [sum(number) (#1), numbers.number (#0)] @@ -290,6 +295,7 @@ Fragment 2: ├── partitions total: 1 ├── partitions scanned: 1 ├── push downs: [filters: [], limit: NONE] + ├── apply join filters: [#0] └── estimated rows: 2.00 (empty) (empty) diff --git a/tests/sqllogictests/suites/mode/standalone/explain/infer_filter.test b/tests/sqllogictests/suites/mode/standalone/explain/infer_filter.test index 35e7386952d55..763f74776b7f9 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/infer_filter.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/infer_filter.test @@ -882,6 +882,8 @@ HashJoin ├── probe keys: [CAST(t1.id (#0) AS Int64 NULL)] ├── keys is null equal: [false] ├── filters: [] +├── build join filters: +│ └── filter id:0, build key:CAST(t2.id (#1) AS Int64 NULL), probe targets:[CAST(t1.id (#0) AS Int64 NULL)@scan0], filter type:bloom,inlist,min_max ├── estimated rows: 1.00 ├── Filter(Build) │ ├── output columns: [t2.id (#1)] @@ -912,6 +914,7 @@ HashJoin ├── partitions scanned: 1 ├── pruning stats: [segments: , blocks: ] ├── push downs: [filters: [t1.id (#0) = 869550529], limit: NONE] + ├── apply join filters: [#0] └── estimated rows: 1.00 statement ok diff --git a/tests/sqllogictests/suites/mode/standalone/explain/runtime_filter_cast.test b/tests/sqllogictests/suites/mode/standalone/explain/runtime_filter_cast.test new file mode 100644 index 0000000000000..a1a445134d3a1 --- /dev/null +++ b/tests/sqllogictests/suites/mode/standalone/explain/runtime_filter_cast.test @@ -0,0 +1,63 @@ +statement ok +drop table if exists rf_cast_probe; + +statement ok +drop table if exists rf_cast_build; + +statement ok +create table rf_cast_build (id Nullable(Int32)); + +statement ok +create table rf_cast_probe (a Int not null); + +statement ok +insert into rf_cast_build values (1), (3), (5); + +statement ok +insert into rf_cast_probe values (1), (2), (3); + +query T +EXPLAIN SELECT * +FROM rf_cast_probe p +JOIN rf_cast_build b + ON CAST(p.a AS Nullable(Int32)) = b.id; +---- +HashJoin +├── output columns: [p.a (#0), b.id (#1)] +├── join type: INNER +├── build keys: [b.id (#1)] +├── probe keys: [CAST(p.a (#0) AS Int32 NULL)] +├── keys is null equal: [false] +├── filters: [] +├── build join filters: +│ └── filter id:0, build key:b.id (#1), probe targets:[CAST(p.a (#0) AS Int32 NULL)@scan0], filter type:bloom,inlist,min_max +├── estimated rows: 1.80 +├── TableScan(Build) +│ ├── table: default.default.rf_cast_build +│ ├── scan id: 1 +│ ├── output columns: [id (#1)] +│ ├── read rows: 3 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [], limit: NONE] +│ └── estimated rows: 3.00 +└── TableScan(Probe) + ├── table: default.default.rf_cast_probe + ├── scan id: 0 + ├── output columns: [a (#0)] + ├── read rows: 3 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + ├── apply join filters: [#0] + └── estimated rows: 3.00 + +statement ok +drop table rf_cast_probe; + +statement ok +drop table rf_cast_build; diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/infer_filter.test b/tests/sqllogictests/suites/mode/standalone/explain_native/infer_filter.test index d57e1b871c5b2..026d0bb8bf2ab 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/infer_filter.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/infer_filter.test @@ -748,6 +748,8 @@ HashJoin ├── probe keys: [CAST(t1.id (#0) AS Int64 NULL)] ├── keys is null equal: [false] ├── filters: [] +├── build join filters: +│ └── filter id:0, build key:CAST(t2.id (#1) AS Int64 NULL), probe targets:[CAST(t1.id (#0) AS Int64 NULL)@scan0], filter type:bloom,inlist,min_max ├── estimated rows: 1.00 ├── TableScan(Build) │ ├── table: default.default.t2 @@ -770,6 +772,7 @@ HashJoin ├── partitions scanned: 1 ├── pruning stats: [segments: , blocks: ] ├── push downs: [filters: [t1.id (#0) = 869550529], limit: NONE] + ├── apply join filters: [#0] └── estimated rows: 1.00 statement ok