Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix union all rewrite bugs in pulling up predicates and add materialized_view_union_rewrite_mode param #42229

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String NESTED_MV_REWRITE_MAX_LEVEL = "nested_mv_rewrite_max_level";
public static final String ENABLE_MATERIALIZED_VIEW_REWRITE = "enable_materialized_view_rewrite";
public static final String ENABLE_MATERIALIZED_VIEW_UNION_REWRITE = "enable_materialized_view_union_rewrite";
public static final String MATERIALIZED_VIEW_UNION_REWRITE_MODE = "materialized_view_union_rewrite_mode";
public static final String ENABLE_MATERIALIZED_VIEW_REWRITE_PARTITION_COMPENSATE =
"enable_materialized_view_rewrite_partition_compensate";

Expand Down Expand Up @@ -1494,6 +1495,15 @@ public String getConnectorSinkCompressionCodec() {
@VarAttr(name = ENABLE_MATERIALIZED_VIEW_UNION_REWRITE)
private boolean enableMaterializedViewUnionRewrite = true;

/**
* <= 0: default mode, only try to union all rewrite by logical plan tree after partition compensate
* 1: eager mode v1, try to pull up query's filter after union when query's output matches mv's define query
* which will increase union rewrite's ability.
* 2: eager mode v2, try to pull up query's filter after union as much as possible.
*/
@VarAttr(name = MATERIALIZED_VIEW_UNION_REWRITE_MODE)
private int materializedViewUnionRewriteMode = 0;

/**
* Whether to compensate partition predicates in mv rewrite, see
* <code>Materialization#isCompensatePartitionPredicate</code> for more details.
Expand Down Expand Up @@ -2922,6 +2932,14 @@ public void setEnableMaterializedViewUnionRewrite(boolean enableMaterializedView
this.enableMaterializedViewUnionRewrite = enableMaterializedViewUnionRewrite;
}

public int getMaterializedViewUnionRewriteMode() {
return materializedViewUnionRewriteMode;
}

public void setMaterializedViewUnionRewriteMode(int materializedViewUnionRewriteMode) {
this.materializedViewUnionRewriteMode = materializedViewUnionRewriteMode;
}

public boolean isEnableSyncMaterializedViewRewrite() {
return enableSyncMaterializedViewRewrite;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code appears to be a part of a larger Java class focused on managing session variables, specifically for handling materialized view configurations in a database or data processing application. Without the full context, identifying a "bug" can be challenging since it often depends on how these variables and methods interact with the rest of the application. However, based on standard coding practices and the changes highlighted in the diff, there isn't an immediately obvious "risky bug" in terms of syntax or Java language rules.

Potential areas of concern that are more contextual or logical rather than syntactical might include:

  1. Ensuring proper synchronization or thread safety when mutating session variables - If this class is accessed by multiple threads, unsynchronized access to the materializedViewUnionRewriteMode variable could lead to inconsistent state.
  2. Validation of materializedViewUnionRewriteMode values - The comments describe valid ranges and their meanings, but no validation logic is present in the setter method. Setting an invalid value might lead to undefined behavior elsewhere in the application.

Assuming validation of input values for the materializedViewUnionRewriteMode field might be beneficial to prevent potential issues, a constructive suggestion would focus on adding such validation:

The most risky bug in this code is:
Lack of validation for materializedViewUnionRewriteMode values which may lead to unintended application behavior if invalid values are set.

You can modify the code like this:

public void setMaterializedViewUnionRewriteMode(int materializedViewUnionRewriteMode) {
    if (materializedViewUnionRewriteMode < 0 || materializedViewUnionRewriteMode > 2) {
        throw new IllegalArgumentException("Invalid value for materializedViewUnionRewriteMode. Valid values are 0 (default mode), 1 (eager mode v1), and 2 (eager mode v2).");
    }
    this.materializedViewUnionRewriteMode = materializedViewUnionRewriteMode;
}

This change introduces a basic check to ensure that only valid values as described in the comments are accepted, helping to avoid potential misconfigurations or logical errors in handling the union rewrite modes.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ public class QueryDebugOptions {
@SerializedName(value = "maxRefreshMaterializedViewRetryNum")
private int maxRefreshMaterializedViewRetryNum = 1;

@SerializedName(value = "enableMVEagerUnionAllRewrite")
private boolean enableMVEagerUnionAllRewrite = false;

@SerializedName(value = "enableQueryTraceLog")
private boolean enableQueryTraceLog = false;

Expand All @@ -57,14 +54,6 @@ public void setMaxRefreshMaterializedViewRetryNum(int maxRefreshMaterializedView
this.maxRefreshMaterializedViewRetryNum = maxRefreshMaterializedViewRetryNum;
}

public boolean isEnableMVEagerUnionAllRewrite() {
return enableMVEagerUnionAllRewrite;
}

public void setEnableMVEagerUnionAllRewrite(boolean enableMVEagerUnionAllRewrite) {
this.enableMVEagerUnionAllRewrite = enableMVEagerUnionAllRewrite;
}

public boolean isEnableQueryTraceLog() {
return enableQueryTraceLog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.google.common.collect.Lists;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.operator.Operator;
import com.starrocks.sql.optimizer.operator.logical.LogicalFilterOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalSetOperator;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
Expand All @@ -28,15 +30,24 @@
import java.util.Map;

public class PushDownPredicateSetRule {
static List<OptExpression> process(OptExpression input, OptimizerContext context) {
public static List<OptExpression> process(OptExpression input, OptimizerContext context) {
LogicalFilterOperator filterOperator = (LogicalFilterOperator) input.getOp();

OptExpression setOptExpression = input.getInputs().get(0);
return doProcess(filterOperator, setOptExpression, true);
}

public static List<OptExpression> doProcess(LogicalFilterOperator filterOperator,
OptExpression setOptExpression,
boolean addNewFilterOp) {
LogicalSetOperator setOperator = (LogicalSetOperator) setOptExpression.getOp();
ScalarOperator filterPredicate = filterOperator.getPredicate();
if (setOperator.getProjection() != null) {
ReplaceColumnRefRewriter rewriter = new ReplaceColumnRefRewriter(setOperator.getProjection().getColumnRefMap());
filterPredicate = rewriter.rewrite(filterPredicate);
}

for (int setChildIdx = 0; setChildIdx < setOptExpression.getInputs().size(); ++setChildIdx) {
Map<ColumnRefOperator, ScalarOperator> operatorMap = new HashMap<>();

for (int i = 0; i < setOperator.getOutputColumnRefOp().size(); ++i) {
/*
* getChildOutputColumns records the output list of child children.
Expand All @@ -48,13 +59,23 @@ static List<OptExpression> process(OptExpression input, OptimizerContext context
}

ReplaceColumnRefRewriter rewriter = new ReplaceColumnRefRewriter(operatorMap);
ScalarOperator rewriteExpr = rewriter.rewrite(filterOperator.getPredicate());
ScalarOperator rewriteExpr = rewriter.rewrite(filterPredicate);

OptExpression filterOpExpression =
OptExpression.create(new LogicalFilterOperator(rewriteExpr), setOptExpression.inputAt(setChildIdx));
setOptExpression.setChild(setChildIdx, filterOpExpression);
}
Operator child = setOptExpression.inputAt(setChildIdx).getOp();
if (child.getProjection() != null) {
rewriteExpr = new ReplaceColumnRefRewriter(child.getProjection().getColumnRefMap())
.rewrite(rewriteExpr);
}

if (addNewFilterOp) {
OptExpression filterOpExpression =
OptExpression.create(new LogicalFilterOperator(rewriteExpr), setOptExpression.inputAt(setChildIdx));
setOptExpression.setChild(setChildIdx, filterOpExpression);
} else {
ScalarOperator finalPredicate = Utils.compoundAnd(child.getPredicate(), rewriteExpr);
child.setPredicate(finalPredicate);
}
}
return Lists.newArrayList(setOptExpression);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ protected OptExpression createUnion(OptExpression queryInput,
unionExpr, newProjection);
// Add extra union all predicates above union all operator.
if (rewriteContext.getUnionRewriteQueryExtraPredicate() != null) {
addUnionAllExtraPredicate(result.getOp(), rewriteContext.getUnionRewriteQueryExtraPredicate());
addUnionAllExtraPredicate(result, rewriteContext.getUnionRewriteQueryExtraPredicate());
}
deriveLogicalProperty(result);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,23 @@ public class MVColumnPruner {
private ColumnRefSet requiredOutputColumns;

public OptExpression pruneColumns(OptExpression queryExpression) {
Projection projection = queryExpression.getOp().getProjection();
if (queryExpression.getOp() instanceof LogicalFilterOperator) {
OptExpression newQueryOptExpression = doPruneColumns(queryExpression.inputAt(0));
Operator filterOp = queryExpression.getOp();
Operator.Builder opBuilder = OperatorBuilderFactory.build(filterOp);
opBuilder.withOperator(filterOp);
return OptExpression.create(opBuilder.build(), newQueryOptExpression);
} else {
return doPruneColumns(queryExpression);
}
}

public OptExpression doPruneColumns(OptExpression project) {
Projection projection = project.getOp().getProjection();
// OptExpression after mv rewrite must have projection.
Preconditions.checkState(projection != null);
requiredOutputColumns = new ColumnRefSet(projection.getOutputColumns());
return queryExpression.getOp().accept(new ColumnPruneVisitor(), queryExpression, null);
return project.getOp().accept(new ColumnPruneVisitor(), project, null);
}

// Prune columns by top-down, only support SPJG/union operators.
LiShuMing marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import com.starrocks.qe.SessionVariable;
import com.starrocks.sql.common.ErrorType;
import com.starrocks.sql.common.PermutationGenerator;
import com.starrocks.sql.common.QueryDebugOptions;
import com.starrocks.sql.common.StarRocksPlannerException;
import com.starrocks.sql.optimizer.ExpressionContext;
import com.starrocks.sql.optimizer.MaterializationContext;
Expand All @@ -63,10 +62,12 @@
import com.starrocks.sql.optimizer.operator.OperatorBuilderFactory;
import com.starrocks.sql.optimizer.operator.Projection;
import com.starrocks.sql.optimizer.operator.logical.LogicalAggregationOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalFilterOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalJoinOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalScanOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalSetOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalUnionOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalViewScanOperator;
import com.starrocks.sql.optimizer.operator.scalar.BinaryPredicateOperator;
Expand All @@ -80,6 +81,7 @@
import com.starrocks.sql.optimizer.rewrite.ReplaceColumnRefRewriter;
import com.starrocks.sql.optimizer.rewrite.scalar.MvNormalizePredicateRule;
import com.starrocks.sql.optimizer.rule.mv.JoinDeriveContext;
import com.starrocks.sql.optimizer.rule.transformation.PushDownPredicateSetRule;
import org.apache.commons.collections4.ListUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -112,6 +114,8 @@ public class MaterializedViewRewriter {
protected final OptimizerContext optimizerContext;
// Mark whether query's plan is rewritten by materialized view.
public static final String REWRITE_SUCCESS = "Rewrite Succeed";
public static final int UNION_REWRITE_EAGER_MODE_1 = 1;
public static final int UNION_REWRITE_EAGER_MODE_2 = 2;

private static final Map<JoinOperator, List<JoinOperator>> JOIN_COMPATIBLE_MAP =
ImmutableMap.<JoinOperator, List<JoinOperator>>builder()
Expand Down Expand Up @@ -1544,6 +1548,7 @@ private boolean isPullUpQueryPredicate(ScalarOperator predicate,
// t where t.a < 1
// If you want to use this feature, you can use it by the setting:
// set query_debug_options = "{'enableMVEagerUnionAllRewrite':true}";
// use union-all rewrite eagerly if union-all rewrite can be used.
if (queryOutputColumnRefs != null && queryOutputColumnRefs.contains(col)) {
return true;
}
Expand All @@ -1562,8 +1567,14 @@ private PredicateSplit getUnionRewriteQueryCompensation(RewriteContext rewriteCo
if (mvCompensationToQuery != null) {
return mvCompensationToQuery;
}
SessionVariable sessionVariable = optimizerContext.getSessionVariable();
int unionRewriteMode = sessionVariable.getMaterializedViewUnionRewriteMode();
if (unionRewriteMode != UNION_REWRITE_EAGER_MODE_1 && unionRewriteMode != UNION_REWRITE_EAGER_MODE_2) {
return null;
}

logMVRewrite(mvRewriteContext, "Try to pull up query's predicates to make possible for union rewrite");
logMVRewrite(mvRewriteContext, "Try to pull up query's predicates to make possible for union rewrite, " +
"unionRewriteMode:{}", unionRewriteMode);
// try to pull up query's predicates to make possible for rewrite from mv to query.
PredicateSplit mvPredicateSplit = rewriteContext.getMvPredicateSplit();
PredicateSplit queryPredicateSplit = rewriteContext.getQueryPredicateSplit();
Expand All @@ -1580,15 +1591,13 @@ private PredicateSplit getUnionRewriteQueryCompensation(RewriteContext rewriteCo
ColumnRefSet queryOnPredicateUsedColRefs = Optional.ofNullable(Utils.compoundAnd(queryOnPredicates))
.map(x -> x.getUsedColumns())
.orElse(new ColumnRefSet());
// use union-all rewrite eagerly if union-all rewrite can be used.
QueryDebugOptions debugOptions = optimizerContext.getSessionVariable().getQueryDebugOptions();
ColumnRefSet queryOutputColumnRefs = debugOptions.isEnableMVEagerUnionAllRewrite() ?

ColumnRefSet queryOutputColumnRefs = unionRewriteMode == UNION_REWRITE_EAGER_MODE_2 ?
rewriteContext.getQueryExpression().getOutputColumns() : null;
Set<ScalarOperator> queryExtraPredicates = queryPredicates.stream()
.filter(pred -> isPullUpQueryPredicate(pred, mvPredicateUsedColRefs,
queryOnPredicateUsedColRefs, queryOutputColumnRefs))
.collect(Collectors.toSet());

if (queryExtraPredicates.isEmpty()) {
return null;
}
Expand Down Expand Up @@ -2106,20 +2115,28 @@ protected OptExpression createUnion(OptExpression queryInput, OptExpression view

// Add extra union all predicates above union all operator.
if (rewriteContext.getUnionRewriteQueryExtraPredicate() != null) {
addUnionAllExtraPredicate(result.getOp(), rewriteContext.getUnionRewriteQueryExtraPredicate());
result = addUnionAllExtraPredicate(result, rewriteContext.getUnionRewriteQueryExtraPredicate());
}

deriveLogicalProperty(result);
return result;
}

protected void addUnionAllExtraPredicate(Operator unionOp,
ScalarOperator extraPredicate) {
Preconditions.checkState(unionOp.getProjection() != null);
ScalarOperator origPredicate = unionOp.getPredicate();
ReplaceColumnRefRewriter rewriter = new ReplaceColumnRefRewriter(unionOp.getProjection().getColumnRefMap());
ScalarOperator rewrittenExtraPredicate = rewriter.rewrite(extraPredicate);
unionOp.setPredicate(Utils.compoundAnd(origPredicate, rewrittenExtraPredicate));
protected OptExpression addUnionAllExtraPredicate(OptExpression result,
ScalarOperator extraPredicate) {
Operator op = result.getOp();
if (op instanceof LogicalSetOperator) {
LogicalFilterOperator filter = new LogicalFilterOperator(extraPredicate);
// TODO: Refactor this so can use PUSH_DOWN_PREDICATE rule set after mv rewrite rule.
return PushDownPredicateSetRule.doProcess(filter, result, false).get(0);
} else {
// If op is aggregate operator, use setPredicate directly.
ScalarOperator origPredicate = op.getPredicate();
ReplaceColumnRefRewriter rewriter = new ReplaceColumnRefRewriter(op.getProjection().getColumnRefMap());
ScalarOperator rewrittenExtraPredicate = rewriter.rewrite(extraPredicate);
op.setPredicate(Utils.compoundAnd(origPredicate, rewrittenExtraPredicate));
return result;
}
}

protected EquationRewriter buildEquationRewriter(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Incorrect handling of unionRewriteMode variable leading to potential logical errors or incorrect query optimizations.

You can modify the code like this:

// Earlier in the code where unionRewriteMode is set based on session variables:
int unionRewriteMode = sessionVariable.getMaterializedViewUnionRewriteMode();
// Ensure the validation check for unionRewriteMode is correct and covers all necessary conditions, 
// and use it consistently throughout your implementation. For example:
if (!(unionRewriteMode >= 0 && unionRewriteMode <= 2)) {
    // Handle invalid mode appropriately
}

// When checking for `unionRewriteMode`, instead of blindly using it, introduce additional checks or logic as needed to ensure its value is correctly interpreted and applied.
// Example modification (hypothetical, depends on actual intended use of `unionRewriteMode`):
+        if (unionRewriteMode == 1 || (unionRewriteMode == 2 && someOtherCondition)) {
             return null;
         }

// Make sure the usage of `unionRewriteMode` further down the code respects its meaning and implements the intended logic correctly.

Note: Since the provided code snippet is quite specific and targeted at a particular system's internals (seemingly related to SQL optimization and query rewriting in StarRocks), a precise fix requires deeper knowledge of the entire system and how unionRewriteMode is meant to influence the behavior. The suggested modification intends to highlight the importance of validating and correctly interpreting session or context-specific settings before applying them within complex logical operations.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public void testPartitionPrune_SingleTable2() throws Exception {
// mv's partition columns is the same with the base table, mv table has no partition filter.
@Test
public void testPartitionPrune_SingleTable3() throws Exception {
starRocksAssert.getCtx().getSessionVariable().setEnableMaterializedViewUnionRewrite(true);
connectContext.getSessionVariable().setEnableMaterializedViewUnionRewrite(true);
connectContext.getSessionVariable().setMaterializedViewUnionRewriteMode(1);

String partial_mv_6 = "create materialized view partial_mv_6" +
" partition by c3" +
Expand Down Expand Up @@ -191,12 +192,14 @@ public void testPartitionPrune_SingleTable3() throws Exception {
" tabletRatio=6/6");

starRocksAssert.dropMaterializedView("partial_mv_6");
connectContext.getSessionVariable().setMaterializedViewUnionRewriteMode(0);
}

// MV has multi tables and partition columns.
@Test
public void testPartitionPrune_MultiTables1() throws Exception {
starRocksAssert.getCtx().getSessionVariable().setEnableMaterializedViewUnionRewrite(true);
connectContext.getSessionVariable().setEnableMaterializedViewUnionRewrite(true);
connectContext.getSessionVariable().setMaterializedViewUnionRewriteMode(1);

// test partition prune
String partial_mv_6 = "create materialized view partial_mv_6" +
Expand Down Expand Up @@ -287,14 +290,15 @@ public void testPartitionPrune_MultiTables1() throws Exception {
" partitions=1/5");

connectContext.getSessionVariable().setMaterializedViewRewriteMode("default");
connectContext.getSessionVariable().setMaterializedViewUnionRewriteMode(0);
starRocksAssert.dropMaterializedView("partial_mv_6");
}

// MV has no partitions and has joins.
@Test
public void testPartitionPrune_MultiTables2() throws Exception {
starRocksAssert.getCtx().getSessionVariable().setEnableMaterializedViewUnionRewrite(true);

connectContext.getSessionVariable().setMaterializedViewUnionRewriteMode(1);
// test partition prune
String partial_mv_6 = "create materialized view partial_mv_6" +
" distributed by hash(c1) as" +
Expand Down Expand Up @@ -375,6 +379,7 @@ public void testPartitionPrune_MultiTables2() throws Exception {
" partitions=1/5");

connectContext.getSessionVariable().setMaterializedViewRewriteMode("default");
connectContext.getSessionVariable().setMaterializedViewUnionRewriteMode(0);
starRocksAssert.dropMaterializedView("partial_mv_6");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class MvRefreshAndRewriteIcebergTest extends MvRewriteTestBase {
public static void beforeClass() throws Exception {
MvRewriteTestBase.beforeClass();
ConnectorPlanTestBase.mockCatalog(connectContext, MockIcebergMetadata.MOCKED_ICEBERG_CATALOG_NAME);
connectContext.getSessionVariable().setMaterializedViewUnionRewriteMode(1);
}

@Test
Expand Down