Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust joins cost estimation [HZ-2658, HZ-2494] #25249

Closed
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
57b9c02
Adjust joins cost estimation
Fly-Style Aug 17, 2023
7d4410f
Fix test
Fly-Style Aug 18, 2023
187c05d
Introduce QueryPlanListener for further optimizer tests
Fly-Style Aug 18, 2023
1775447
Adjust cost model
Fly-Style Aug 18, 2023
48abe08
Brackets
Fly-Style Aug 18, 2023
98ad736
Revert test change back, and actually it is correct
Fly-Style Aug 18, 2023
d1a1b16
Address review comments
Fly-Style Sep 4, 2023
c4bd3ff
Divide processed and produced rows
Fly-Style Sep 5, 2023
83dc9b5
Apply suggestions from code review
Fly-Style Sep 7, 2023
dc4e5dc
Merge branch 'master' into fix/5.4/hash-join-cost-adj
Fly-Style Sep 7, 2023
d003fc7
Adjust costs after the discussion
Fly-Style Sep 8, 2023
f2b9e19
Merge branch 'master' into fix/5.4/hash-join-cost-adj
Fly-Style Sep 8, 2023
bd9efeb
Remove selectivity from cpu calculation
Fly-Style Sep 18, 2023
50fee00
Fix algorithm even better
Fly-Style Sep 18, 2023
6044a69
Update comment
Fly-Style Sep 18, 2023
516ab7b
Fix HazelcastTable ctor workflow
Fly-Style Sep 18, 2023
05ec305
Update hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/phy…
Fly-Style Sep 19, 2023
68def60
Multiply
Fly-Style Sep 28, 2023
b06e92b
Cost readjustment
Fly-Style Sep 28, 2023
4933580
Merge branch 'master' into fix/5.4/hash-join-cost-adj
Fly-Style Oct 4, 2023
fbd5492
Docs and constant adjustments
Fly-Style Oct 4, 2023
ae0bd63
Typos
Fly-Style Oct 4, 2023
f65293f
Revert back rows estimation of IMap; add broadcast factor for hash LE…
Fly-Style Oct 19, 2023
491e5fd
Clarify doc for hash join
Fly-Style Jan 12, 2024
a716a94
Checkstyle
Fly-Style Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ public SqlPlan prepare(OptimizationTask task) {
int memberCount = nodeEngine.getClusterService().getSize(MemberSelectors.DATA_MEMBER_SELECTOR);

OptimizerContext context = OptimizerContext.create(
nodeEngine.getHazelcastInstance(),
task.getSchema(),
task.getSearchPaths(),
task.getArguments(),
Expand Down Expand Up @@ -804,8 +805,7 @@ private PhysicalRel optimize(
logger.fine("After physical opt:\n" + RelOptUtil.toString(physicalRel));
}

PhysicalRel finalPhysicalRel = physicalRel;
queryPlanListeners.forEach(l -> l.onQueryPlanBuilt(finalPhysicalRel));
// TODO[sasha]: capture final physical rel for listeners here.
Fly-Style marked this conversation as resolved.
Show resolved Hide resolved
return physicalRel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hazelcast.jet.sql.impl;

import com.google.common.collect.ImmutableList;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.sql.impl.opt.cost.CostFactory;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdBoundedness;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdPrunability;
Expand Down Expand Up @@ -107,14 +108,15 @@ private OptimizerContext(
* @return Context.
*/
public static OptimizerContext create(
HazelcastInstance hz,
SqlCatalog schema,
List<List<String>> searchPaths,
List<Object> arguments,
int memberCount,
IMapResolver iMapResolver
) {
// Resolve tables.
HazelcastSchema rootSchema = HazelcastSchemaUtils.createRootSchema(schema);
HazelcastSchema rootSchema = HazelcastSchemaUtils.createRootSchema(hz, schema);

return create(rootSchema, searchPaths, arguments, memberCount, iMapResolver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ public class Cost implements RelOptCost {
public static final Cost HUGE = new Cost(Double.MAX_VALUE / 100, Double.MAX_VALUE / 100, Double.MAX_VALUE / 100);
public static final Cost INFINITY = new Cost(Double.MAX_VALUE, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);

/**
* Multiplier to display hash table building actions:
* - row hash computation; (estimate - 3 ops, the process itself is heavier from CPU ops POV)
* - probe hash table; (estimate - 1 op)
* - walk through hash chain (estimate - 1 op, assuming hash collision may happen)
* - and compare with each element; (estimate - 1 op, assuming hash collision may happen)
* - add the k-v to the table. (estimate - 1 op).
*/
public static final double HASH_JOIN_MULTIPLIER = 7;
Fly-Style marked this conversation as resolved.
Show resolved Hide resolved

/**
* Multiplier to display row comparison
*/
public static final double JOIN_ROW_CMP_MULTIPLIER = 2;

private final double rows;
private final double cpu;
private final double network;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.jet.sql.impl.opt.physical;

import com.hazelcast.jet.sql.impl.opt.cost.Cost;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
Expand All @@ -28,7 +29,6 @@
import org.checkerframework.checker.nullness.qual.Nullable;

public class JoinHashPhysicalRel extends JoinPhysicalRel {
private static final double COST_FACTOR = 1.1;

JoinHashPhysicalRel(
RelOptCluster cluster,
Expand Down Expand Up @@ -58,9 +58,27 @@ public Join copy(
return new JoinHashPhysicalRel(getCluster(), traitSet, left, right, conditionExpr, joinType);
}

/**
* Cost calculation of Hash Join relation. It does not rely on children cost.
* <p>
* Hash Join algorithm is a more advanced join algorithm, which builds a hash table for the left
* row set, and then compare each row from the right side. Cost estimation is the following: <ol>
* <li> Produced row count is L * R * (join selectivity).
* <li> Processed row count is L + R because we traverse both sides once per join.
* <li> CPU is L * (hash table build cost) + R * (row comparison cost). </ol>
* <p>
* A perfect estimation must also include memory (occupied by the hash table) and IO costs.
*/
@Override
@Nullable
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
return super.computeSelfCost(planner, mq).multiplyBy(COST_FACTOR);
double leftRowCount = mq.getRowCount(getLeft());
double rightRowCount = mq.getRowCount(getRight());

double producedRowCount = mq.getRowCount(this);
double cpu = leftRowCount * Cost.HASH_JOIN_MULTIPLIER
Copy link
Contributor

@k-jamroz k-jamroz Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case of outer join one of the sides is broadcast. This requires more processing. Shouldn't we include that in cost calculation (I don't mean network - however that would nice too, but CPU)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we include that in cost calculation

Yes, makes sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not what I meant: in case of outer non-equi join, outer side is broadcast. So we do rightCount*memberCount lookups which directly translates to CPU cost of lookups. However, they are also different than in case of equi-join. Eg. for query like select * from m1 left join m2 on m1.__key<>m2.__key hashmap in SqlHashJoinP degenerates to single key, and we iterate over all rows (this is a de-facto nested loop with spooling left side in memory)

Additionally, those rows have to be sent over network, but AFAIR we do not add network cost to CPU cost. We have separate network cost, which has it's own mutliplier but is generally 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hashmap in SqlHashJoinP degenerates to single key

If I see correctly in case of non-equi hash join the hashmap is always degenerate. So we:

  1. build memberCount single-unique-key hash maps (multimaps to be precise) each containing rightCount items
  2. then we do leftCount lookups (in single-key hash map) and iterate over all found rows (ie. rightCount rows). This gives leftCount lookups (are they constant time with regard to rightCount?) and leftCount*rightCount comparisons for iterating over elements found in multimap (ie. all right side rows).
  3. then projection of matching rows

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equi-join case is somewhere on the continuum between "hashmap lookup gives always single right row" and "hashmap lookup gives all right rows" depending on selectivity of equijoin columns (which we cannot estimate because we do not have column histograms).
also in equi-join hash table is partitioned, but might be skewed or even degenerate for the same reason

+ rightRowCount * Cost.JOIN_ROW_CMP_MULTIPLIER;

return planner.getCostFactory().makeCost(producedRowCount, cpu, 0.);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@
import com.google.common.collect.ImmutableList;
import com.hazelcast.jet.sql.impl.HazelcastPhysicalScan;
import com.hazelcast.jet.sql.impl.opt.OptUtils;
import com.hazelcast.jet.sql.impl.opt.cost.Cost;
import com.hazelcast.jet.sql.impl.schema.HazelcastTable;
import com.hazelcast.jet.sql.impl.validate.HazelcastSqlOperatorTable;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.util.ImmutableIntList;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -109,6 +114,46 @@ public <V> V accept(CreateDagVisitor<V> visitor) {
return visitor.onNestedLoopJoin(this);
}

/**
* Cost calculation of Nested Loop Join relation.
* <p>
* Nested Loop Join algorithm is a simple join algorithm, where for each left row,
* we are traversing the whole right row set. Cost estimation is the following: <ol>
* <li> Produced row count is L * R * (join selectivity).
* <li> Processed row count is L * k * R, where k is 1 for non-equi-join,
* (join selectivity) ≤ k ≤ 1 for equi-join and 1/R for key lookup.
* <li> CPU is L * (join selectivity) * R * (row comparison cost) assuming k
* converges to the join selectivity on average. </ol>
* <p>
* A perfect estimation must also include memory and IO costs.
*/
@Override
@Nullable
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
final double leftRowCount = mq.getRowCount(left);
final double rightRowCount = mq.getRowCount(right);
if (Double.isInfinite(leftRowCount) || Double.isInfinite(rightRowCount)) {
Fly-Style marked this conversation as resolved.
Show resolved Hide resolved
return planner.getCostFactory().makeInfiniteCost();
}

RelOptCost rightCost = planner.getCost(getRight(), mq);
if (rightCost == null) {
return planner.getCostFactory().makeInfiniteCost();
}

Double selectivity = mq.getSelectivity(this, condition);
if (selectivity == null) {
selectivity = 1.;
}

// TODO: introduce selectivity estimator, but ATM we taking the worst case scenario : selectivity = 1.0.
double producedRows = mq.getRowCount(this);
double processedRowsEstimate = leftRowCount * selectivity * rightRowCount;
double cpuEstimate = Math.max(1.0, processedRowsEstimate - 1) * Cost.JOIN_ROW_CMP_MULTIPLIER;

return planner.getCostFactory().makeCost(producedRows, cpuEstimate, 0);
}

@Override
public Join copy(
RelTraitSet traitSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.hazelcast.jet.sql.impl.schema;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.sql.impl.QueryUtils;
import com.hazelcast.sql.impl.schema.SqlCatalog;
import com.hazelcast.sql.impl.schema.Table;
import com.hazelcast.sql.impl.schema.map.PartitionedMapTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Statistic;

Expand Down Expand Up @@ -56,7 +58,7 @@ public static HazelcastSchema createCatalog(Schema schema) {
*
* @return Top-level schema.
*/
public static HazelcastSchema createRootSchema(SqlCatalog catalog) {
public static HazelcastSchema createRootSchema(HazelcastInstance hz, SqlCatalog catalog) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need hz anymore

// Create schemas.
Map<String, Schema> schemaMap = new HashMap<>();

Expand All @@ -68,11 +70,9 @@ public static HazelcastSchema createRootSchema(SqlCatalog catalog) {
for (Map.Entry<String, Table> tableEntry : currentSchemaEntry.getValue().entrySet()) {
String tableName = tableEntry.getKey();
Table table = tableEntry.getValue();

HazelcastTable convertedTable = new HazelcastTable(
table,
createTableStatistic(table)
);
HazelcastTable convertedTable = table instanceof PartitionedMapTable
? new HazelcastTable(table, hz)
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved
: new HazelcastTable(table, createTableStatistic(table));

schemaTables.put(tableName, convertedTable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.jet.sql.impl.schema;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.sql.impl.opt.OptUtils;
import com.hazelcast.jet.sql.impl.opt.common.CalcIntoScanRule;
import com.hazelcast.jet.sql.impl.opt.cost.CostUtils;
Expand Down Expand Up @@ -46,6 +47,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.function.Supplier;

import static java.util.stream.Collectors.joining;

Expand Down Expand Up @@ -83,30 +85,33 @@
* properties, thus making further optimization more complex.
*/
public class HazelcastTable extends AbstractTable {

private final Table target;
private final Statistic statistic;
private final Supplier<Statistic> statisticSupplier;
private final RexNode filter;
private List<RexNode> projects;

private RelDataType rowType;
private final Set<String> hiddenFieldNames = new HashSet<>();

public HazelcastTable(Table target, HazelcastInstance instance) {
this(target, createTableStatistic(target, instance));
}

public HazelcastTable(Table target, Statistic statistic) {
this.target = target;
this.statistic = statistic;
this.statisticSupplier = () -> statistic;
this.filter = null;
burakgok marked this conversation as resolved.
Show resolved Hide resolved
}

private HazelcastTable(
Table target,
Statistic statistic,
@Nonnull List<RexNode> projects,
Supplier<Statistic> statisticSupplier,
List<RexNode> projects,
@Nullable RelDataType rowType,
@Nullable RexNode filter
) {
this.target = target;
this.statistic = statistic;
this.statisticSupplier = statisticSupplier;
this.projects = projects;
this.rowType = rowType == null ? computeRowType(projects) : rowType;
this.filter = filter;
Expand All @@ -127,11 +132,11 @@ private void initRowType() {
}

public HazelcastTable withProject(List<RexNode> projects, @Nullable RelDataType rowType) {
return new HazelcastTable(target, statistic, projects, rowType, filter);
return new HazelcastTable(target, statisticSupplier, projects, rowType, filter);
}

public HazelcastTable withFilter(RexNode filter) {
return new HazelcastTable(target, statistic, projects, rowType, filter);
return new HazelcastTable(target, statisticSupplier, projects, rowType, filter);
}

@Nonnull
Expand All @@ -158,6 +163,7 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {

@Override
public Statistic getStatistic() {
Statistic statistic = statisticSupplier.get();
if (filter == null) {
return statistic;
} else {
Expand All @@ -167,8 +173,9 @@ public Statistic getStatistic() {
}
}

@SuppressWarnings("DataFlowIssue")
public double getTotalRowCount() {
return statistic.getRowCount();
return statisticSupplier.get().getRowCount();
}

public boolean isHidden(String fieldName) {
Expand Down Expand Up @@ -212,12 +219,17 @@ private RelDataType computeRowType(List<RexNode> projects) {
return new RelRecordType(StructKind.PEEK_FIELDS, typeFields, false);
}

private static Statistic createTableStatistic(Table table, HazelcastInstance instance) {
return new HazelcastTableStatistic(instance.getMap(table.getSqlName()).size());
}

/**
* Statistics that takes into account the row count after the filter is applied.
*/
private final class AdjustedStatistic implements Statistic {

private final Double rowCount;
private final Statistic statistic = statisticSupplier.get();

private AdjustedStatistic(Double rowCount) {
this.rowCount = rowCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,7 @@ protected static PlanRow parse(String input) {

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < level; i++) {
builder.append(" ");
}
builder.append(node);
return builder.toString();
return " ".repeat(Math.max(0, level)) + node;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ private static OptimizerContext createContext() {
List<List<String>> searchPaths = QueryUtils.prepareSearchPaths(emptyList(), tableResolvers);

return OptimizerContext.create(
instance(),
new SqlCatalog(tableResolvers),
searchPaths,
emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ private static OptimizerContext createContext() {
List<List<String>> searchPaths = QueryUtils.prepareSearchPaths(emptyList(), tableResolvers);

return OptimizerContext.create(
instance(),
new SqlCatalog(tableResolvers),
searchPaths,
emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private void checkQuery(String query) {

private static OptimizerContext createContext() {
return OptimizerContext.create(
instance(),
new SqlCatalog(emptyList()),
emptyList(),
emptyList(),
Expand Down