From 7b0ac7d18f5c546ba0ea2c7d5ab807b5524b33d7 Mon Sep 17 00:00:00 2001 From: Pxl Date: Fri, 5 Jun 2026 18:23:25 +0800 Subject: [PATCH] [fix](fe) Prevent unsafe runtime filter pushdown through outer joins (#64102) related with: #57425 Runtime filters from a parent inner join could be pushed through an outer join into the null-generating child even when the probe expression was not null-propagating for that child. The problem can be reproduced with this SQL shape: ```sql create table rf_outer_join_nullable_a (pk int) duplicate key(pk) distributed by hash(pk) buckets 1 properties("replication_num" = "1"); create table rf_outer_join_nullable_b (pk int) duplicate key(pk) distributed by hash(pk) buckets 1 properties("replication_num" = "1"); create table rf_outer_join_nullable_c (pk int) duplicate key(pk) distributed by hash(pk) buckets 1 properties("replication_num" = "1"); insert into rf_outer_join_nullable_a values (1); insert into rf_outer_join_nullable_b values (1); insert into rf_outer_join_nullable_c values (0); set disable_join_reorder = true; select coalesce(b.pk, 0) as k, count(*) as cnt from rf_outer_join_nullable_a a left join rf_outer_join_nullable_b b on a.pk = b.pk inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk group by 1 order by 1; ``` The correct result is empty. `a.pk = 1` matches `b.pk = 1` in the left outer join, then the parent inner join evaluates `coalesce(1, 0) = 0`, which is false. The wrong plan generated a runtime filter from the parent inner join, effectively `c.pk -> coalesce(b.pk, 0)`, and pushed it through the lower `LEFT OUTER JOIN` into the right side scan of `b`. If `b.pk = 1` is pre-filtered before the left outer join, the join emits a NULL-extended row for `b`; then `coalesce(NULL, 0) = 0` becomes true and incorrectly returns `(0, 1)`. Therefore the runtime filter `c.pk -> coalesce(b.pk, 0)` must not be planned on the null-generating side of the lower outer join. This PR blocks runtime filter pushdown through an outer join's null-generating child unless the probe expression preserves NULL semantics for slots from that child. Normal pushdown through preserved sides and null-propagating expressions is kept unchanged. The bug became observable after #57425 changed the target lookup for expression runtime filters from `ctx.probeExpr` to `ctx.probeSlot`. Before that change, an expression such as `coalesce(b.pk, 0)` could not resolve the target relation in this path and the unsafe pushdown was not generated. None - Test - [x] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason Added regression case with `disable_join_reorder`, `qt_shape`, and empty result verification: `regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy` Unit test: `./run-fe-ut.sh --run org.apache.doris.nereids.postprocess.RuntimeFilterTest#testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin,org.apache.doris.nereids.postprocess.RuntimeFilterTest#testPushDownNullPropagatingRuntimeFilterThroughOuterJoin` The SQL regression case was not run locally against the available 9333 cluster because that cluster was the unpatched repro cluster. - Behavior changed: - [ ] No. - [x] Yes. Runtime filters are no longer pushed through an outer join into its null-generating child when the probe expression can convert NULL to a non-NULL value. - Does this need documentation? - [x] No. - [ ] Yes. - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label (cherry picked from commit 691189aaad64774a4f9ed6c522939ac8cb1b5407) --- .../post/RuntimeFilterPushDownVisitor.java | 52 +++++++++++- .../postprocess/RuntimeFilterTest.java | 19 +++++ ...untime_filter_outer_join_nullable_side.out | 18 +++++ ...ime_filter_outer_join_nullable_side.groovy | 79 +++++++++++++++++++ 4 files changed, 164 insertions(+), 4 deletions(-) create mode 100644 regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out create mode 100644 regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java index 7c7f94db657c01..9175058cfcf07f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.expressions.NullSafeEqual; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitors; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; @@ -309,10 +310,12 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin join, + boolean isLeftChild, PushDownContext ctx) { + if (join.equals(ctx.builderNode) || !isNullGeneratingChild(join.getJoinType(), isLeftChild)) { + return true; + } + // A runtime filter is still safe on the null-generating side if generated NULL rows + // cannot become non-NULL before the parent join condition is evaluated. For example, + // `b.pk = c.pk` rejects generated NULLs, while `coalesce(b.pk, 0) = c.pk` may match them. + return isNullPropagating(ctx.probeExpr); + } + + private boolean isNullGeneratingChild(JoinType joinType, boolean isLeftChild) { + if (joinType.isFullOuterJoin()) { + return true; + } + if (isLeftChild) { + return joinType.isRightOuterJoin() || joinType.isAsofRightOuterJoin(); + } + return joinType.isLeftOuterJoin() || joinType.isAsofLeftOuterJoin(); + } + + private boolean isNullPropagating(Expression expression) { + if (expression instanceof Slot) { + return true; + } + if (expression instanceof Cast) { + return isNullPropagating(((Cast) expression).child()); + } + if (expression instanceof PropagateNullable) { + for (Expression child : expression.children()) { + if (!child.getInputSlots().isEmpty() && !isNullPropagating(child)) { + return false; + } + } + return true; + } + return false; + } + @Override public Boolean visitPhysicalProject(PhysicalProject project, PushDownContext ctx) { if (!project.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java index 3d6964d8d7e8b2..a8ee2a64f4e77a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java @@ -126,6 +126,25 @@ public void testPushDownEncounterUnsupportedJoinType() { Pair.of("s_suppkey", "c_custkey"), Pair.of("c_custkey", "lo_custkey"))); } + @Test + public void testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin() { + String sql = "select * from lineorder left outer join customer on lo_custkey = c_custkey" + + " inner join supplier on coalesce(c_custkey, 0) = s_suppkey"; + List filters = getRuntimeFilters(sql).get(); + Assertions.assertEquals(0, filters.size()); + } + + @Test + public void testPushDownNullPropagatingRuntimeFilterThroughOuterJoin() { + String sql = "select * from lineorder left outer join customer on lo_custkey = c_custkey" + + " inner join supplier on c_custkey = s_suppkey"; + List filters = getRuntimeFilters(sql).get(); + Assertions.assertEquals(2, filters.size()); + checkRuntimeFilterExprs(filters, ImmutableList.of( + Pair.of("c_custkey", "lo_custkey"), + Pair.of("s_suppkey", "c_custkey"))); + } + @Test public void testPushDownThroughAggNode() { String sql = "select profit" diff --git a/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out new file mode 100644 index 00000000000000..37e22cca87b25f --- /dev/null +++ b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out @@ -0,0 +1,18 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !shape -- +PhysicalResultSink +--PhysicalQuickSort[MERGE_SORT] +----PhysicalDistribute[DistributionSpecGather] +------PhysicalQuickSort[LOCAL_SORT] +--------hashAgg[GLOBAL] +----------PhysicalDistribute[DistributionSpecHash] +------------hashAgg[LOCAL] +--------------PhysicalProject +----------------hashJoin[INNER_JOIN broadcast] hashCondition=((expr_coalesce(pk, 0) = c.pk)) otherCondition=() +------------------PhysicalProject +--------------------hashJoin[LEFT_OUTER_JOIN broadcast] hashCondition=((a.pk = b.pk)) otherCondition=() +----------------------PhysicalOlapScan[rf_outer_join_nullable_a] +----------------------PhysicalOlapScan[rf_outer_join_nullable_b] +------------------PhysicalOlapScan[rf_outer_join_nullable_c] + +-- !result -- diff --git a/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy new file mode 100644 index 00000000000000..49fa677d0ed224 --- /dev/null +++ b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_runtime_filter_outer_join_nullable_side") { + sql "drop table if exists rf_outer_join_nullable_a" + sql "drop table if exists rf_outer_join_nullable_b" + sql "drop table if exists rf_outer_join_nullable_c" + + sql """ + create table rf_outer_join_nullable_a ( + pk int + ) + duplicate key(pk) + distributed by hash(pk) buckets 1 + properties("replication_num" = "1") + """ + + sql """ + create table rf_outer_join_nullable_b ( + pk int + ) + duplicate key(pk) + distributed by hash(pk) buckets 1 + properties("replication_num" = "1") + """ + + sql """ + create table rf_outer_join_nullable_c ( + pk int + ) + duplicate key(pk) + distributed by hash(pk) buckets 1 + properties("replication_num" = "1") + """ + + sql "insert into rf_outer_join_nullable_a values (1)" + sql "insert into rf_outer_join_nullable_b values (1)" + sql "insert into rf_outer_join_nullable_c values (0)" + sql "sync" + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set disable_join_reorder=true" + sql "set runtime_filter_mode='GLOBAL'" + sql "set runtime_filter_type='IN_OR_BLOOM_FILTER'" + sql "set runtime_filter_wait_infinitely=true" + sql "set enable_runtime_filter_prune=false" + sql "set parallel_pipeline_task_num=1" + + def query = """ + select coalesce(b.pk, 0) as k, count(*) as cnt + from rf_outer_join_nullable_a a + left join rf_outer_join_nullable_b b on a.pk = b.pk + inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk + group by 1 + order by 1 + """ + + qt_shape """ + explain shape plan + ${query} + """ + + qt_result query +}