From 7f4a3c8a0dd66da452d35648fd12494d1bea196c Mon Sep 17 00:00:00 2001 From: yujun Date: Thu, 23 Apr 2026 10:20:09 +0800 Subject: [PATCH] [fix](nereids) Block push-down of filters containing unique functions through Generate and CTE consumer (#62705) Issue Number: close #25201, close #25202 Problem Summary: Two Nereids rewrite rules moved filter conjuncts that contain non-idempotent `UniqueFunction` calls (`rand` / `uuid` / `random_bytes` / `uuid_numeric`) across operators that change how many times the unique function is evaluated, producing wrong results. 1. `PushDownFilterThroughGenerate` pushed a conjunct like `t1.id + rand(1,100) > 5` below `LogicalGenerate`. Before the push, `rand` is evaluated per generated row; after, it is evaluated per base row and then the result is duplicated for every row produced by generate, so groups of N generated rows share a single rand value instead of N independent ones. 2. `CollectFilterAboveConsumer` registered filter conjuncts above a CTE consumer into `cascadesContext.putConsumerIdToFilter(...)`, after which `RewriteCteChildren.tryToConstructFilter` would OR them up and push them into the CTE producer. For a conjunct like `rand() > 0.1`, that causes the random filter to run on both the producer scan and each consumer filter, and different consumers would see inconsistent rows. Fix: in both rules, skip conjuncts whose `containsUniqueFunction()` is true so they stay above the operator and are evaluated once per output row. Adjacent rules (`PushDownFilterThroughRepeat/Window/PartitionTopN/SetOperation`, `PullUpPredicates` and its consumers) have the same class of bug but are out of scope for this PR and will be addressed separately. --- .../rewrite/CollectFilterAboveConsumer.java | 3 + .../PushDownFilterThroughGenerate.java | 3 +- .../CollectFilterAboveConsumerTest.java | 87 ++++++++++++++++++ .../PushDownFilterThroughGenerateTest.java | 92 +++++++++++++++++++ ...er_above_consumer_with_unique_function.out | 24 +++++ ..._through_generate_with_unique_function.out | 26 ++++++ ...above_consumer_with_unique_function.groovy | 49 ++++++++++ ...rough_generate_with_unique_function.groovy | 49 ++++++++++ 8 files changed, 332 insertions(+), 1 deletion(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerateTest.java create mode 100644 regression-test/data/nereids_rules_p0/unique_function/collect_filter_above_consumer_with_unique_function.out create mode 100644 regression-test/data/nereids_rules_p0/unique_function/push_down_filter_through_generate_with_unique_function.out create mode 100644 regression-test/suites/nereids_rules_p0/unique_function/collect_filter_above_consumer_with_unique_function.groovy create mode 100644 regression-test/suites/nereids_rules_p0/unique_function/push_down_filter_through_generate_with_unique_function.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java index d2e61f9e665d6d..62f29f40441626 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java @@ -38,6 +38,9 @@ public Rule build() { LogicalCTEConsumer cteConsumer = filter.child(); Set exprs = filter.getConjuncts(); for (Expression expr : exprs) { + if (expr.containsUniqueFunction()) { + continue; + } Expression rewrittenExpr = expr.rewriteUp(e -> { if (e instanceof Slot) { return cteConsumer.getProducerSlot((Slot) e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java index 9e2c980acb5311..851aba0e21ae07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java @@ -49,7 +49,8 @@ public Rule build() { Set remainPredicates = Sets.newHashSet(); filter.getConjuncts().forEach(conjunct -> { Set conjunctSlots = conjunct.getInputSlots(); - if (!conjunctSlots.isEmpty() && childOutputs.containsAll(conjunctSlots)) { + if (!conjunctSlots.isEmpty() && childOutputs.containsAll(conjunctSlots) + && !conjunct.containsUniqueFunction()) { pushDownPredicates.add(conjunct); } else { remainPredicates.add(conjunct); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java new file mode 100644 index 00000000000000..a12704bd238284 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.trees.expressions.CTEId; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThan; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Random; +import org.apache.doris.nereids.trees.expressions.literal.DoubleLiteral; +import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; +import org.apache.doris.nereids.trees.plans.RelationId; +import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.util.MemoTestUtils; +import org.apache.doris.nereids.util.PlanConstructor; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.ImmutableSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Set; + +/** + * Tests for {@link CollectFilterAboveConsumer}. + */ +class CollectFilterAboveConsumerTest { + + /** + * Filter has two conjuncts: one deterministic, one containing a unique (non-idempotent) function. + * After applying the rule, only the deterministic conjunct should be collected into + * consumerIdToFilter. Collecting the unique-function conjunct would cause it to be pushed into + * the CTE producer while still remaining on the consumer side, resulting in double execution + * (bug DORIS-25202). + */ + @Test + void testDoNotCollectUniqueFunctionConjunct() { + ConnectContext connectContext = new ConnectContext(); + LogicalOlapScan producerPlan = PlanConstructor.newLogicalOlapScan(0, "t1", 0); + LogicalCTEConsumer consumer = new LogicalCTEConsumer( + PlanConstructor.getNextRelationId(), new CTEId(1), "cte1", producerPlan); + + Slot idSlot = consumer.getOutput().get(0); + Expression deterministic = new EqualTo(idSlot, new IntegerLiteral(1)); + Expression uniqueFn = new GreaterThan(new Random(), new DoubleLiteral(0.5)); + LogicalFilter filter = new LogicalFilter<>( + ImmutableSet.of(deterministic, uniqueFn), consumer); + + CascadesContext cascadesContext = MemoTestUtils.createCascadesContext(connectContext, filter); + Rule rule = new CollectFilterAboveConsumer().build(); + rule.transform(filter, cascadesContext); + + Map> collected = cascadesContext.getStatementContext() + .getConsumerIdToFilters(); + Set filters = collected.get(consumer.getRelationId()); + Assertions.assertNotNull(filters, "deterministic conjunct must be collected"); + // Only the deterministic conjunct (after slot-rewriting to producer slot) should be present, + // with the unique-function conjunct skipped. + Assertions.assertEquals(1, filters.size(), + "exactly one conjunct (the deterministic one) should be collected, " + + "unique-function conjunct must NOT be collected"); + Expression onlyCollected = filters.iterator().next(); + Assertions.assertFalse(onlyCollected.containsUniqueFunction(), + "collected conjunct must not contain a unique function"); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerateTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerateTest.java new file mode 100644 index 00000000000000..8b1c2db5bb5f6b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerateTest.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.functions.generator.Unnest; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Random; +import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.types.IntegerType; +import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.nereids.util.PlanConstructor; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link PushDownFilterThroughGenerate}. + */ +class PushDownFilterThroughGenerateTest implements MemoPatternMatchSupported { + + private final LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0); + + /** Deterministic filter whose slots only reference generate's child output should be pushed down. */ + @Test + void testPushDownDeterministicFilter() { + SlotReference idSlot = (SlotReference) scan.getOutput().get(0); + SlotReference genOut = new SlotReference("g1", IntegerType.INSTANCE); + Unnest gen = new Unnest(new IntegerLiteral(1)); + LogicalGenerate generate = new LogicalGenerate<>( + ImmutableList.of(gen), ImmutableList.of(genOut), scan); + + Expression predicate = new EqualTo(idSlot, new IntegerLiteral(1)); + LogicalFilter filter = new LogicalFilter<>(ImmutableSet.of(predicate), generate); + + PlanChecker.from(new ConnectContext(), filter) + .applyTopDown(new PushDownFilterThroughGenerate()) + .matchesFromRoot( + logicalGenerate( + logicalFilter(logicalOlapScan()) + .when(f -> f.getConjuncts().contains(predicate)) + ) + ); + } + + /** Filter containing a unique (non-idempotent) function must NOT be pushed through generate. */ + @Test + void testDoNotPushUniqueFunctionThroughGenerate() { + SlotReference idSlot = (SlotReference) scan.getOutput().get(0); + SlotReference genOut = new SlotReference("g1", IntegerType.INSTANCE); + Unnest gen = new Unnest(new IntegerLiteral(1)); + LogicalGenerate generate = new LogicalGenerate<>( + ImmutableList.of(gen), ImmutableList.of(genOut), scan); + + // random() depends only on idSlot-like pattern but contains a UniqueFunction. + // Use EqualTo(id, random()) so the conjunct's input slots are a subset of scan output + // (would otherwise satisfy the push-down condition). + Expression predicate = new EqualTo(idSlot, new Random()); + LogicalFilter filter = new LogicalFilter<>(ImmutableSet.of(predicate), generate); + + PlanChecker.from(new ConnectContext(), filter) + .applyTopDown(new PushDownFilterThroughGenerate()) + .matchesFromRoot( + logicalFilter( + logicalGenerate(logicalOlapScan()) + ).when(f -> f.getConjuncts().contains(predicate)) + ); + } +} diff --git a/regression-test/data/nereids_rules_p0/unique_function/collect_filter_above_consumer_with_unique_function.out b/regression-test/data/nereids_rules_p0/unique_function/collect_filter_above_consumer_with_unique_function.out new file mode 100644 index 00000000000000..0b6c440c0268f0 --- /dev/null +++ b/regression-test/data/nereids_rules_p0/unique_function/collect_filter_above_consumer_with_unique_function.out @@ -0,0 +1,24 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !collect_filter_above_consumer_unique_1 -- +PhysicalCteAnchor ( cteId=CTEId#0 ) +--PhysicalCteProducer ( cteId=CTEId#0 ) +----PhysicalOlapScan[t1] +--PhysicalResultSink +----PhysicalUnion +------filter((random() > 0.1)) +--------PhysicalCteConsumer ( cteId=CTEId#0 ) +------filter((random() > 0.2)) +--------PhysicalCteConsumer ( cteId=CTEId#0 ) + +-- !collect_filter_above_consumer_unique_2 -- +PhysicalCteAnchor ( cteId=CTEId#0 ) +--PhysicalCteProducer ( cteId=CTEId#0 ) +----filter((cte1.id > 10)) +------PhysicalOlapScan[t1] +--PhysicalResultSink +----PhysicalUnion +------filter((cte1.id > 10) and (random() > 0.1)) +--------PhysicalCteConsumer ( cteId=CTEId#0 ) +------filter((cte1.id > 100) and (random() > 0.2)) +--------PhysicalCteConsumer ( cteId=CTEId#0 ) + diff --git a/regression-test/data/nereids_rules_p0/unique_function/push_down_filter_through_generate_with_unique_function.out b/regression-test/data/nereids_rules_p0/unique_function/push_down_filter_through_generate_with_unique_function.out new file mode 100644 index 00000000000000..a2506f5d8a7673 --- /dev/null +++ b/regression-test/data/nereids_rules_p0/unique_function/push_down_filter_through_generate_with_unique_function.out @@ -0,0 +1,26 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !filter_through_generate_unique_1 -- +PhysicalResultSink +--PhysicalProject[tmp1.e1] +----filter((random() > 0.1)) +------PhysicalGenerate +--------PhysicalProject[1 AS `1`] +----------PhysicalStorageLayerAggregate[t1] + +-- !filter_through_generate_unique_2 -- +PhysicalResultSink +--PhysicalProject[tmp1.e1] +----filter(((cast(id as BIGINT) + random(1, 100)) > 5)) +------PhysicalGenerate +--------PhysicalProject[t1.id] +----------filter((t1.id > 10)) +------------PhysicalOlapScan[t1] + +-- !filter_through_generate_unique_3 -- +PhysicalResultSink +--PhysicalProject[tmp1.e1] +----filter(((cast(id as BIGINT) + random(1, 100)) > 5)) +------PhysicalGenerate +--------PhysicalProject[t1.id] +----------PhysicalOlapScan[t1] + diff --git a/regression-test/suites/nereids_rules_p0/unique_function/collect_filter_above_consumer_with_unique_function.groovy b/regression-test/suites/nereids_rules_p0/unique_function/collect_filter_above_consumer_with_unique_function.groovy new file mode 100644 index 00000000000000..a31a665550596c --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/unique_function/collect_filter_above_consumer_with_unique_function.groovy @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('collect_filter_above_consumer_with_unique_function') { + sql 'SET enable_nereids_planner=true' + sql 'SET runtime_filter_mode=OFF' + sql 'SET enable_fallback_to_original_planner=false' + // keep CTE materialized so CollectFilterAboveConsumer actually runs. + sql 'SET inline_cte_referenced_threshold=0' + sql "SET ignore_shape_nodes='PhysicalDistribute'" + sql "SET detail_shape_nodes='PhysicalProject'" + sql 'SET disable_nereids_rules=PRUNE_EMPTY_PARTITION' + + // filter with unique function above a CTE consumer must NOT be collected and + // pushed into the CTE producer; otherwise the producer would be re-filtered + // and other consumers would see inconsistent rows. + qt_collect_filter_above_consumer_unique_1 ''' + explain shape plan + with cte1 as (select id, msg from t1) + select * from cte1 where rand() > 0.1 + union all + select * from cte1 where rand() > 0.2 + ''' + + // mixed conjuncts: deterministic parts can be collected and pushed into the CTE producer, + // unique parts must stay above the consumer only. Use predicates that survive simplification + // so the collected-into-producer OR shape is visible. + qt_collect_filter_above_consumer_unique_2 ''' + explain shape plan + with cte1 as (select id, msg from t1) + select * from cte1 where id > 10 and rand() > 0.1 + union all + select * from cte1 where id > 100 and rand() > 0.2 + ''' +} diff --git a/regression-test/suites/nereids_rules_p0/unique_function/push_down_filter_through_generate_with_unique_function.groovy b/regression-test/suites/nereids_rules_p0/unique_function/push_down_filter_through_generate_with_unique_function.groovy new file mode 100644 index 00000000000000..7f602cde34a715 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/unique_function/push_down_filter_through_generate_with_unique_function.groovy @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('push_down_filter_through_generate_with_unique_function') { + sql 'SET enable_nereids_planner=true' + sql 'SET runtime_filter_mode=OFF' + sql 'SET enable_fallback_to_original_planner=false' + sql "SET ignore_shape_nodes='PhysicalDistribute'" + sql "SET detail_shape_nodes='PhysicalProject'" + sql 'SET disable_nereids_rules=PRUNE_EMPTY_PARTITION' + + // unique function in filter must NOT be pushed below LogicalGenerate, + // otherwise call count / state of the unique function changes semantically. + qt_filter_through_generate_unique_1 ''' + explain shape plan + select e1 from t1 lateral view explode_numbers(3) tmp1 as e1 + where rand() > 0.1 + ''' + + // mixed: deterministic conjunct pushable, unique conjunct in same WHERE must stay above generate. + // Here `t1.id > 10` should be pushed below generate while `t1.id + rand(1,100) > 5` (a conjunct + // that does reference a base slot, so old code would have pushed it) must stay above. + qt_filter_through_generate_unique_2 ''' + explain shape plan + select e1 from t1 lateral view explode_numbers(3) tmp1 as e1 + where t1.id > 10 and t1.id + rand(1,100) > 5 + ''' + + // unique function combined with base slot in the same conjunct. + qt_filter_through_generate_unique_3 ''' + explain shape plan + select e1 from t1 lateral view explode_numbers(3) tmp1 as e1 + where t1.id + rand(1,100) > 5 + ''' +}