Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions src/query/service/src/physical_plans/runtime_filter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,12 @@ pub async fn build_runtime_filter(

let build_side = s_expr.build_side_child();
let build_side_data_distribution = build_side.get_data_distribution()?;
if build_side_data_distribution
.as_ref()
.is_some_and(|e| !matches!(e, Exchange::Broadcast | Exchange::NodeToNodeHash(_)))
{
if build_side_data_distribution.as_ref().is_some_and(|e| {
!matches!(
e,
Exchange::Broadcast | Exchange::NodeToNodeHash(_) | Exchange::Merge
)
}) {
return Ok(Default::default());
}

Expand All @@ -124,9 +126,16 @@ pub async fn build_runtime_filter(
})
})
{
// Skip if not a column reference
if probe_key.as_column_ref().is_none() {
continue;
// Skip if the probe expression is neither a direct column reference nor a
// cast from not null to nullable type (e.g. CAST(col AS Nullable(T))).
match &probe_key {
RemoteExpr::ColumnRef { .. } => {}
RemoteExpr::Cast {
expr: box RemoteExpr::ColumnRef { data_type, .. },
dest_type,
..
} if &dest_type.remove_nullable() == data_type => {}
_ => continue,
}

let probe_targets =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,15 @@ fn build_inlist_filter(inlist: Column, probe_key: &Expr<String>) -> Result<Expr<
data_type: DataType::Boolean,
}));
}
let probe_key = probe_key.as_column_ref().unwrap();
let probe_key = match probe_key {
Expr::ColumnRef(col) => col,
// Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T))
Expr::Cast(cast) => match cast.expr.as_ref() {
Expr::ColumnRef(col) => col,
_ => unreachable!(),
},
_ => unreachable!(),
};

let raw_probe_key = RawExpr::ColumnRef {
span: probe_key.span,
Expand Down Expand Up @@ -249,7 +257,15 @@ async fn build_bloom_filter(
probe_key: &Expr<String>,
max_threads: usize,
) -> Result<RuntimeFilterBloom> {
let probe_key = probe_key.as_column_ref().unwrap();
let probe_key = match probe_key {
Expr::ColumnRef(col) => col,
// Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T))
Expr::Cast(cast) => match cast.expr.as_ref() {
Expr::ColumnRef(col) => col,
_ => unreachable!(),
},
_ => unreachable!(),
};
let column_name = probe_key.id.to_string();
let total_items = bloom.len();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,15 @@ pub(crate) fn min_max_filter(
max: Scalar,
probe_key: &Expr<String>,
) -> Result<Expr<String>> {
let probe_key = probe_key.as_column_ref().unwrap();
let probe_key = match probe_key {
Expr::ColumnRef(col) => col,
// Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T))
Expr::Cast(cast) => match cast.expr.as_ref() {
Expr::ColumnRef(col) => col,
_ => unreachable!(),
},
_ => unreachable!(),
};
let raw_probe_key = RawExpr::ColumnRef {
span: probe_key.span,
id: probe_key.id.to_string(),
Expand Down
6 changes: 6 additions & 0 deletions tests/sqllogictests/suites/mode/cluster/exchange.test
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ Exchange
├── probe keys: [CAST(t1.number (#2) AS UInt64 NULL)]
├── keys is null equal: [false]
├── filters: []
├── build join filters:
│ └── filter id:0, build key:t.number (#1), probe targets:[CAST(t1.number (#2) AS UInt64 NULL)@scan1], filter type:bloom,inlist,min_max
├── estimated rows: 2.00
├── Exchange(Build)
│ ├── output columns: [sum(number) (#1), numbers.number (#0)]
Expand Down Expand Up @@ -224,6 +226,7 @@ Exchange
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [], limit: NONE]
├── apply join filters: [#0]
└── estimated rows: 2.00

query T
Expand Down Expand Up @@ -277,6 +280,8 @@ Fragment 2:
├── probe keys: [CAST(t1.number (#2) AS UInt64 NULL)]
├── keys is null equal: [false]
├── filters: []
├── build join filters:
│ └── filter id:0, build key:t.number (#1), probe targets:[CAST(t1.number (#2) AS UInt64 NULL)@scan1], filter type:bloom,inlist,min_max
├── estimated rows: 2.00
├── ExchangeSource(Build)
│ ├── output columns: [sum(number) (#1), numbers.number (#0)]
Expand All @@ -290,6 +295,7 @@ Fragment 2:
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [], limit: NONE]
├── apply join filters: [#0]
└── estimated rows: 2.00
(empty)
(empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,8 @@ HashJoin
├── probe keys: [CAST(t1.id (#0) AS Int64 NULL)]
├── keys is null equal: [false]
├── filters: []
├── build join filters:
│ └── filter id:0, build key:CAST(t2.id (#1) AS Int64 NULL), probe targets:[CAST(t1.id (#0) AS Int64 NULL)@scan0], filter type:bloom,inlist,min_max
├── estimated rows: 1.00
├── Filter(Build)
│ ├── output columns: [t2.id (#1)]
Expand Down Expand Up @@ -912,6 +914,7 @@ HashJoin
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1, bloom pruning: 1 to 1>]
├── push downs: [filters: [t1.id (#0) = 869550529], limit: NONE]
├── apply join filters: [#0]
└── estimated rows: 1.00

statement ok
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
statement ok
drop table if exists rf_cast_probe;

statement ok
drop table if exists rf_cast_build;

statement ok
create table rf_cast_build (id Nullable(Int32));

statement ok
create table rf_cast_probe (a Int not null);

statement ok
insert into rf_cast_build values (1), (3), (5);

statement ok
insert into rf_cast_probe values (1), (2), (3);

query T
EXPLAIN SELECT *
FROM rf_cast_probe p
JOIN rf_cast_build b
ON CAST(p.a AS Nullable(Int32)) = b.id;
----
HashJoin
├── output columns: [p.a (#0), b.id (#1)]
├── join type: INNER
├── build keys: [b.id (#1)]
├── probe keys: [CAST(p.a (#0) AS Int32 NULL)]
├── keys is null equal: [false]
├── filters: []
├── build join filters:
│ └── filter id:0, build key:b.id (#1), probe targets:[CAST(p.a (#0) AS Int32 NULL)@scan0], filter type:bloom,inlist,min_max
├── estimated rows: 1.80
├── TableScan(Build)
│ ├── table: default.default.rf_cast_build
│ ├── scan id: 1
│ ├── output columns: [id (#1)]
│ ├── read rows: 3
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 3.00
└── TableScan(Probe)
├── table: default.default.rf_cast_probe
├── scan id: 0
├── output columns: [a (#0)]
├── read rows: 3
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
├── apply join filters: [#0]
└── estimated rows: 3.00

statement ok
drop table rf_cast_probe;

statement ok
drop table rf_cast_build;
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,8 @@ HashJoin
├── probe keys: [CAST(t1.id (#0) AS Int64 NULL)]
├── keys is null equal: [false]
├── filters: []
├── build join filters:
│ └── filter id:0, build key:CAST(t2.id (#1) AS Int64 NULL), probe targets:[CAST(t1.id (#0) AS Int64 NULL)@scan0], filter type:bloom,inlist,min_max
├── estimated rows: 1.00
├── TableScan(Build)
│ ├── table: default.default.t2
Expand All @@ -770,6 +772,7 @@ HashJoin
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1, bloom pruning: 1 to 1>]
├── push downs: [filters: [t1.id (#0) = 869550529], limit: NONE]
├── apply join filters: [#0]
└── estimated rows: 1.00

statement ok
Expand Down