Skip to content

Commit

Permalink
[CALCITE-2018] Queries failed with AssertionError: rel has lower cost…
Browse files Browse the repository at this point in the history
… than best cost of subset. (Second attempt)

(cherry picked from commit 1c795d7)
  • Loading branch information
vvysotskyi committed Jan 4, 2018
1 parent e780b6f commit af04ef4
Show file tree
Hide file tree
Showing 18 changed files with 91 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
inputJavaType);

final RexBuilder rexBuilder = getCluster().getRexBuilder();
final RelMetadataQuery mq = RelMetadataQuery.instance();
final RelMetadataQuery mq = getCluster().getMetadataQuery();
final RelOptPredicateList predicates = mq.getPulledUpPredicates(child);
final RexSimplify simplify =
new RexSimplify(rexBuilder, predicates, false, RexUtil.EXECUTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public RelOptCost getCost(RelNode rel, RelMetadataQuery mq) {

@SuppressWarnings("deprecation")
public RelOptCost getCost(RelNode rel) {
final RelMetadataQuery mq = RelMetadataQuery.instance();
final RelMetadataQuery mq = rel.getCluster().getMetadataQuery();
return getCost(rel, mq);
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public static Set<RelOptTable> findTables(RelNode rel) {
*/
public static List<RelOptTable> findAllTables(RelNode rel) {
final Multimap<Class<? extends RelNode>, RelNode> nodes =
RelMetadataQuery.instance().getNodeTypes(rel);
rel.getCluster().getMetadataQuery().getNodeTypes(rel);
final List<RelOptTable> usedTables = new ArrayList<>();
for (Entry<Class<? extends RelNode>, Collection<RelNode>> e : nodes.asMap().entrySet()) {
if (TableScan.class.isAssignableFrom(e.getKey())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import java.util.Map;
import java.util.Set;

import static org.apache.calcite.rel.metadata.RelMdUtil.clearCache;

/**
* HepPlanner is a heuristic implementation of the {@link RelOptPlanner}
* interface.
Expand Down Expand Up @@ -812,6 +814,7 @@ private void contractVertices(
}
parentRel.replaceInput(i, preservedVertex);
}
clearCache(parentRel);
graph.removeEdge(parent, discardedVertex);
graph.addEdge(parent, preservedVertex);
updateVertex(parent, parentRel);
Expand Down Expand Up @@ -873,8 +876,9 @@ private RelNode buildFinalPlan(HepRelVertex vertex) {
}
child = buildFinalPlan((HepRelVertex) child);
rel.replaceInput(i, child);
rel.recomputeDigest();
}
clearCache(rel);
rel.recomputeDigest();

return rel;
}
Expand Down
23 changes: 19 additions & 4 deletions core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import org.apache.calcite.util.trace.CalciteTrace;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;

import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -311,21 +313,24 @@ void mergeWith(
assert otherSet.equivalentSet == null;
LOGGER.trace("Merge set#{} into set#{}", otherSet.id, id);
otherSet.equivalentSet = this;
final RelMetadataQuery mq = rel.getCluster().getMetadataQuery();

// remove from table
boolean existed = planner.allSets.remove(otherSet);
assert existed : "merging with a dead otherSet";

final Map<RelSubset, RelNode> changedSubsets = Maps.newIdentityHashMap();

// merge subsets
for (RelSubset otherSubset : otherSet.subsets) {
planner.ruleQueue.subsetImportances.remove(otherSubset);
RelSubset subset =
getOrCreateSubset(
otherSubset.getCluster(),
otherSubset.getTraitSet());
// collect RelSubset instances, whose best should be changed
if (otherSubset.bestCost.isLt(subset.bestCost)) {
subset.bestCost = otherSubset.bestCost;
subset.best = otherSubset.best;
changedSubsets.put(subset, otherSubset.best);
}
for (RelNode otherRel : otherSubset.getRels()) {
planner.reregister(this, otherRel);
Expand All @@ -335,6 +340,18 @@ void mergeWith(
// Has another set merged with this?
assert equivalentSet == null;

// calls propagateCostImprovements() for RelSubset instances,
// whose best should be changed to checks whether that
// subset's parents have gotten cheaper.
final Set<RelSubset> activeSet = new HashSet<>();

This comment has been minimized.

Copy link
@chunhui-shi

chunhui-shi Jan 11, 2018

My understanding is this change allow the lower cost from otherSubset to update the parents of merged 'this', if so, could we skip the next propagateCostImprovements at line 375?

This comment has been minimized.

Copy link
@vvysotskyi

vvysotskyi Jan 12, 2018

Author Collaborator

After calling planner.rename(parentRel); may happen scenario when RelNodes form equivalent RelSet were added into this RelSet. Therefore getParentRels() method at line 373 may return parent rels that weren't taken into account at this block of code, but this code should be executed before calling planner.rename(parentRel); since at this line are updated all rels which have a child in the other set

for (Map.Entry<RelSubset, RelNode> subsetBestPair : changedSubsets.entrySet()) {
RelSubset relSubset = subsetBestPair.getKey();
relSubset.propagateCostImprovements(
planner, mq, subsetBestPair.getValue(),
activeSet);
}
assert activeSet.isEmpty();

// Update all rels which have a child in the other set, to reflect the
// fact that the child has been renamed.
//
Expand All @@ -353,8 +370,6 @@ void mergeWith(
}

// Make sure the cost changes as a result of merging are propagated.
final Set<RelSubset> activeSet = new HashSet<>();
final RelMetadataQuery mq = rel.getCluster().getMetadataQuery();
for (RelNode parentRel : getParentRels()) {
final RelSubset parentSubset = planner.getSubset(parentRel);
parentSubset.propagateCostImprovements(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ Set<RelNode> getParents() {
final Set<RelNode> list = new LinkedHashSet<>();
for (RelNode parent : set.getParentRels()) {
for (RelSubset rel : inputSubsets(parent)) {
if (rel.set == set && traitSet.satisfies(rel.getTraitSet())) {
// see usage of this method in propagateCostImprovements0()
if (rel == this) {

This comment has been minimized.

Copy link
@chunhui-shi

chunhui-shi Jan 11, 2018

Should we keep the original condition, I feel that we may want to clean other rel that have the same set and traitSet as well.

This comment has been minimized.

Copy link
@vvysotskyi

vvysotskyi Jan 12, 2018

Author Collaborator

This change was done because this method is used only in propagateCostImprovements0(). There is assumed that it will return those RelNodes who has this RelSubset as the input. There is no sense to clear cache for other RelNodes or call propagateCostImprovements() on their RelSubsets since its cost was not changed.

This commit was cherry-picked from Calcite the pull request apache#552. It would be great if we move, or just continue our discussion there, so Calcite community may also take part since for now it is just ignored there.

list.add(parent);
}
}
Expand Down Expand Up @@ -337,12 +338,17 @@ void propagateCostImprovements0(VolcanoPlanner planner, RelMetadataQuery mq,

bestCost = cost;
best = rel;
// since best was changed, cached metadata for this subset should be removed
mq.clearCache(this);

// Lower cost means lower importance. Other nodes will change
// too, but we'll get to them later.
planner.ruleQueue.recompute(this);
for (RelNode parent : getParents()) {
// removes parent cached metadata since its input was changed
mq.clearCache(parent);
final RelSubset parentSubset = planner.getSubset(parent);
// parent subset will clear its cache in propagateCostImprovements0 method itself
parentSubset.propagateCostImprovements(planner, mq, parent,
activeSet);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.calcite.rel.metadata.RelMdUtil.clearCache;

/**
* VolcanoPlanner optimizes queries by transforming expressions selectively
* according to a dynamic programming algorithm.
Expand Down Expand Up @@ -1382,6 +1384,7 @@ private boolean fixUpInputs(RelNode rel) {
}
}
}
clearCache(rel);
return changeCount > 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.calcite.interpreter.BindableConvention;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptMaterialization;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
Expand Down Expand Up @@ -61,10 +61,10 @@ class CalciteMaterializer extends CalcitePrepareImpl.CalcitePreparingStmt {
CalciteMaterializer(CalcitePrepareImpl prepare,
CalcitePrepare.Context context,
CatalogReader catalogReader, CalciteSchema schema,
RelOptPlanner planner, SqlRexConvertletTable convertletTable) {
SqlRexConvertletTable convertletTable, RelOptCluster cluster) {
super(prepare, context, catalogReader, catalogReader.getTypeFactory(),
schema, EnumerableRel.Prefer.ANY, planner, BindableConvention.INSTANCE,
convertletTable);
schema, EnumerableRel.Prefer.ANY, BindableConvention.INSTANCE,
convertletTable, cluster);
}

/** Populates a materialization record, converting a table path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ private ParseResult convert_(Context context, String sql, boolean analyze,

final CalcitePreparingStmt preparingStmt =
new CalcitePreparingStmt(this, context, catalogReader, typeFactory,
context.getRootSchema(), null, planner, resultConvention,
createConvertletTable());
context.getRootSchema(), null, resultConvention, createConvertletTable(),
createCluster(planner, new RexBuilder(typeFactory)));
final SqlToRelConverter converter =
preparingStmt.getSqlToRelConverter(validator, catalogReader,
configBuilder.build());
Expand Down Expand Up @@ -731,8 +731,8 @@ <T> CalciteSignature<T> prepare2_(
: EnumerableConvention.INSTANCE;
final CalcitePreparingStmt preparingStmt =
new CalcitePreparingStmt(this, context, catalogReader, typeFactory,
context.getRootSchema(), prefer, planner, resultConvention,
createConvertletTable());
context.getRootSchema(), prefer, resultConvention, createConvertletTable(),
createCluster(planner, new RexBuilder(typeFactory)));

final RelDataType x;
final Prepare.PreparedResult preparedResult;
Expand Down Expand Up @@ -998,7 +998,7 @@ private static String getTypeName(RelDataType type) {
}

protected void populateMaterializations(Context context,
RelOptPlanner planner, Prepare.Materialization materialization) {
Prepare.Materialization materialization, RelOptCluster cluster) {
// REVIEW: initialize queryRel and tableRel inside MaterializationService,
// not here?
try {
Expand All @@ -1010,8 +1010,8 @@ protected void populateMaterializations(Context context,
context.getTypeFactory(),
context.config());
final CalciteMaterializer materializer =
new CalciteMaterializer(this, context, catalogReader, schema, planner,
createConvertletTable());
new CalciteMaterializer(this, context, catalogReader, schema,
createConvertletTable(), cluster);
materializer.populate(materialization);
} catch (Exception e) {
throw new RuntimeException("While populating materialization "
Expand Down Expand Up @@ -1063,6 +1063,7 @@ static class CalcitePreparingStmt extends Prepare
protected final RelDataTypeFactory typeFactory;
protected final SqlRexConvertletTable convertletTable;
private final EnumerableRel.Prefer prefer;
private final RelOptCluster cluster;
private final Map<String, Object> internalParameters =
Maps.newLinkedHashMap();
private int expansionDepth;
Expand All @@ -1074,17 +1075,18 @@ static class CalcitePreparingStmt extends Prepare
RelDataTypeFactory typeFactory,
CalciteSchema schema,
EnumerableRel.Prefer prefer,
RelOptPlanner planner,
Convention resultConvention,
SqlRexConvertletTable convertletTable) {
SqlRexConvertletTable convertletTable,
RelOptCluster cluster) {
super(context, catalogReader, resultConvention);
this.prepare = prepare;
this.schema = schema;
this.prefer = prefer;
this.planner = planner;
this.cluster = cluster;
this.planner = cluster.getPlanner();
this.rexBuilder = cluster.getRexBuilder();
this.typeFactory = typeFactory;
this.convertletTable = convertletTable;
this.rexBuilder = new RexBuilder(typeFactory);
}

@Override protected void init(Class runtimeContextClass) {
Expand All @@ -1096,8 +1098,6 @@ public PreparedResult prepareQueryable(
return prepare_(
new Supplier<RelNode>() {
public RelNode get() {
final RelOptCluster cluster =
prepare.createCluster(planner, rexBuilder);
return new LixToRelTranslator(cluster, CalcitePreparingStmt.this)
.translate(queryable);
}
Expand Down Expand Up @@ -1161,7 +1161,6 @@ private PreparedResult prepare_(Supplier<RelNode> fn,
SqlValidator validator,
CatalogReader catalogReader,
SqlToRelConverter.Config config) {
final RelOptCluster cluster = prepare.createCluster(planner, rexBuilder);
return new SqlToRelConverter(this, validator, catalogReader, cluster,
convertletTable, config);
}
Expand Down Expand Up @@ -1296,7 +1295,7 @@ public Type getElementType() {
? MaterializationService.instance().query(schema)
: ImmutableList.<Prepare.Materialization>of();
for (Prepare.Materialization materialization : materializations) {
prepare.populateMaterializations(context, planner, materialization);
prepare.populateMaterializations(context, materialization, cluster);
}
return materializations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ public String getCorrelVariable() {

@SuppressWarnings("deprecation")
public boolean isDistinct() {
final RelMetadataQuery mq = RelMetadataQuery.instance();
final RelMetadataQuery mq = cluster.getMetadataQuery();
return Boolean.TRUE.equals(mq.areRowsUnique(this));
}

@SuppressWarnings("deprecation")
public boolean isKey(ImmutableBitSet columns) {
final RelMetadataQuery mq = RelMetadataQuery.instance();
final RelMetadataQuery mq = cluster.getMetadataQuery();
return Boolean.TRUE.equals(mq.areColumnsUnique(this, columns));
}

Expand Down Expand Up @@ -243,7 +243,7 @@ public List<RelNode> getInputs() {

@SuppressWarnings("deprecation")
public final double getRows() {
return estimateRowCount(RelMetadataQuery.instance());
return estimateRowCount(cluster.getMetadataQuery());
}

public double estimateRowCount(RelMetadataQuery mq) {
Expand Down Expand Up @@ -285,7 +285,7 @@ public RelNode accept(RexShuttle shuttle) {

@SuppressWarnings("deprecation")
public final RelOptCost computeSelfCost(RelOptPlanner planner) {
return computeSelfCost(planner, RelMetadataQuery.instance());
return computeSelfCost(planner, cluster.getMetadataQuery());
}

public RelOptCost computeSelfCost(RelOptPlanner planner,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/calcite/rel/core/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ public RexNode getCondition() {

@Deprecated // to be removed before 2.0
public static double estimateFilteredRows(RelNode child, RexProgram program) {
final RelMetadataQuery mq = RelMetadataQuery.instance();
final RelMetadataQuery mq = child.getCluster().getMetadataQuery();
return RelMdUtil.estimateFilteredRows(child, program, mq);
}

@Deprecated // to be removed before 2.0
public static double estimateFilteredRows(RelNode child, RexNode condition) {
final RelMetadataQuery mq = RelMetadataQuery.instance();
final RelMetadataQuery mq = child.getCluster().getMetadataQuery();
return RelMdUtil.estimateFilteredRows(child, condition, mq);
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/calcite/rel/core/Join.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public JoinRelType getJoinType() {
public static double estimateJoinedRows(
Join joinRel,
RexNode condition) {
final RelMetadataQuery mq = RelMetadataQuery.instance();
final RelMetadataQuery mq = joinRel.getCluster().getMetadataQuery();
return Util.first(RelMdUtil.getJoinRowCount(mq, joinRel, condition), 1D);
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/calcite/rel/core/Union.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected Union(RelInput input) {

@Deprecated // to be removed before 2.0
public static double estimateRowCount(RelNode rel) {
final RelMetadataQuery mq = RelMetadataQuery.instance();
final RelMetadataQuery mq = rel.getCluster().getMetadataQuery();
return RelMdUtil.getUnionAllRowCount(mq, (Union) rel);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,9 @@ private static <M extends Metadata> MetadataHandler<M> load3(
.append(method.i)
.append(")");
}
buff.append(", r");
safeArgList(buff, method.e)
.append(");\n")
.append(" final Object v = mq.map.get(key);\n")
.append(" final Object v = mq.map.get(r, key);\n")
.append(" if (v != null) {\n")
.append(" if (v == ")
.append(NullSentinel.class.getName())
Expand All @@ -295,7 +294,7 @@ private static <M extends Metadata> MetadataHandler<M> load3(
.append(method.e.getReturnType().getName())
.append(") v;\n")
.append(" }\n")
.append(" mq.map.put(key,")
.append(" mq.map.put(r, key,")
.append(NullSentinel.class.getName())
.append(".ACTIVE);\n")
.append(" try {\n")
Expand All @@ -306,14 +305,14 @@ private static <M extends Metadata> MetadataHandler<M> load3(
.append("_(r, mq");
argList(buff, method.e)
.append(");\n")
.append(" mq.map.put(key, ")
.append(" mq.map.put(r, key, ")
.append(NullSentinel.class.getName())
.append(".mask(x));\n")
.append(" return x;\n")
.append(" } catch (")
.append(Exception.class.getName())
.append(" e) {\n")
.append(" mq.map.remove(key);\n")
.append(" mq.map.row(r).clear();\n")
.append(" throw e;\n")
.append(" }\n")
.append(" }\n")
Expand Down
Loading

0 comments on commit af04ef4

Please sign in to comment.