Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Instant;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
Expand All @@ -59,7 +58,6 @@
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptTable.ViewExpander;
Expand Down Expand Up @@ -551,12 +549,26 @@ public RelNode visit(TableScan scan) {
}
}

/** Try to optimize the plan by using HepPlanner */
private static final List<RelOptRule> hepRuleList =
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryan-gh-bot just for my knowledge why are we removing it?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That hepRuleList field only existed as the argument to the old addRuleCollection(hepRuleList) call. Once the builder switched to two sequential addRuleInstance(...) calls (which take single rules directly), nothing else referenced the list, so keeping it would just be dead code — and dropping it also lets the java.util.List and org.apache.calcite.plan.RelOptRule imports go away. The Javadoc that used to sit on the field moved up onto HEP_PROGRAM, which is now the thing whose ordering actually matters.

List.of(FilterMergeRule.Config.DEFAULT.toRule(), PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE);

/**
* Try to optimize the plan by using HepPlanner.
*
* <p>Rule order matters: {@link PPLSimplifyDedupRule#DEDUP_SIMPLIFY_RULE} must run to fixpoint
* before {@link FilterMergeRule}. The simplify rule's bottom operand only matches a pure {@code
* IS NOT NULL} (or AND-of-{@code IS NOT NULL}) bucket-non-null filter; if {@code FilterMergeRule}
* runs first when a user {@code WHERE} sits below the synthetic {@code IS NOT NULL} filter that
* PPL emits as part of {@code dedup}, the two adjacent filters are merged into a single filter
* whose condition includes the user predicate, the simplify rule's predicate fails, no {@link
* org.opensearch.sql.calcite.plan.rel.LogicalDedup} is produced, and dedup pushdown to the
* OpenSearch storage engine is silently disabled. Using separate {@code addRuleInstance} calls
* (rather than {@code addRuleCollection}) enforces deterministic ordering: dedup simplification
* fires first against the original adjacent-filter shape, then any remaining adjacent filters are
* merged.
*/
private static final HepProgram HEP_PROGRAM =
new HepProgramBuilder().addRuleCollection(hepRuleList).build();
new HepProgramBuilder()
.addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE)
.addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule())
.build();

public static RelNode optimize(RelNode plan, CalcitePlanContext context) {
Util.discard(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
import org.opensearch.sql.calcite.SysLimit;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.ExpressionEvaluationException;
Expand Down Expand Up @@ -110,6 +111,20 @@ public RelNode getRelNode(String ppl) {
return root;
}

/**
* Get the root RelNode of the given PPL query after running the production HEP program from
* {@code CalciteToolsHelper}. Use this in regression tests that exercise rules registered in the
* production HEP program (e.g. {@code PPLSimplifyDedupRule}) — those rules need to see the raw
* planner output, not the post-FilterMerge form returned by {@link #getRelNode(String)}.
*/
public RelNode getRelNodeAfterCalciteHep(String ppl) {
CalcitePlanContext context = createBuilderContext();
Query query = (Query) plan(pplParser, ppl);
planTransformer.analyze(query.getPlan(), context);
RelNode root = context.relBuilder.build();
return CalciteToolsHelper.optimize(root, context);
}

private RelNode mergeAdjacentFilters(RelNode relNode) {
HepProgram program =
new HepProgramBuilder().addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@

package org.opensearch.sql.ppl.calcite;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.test.CalciteAssert;
import org.junit.Test;
import org.opensearch.sql.calcite.plan.rel.LogicalDedup;
import org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule;

public class CalcitePPLDedupTest extends CalcitePPLAbstractTest {

Expand Down Expand Up @@ -353,4 +361,72 @@ public void testSortFieldProjectedAwayBeforeDedup() {
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);
}

/**
* Regression test for issue #7: when a user {@code where} sits below {@code dedup}, the HEP
* program in {@code CalciteToolsHelper} must still produce a {@link LogicalDedup}. Before the
* fix, both rules were registered via {@code addRuleCollection}, so {@code FilterMergeRule} could
* fire ahead of {@code PPLSimplifyDedupRule} and merge the user predicate into the
* bucket-non-null filter; the simplify rule's bottom operand then rejected the merged condition
* (it only accepts pure {@code IS NOT NULL}/AND-of-{@code IS NOT NULL}), no {@code LogicalDedup}
* was produced, and dedup pushdown to the OpenSearch storage engine was silently disabled. The
* fix is to register the two rules with separate {@code addRuleInstance} calls in the order
* simplify-dedup first (to fixpoint), then filter-merge.
*/
@Test
public void testWhereThenDedupProducesLogicalDedup() {
// Use a where predicate on a DIFFERENT column from the dedup column. With the same column,
// Calcite's RexSimplify can fold AND(IS_NOT_NULL(x), >(x, c)) down to >(x, c), masking the
// bug. The issue's reproducer (where on @timestamp, dedup on namespace) hits this exact
// shape.
String ppl = "source=EMP | where SAL > 1000 | dedup 1 DEPTNO | fields DEPTNO";
RelNode optimized = getRelNodeAfterCalciteHep(ppl);
String optimizedPlan = optimized.explain();
assertTrue(
"where + dedup must produce a LogicalDedup so OpenSearch DedupPushdownRule can match;"
+ " actual plan was:\n"
+ optimizedPlan,
optimizedPlan.contains("LogicalDedup"));
// The window-form leftover would indicate the simplify rule did not fire — assert it is gone.
assertFalse(
"ROW_NUMBER window must be consumed by PPLSimplifyDedupRule when where + dedup are"
+ " combined; actual plan was:\n"
+ optimizedPlan,
optimizedPlan.contains("ROW_NUMBER"));
}

/**
* Adversarial regression test: simulates the pathological order described in issue #7 by forcing
* FilterMergeRule to run to fixpoint BEFORE PPLSimplifyDedupRule. This documents the failure mode
* the fix in {@code CalciteToolsHelper} prevents — once the bucket-non-null filter has been
* merged with the user {@code WHERE}, {@code mayBeFilterFromBucketNonNull} can never accept the
* combined condition, so {@code PPLSimplifyDedupRule} is permanently unable to produce a {@code
* LogicalDedup}. The production fix enforces order at the program level (sequential {@code
* addRuleInstance} calls), making this hazard unreachable.
*/
@Test
public void testFilterMergeBeforeSimplifyDedupBreaksPattern() {
String ppl = "source=EMP | where SAL > 1000 | dedup 1 DEPTNO | fields DEPTNO";
// getRelNode already runs FilterMergeRule on the raw plan, simulating the pathological
// schedule where FilterMergeRule fires before PPLSimplifyDedupRule.
RelNode mergedFirst = getRelNode(ppl);
HepProgram simplifyOnly =
new HepProgramBuilder().addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE).build();
HepPlanner planner = new HepPlanner(simplifyOnly);
planner.setRoot(mergedFirst);
RelNode result = planner.findBestExp();
String plan = result.explain();
assertFalse(
"If FilterMergeRule runs before PPLSimplifyDedupRule, the simplify rule must NOT recover"
+ " — the merged AND(IS_NOT_NULL, user_predicate) filter fails the bucket-non-null"
+ " predicate. This documents why the production HEP program enforces ordering via"
+ " separate addRuleInstance calls (PPLSimplifyDedupRule first, then FilterMergeRule)."
+ " Actual plan was:\n"
+ plan,
plan.contains("LogicalDedup"));
assertTrue(
"Plan should still contain ROW_NUMBER window form when simplify fails. Actual plan was:\n"
+ plan,
plan.contains("ROW_NUMBER"));
}
}
Loading