diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java index 7c7f94db657c01..9175058cfcf07f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPushDownVisitor.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.expressions.NullSafeEqual; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitors; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; @@ -309,10 +310,12 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin join, + boolean isLeftChild, PushDownContext ctx) { + if (join.equals(ctx.builderNode) || !isNullGeneratingChild(join.getJoinType(), isLeftChild)) { + return true; + } + // A runtime filter is still safe on the null-generating side if generated NULL rows + // cannot become non-NULL before the parent join condition is evaluated. For example, + // `b.pk = c.pk` rejects generated NULLs, while `coalesce(b.pk, 0) = c.pk` may match them. + return isNullPropagating(ctx.probeExpr); + } + + private boolean isNullGeneratingChild(JoinType joinType, boolean isLeftChild) { + if (joinType.isFullOuterJoin()) { + return true; + } + if (isLeftChild) { + return joinType.isRightOuterJoin() || joinType.isAsofRightOuterJoin(); + } + return joinType.isLeftOuterJoin() || joinType.isAsofLeftOuterJoin(); + } + + private boolean isNullPropagating(Expression expression) { + if (expression instanceof Slot) { + return true; + } + if (expression instanceof Cast) { + return isNullPropagating(((Cast) expression).child()); + } + if (expression instanceof PropagateNullable) { + for (Expression child : expression.children()) { + if (!child.getInputSlots().isEmpty() && !isNullPropagating(child)) { + return false; + } + } + return true; + } + return false; + } + @Override public Boolean visitPhysicalProject(PhysicalProject project, PushDownContext ctx) { if (!project.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java index f1164ece13e14a..d58fba9afef496 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java @@ -134,6 +134,25 @@ public void testPushDownEncounterUnsupportedJoinType() { Pair.of("s_suppkey", "c_custkey"), Pair.of("c_custkey", "lo_custkey"))); } + @Test + public void testDoNotPushDownNonNullPropagatingRuntimeFilterThroughOuterJoin() { + String sql = "select * from lineorder left outer join customer on lo_custkey = c_custkey" + + " inner join supplier on coalesce(c_custkey, 0) = s_suppkey"; + List filters = getRuntimeFilters(sql).get(); + Assertions.assertEquals(0, filters.size()); + } + + @Test + public void testPushDownNullPropagatingRuntimeFilterThroughOuterJoin() { + String sql = "select * from lineorder left outer join customer on lo_custkey = c_custkey" + + " inner join supplier on c_custkey = s_suppkey"; + List filters = getRuntimeFilters(sql).get(); + Assertions.assertEquals(2, filters.size()); + checkRuntimeFilterExprs(filters, ImmutableList.of( + Pair.of("c_custkey", "lo_custkey"), + Pair.of("s_suppkey", "c_custkey"))); + } + @Test public void testPushDownThroughAggNode() { String sql = "select profit" diff --git a/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out new file mode 100644 index 00000000000000..ded64f1038283a --- /dev/null +++ b/regression-test/data/correctness_p0/test_runtime_filter_outer_join_nullable_side.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !shape -- +PhysicalResultSink +--PhysicalQuickSort[MERGE_SORT] +----PhysicalDistribute[DistributionSpecGather] +------PhysicalQuickSort[LOCAL_SORT] +--------hashAgg[GLOBAL] +----------PhysicalDistribute[DistributionSpecHash] +------------PhysicalProject +--------------hashJoin[INNER_JOIN broadcast] hashCondition=((expr_coalesce(pk, 0) = c.pk)) otherCondition=() +----------------PhysicalProject +------------------hashJoin[LEFT_OUTER_JOIN broadcast] hashCondition=((a.pk = b.pk)) otherCondition=() +--------------------PhysicalOlapScan[rf_outer_join_nullable_a(a)] +--------------------PhysicalOlapScan[rf_outer_join_nullable_b(b)] +----------------PhysicalOlapScan[rf_outer_join_nullable_c(c)] + +-- !result -- diff --git a/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy new file mode 100644 index 00000000000000..49fa677d0ed224 --- /dev/null +++ b/regression-test/suites/correctness_p0/test_runtime_filter_outer_join_nullable_side.groovy @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_runtime_filter_outer_join_nullable_side") { + sql "drop table if exists rf_outer_join_nullable_a" + sql "drop table if exists rf_outer_join_nullable_b" + sql "drop table if exists rf_outer_join_nullable_c" + + sql """ + create table rf_outer_join_nullable_a ( + pk int + ) + duplicate key(pk) + distributed by hash(pk) buckets 1 + properties("replication_num" = "1") + """ + + sql """ + create table rf_outer_join_nullable_b ( + pk int + ) + duplicate key(pk) + distributed by hash(pk) buckets 1 + properties("replication_num" = "1") + """ + + sql """ + create table rf_outer_join_nullable_c ( + pk int + ) + duplicate key(pk) + distributed by hash(pk) buckets 1 + properties("replication_num" = "1") + """ + + sql "insert into rf_outer_join_nullable_a values (1)" + sql "insert into rf_outer_join_nullable_b values (1)" + sql "insert into rf_outer_join_nullable_c values (0)" + sql "sync" + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set disable_join_reorder=true" + sql "set runtime_filter_mode='GLOBAL'" + sql "set runtime_filter_type='IN_OR_BLOOM_FILTER'" + sql "set runtime_filter_wait_infinitely=true" + sql "set enable_runtime_filter_prune=false" + sql "set parallel_pipeline_task_num=1" + + def query = """ + select coalesce(b.pk, 0) as k, count(*) as cnt + from rf_outer_join_nullable_a a + left join rf_outer_join_nullable_b b on a.pk = b.pk + inner join rf_outer_join_nullable_c c on coalesce(b.pk, 0) = c.pk + group by 1 + order by 1 + """ + + qt_shape """ + explain shape plan + ${query} + """ + + qt_result query +}